Storm Localモード数単語
一、Spoutコード大放送
二、Blotコード大放送
三、主な方法大放送
public static class SentenceSpout extends BaseRichSpout {
SpoutOutputCollector spoutOutputCollector;
private Integer count = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
spoutOutputCollector = collector;
}
@Override
public void nextTuple() {
if (count > 0) {
Utils.sleep(5000);
return;
}
count++;
Utils.sleep(100);
String message = "hello storm hello spark hello lwj hello smj hello bigdata hello flume hello hbase hello sb hello buffer";
List<Integer> source = spoutOutputCollector.emit(new Values(message), "source");
LOG.info("SOURCE ========== " + source);
}
@Override
public void ack(Object id) {
LOG.info("ack ID ==== " + id.toString());
}
@Override
public void fail(Object id) {
LOG.info("fail ID ==== " + id.toString());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
二、Blotコード大放送
public static final Log LOG = LogFactory.getLog(WordCountTopology.class);
public static class WordCountBlot extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void cleanup() {
LOG.info("======= ========");
LOG.info(counts);
LOG.info("======= ========");
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//hello storm hello spark
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static class SplitSentenceBolt implements IRichBolt {
OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
outputCollector = collector;
}
@Override
public void execute(Tuple input) {
String message = input.getString(0);
String[] split = message.split(" ");
for (String s : split) {
outputCollector.emit(new Values(s));
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
三、主な方法大放送
TopologyBuilder builder = new TopologyBuilder();
// key-value key
builder.setSpout("sentence", new SentenceSpout(), 1);
builder.setBolt("word", new SplitSentenceBolt(), 1).shuffleGrouping("sentence");
builder.setBolt("count", new WordCountBlot(), 1).shuffleGrouping("word");
Config conf = new Config();
conf.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();