stormデータストリームグループ

2871 ワード

カスタムデータストリームグループbacktypeを実装できます.storm.grouping.CustomstreamGroupインタフェースは、カスタムデータストリームグループを作成し、どのboltがどのメタグループを受信するかを自分で決定します.単語カウンタの例を変更して、同じアルファベットの単語を同じboltで受信します.
    public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
        int numTasks = 0;

        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }

        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }

これはCustomStreamGroupの簡単な実装であり,ここでは単語の頭文字の整数値とタスク数の残数を用いて,受信メタグループのboltを決定する.
 
このカスタムデータストリームグループをword-normalizerで修正すると使用できます.
    builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());

 
 
 
 
 
 
ダイレクト・データ・ストリーム・グループこれは特殊なデータ・ストリーム・グループであり、データ・ソースは、どのコンポーネントがメタ・グループを受信するかを決定するために使用できます.前の例と同様に、データソースは、単語の頭文字に基づいて、どのboltがメタグループを受信するかを決定する.ダイレクト・データ・ストリーム・グループを使用するには、WordNormalizer boltでemitの代わりにemitDirectメソッドを使用します.   
public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //       
        collector.ack(input);
    }

    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }

prepareメソッドでのタスク数の計算
    public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }

トポロジ定義でデータ・ストリームが直接グループ化されることを指定します.
    builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");