stormデータストリームグループ
2871 ワード
カスタムデータストリームグループbacktypeを実装できます.storm.grouping.CustomstreamGroupインタフェースは、カスタムデータストリームグループを作成し、どのboltがどのメタグループを受信するかを自分で決定します.単語カウンタの例を変更して、同じアルファベットの単語を同じboltで受信します.
これはCustomStreamGroupの簡単な実装であり,ここでは単語の頭文字の整数値とタスク数の残数を用いて,受信メタグループのboltを決定する.
このカスタムデータストリームグループをword-normalizerで修正すると使用できます.
ダイレクト・データ・ストリーム・グループこれは特殊なデータ・ストリーム・グループであり、データ・ソースは、どのコンポーネントがメタ・グループを受信するかを決定するために使用できます.前の例と同様に、データソースは、単語の頭文字に基づいて、どのboltがメタグループを受信するかを決定する.ダイレクト・データ・ストリーム・グループを使用するには、WordNormalizer boltでemitの代わりにemitDirectメソッドを使用します.
prepareメソッドでのタスク数の計算
トポロジ定義でデータ・ストリームが直接グループ化されることを指定します.
public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
int numTasks = 0;
@Override
public List<Integer> chooseTasks(List<Object> values) {
List<Integer> boltIds = new ArrayList<Integer>();
if(values.size()>0){
String str = values.get(0).toString();
if(str.isEmpty()){
boltIds.add(0);
}else{
boltIds.add(str.charAt(0) % numTasks);
}
}
return boltIds;
}
@Override
public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
numTasks = targetTasks.size();
}
}
これはCustomStreamGroupの簡単な実装であり,ここでは単語の頭文字の整数値とタスク数の残数を用いて,受信メタグループのboltを決定する.
このカスタムデータストリームグループをword-normalizerで修正すると使用できます.
builder.setBolt("word-normalizer", new WordNormalizer())
.customGrouping("word-reader", new ModuleGrouping());
ダイレクト・データ・ストリーム・グループこれは特殊なデータ・ストリーム・グループであり、データ・ソースは、どのコンポーネントがメタ・グループを受信するかを決定するために使用できます.前の例と同様に、データソースは、単語の頭文字に基づいて、どのboltがメタグループを受信するかを決定する.ダイレクト・データ・ストリーム・グループを使用するには、WordNormalizer boltでemitの代わりにemitDirectメソッドを使用します.
public void execute(Tuple input) {
...
for(String word : words){
if(!word.isEmpty()){
...
collector.emitDirect(getWordCountIndex(word),new Values(word));
}
}
//
collector.ack(input);
}
public Integer getWordCountIndex(String word) {
word = word.trim().toUpperCase();
if(word.isEmpty()){
return 0;
}else{
return word.charAt(0) % numCounterTasks;
}
}
prepareメソッドでのタスク数の計算
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.numCounterTasks = context.getComponentTasks("word-counter");
}
トポロジ定義でデータ・ストリームが直接グループ化されることを指定します.
builder.setBolt("word-counter", new WordCounter(),2)
.directGrouping("word-normalizer");