ソース読解のstorm操作zookeeper-cluster.clj

20627 ワード

storm操作zookeeperの主な関数は、名前空間backtype.storm.cluster.cljファイル(cluster.cljファイル)に定義されます.backtype.storm.clusterは、2つの重要なprotocol:ClusterStateとStormClusterStateを定義します.
clojureのprotocolはjavaのインタフェースと見なすことができ,一連の方法をカプセル化した.ClusterStateプロトコルには、サブノード関数の取得、サブノードデータ関数の取得など、zookeeperと対話する基礎関数のセットがカプセル化されています.ClusterStateプロトコルの定義は次のとおりです.
ClusterStateプロトコル

(defprotocol ClusterState
  (set-ephemeral-node [this path data])
  (delete-node [this path])
  (create-sequential [this path data])
  ;; if node does not exist, create persistent with this data
  (set-data [this path data])
  (get-data [this path watch?])
  (get-version [this path watch?])
  (get-data-with-version [this path watch?])
  (get-children [this path watch?])
  (mkdirs [this path])
  (close [this])
  (register [this callback])
  (unregister [this id]))

StormClusterStateプロトコルには、stormがzookeeperと対話する関数のセットがカプセル化されており、StormClusterStateプロトコルの関数をClusterStateプロトコルの関数の「組合せ」と見なすことができます.StormClusterStateプロトコルの定義は次のとおりです.
StormClusterStateプロトコル

(defprotocol StormClusterState
 (assignments [this callback])
 (assignment-info [this storm-id callback])
 (assignment-info-with-version [this storm-id callback])
 (assignment-version [this storm-id callback])
 (active-storms [this])
 (storm-base [this storm-id callback])
 (get-worker-heartbeat [this storm-id node port])
 (executor-beats [this storm-id executor->node+port])
 (supervisors [this callback])
 (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
 (setup-heartbeats! [this storm-id])
 (teardown-heartbeats! [this storm-id])
 (teardown-topology-errors! [this storm-id])
 (heartbeat-storms [this])
 (error-topologies [this])
 (worker-heartbeat! [this storm-id node port info])
 (remove-worker-heartbeat! [this storm-id node port])
 (supervisor-heartbeat! [this supervisor-id info])
 (activate-storm! [this storm-id storm-base])
 (update-storm! [this storm-id new-elems])
 (remove-storm-base! [this storm-id])
 (set-assignment! [this storm-id info])
 (remove-storm! [this storm-id])
 (report-error [this storm-id task-id node port error])
 (errors [this storm-id task-id])
 (disconnect [this]))

ネーミングスペースbacktype.storm.clusterは、ClusterStateとStormClusterStateの2つの重要なプロトコルを定義するほか、mk-distributed-cluster-stateとmk-storm-cluster-stateの2つの重要な関数を定義します.
mk-distributed-cluster-state関数は次のとおりです.
この関数はClusterStateプロトコルを実装したオブジェクトを返し、このオブジェクトによってzookeeperとインタラクティブになります.
mk-distributed-cluster-state関数

(defn mk-distributed-cluster-state
;; conf   storm.yaml      ,   map  
[conf]
;; zk    zk client,Storm  CuratorFramework Zookeeper     
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT)            :auth-conf conf)]
  ;;   storm   zookeeper     ,    /storm
  (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
  (.close zk))
;; callbacks        ,   map  
(let [callbacks (atom {})
  ;; active  zookeeper     
  active (atom true)
  ;; zk      zk client, zk client   watcher,   zookeeper          ,zk server  zk client     event,zk client   watcher   callbacks          event
  ;;   nimbus ,callbacks      ,  nimbus   event           ;    supervisor ,callbacks        ,   supervisor  zk server   event ,          
  ;; mk-client     zookeeper.clj   ,        
  zk (zk/mk-client conf
           (conf STORM-ZOOKEEPER-SERVERS)
           (conf STORM-ZOOKEEPER-PORT)
           :auth-conf conf
           :root (conf STORM-ZOOKEEPER-ROOT)
           ;; :watcher      ,  zk client   watcher  ,state    zk client   ;type      ;path  zookeeper       znode
           ;;  watcher          callbacks      ,callbacks        mk-storm-cluster-state       ClusterState register     
           :watcher (fn [state type path]
                (when @active
                 (when-not (= :connected state)
                  (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
                 (when-not (= :none type)
                  (doseq [callback (vals @callbacks)]
                   (callback type path))))))]
;; reify   java  implements,          
(reify
 ClusterState
 ;; register           callbacks ,key   32    
 (register
  [this callback]
  (let [id (uuid)]
   (swap! callbacks assoc id callback)
   id))
 ;; unregister       key      callbacks   
 (unregister
  [this id]
  (swap! callbacks dissoc id))
 ;;  zookeeper         
 (set-ephemeral-node
  [this path data]
  (zk/mkdirs zk (parent-path path))
  (if (zk/exists zk path false)
   (try-cause
    (zk/set-data zk path data) ; should verify that it's ephemeral
    (catch KeeperException$NoNodeException e
     (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
     (zk/create-node zk path data :ephemeral)
     ))
   (zk/create-node zk path data :ephemeral)))
 ;;  zookeeper         
 (create-sequential
  [this path data]
  (zk/create-node zk path data :sequential))
 ;;         
 (set-data
  [this path data]
  ;; note: this does not turn off any existing watches
  (if (zk/exists zk path false)
   (zk/set-data zk path data)
   (do
    (zk/mkdirs zk (parent-path path))
    (zk/create-node zk path data :persistent))))
 ;;       
 (delete-node
  [this path]
  (zk/delete-recursive zk path))
 ;;         。path      ;watch?        ,            "  ",  watch?=true,   set-data          ,
 ;;   zk client      ,zk client     ,     zk client      watcher  ( :watcher     )
 (get-data
  [this path watch?]
  (zk/get-data zk path watch?))
 ;;  get-data                  ,       version,version           
 (get-data-with-version
  [this path watch?]
  (zk/get-data-with-version zk path watch?))
 ;;        version,watch?    get-data    watch?  
 (get-version 
  [this path watch?]
  (zk/get-version zk path watch?))
 ;;             ,watch?    get-data    watch?  
 (get-children
  [this path watch?]
  (zk/get-children zk path watch?))
 ;;  zookeeper       
 (mkdirs
  [this path]
  (zk/mkdirs zk path))
 ;;   zk client
 (close
  [this]
  (reset! active false)
  (.close zk)))))

mk-storm-cluster-state関数は次のように定義されます.
mk-storm-cluster-state関数は非常に重要であり、StormClusterStateプロトコルを実装したインスタンスを返します.このインスタンスstormによりzookeeperとのインタラクションがより容易になります.
nimbusとsupervisorを起動する関数では、mk-storm-cluster-state関数が呼び出されます.nimbusとsupervisorの起動については後述する.
mk-storm-cluster-state関数

(defn mk-storm-cluster-state
 [cluster-state-spec]
 ;; satisfies?     java  instanceof,  cluster-state-spec   ClusterState  
 (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
              [false cluster-state-spec]
              [true (mk-distributed-cluster-state cluster-state-spec)])
  ;;   topology id->     map, /assignments/{topology id}       ,zk client  assignment-info-callback topology id        
  assignment-info-callback (atom {})
  ;; assignment-info-with-version-callback assignment-info-callback  
  assignment-info-with-version-callback (atom {})
  ;; assignment-version-callback assignments-callback  
  assignment-version-callback (atom {})
  ;;  /supervisors   znode         ,zk client  supervisors-callback     
  supervisors-callback (atom nil)
  ;;  /assignments   znode         ,zk client  assignments-callback     
  assignments-callback (atom nil)
  ;;  /storms/{topology id}   znode        ,zk client  storm-base-callback topology id        
  storm-base-callback (atom {})
  ;; register   "    (fn ...)"   cluster-state callbacks   ,           uuid
  state-id (register
        cluster-state
        ;;   "    ",type      ,path  znode
        (fn [type path]
         ;; subtree       "assignments"、"storms"、"supervisors" ,args  topology id
         (let [[subtree & args] (tokenize-path path)]
          ;; condp   java  switch
          (condp = subtree
           ;;  subtree="assignments" ,  args  ,   /assignments        ,  assignments-callback       ,  
     ;;   /assignments/{topology id}           ,  assignment-info-callback       
           ASSIGNMENTS-ROOT (if (empty? args)
                    (issue-callback! assignments-callback)
                    (issue-map-callback! assignment-info-callback (first args)))
           ;;  subtree="supervisors" ,   /supervisors        ,  supervisors-callback       
           SUPERVISORS-ROOT (issue-callback! supervisors-callback)
           ;;  subtree="storms" ,   /storms/{topology id}           ,  storm-base-callback       
           STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
           ;; this should never happen
           (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
;;  zookeeper   storm  topology    znode
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
 (mkdirs cluster-state p))
;;       StormClusterState     
(reify
 StormClusterState
 ;;   /assignments      ,  callback   ,     assignments-callback,  /assignments  "    "
 (assignments
  [this callback]
  (when callback
   (reset! assignments-callback callback))
  (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
 ;;   /assignments/{storm-id}    , storm-id     ,  callback   ,     assignment-info-callback ,  /assignments/{storm-id}  "    "
 (assignment-info
  [this storm-id callback]
  (when callback
   (swap! assignment-info-callback assoc storm-id callback))
  (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
 ;;   /assignments/{storm-id}      version  ,  callback   ,     assignment-info-with-version-callback ,  /assignments/{storm-id}  "    "
 (assignment-info-with-version 
  [this storm-id callback]
  (when callback
   (swap! assignment-info-with-version-callback assoc storm-id callback))
  (let [{data :data version :version} 
     (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
  {:data (maybe-deserialize data)
   :version version}))
 ;;   /assignments/{storm-id}     version  ,  callback   ,     assignment-version-callback ,  /assignments/{storm-id}  "    "
 (assignment-version 
  [this storm-id callback]
  (when callback
   (swap! assignment-version-callback assoc storm-id callback))
  (get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
 ;;   storm        topology id /storms      
 (active-storms
  [this]
  (get-children cluster-state STORMS-SUBTREE false))
 ;;   storm         topology id /workerbeats      
 (heartbeat-storms
  [this]
  (get-children cluster-state WORKERBEATS-SUBTREE false))
 ;;         topology id /errors      
 (error-topologies
  [this]
  (get-children cluster-state ERRORS-SUBTREE false))
 ;;     storm-id       , /workerbeats/{storm-id}/{node-port}    
 (get-worker-heartbeat
  [this storm-id node port]
  (-> cluster-state
    (get-data (workerbeat-path storm-id node port) false)
    maybe-deserialize))
 ;;                 
 (executor-beats
  [this storm-id executor->node+port]
  ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
  ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
  ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
  ;; we avoid situations like that
  (let [node+port->executors (reverse-map executor->node+port)
     all-heartbeats (for [[[node port] executors] node+port->executors]
              (->> (get-worker-heartbeat this storm-id node port)
                (convert-executor-beats executors)
                ))]
   (apply merge all-heartbeats)))
 ;;   /supervisors      ,  callback   ,     supervisors-callback,  /supervisors  "    " 
 (supervisors
  [this callback]
  (when callback
   (reset! supervisors-callback callback))
  (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
 ;;   /supervisors/{supervisor-id}    , supervisor     
 (supervisor-info
  [this supervisor-id]
  (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
 ;;         
 (worker-heartbeat!
  [this storm-id node port info]
  (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
 ;;         
 (remove-worker-heartbeat!
  [this storm-id node port]
  (delete-node cluster-state (workerbeat-path storm-id node port)))
 ;;     storm-id topology            
 (setup-heartbeats!
  [this storm-id]
  (mkdirs cluster-state (workerbeat-storm-root storm-id)))
 ;;     storm-id topology       
 (teardown-heartbeats!
  [this storm-id]
  (try-cause
   (delete-node cluster-state (workerbeat-storm-root storm-id))
   (catch KeeperException e
    (log-warn-error e "Could not teardown heartbeats for " storm-id))))
 ;;     storm-id topology       
 (teardown-topology-errors!
  [this storm-id]
  (try-cause
   (delete-node cluster-state (error-storm-root storm-id))
   (catch KeeperException e
    (log-warn-error e "Could not teardown errors for " storm-id))))
 ;;         supervisor     
 (supervisor-heartbeat!
  [this supervisor-id info]
  (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
 ;;   /storms/{storm-id}  
 (activate-storm!
  [this storm-id storm-base]
  (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
 ;;   topology   StormBase  ,   /storm/{storm-id}  
 (update-storm!
  [this storm-id new-elems]
  ;; base  storm-id zookeeper  StormBase  
  (let [base (storm-base this storm-id nil)
     ;; executors  component  ->      map
     executors (:component->executors base)
     ;; new-elems           map,update         map    map 
     new-elems (update new-elems :component->executors (partial merge executors))]
   ;;   StormBase         map,   zookeeper /storms/{storm-id}  
   (set-data cluster-state (storm-path storm-id)
        (-> base
          (merge new-elems)
          Utils/serialize))))
 ;;   storm-id StormBase  ,   /storms/{storm-id}    ,  callback   ,     storm-base-callback,  /storms/{storm-id}    "    "
 (storm-base
  [this storm-id callback]
  (when callback
   (swap! storm-base-callback assoc storm-id callback))
  (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
 ;;   storm-id StormBase  ,   /storms/{storm-id}  
 (remove-storm-base!
  [this storm-id]
  (delete-node cluster-state (storm-path storm-id)))
 ;;   storm-id     ,   /assignments/{storm-id}    
 (set-assignment!
  [this storm-id info]
  (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
 ;;   storm-id     ,     StormBase  ,   /assignments/{storm-id}   /storms/{storm-id}  
 (remove-storm!
  [this storm-id]
  (delete-node cluster-state (assignment-path storm-id))
  (remove-storm-base! this storm-id))
 ;;          zookeeper
 (report-error
  [this storm-id component-id node port error]
  ;; path  "/errors/{storm-id}/{component-id}"
  (let [path (error-path storm-id component-id)
     ;; data      ,      、      、     
     data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
     ;;   /errors/{storm-id}/{component-id}  
     _ (mkdirs cluster-state path)
     ;;   /errors/{storm-id}/{component-id}      ,       
     _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
     ;; to-kill              10           
     to-kill (->> (get-children cluster-state path false)
            (sort-by parse-error-path)
            reverse
            (drop 10))]
   ;;   to-kill      
   (doseq [k to-kill]
    (delete-node cluster-state (str path "/" k)))))
 ;;      storm-id component-id      
 (errors
  [this storm-id component-id]
  (let [path (error-path storm-id component-id)
     _ (mkdirs cluster-state path)
     children (get-children cluster-state path false)
     errors (dofor [c children]
            (let [data (-> (get-data cluster-state (str path "/" c) false)
                    maybe-deserialize)]
             (when data
              (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
              )))
     ]
   (->> (filter not-nil? errors)
      (sort-by (comp - :time-secs)))))
 ;;     ,      ,      cluster-state callbacks   
 (disconnect
  [this]
  (unregister cluster-state state-id)
  (when solo?
   (close cluster-state))))))

zookeeper.cljにおけるmk-client関数
mk-client関数は、CuratorFrameworkインスタンスを作成し、そのインスタンスにCuratorListenerを登録します.バックグラウンド操作が完了したり、指定したwatchがトリガーされたりすると、CuratorListenerのeventReceived()が実行されます.eventReceivedで呼び出されるwacher関数は、mk-distributed-cluster-stateのwatcherバインド関数です.

(defnk mk-client
 [conf servers port
  :root ""
  :watcher default-watcher
  :auth-conf nil]
 (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
  (.. fk
    (getCuratorListenable)
    (addListener
     (reify CuratorListener
      (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
        (when (= (.getType e) CuratorEventType/WATCHED)
         (let [^WatchedEvent event (.getWatchedEvent e)]
          (watcher (zk-keeper-states (.getState event))
              (zk-event-types (.getType event))
              (.getPath event))))))))
  (.start fk)
  fk)) 

以上がstormとzookeeperのインタラクションのソースコード分析であり、最も重要な部分はzk clientに「wacher」をどのように追加するかであり、stormの多くの機能はzookeeperのwacherメカニズムによって実現されていると思います.例えば「情報の割り当て」です.「wacher」を追加するには、次の手順に従います.
mk-distributed-cluster-state関数は、zk clientを作成し、watcherがzk clientに「wacher」関数を指定します.この「wacher」関数は、ClusterStateのcallbacksセット内の関数を簡単に呼び出すだけです.この「wacher」関数が実行する関数が、ClusterStateインスタンスによって決定され、ClusterStateインスタンスがregister関数を提供してcallbacksセットを更新します.ClusterStateインスタンスはmk-storm-cluster-state関数に渡され、mk-storm-cluster-stateでregisterを呼び出して「watcher」関数のすべての論理mk-storm-cluster-stateに登録されている関数実行の具体的な内容はStormClusterStateインスタンスによって決定され、zookeeperノードに「オブザーバ」を追加するStormClusterStateインスタンスでも実装されています.これにより、StormClusterStateインスタンスを使用して、興味のあるノードに「観察」と「コールバック関数」を追加できます.ノードまたはノードデータが変化すると、zkサーバはzk clientに「通知」を送信し、zk clientの「wather」関数が呼び出され、登録した「戻り関数」が実行されます.
まとめ
この部分のソースコードはzookeeperと密接に関連しており、「データ観察」や「ノード観察」などの多くのzookeeperの概念と特性に関連しており、zookeeperのwacherメカニズムについては//www.jb 51.net/article/124295.htmlを参照してください.stormはzookeeperのapiを直接使用するのではなく、Curatorフレームワークを使用してzookeeperへのアクセスを簡素化しています.Curatorフレームワークについては、//www.jb 51.net/article/125785.htmを参照してください.
以上が本稿のソースコード読解のstorm操作zookeeper-cluster.cljに関するすべての内容であり、興味のある友人は、zookeeper watchメカニズムの理解、apache zookeeperの使用方法の例の詳細、zookeeperに対応するacl権限の構成などを参照して、大家に役立つことを望んでいる.皆さんの読書に感謝します!