Storm starter - Overview
13079 ワード
Stormのstarterの例は、すべて誠意を持っていて、例だけではなく、実際のシーンに直接使用することができます.そして、SlidingWindowCounter、TimeCacheMapなどの役に立つtoolを高めることができます.だからstarterはstormベースのプログラミングのフレームワークを高めたと言えるので、真剣に研究する価値があります.
ExclamationTopology、基本的なTopology
特に何もない、標準的な例
RollingTopWords
TopNとスライドウィンドウ機能を実現したこの例のBolt実現は指導的意義があり,Storm starter - RollingTopWords
SingleJoinExample
TimeCacheMapによりmemoryベースのjoin,Storm starter - SingleJoinExampleを実現
BasicDRPCTopology, ReachTopology
DRPCの例については、Twitter Storm – DRPC参照
TransactionalGlobalCount, TransactionalWords
Transactional Topology, Storm - Transactional-topologies
TransactionalGlobalCountは比較的簡単で、TransactionalWordsがwordカウントに加えてword count分布統計を加えているのを見てみましょう.
Count_の使用Databaseはwordのカウントを記録します
Bucket_の使用Databaseはwordカウントの分布を記録します.例えば、0~9回のwordがどれだけあるか、10~20のwordがどれだけあるかなどです.
KeyedCountUpdaterについては前述の簡単な例と大きな違いはなく、executeではwordをcountし、finishBatchでは直接commitからCount_へDatabase出力、new Fields(「id」,「key」,「count」,「prev-count」)その他はよく理解できますが、なぜprev-countが必要ですか?Bucket_を更新しているのでDatabaseは、wordのbucketが移行したかどうかを知る必要があるので、前のcountを知る必要があります.
Bucketize,count/BUCKET_によるSIZEは、どのbucketに属するべきかを算出し、新しいwordであれば、あるbucket+1で直接wordのbucketが変化すれば、新しいbucket+1では、古いbucket–1が変化しなければ、出力する必要はありません
BucketCountUpdater、つまり上のbucketの更新をBucket_に更新Database
Topologyは次のように定義されています.
WordCountTopology、多言語サポート
Stormマルチ言語サポート
ShellBoltとBaseBasicBoltをそれぞれ使用してpythonとJavaで実装されたBlotを宣言します.
Topologyを定義する際には、ShellBoltとBaseBasicBoltをそのまま混ぜ合わせることができ、とても便利です
ExclamationTopology、基本的なTopology
特に何もない、標準的な例
/**
* This is a basic example of a Storm topology.
*/
public class ExclamationTopology {
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
Config conf = new Config();
conf.setDebug(true);
if(args!=null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
RollingTopWords
TopNとスライドウィンドウ機能を実現したこの例のBolt実現は指導的意義があり,Storm starter - RollingTopWords
SingleJoinExample
TimeCacheMapによりmemoryベースのjoin,Storm starter - SingleJoinExampleを実現
BasicDRPCTopology, ReachTopology
DRPCの例については、Twitter Storm – DRPC参照
TransactionalGlobalCount, TransactionalWords
Transactional Topology, Storm - Transactional-topologies
TransactionalGlobalCountは比較的簡単で、TransactionalWordsがwordカウントに加えてword count分布統計を加えているのを見てみましょう.
public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
Count_の使用Databaseはwordのカウントを記録します
Bucket_の使用Databaseはwordカウントの分布を記録します.例えば、0~9回のwordがどれだけあるか、10~20のwordがどれだけあるかなどです.
public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter
KeyedCountUpdaterについては前述の簡単な例と大きな違いはなく、executeではwordをcountし、finishBatchでは直接commitからCount_へDatabase出力、new Fields(「id」,「key」,「count」,「prev-count」)その他はよく理解できますが、なぜprev-countが必要ですか?Bucket_を更新しているのでDatabaseは、wordのbucketが移行したかどうかを知る必要があるので、前のcountを知る必要があります.
Bucketize,count/BUCKET_によるSIZEは、どのbucketに属するべきかを算出し、新しいwordであれば、あるbucket+1で直接wordのbucketが変化すれば、新しいbucket+1では、古いbucket–1が変化しなければ、出力する必要はありません
public static class Bucketize extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
int curr = tuple.getInteger(2);
Integer prev = tuple.getInteger(3);
int currBucket = curr / BUCKET_SIZE;
Integer prevBucket = null;
if(prev!=null) {
prevBucket = prev / BUCKET_SIZE;
}
if(prevBucket==null) {
collector.emit(new Values(attempt, currBucket, 1));
} else if(currBucket != prevBucket) {
collector.emit(new Values(attempt, currBucket, 1));
collector.emit(new Values(attempt, prevBucket, -1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("attempt", "bucket", "delta"));
}
}
BucketCountUpdater、つまり上のbucketの更新をBucket_に更新Database
Topologyは次のように定義されています.
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
builder.setBolt("count", new KeyedCountUpdater(), 5)
.fieldsGrouping("spout", new Fields("word"));
builder.setBolt("bucketize", new Bucketize())
.noneGrouping("count");
builder.setBolt("buckets", new BucketCountUpdater(), 5)
.fieldsGrouping("bucketize", new Fields("bucket"));
WordCountTopology、多言語サポート
Stormマルチ言語サポート
ShellBoltとBaseBasicBoltをそれぞれ使用してpythonとJavaで実装されたBlotを宣言します.
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if(count==null) count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
Topologyを定義する際には、ShellBoltとBaseBasicBoltをそのまま混ぜ合わせることができ、とても便利です
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));