Storm-ソース分析-TOpology Submit-Task-TOpologyContext(backtype.storm.task)

11635 ワード

1. GeneralTopologyContext


StormTopologyを含むTopologyの基本情報を記録し、StormConfが彼らから導出した、taskとcomponent、componentのstreams、input/output情報を含む.
public class GeneralTopologyContext implements JSONAware {
    private StormTopology _topology; 
    private Map<Integer, String> _taskToComponent;
    private Map<String, List<Integer>> _componentToTasks;
    private Map<String, Map<String, Fields>> _componentToStreamToFields; //ComponentCommon.streams, map<string, StreamInfo>
    private String _stormId;   ;;topology id
    protected Map _stormConf;  

}

StormTopology,workerディスクstormcode.serから読み出し
struct StormTopology {
  //ids must be unique across maps
  // #workers to use is in conf
  1: required map<string, SpoutSpec> spouts;
  2: required map<string, Bolt> bolts;
  3: required map<string, StateSpoutSpec> state_spouts;
}

StormConf,workerはディスクstormconf.serから読み出す
taskToComponent,componentToTasks,taskとcomponentの対応関係
componentTostreamToFields,componentはどのstreamsを含んで、各streamはどのfieldsを含みます
明らかな操作に加えて、componentの入出力を得るための操作もあります.
    /**
     * Gets the declared inputs to the specified component.
     *
     * @return A map from subscribed component/stream to the grouping subscribed with.
     */
    public Map<GlobalStreamId, Grouping> getSources(String componentId) {
        return getComponentCommon(componentId).get_inputs();  //ComponentCommon.inputs,map<GlobalStreamId, Grouping>
    }
    /**
     * Gets information about who is consuming the outputs of the specified component,
     * and how.
     *
     * @return Map from stream id to component id to the Grouping used.
     */
    public Map<String, Map<String, Grouping>> getTargets(String componentId) {
        Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
        for(String otherComponentId: getComponentIds()) {  // components id
            Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();  // component inputs
            for(GlobalStreamId id: inputs.keySet()) {  // inputs stream-id
                if(id.get_componentId().equals(componentId)) {    // stream component component
                    Map<String, Grouping> curr = ret.get(id.get_streamId());
                    if(curr==null) curr = new HashMap<String, Grouping>();
                    curr.put(otherComponentId, inputs.get(id));
                    ret.put(id.get_streamId(), curr);
                }
            }
        }
        return ret; // [steamid, [target-componentid, grouping]]
    }

この中のgetComponentCommonとgetComponentIdsは、ThriftTopologyUtilsクラスから誤解しないでください.thriftAPIでnimbusに行って情報を取得するのではなく、StormTopologyから情報を読むだけです.StormTopologyクラス自体はgenerated by thrift thriftで発生したclassで、metaDataMapがあるので、以下のように実現します.
    public static Set<String> getComponentIds(StormTopology topology) {
        Set<String> ret = new HashSet<String>();
        for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) {
            Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);
            ret.addAll(componentMap.keySet());
        }
        return ret;
    }

MetaDataMapでStormTopologyの中にあるfield,spouts,bolts,state_を読み出します义齿
このような利点は、動的で、StormTopologyが変化した場合、コードを変更する必要がなく、通常のjava classではこのような機能を実現できないはずですが、pythonのような動的言語では、簡単です.
もちろんここではStormTopologyから直接書く必要はありませんspouts...で読みます
 
storm.thriftではComponentCommonの定義を見て、上の2つの関数はgetTargetsの実現をよく理解しています.見てみる必要があります.inputsからoutputsを発売したからです.ComponentCommonではoutputのstreamidとfieldsしか記録されていませんが、このstreamがどのcomponentに送られているのか分かりませんが、inputではstreamidはGlobalStreamIdタイプで、GlobalStreamIdにはstreamidだけが含まれています.ソースcomponentのcomponentidもあるのでこれから逆算できます.ソースcomponentが現在のcomponentであれば、そのcomponentがソースcomponentのtarget componentであることを説明します.
struct ComponentCommon {
  1: required map<GlobalStreamId, Grouping> inputs;
  2: required map<string, StreamInfo> streams; //key is stream id, outputs
  3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
  4: optional string json_conf;
}

struct SpoutSpec {
  1: required ComponentObject spout_object;
  2: required ComponentCommon common;
  // can force a spout to be non-distributed by overriding the component configuration
  // and setting TOPOLOGY_MAX_TASK_PARALLELISM to 1
}

struct Bolt {
  1: required ComponentObject bolt_object;
  2: required ComponentCommon common;
}

 

2. WorkerTopologyContext


WorkerTopologyContextはworker関連情報をカプセル化しています
public class WorkerTopologyContext extends GeneralTopologyContext {
    public static final String SHARED_EXECUTOR = "executor";
    
    private Integer _workerPort;         ;;worker port
    private List<Integer> _workerTasks;  ;;worker taskids
    private String _codeDir;             ;;supervisor , stormdist/stormid
    private String _pidDir;              ;; worker ( ) pids ,workid/pids 
    Map<String, Object> _userResources;
    Map<String, Object> _defaultResources;

}

 

3. TopologyContext


注記を見ると、TopologyContextはboltとspoutのprepare(or open)関数のパラメータとして使用されるのでopenOrPrepareWasCalledで、このTopologyContextがprepareによって呼び出されたかどうかを示す
registerMetric、往_に使用できますregisteredMetricsにmetics登録を登録する仕組み,[timeBucketSizeInSecs,[taskId,[name,metric]]
_hooks、task hookの登録に使用
/**
 * A TopologyContext is given to bolts and spouts in their "prepare" and "open"
 * methods, respectively. This object provides information about the component's
 * place within the topology, such as task ids, inputs and outputs, etc.
 *
 * <p>The TopologyContext is also used to declare ISubscribedState objects to
 * synchronize state with StateSpouts this object is subscribed to.</p>
 */
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
    private Integer _taskId;
    private Map<String, Object> _taskData = new HashMap<String, Object>();
    private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
    private Map<String, Object> _executorData;
    private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
    private clojure.lang.Atom _openOrPrepareWasCalled;
    public TopologyContext(StormTopology topology, Map stormConf,
            Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
            Map<String, Map<String, Fields>> componentToStreamToFields,
            String stormId, String codeDir, String pidDir, Integer taskId,
            Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
            Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics,
            clojure.lang.Atom openOrPrepareWasCalled) {
        super(topology, stormConf, taskToComponent, componentToSortedTasks,
                componentToStreamToFields, stormId, codeDir, pidDir,
                workerPort, workerTasks, defaultResources, userResources);
        _taskId = taskId;
        _executorData = executorData;
        _registeredMetrics = registeredMetrics;
        _openOrPrepareWasCalled = openOrPrepareWasCalled;
    }

 

4.使用


mk-task-data、taskごとのtopology contextを作成
user-context (user-topology-context (:worker executor-data) executor-data task-id)
(defn user-topology-context [worker executor-data tid]
  ((mk-topology-context-builder
    worker
    executor-data
    (:topology worker))
   tid))

(defn mk-topology-context-builder [worker executor-data topology]
  (let [conf (:conf worker)]
    #(TopologyContext.
      topology
      (:storm-conf worker)
      (:task->component worker)
      (:component->sorted-tasks worker)
      (:component->stream->fields worker)
      (:storm-id worker)
      (supervisor-storm-resources-path
        (supervisor-stormdist-root conf (:storm-id worker)))
      (worker-pids-root conf (:worker-id worker))
      (int %)
      (:port worker)
      (:task-ids worker)
      (:default-shared-resources worker)
      (:user-shared-resources worker)
      (:shared-executor-data executor-data)
      (:interval->task->metric-registry executor-data)
      (:open-or-prepare-was-called? executor-data))))