Zoo KeeperのWatchメカニズムを分析します。

17023 ワード

WatchはZoo Keeperのリリース/購読機能を実現する最も核心的なキャラクターです。我々はZoo Keeperのあるノードの変化に対して後続の処理を行う必要がある場合、Watchに使用する必要がある。
Zoo KeeperのWatchメカニズムは、総じて三つの流れに分けられます。client登録Watch、server処理Watch、clientフィードバックWatch。次に、上記の三つの流れからWatchがどのように働いているかを分析します。
Client登録Watch
ClientにはZKWatch Managerが登録されているすべてのWatchを保存します。
1.標準のコンストラクタを使って登録します。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
このWatchはセッション全体のデフォルトのWatchとしてdefault Wathcerに保存されます。2.getData、exist方法で登録し、ここでgetDataを例にして分析します。getDataには2つの重負荷の方法があります。
public byte[]getData(String path,boot watch,Stt stat)
このbootlean変数はデフォルトのWatchを登録するかどうかを表します。
public byte[]getData(final String path、Watch Watch、Stt stat)
public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        //WatchRegistration     Watch path     
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        //  request      ,         watcher 
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        //  ReplyHeader
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
Watch RegistrationはWatchとpathの対応関係を保存するために使用されます。Zoo Keeperでは、一番小さい通信ユニットはPacketですので、Watch RegistrationをPacketで包装する必要があります。Packetに包装してから、列に入れて発送を待ちます。
ClientはこのPacketを送信した後、SendThreadスレッドのreadResponse()によって応答を受けて、finish Packetメソッドを呼び出してpacketからWathcerを提出し、ZKWatch Managerに登録します。
private void finishPacket(Packet p) {
        if (p.watchRegistration != null) {
            //  watcher
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        。。。
    }
Watch Registration.register方法。
// Wathcer,     ZKWatcherManager 。
public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map> watches = getWatches(rc);
                synchronized(watches) {
                    Set watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }
サービスエンド処理Watch
クライアント登録Watchは、完全なWatchを登録するのではなく、Request Headerとrequstの二つの属性をネットワーク転送するだけです。
サービスが最後のステップで要求された処理を行う場合、FinalRequest Processe ort.processRequest()方法は、このときの要求がWatch登録が必要かどうかを判断します。
case OpCode.getData: {
                ...
                //             Watcher 。
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);
               ...
            }
登録が必要なら、Watchに対応したServerCnxnとpathをWatch Managerに格納します。
Watchトリガー
上記の2ステップを通じて、WatchとclientとServerの端に存在しますが、Watchはどうやってclientをリセットしますか?
私たちはWatcher Managerのtrigger Watchに入ります。
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        //  WatchedEvent 
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            //  Path  Watcher 
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    //  Watcher ,    Watcher      
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            //  process      Watcher 
            w.process(e);
        }
        return watchers;
    }
私たちはprocessの方法を見て何をしましたか?
abstract public void process(WatchedEvent event);
ウォッチのprocessはabstract方法です。私たちはServerCnxnがimplement Watchインターフェースであることを知っています。また、NIOSTER CnxnがServerCnxnを実現しました。だからNIOSTerverCnxnのプロcessを見てみます。
synchronized public void process(WatchedEvent event) {
        //    -1,        
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        //   WatcherEvent ,         
        WatcherEvent e = event.getWrapper();
        //        
        sendResponse(h, e, "notification");
    }
これにより、processはWatchの業務方法を本当に実行するのではなく、通知の役割を果たしていることが分かります。本当にコールバック業務ロジックを実行するのはClient側です。
Client側でコールバックを実行します。
Client端末はClienntCnxnを通じてクライアントの下のネットワーク通信を管理する。また、内部クラスSendThreadは、メッセージの受信および送信を行うために使用される。
SendThread.readRespose()は、コードの一部を切り取りました。
if (replyHdr.getXid() == -1) {
                // -1 means notification
                //-1        ,    process  
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                //     WatcherEvent
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                //  chrootPath ,          
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else
                        event.setPath(serverPath.substring(chrootPath.length()));
                }
                //WatcherEvent  WatchedEvent 
                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                // WatchedEvent   eventThread    
                eventThread.queueEvent( we );
                return;
            }
これから分かるように、SendThreadは通知を受けるだけで、本当のWatchがReadを実行するのはeventThreadの中です。
EventThread.queueEvent
public void queueEvent(WatchedEvent event) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();

            // materialize the watchers based on the event
            //   WatchedEvent    watchers
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
            //  waitingEvents   
            waitingEvents.add(pair);
        }
このWatch edventに関するすべてのwatch ersを見つけたら、watch ersをwaitingEventsの列に入れます。EventThreadのrunは常に列から任務を取り出して実行します。
EventThread.run()
@Override
        public void run() {
           try {
              isRunning = true;
              while (true) {
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                     //  event
                    processEvent(event);
                 }
                 ......
        }

private void processEvent(Object event) {
          try {
              if (event instanceof WatcherSetEventPair) {
                  // each watcher will process the event
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  //     Watcher,        process  
                  //                 
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
                  ....
流れのまとめ
1.クライアントにはZKWatcher Managerが登録のためのWatch 2を記憶するためのものがあります。クライアントが登録Watchを含む要求を開始したら、まずこの要求を必要登録Watchと表記してから、Packetとしてパッケージし、サービス端末3に送信します。サービス端末が要求を受信したら、登録する必要があるWatchを読み取ります。WatchのようなServerCnxtをWatch Managerに入れます。4.Watch対応のノードにEventイベントが発生した場合、サービス端末から通知が送信され、クライアントパスにXXEventが発生したと伝えられます。5.クライアントが通知を受け取ったら(SendThreadは通知を受けます)、関連のWatchを探してWatch SetEventPairと包装して、waiting行列に入れます。6.クライアントのEventThreadはRun方法を実行し、Watch SetEventPairをwaitingから取り出し、すべてのWatchを見つけて本格的なコールバック業務の実行を行う。