Storm-ソース分析-TOpology Submit-Task-TOpologyContext(backtype.storm.task)
11635 ワード
1. GeneralTopologyContext
StormTopologyを含むTopologyの基本情報を記録し、StormConfが彼らから導出した、taskとcomponent、componentのstreams、input/output情報を含む.
public class GeneralTopologyContext implements JSONAware {
private StormTopology _topology;
private Map<Integer, String> _taskToComponent;
private Map<String, List<Integer>> _componentToTasks;
private Map<String, Map<String, Fields>> _componentToStreamToFields; //ComponentCommon.streams, map<string, StreamInfo>
private String _stormId; ;;topology id
protected Map _stormConf;
}
StormTopology,workerディスク
stormcode.ser
から読み出しstruct StormTopology {
//ids must be unique across maps
// #workers to use is in conf
1: required map<string, SpoutSpec> spouts;
2: required map<string, Bolt> bolts;
3: required map<string, StateSpoutSpec> state_spouts;
}
StormConf,workerはディスク
stormconf.ser
から読み出すtaskToComponent,componentToTasks,taskとcomponentの対応関係
componentTostreamToFields,componentはどのstreamsを含んで、各streamはどのfieldsを含みます
明らかな操作に加えて、componentの入出力を得るための操作もあります.
/**
* Gets the declared inputs to the specified component.
*
* @return A map from subscribed component/stream to the grouping subscribed with.
*/
public Map<GlobalStreamId, Grouping> getSources(String componentId) {
return getComponentCommon(componentId).get_inputs(); //ComponentCommon.inputs,map<GlobalStreamId, Grouping>
}
/**
* Gets information about who is consuming the outputs of the specified component,
* and how.
*
* @return Map from stream id to component id to the Grouping used.
*/
public Map<String, Map<String, Grouping>> getTargets(String componentId) {
Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
for(String otherComponentId: getComponentIds()) { // components id
Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs(); // component inputs
for(GlobalStreamId id: inputs.keySet()) { // inputs stream-id
if(id.get_componentId().equals(componentId)) { // stream component component
Map<String, Grouping> curr = ret.get(id.get_streamId());
if(curr==null) curr = new HashMap<String, Grouping>();
curr.put(otherComponentId, inputs.get(id));
ret.put(id.get_streamId(), curr);
}
}
}
return ret; // [steamid, [target-componentid, grouping]]
}
この中のgetComponentCommonとgetComponentIdsは、ThriftTopologyUtilsクラスから誤解しないでください.thriftAPIでnimbusに行って情報を取得するのではなく、StormTopologyから情報を読むだけです.StormTopologyクラス自体はgenerated by thrift thriftで発生したclassで、metaDataMapがあるので、以下のように実現します.
public static Set<String> getComponentIds(StormTopology topology) {
Set<String> ret = new HashSet<String>();
for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) {
Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);
ret.addAll(componentMap.keySet());
}
return ret;
}
MetaDataMapでStormTopologyの中にあるfield,spouts,bolts,state_を読み出します义齿
このような利点は、動的で、StormTopologyが変化した場合、コードを変更する必要がなく、通常のjava classではこのような機能を実現できないはずですが、pythonのような動的言語では、簡単です.
もちろんここではStormTopologyから直接書く必要はありませんspouts...で読みます
storm.thriftではComponentCommonの定義を見て、上の2つの関数はgetTargetsの実現をよく理解しています.見てみる必要があります.inputsからoutputsを発売したからです.ComponentCommonではoutputのstreamidとfieldsしか記録されていませんが、このstreamがどのcomponentに送られているのか分かりませんが、inputではstreamidはGlobalStreamIdタイプで、GlobalStreamIdにはstreamidだけが含まれています.ソースcomponentのcomponentidもあるのでこれから逆算できます.ソースcomponentが現在のcomponentであれば、そのcomponentがソースcomponentのtarget componentであることを説明します.
struct ComponentCommon {
1: required map<GlobalStreamId, Grouping> inputs;
2: required map<string, StreamInfo> streams; //key is stream id, outputs
3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
4: optional string json_conf;
}
struct SpoutSpec {
1: required ComponentObject spout_object;
2: required ComponentCommon common;
// can force a spout to be non-distributed by overriding the component configuration
// and setting TOPOLOGY_MAX_TASK_PARALLELISM to 1
}
struct Bolt {
1: required ComponentObject bolt_object;
2: required ComponentCommon common;
}
2. WorkerTopologyContext
WorkerTopologyContextはworker関連情報をカプセル化しています
public class WorkerTopologyContext extends GeneralTopologyContext {
public static final String SHARED_EXECUTOR = "executor";
private Integer _workerPort; ;;worker port
private List<Integer> _workerTasks; ;;worker taskids
private String _codeDir; ;;supervisor , stormdist/stormid
private String _pidDir; ;; worker ( ) pids ,workid/pids
Map<String, Object> _userResources;
Map<String, Object> _defaultResources;
}
3. TopologyContext
注記を見ると、TopologyContextはboltとspoutのprepare(or open)関数のパラメータとして使用されるのでopenOrPrepareWasCalledで、このTopologyContextがprepareによって呼び出されたかどうかを示す
registerMetric、往_に使用できますregisteredMetricsにmetics登録を登録する仕組み,[timeBucketSizeInSecs,[taskId,[name,metric]]
_hooks、task hookの登録に使用
/**
* A TopologyContext is given to bolts and spouts in their "prepare" and "open"
* methods, respectively. This object provides information about the component's
* place within the topology, such as task ids, inputs and outputs, etc.
*
* <p>The TopologyContext is also used to declare ISubscribedState objects to
* synchronize state with StateSpouts this object is subscribed to.</p>
*/
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
private Integer _taskId;
private Map<String, Object> _taskData = new HashMap<String, Object>();
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
private Map<String, Object> _executorData;
private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
private clojure.lang.Atom _openOrPrepareWasCalled;
public TopologyContext(StormTopology topology, Map stormConf,
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
Map<String, Map<String, Fields>> componentToStreamToFields,
String stormId, String codeDir, String pidDir, Integer taskId,
Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics,
clojure.lang.Atom openOrPrepareWasCalled) {
super(topology, stormConf, taskToComponent, componentToSortedTasks,
componentToStreamToFields, stormId, codeDir, pidDir,
workerPort, workerTasks, defaultResources, userResources);
_taskId = taskId;
_executorData = executorData;
_registeredMetrics = registeredMetrics;
_openOrPrepareWasCalled = openOrPrepareWasCalled;
}
4.使用
mk-task-data、taskごとのtopology contextを作成
user-context (user-topology-context (:worker executor-data) executor-data task-id)
(defn user-topology-context [worker executor-data tid]
((mk-topology-context-builder
worker
executor-data
(:topology worker))
tid))
(defn mk-topology-context-builder [worker executor-data topology]
(let [conf (:conf worker)]
#(TopologyContext.
topology
(:storm-conf worker)
(:task->component worker)
(:component->sorted-tasks worker)
(:component->stream->fields worker)
(:storm-id worker)
(supervisor-storm-resources-path
(supervisor-stormdist-root conf (:storm-id worker)))
(worker-pids-root conf (:worker-id worker))
(int %)
(:port worker)
(:task-ids worker)
(:default-shared-resources worker)
(:user-shared-resources worker)
(:shared-executor-data executor-data)
(:interval->task->metric-registry executor-data)
(:open-or-prepare-was-called? executor-data))))