Storm Localモード数単語


一、Spoutコード大放送
public static class SentenceSpout extends BaseRichSpout {
        SpoutOutputCollector spoutOutputCollector;
        private Integer count = 0;
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            spoutOutputCollector = collector;
        }
        @Override
        public void nextTuple() {
            if (count > 0) {
                Utils.sleep(5000);
                return;
            }
            count++;
            Utils.sleep(100);
            String message = "hello storm hello spark hello lwj hello smj hello bigdata hello flume hello hbase hello sb hello buffer";
            List<Integer> source = spoutOutputCollector.emit(new Values(message), "source");
            LOG.info("SOURCE ========== " + source);
        }

        @Override
        public void ack(Object id) {
            LOG.info("ack ID ==== " + id.toString());
        }

        @Override
        public void fail(Object id) {
            LOG.info("fail ID ==== " + id.toString());
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    
二、Blotコード大放送
public static final Log LOG = LogFactory.getLog(WordCountTopology.class);

    public static class WordCountBlot extends BaseBasicBolt {

        Map<String, Integer> counts = new HashMap<String, Integer>();

        @Override
        public void cleanup() {
            LOG.info("======= ========");
            LOG.info(counts);
            LOG.info("======= ========");
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            //hello storm hello spark
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) {
                count = 0;
            }
            count++;
            counts.put(word, count);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }


    public static class SplitSentenceBolt implements IRichBolt {
        OutputCollector outputCollector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            outputCollector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String message = input.getString(0);
            String[] split = message.split(" ");
            for (String s : split) {
                outputCollector.emit(new Values(s));
            }
        }

        @Override
        public void cleanup() {

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

三、主な方法大放送
TopologyBuilder builder = new TopologyBuilder();

        //    key-value      key 
        builder.setSpout("sentence", new SentenceSpout(), 1);
        builder.setBolt("word", new SplitSentenceBolt(), 1).shuffleGrouping("sentence");
        builder.setBolt("count", new WordCountBlot(), 1).shuffleGrouping("word");

        Config conf = new Config();

        conf.setMaxTaskParallelism(1);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word-count", conf, builder.createTopology());

        Thread.sleep(10000);

        cluster.shutdown();