ソース読解のstorm操作zookeeper-cluster.clj
20627 ワード
storm操作zookeeperの主な関数は、名前空間backtype.storm.cluster.cljファイル(cluster.cljファイル)に定義されます.backtype.storm.clusterは、2つの重要なprotocol:ClusterStateとStormClusterStateを定義します.
clojureのprotocolはjavaのインタフェースと見なすことができ,一連の方法をカプセル化した.ClusterStateプロトコルには、サブノード関数の取得、サブノードデータ関数の取得など、zookeeperと対話する基礎関数のセットがカプセル化されています.ClusterStateプロトコルの定義は次のとおりです.
ClusterStateプロトコル
StormClusterStateプロトコルには、stormがzookeeperと対話する関数のセットがカプセル化されており、StormClusterStateプロトコルの関数をClusterStateプロトコルの関数の「組合せ」と見なすことができます.StormClusterStateプロトコルの定義は次のとおりです.
StormClusterStateプロトコル
ネーミングスペースbacktype.storm.clusterは、ClusterStateとStormClusterStateの2つの重要なプロトコルを定義するほか、mk-distributed-cluster-stateとmk-storm-cluster-stateの2つの重要な関数を定義します.
mk-distributed-cluster-state関数は次のとおりです.
この関数はClusterStateプロトコルを実装したオブジェクトを返し、このオブジェクトによってzookeeperとインタラクティブになります.
mk-distributed-cluster-state関数
mk-storm-cluster-state関数は次のように定義されます.
mk-storm-cluster-state関数は非常に重要であり、StormClusterStateプロトコルを実装したインスタンスを返します.このインスタンスstormによりzookeeperとのインタラクションがより容易になります.
nimbusとsupervisorを起動する関数では、mk-storm-cluster-state関数が呼び出されます.nimbusとsupervisorの起動については後述する.
mk-storm-cluster-state関数
zookeeper.cljにおけるmk-client関数
mk-client関数は、CuratorFrameworkインスタンスを作成し、そのインスタンスにCuratorListenerを登録します.バックグラウンド操作が完了したり、指定したwatchがトリガーされたりすると、CuratorListenerのeventReceived()が実行されます.eventReceivedで呼び出されるwacher関数は、mk-distributed-cluster-stateのwatcherバインド関数です.
以上がstormとzookeeperのインタラクションのソースコード分析であり、最も重要な部分はzk clientに「wacher」をどのように追加するかであり、stormの多くの機能はzookeeperのwacherメカニズムによって実現されていると思います.例えば「情報の割り当て」です.「wacher」を追加するには、次の手順に従います.
mk-distributed-cluster-state関数は、zk clientを作成し、watcherがzk clientに「wacher」関数を指定します.この「wacher」関数は、ClusterStateのcallbacksセット内の関数を簡単に呼び出すだけです.この「wacher」関数が実行する関数が、ClusterStateインスタンスによって決定され、ClusterStateインスタンスがregister関数を提供してcallbacksセットを更新します.ClusterStateインスタンスはmk-storm-cluster-state関数に渡され、mk-storm-cluster-stateでregisterを呼び出して「watcher」関数のすべての論理mk-storm-cluster-stateに登録されている関数実行の具体的な内容はStormClusterStateインスタンスによって決定され、zookeeperノードに「オブザーバ」を追加するStormClusterStateインスタンスでも実装されています.これにより、StormClusterStateインスタンスを使用して、興味のあるノードに「観察」と「コールバック関数」を追加できます.ノードまたはノードデータが変化すると、zkサーバはzk clientに「通知」を送信し、zk clientの「wather」関数が呼び出され、登録した「戻り関数」が実行されます.
まとめ
この部分のソースコードはzookeeperと密接に関連しており、「データ観察」や「ノード観察」などの多くのzookeeperの概念と特性に関連しており、zookeeperのwacherメカニズムについては//www.jb 51.net/article/124295.htmlを参照してください.stormはzookeeperのapiを直接使用するのではなく、Curatorフレームワークを使用してzookeeperへのアクセスを簡素化しています.Curatorフレームワークについては、//www.jb 51.net/article/125785.htmを参照してください.
以上が本稿のソースコード読解のstorm操作zookeeper-cluster.cljに関するすべての内容であり、興味のある友人は、zookeeper watchメカニズムの理解、apache zookeeperの使用方法の例の詳細、zookeeperに対応するacl権限の構成などを参照して、大家に役立つことを望んでいる.皆さんの読書に感謝します!
clojureのprotocolはjavaのインタフェースと見なすことができ,一連の方法をカプセル化した.ClusterStateプロトコルには、サブノード関数の取得、サブノードデータ関数の取得など、zookeeperと対話する基礎関数のセットがカプセル化されています.ClusterStateプロトコルの定義は次のとおりです.
ClusterStateプロトコル
(defprotocol ClusterState
(set-ephemeral-node [this path data])
(delete-node [this path])
(create-sequential [this path data])
;; if node does not exist, create persistent with this data
(set-data [this path data])
(get-data [this path watch?])
(get-version [this path watch?])
(get-data-with-version [this path watch?])
(get-children [this path watch?])
(mkdirs [this path])
(close [this])
(register [this callback])
(unregister [this id]))
StormClusterStateプロトコルには、stormがzookeeperと対話する関数のセットがカプセル化されており、StormClusterStateプロトコルの関数をClusterStateプロトコルの関数の「組合せ」と見なすことができます.StormClusterStateプロトコルの定義は次のとおりです.
StormClusterStateプロトコル
(defprotocol StormClusterState
(assignments [this callback])
(assignment-info [this storm-id callback])
(assignment-info-with-version [this storm-id callback])
(assignment-version [this storm-id callback])
(active-storms [this])
(storm-base [this storm-id callback])
(get-worker-heartbeat [this storm-id node port])
(executor-beats [this storm-id executor->node+port])
(supervisors [this callback])
(supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
(setup-heartbeats! [this storm-id])
(teardown-heartbeats! [this storm-id])
(teardown-topology-errors! [this storm-id])
(heartbeat-storms [this])
(error-topologies [this])
(worker-heartbeat! [this storm-id node port info])
(remove-worker-heartbeat! [this storm-id node port])
(supervisor-heartbeat! [this supervisor-id info])
(activate-storm! [this storm-id storm-base])
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
(set-assignment! [this storm-id info])
(remove-storm! [this storm-id])
(report-error [this storm-id task-id node port error])
(errors [this storm-id task-id])
(disconnect [this]))
ネーミングスペースbacktype.storm.clusterは、ClusterStateとStormClusterStateの2つの重要なプロトコルを定義するほか、mk-distributed-cluster-stateとmk-storm-cluster-stateの2つの重要な関数を定義します.
mk-distributed-cluster-state関数は次のとおりです.
この関数はClusterStateプロトコルを実装したオブジェクトを返し、このオブジェクトによってzookeeperとインタラクティブになります.
mk-distributed-cluster-state関数
(defn mk-distributed-cluster-state
;; conf storm.yaml , map
[conf]
;; zk zk client,Storm CuratorFramework Zookeeper
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
;; storm zookeeper , /storm
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
(.close zk))
;; callbacks , map
(let [callbacks (atom {})
;; active zookeeper
active (atom true)
;; zk zk client, zk client watcher, zookeeper ,zk server zk client event,zk client watcher callbacks event
;; nimbus ,callbacks , nimbus event ; supervisor ,callbacks , supervisor zk server event ,
;; mk-client zookeeper.clj ,
zk (zk/mk-client conf
(conf STORM-ZOOKEEPER-SERVERS)
(conf STORM-ZOOKEEPER-PORT)
:auth-conf conf
:root (conf STORM-ZOOKEEPER-ROOT)
;; :watcher , zk client watcher ,state zk client ;type ;path zookeeper znode
;; watcher callbacks ,callbacks mk-storm-cluster-state ClusterState register
:watcher (fn [state type path]
(when @active
(when-not (= :connected state)
(log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
(when-not (= :none type)
(doseq [callback (vals @callbacks)]
(callback type path))))))]
;; reify java implements,
(reify
ClusterState
;; register callbacks ,key 32
(register
[this callback]
(let [id (uuid)]
(swap! callbacks assoc id callback)
id))
;; unregister key callbacks
(unregister
[this id]
(swap! callbacks dissoc id))
;; zookeeper
(set-ephemeral-node
[this path data]
(zk/mkdirs zk (parent-path path))
(if (zk/exists zk path false)
(try-cause
(zk/set-data zk path data) ; should verify that it's ephemeral
(catch KeeperException$NoNodeException e
(log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
(zk/create-node zk path data :ephemeral)
))
(zk/create-node zk path data :ephemeral)))
;; zookeeper
(create-sequential
[this path data]
(zk/create-node zk path data :sequential))
;;
(set-data
[this path data]
;; note: this does not turn off any existing watches
(if (zk/exists zk path false)
(zk/set-data zk path data)
(do
(zk/mkdirs zk (parent-path path))
(zk/create-node zk path data :persistent))))
;;
(delete-node
[this path]
(zk/delete-recursive zk path))
;; 。path ;watch? , " ", watch?=true, set-data ,
;; zk client ,zk client , zk client watcher ( :watcher )
(get-data
[this path watch?]
(zk/get-data zk path watch?))
;; get-data , version,version
(get-data-with-version
[this path watch?]
(zk/get-data-with-version zk path watch?))
;; version,watch? get-data watch?
(get-version
[this path watch?]
(zk/get-version zk path watch?))
;; ,watch? get-data watch?
(get-children
[this path watch?]
(zk/get-children zk path watch?))
;; zookeeper
(mkdirs
[this path]
(zk/mkdirs zk path))
;; zk client
(close
[this]
(reset! active false)
(.close zk)))))
mk-storm-cluster-state関数は次のように定義されます.
mk-storm-cluster-state関数は非常に重要であり、StormClusterStateプロトコルを実装したインスタンスを返します.このインスタンスstormによりzookeeperとのインタラクションがより容易になります.
nimbusとsupervisorを起動する関数では、mk-storm-cluster-state関数が呼び出されます.nimbusとsupervisorの起動については後述する.
mk-storm-cluster-state関数
(defn mk-storm-cluster-state
[cluster-state-spec]
;; satisfies? java instanceof, cluster-state-spec ClusterState
(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
[false cluster-state-spec]
[true (mk-distributed-cluster-state cluster-state-spec)])
;; topology id-> map, /assignments/{topology id} ,zk client assignment-info-callback topology id
assignment-info-callback (atom {})
;; assignment-info-with-version-callback assignment-info-callback
assignment-info-with-version-callback (atom {})
;; assignment-version-callback assignments-callback
assignment-version-callback (atom {})
;; /supervisors znode ,zk client supervisors-callback
supervisors-callback (atom nil)
;; /assignments znode ,zk client assignments-callback
assignments-callback (atom nil)
;; /storms/{topology id} znode ,zk client storm-base-callback topology id
storm-base-callback (atom {})
;; register " (fn ...)" cluster-state callbacks , uuid
state-id (register
cluster-state
;; " ",type ,path znode
(fn [type path]
;; subtree "assignments"、"storms"、"supervisors" ,args topology id
(let [[subtree & args] (tokenize-path path)]
;; condp java switch
(condp = subtree
;; subtree="assignments" , args , /assignments , assignments-callback ,
;; /assignments/{topology id} , assignment-info-callback
ASSIGNMENTS-ROOT (if (empty? args)
(issue-callback! assignments-callback)
(issue-map-callback! assignment-info-callback (first args)))
;; subtree="supervisors" , /supervisors , supervisors-callback
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
;; subtree="storms" , /storms/{topology id} , storm-base-callback
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
;; this should never happen
(exit-process! 30 "Unknown callback for subtree " subtree args)))))]
;; zookeeper storm topology znode
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
(mkdirs cluster-state p))
;; StormClusterState
(reify
StormClusterState
;; /assignments , callback , assignments-callback, /assignments " "
(assignments
[this callback]
(when callback
(reset! assignments-callback callback))
(get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
;; /assignments/{storm-id} , storm-id , callback , assignment-info-callback , /assignments/{storm-id} " "
(assignment-info
[this storm-id callback]
(when callback
(swap! assignment-info-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
;; /assignments/{storm-id} version , callback , assignment-info-with-version-callback , /assignments/{storm-id} " "
(assignment-info-with-version
[this storm-id callback]
(when callback
(swap! assignment-info-with-version-callback assoc storm-id callback))
(let [{data :data version :version}
(get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
{:data (maybe-deserialize data)
:version version}))
;; /assignments/{storm-id} version , callback , assignment-version-callback , /assignments/{storm-id} " "
(assignment-version
[this storm-id callback]
(when callback
(swap! assignment-version-callback assoc storm-id callback))
(get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
;; storm topology id /storms
(active-storms
[this]
(get-children cluster-state STORMS-SUBTREE false))
;; storm topology id /workerbeats
(heartbeat-storms
[this]
(get-children cluster-state WORKERBEATS-SUBTREE false))
;; topology id /errors
(error-topologies
[this]
(get-children cluster-state ERRORS-SUBTREE false))
;; storm-id , /workerbeats/{storm-id}/{node-port}
(get-worker-heartbeat
[this storm-id node port]
(-> cluster-state
(get-data (workerbeat-path storm-id node port) false)
maybe-deserialize))
;;
(executor-beats
[this storm-id executor->node+port]
;; need to take executor->node+port in explicitly so that we don't run into a situation where a
;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
;; we avoid situations like that
(let [node+port->executors (reverse-map executor->node+port)
all-heartbeats (for [[[node port] executors] node+port->executors]
(->> (get-worker-heartbeat this storm-id node port)
(convert-executor-beats executors)
))]
(apply merge all-heartbeats)))
;; /supervisors , callback , supervisors-callback, /supervisors " "
(supervisors
[this callback]
(when callback
(reset! supervisors-callback callback))
(get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
;; /supervisors/{supervisor-id} , supervisor
(supervisor-info
[this supervisor-id]
(maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
;;
(worker-heartbeat!
[this storm-id node port info]
(set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
;;
(remove-worker-heartbeat!
[this storm-id node port]
(delete-node cluster-state (workerbeat-path storm-id node port)))
;; storm-id topology
(setup-heartbeats!
[this storm-id]
(mkdirs cluster-state (workerbeat-storm-root storm-id)))
;; storm-id topology
(teardown-heartbeats!
[this storm-id]
(try-cause
(delete-node cluster-state (workerbeat-storm-root storm-id))
(catch KeeperException e
(log-warn-error e "Could not teardown heartbeats for " storm-id))))
;; storm-id topology
(teardown-topology-errors!
[this storm-id]
(try-cause
(delete-node cluster-state (error-storm-root storm-id))
(catch KeeperException e
(log-warn-error e "Could not teardown errors for " storm-id))))
;; supervisor
(supervisor-heartbeat!
[this supervisor-id info]
(set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
;; /storms/{storm-id}
(activate-storm!
[this storm-id storm-base]
(set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
;; topology StormBase , /storm/{storm-id}
(update-storm!
[this storm-id new-elems]
;; base storm-id zookeeper StormBase
(let [base (storm-base this storm-id nil)
;; executors component -> map
executors (:component->executors base)
;; new-elems map,update map map
new-elems (update new-elems :component->executors (partial merge executors))]
;; StormBase map, zookeeper /storms/{storm-id}
(set-data cluster-state (storm-path storm-id)
(-> base
(merge new-elems)
Utils/serialize))))
;; storm-id StormBase , /storms/{storm-id} , callback , storm-base-callback, /storms/{storm-id} " "
(storm-base
[this storm-id callback]
(when callback
(swap! storm-base-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
;; storm-id StormBase , /storms/{storm-id}
(remove-storm-base!
[this storm-id]
(delete-node cluster-state (storm-path storm-id)))
;; storm-id , /assignments/{storm-id}
(set-assignment!
[this storm-id info]
(set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
;; storm-id , StormBase , /assignments/{storm-id} /storms/{storm-id}
(remove-storm!
[this storm-id]
(delete-node cluster-state (assignment-path storm-id))
(remove-storm-base! this storm-id))
;; zookeeper
(report-error
[this storm-id component-id node port error]
;; path "/errors/{storm-id}/{component-id}"
(let [path (error-path storm-id component-id)
;; data , 、 、
data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
;; /errors/{storm-id}/{component-id}
_ (mkdirs cluster-state path)
;; /errors/{storm-id}/{component-id} ,
_ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
;; to-kill 10
to-kill (->> (get-children cluster-state path false)
(sort-by parse-error-path)
reverse
(drop 10))]
;; to-kill
(doseq [k to-kill]
(delete-node cluster-state (str path "/" k)))))
;; storm-id component-id
(errors
[this storm-id component-id]
(let [path (error-path storm-id component-id)
_ (mkdirs cluster-state path)
children (get-children cluster-state path false)
errors (dofor [c children]
(let [data (-> (get-data cluster-state (str path "/" c) false)
maybe-deserialize)]
(when data
(struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
)))
]
(->> (filter not-nil? errors)
(sort-by (comp - :time-secs)))))
;; , , cluster-state callbacks
(disconnect
[this]
(unregister cluster-state state-id)
(when solo?
(close cluster-state))))))
zookeeper.cljにおけるmk-client関数
mk-client関数は、CuratorFrameworkインスタンスを作成し、そのインスタンスにCuratorListenerを登録します.バックグラウンド操作が完了したり、指定したwatchがトリガーされたりすると、CuratorListenerのeventReceived()が実行されます.eventReceivedで呼び出されるwacher関数は、mk-distributed-cluster-stateのwatcherバインド関数です.
(defnk mk-client
[conf servers port
:root ""
:watcher default-watcher
:auth-conf nil]
(let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
(.. fk
(getCuratorListenable)
(addListener
(reify CuratorListener
(^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
(when (= (.getType e) CuratorEventType/WATCHED)
(let [^WatchedEvent event (.getWatchedEvent e)]
(watcher (zk-keeper-states (.getState event))
(zk-event-types (.getType event))
(.getPath event))))))))
(.start fk)
fk))
以上がstormとzookeeperのインタラクションのソースコード分析であり、最も重要な部分はzk clientに「wacher」をどのように追加するかであり、stormの多くの機能はzookeeperのwacherメカニズムによって実現されていると思います.例えば「情報の割り当て」です.「wacher」を追加するには、次の手順に従います.
mk-distributed-cluster-state関数は、zk clientを作成し、watcherがzk clientに「wacher」関数を指定します.この「wacher」関数は、ClusterStateのcallbacksセット内の関数を簡単に呼び出すだけです.この「wacher」関数が実行する関数が、ClusterStateインスタンスによって決定され、ClusterStateインスタンスがregister関数を提供してcallbacksセットを更新します.ClusterStateインスタンスはmk-storm-cluster-state関数に渡され、mk-storm-cluster-stateでregisterを呼び出して「watcher」関数のすべての論理mk-storm-cluster-stateに登録されている関数実行の具体的な内容はStormClusterStateインスタンスによって決定され、zookeeperノードに「オブザーバ」を追加するStormClusterStateインスタンスでも実装されています.これにより、StormClusterStateインスタンスを使用して、興味のあるノードに「観察」と「コールバック関数」を追加できます.ノードまたはノードデータが変化すると、zkサーバはzk clientに「通知」を送信し、zk clientの「wather」関数が呼び出され、登録した「戻り関数」が実行されます.
まとめ
この部分のソースコードはzookeeperと密接に関連しており、「データ観察」や「ノード観察」などの多くのzookeeperの概念と特性に関連しており、zookeeperのwacherメカニズムについては//www.jb 51.net/article/124295.htmlを参照してください.stormはzookeeperのapiを直接使用するのではなく、Curatorフレームワークを使用してzookeeperへのアクセスを簡素化しています.Curatorフレームワークについては、//www.jb 51.net/article/125785.htmを参照してください.
以上が本稿のソースコード読解のstorm操作zookeeper-cluster.cljに関するすべての内容であり、興味のある友人は、zookeeper watchメカニズムの理解、apache zookeeperの使用方法の例の詳細、zookeeperに対応するacl権限の構成などを参照して、大家に役立つことを望んでいる.皆さんの読書に感謝します!