stormカスタムgrouping


stormにはgroupingスキームがたくさんあります
storm wikiでは、様々なgroupingについて次のように説明しています.

Stream groupings


Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt's tasks.
There are seven built-in stream groupings in Storm, and you can implement a custom stream grouping by implementing theCustomStreamGrouping interface:
Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id"field, tuples with the same "user-id"will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
All grouping: The stream is replicated across all the bolt's tasks. Use this grouping with care.
Global grouping: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
None grouping: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the emitDirect methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the  emit  method in OutputCollector (which returns the task ids that the tuple was sent to).
Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.
Resources:
TopologyBuilder: use this class to define topologies
InputDeclarer: this object is returned whenever  setBolt  is called on  TopologyBuilder  and is used for declaring a bolt's input streams and how those streams should be grouped
CoordinatedBolt: this bolt is useful for distributed RPC topologies and makes heavy use of direct streams and direct groupings
私たちは今、ビジネスでユーザーのuidをセグメント化されたルールに従って対応するtaskにグループ化したいという問題に直面しています.そこで、uid%kの方法で同じモジュール値をtaskに記録してビジネス処理を行い、自分でModStreamingGroupingを実現しました.コードは以下の通りです.
package storm.starter;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;

public class ModStreamGrouping implements CustomStreamGrouping {
	
	private Map _map;
	private TopologyContext _ctx;
	private Fields _fields;
	private List<Integer> _targetTasks;
	
	public ModStreamGrouping(){
		
	}
	
	@Override
	public void prepare(TopologyContext context, Fields outFields,
			List<Integer> targetTasks) {
		// TODO Auto-generated method stub
		_ctx = context;
		_fields = outFields;
		_targetTasks = targetTasks;
	}

	@Override
	public List<Integer> chooseTasks(List<Object> values) {
		// TODO Auto-generated method stub
		Long groupingKey = Long.valueOf( values.get(0).toString());
		int index = (int) (groupingKey%(_targetTasks.size()));
		return Arrays.asList(_targetTasks.get(index));
	}

}

テストコード:
package storm.starter;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class ModGroupingTest {
	public static class TestUidSpout extends BaseRichSpout {
	    boolean _isDistributed;
	    SpoutOutputCollector _collector;
	        
	    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
	        _collector = collector;
	    }
	    
	    public void close() {
	        
	    }
	        
	    public void nextTuple() {
	        Utils.sleep(100);
	       
	        final Random rand = new Random();
	        final int uid =rand.nextInt(100000000);
	        
	        _collector.emit(new Values(uid));
	        
	    }
	    
	    public void ack(Object msgId) {

	    }

	    public void fail(Object msgId) {
	        
	    }
	    
	    public void declareOutputFields(OutputFieldsDeclarer declarer) {
	    	declarer.declare(new Fields("uid"));
	    }

   
	}
	
	public static class modGroupBolt extends BaseRichBolt {
        OutputCollector _collector;
        String _ComponentId;
        int _TaskId;
        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

        	_collector = collector;
        	_ComponentId = context.getThisComponentId();
        	_TaskId = context.getThisTaskId();
        }

        @Override
        public void execute(Tuple tuple) {
//            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        	System.out.println(_ComponentId+":"+_TaskId +"recevie :" + tuple.getInteger(0));
            
        	_collector.emit(new Values(tuple));
            _collector.ack(tuple);
            
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("uid"));
        }


    }
	
	public static void main(String args[]){
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("uid", new TestUidSpout());
		builder.setBolt("process", new modGroupBolt(), 10).customGrouping("uid", new ModStreamGrouping());
		
		Config config = new Config();
		config.setDebug(true);
		
		config.setNumWorkers(3);
		LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", config, builder.createTopology());
//        Utils.sleep(30000);
//        cluster.killTopology("test");
//        cluster.shutdown();    
	}
}