zookeeperソース分析の3クライアント送信要求プロセス

23098 ワード

znodeは、このディレクトリノードに格納されているデータの変更、サブノードディレクトリの変化などを含めてモニタリングを設定するクライアントに通知することができ、この機能はzookeeperがアプリケーションにとって最も重要な特性であり、この特性によって実現できる機能は、構成の集中管理、クラスタ管理、分散ロックなどを含む.
知識の準備:
zookeeper定義のステータスは次のとおりです.
Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);
 
 

事件定义的的类型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);

watcher定义的的类型有Children(1), Data(2), Any(3);

在上一篇

zookeeper源码分析之一客户端

中,我们连接zookeeper时,启动了一个MyWatcher

protected void connectToZK(String newHost) throws InterruptedException, IOException {
        if (zk != null && zk.getState().isAlive()) {
            zk.close();
        }
        host = newHost;
        boolean readOnly = cl.getOption("readonly") != null;
        if (cl.getOption("secure") != null) {
            System.setProperty(ZooKeeper.SECURE_CLIENT, "true");
            System.out.println("Secure connection is enabled");
        }
 zk = new ZooKeeper(host,
                 Integer.parseInt(cl.getOption("timeout")),
                 new MyWatcher(), readOnly);
    }

zookeeperのサンプルを作成するときにwatchManagerを使用します.
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly, HostProvider aHostProvider)
            throws IOException {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        hostProvider = aHostProvider;

 cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

転送されたMyWatcherをデフォルトのwatcherとしてwatchManagerに格納し、ClientCnxnでパッケージしてスレッドを起動します.
では、ClientCnxnについて説明しましょう.ClientCnxnはクライアントsocketのioを管理しています.接続可能なserverのセットと、変換が必要なときに透明に変換できるserverのセットを維持しています.
まずsocketの入手方法を理解しましょう.
    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
        String clientCnxnSocketName = System
                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
        if (clientCnxnSocketName == null) {
            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
        }
        try {
            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName) .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + clientCnxnSocketName);
            ioe.initCause(e);
            throw ioe;
        }
    }

次にClientCnxnのstart()メソッドを起動し、このメソッドで2つのスレッドを起動します.
    public void start() {
        sendThread.start();
        eventThread.start();
    }

ここで、SendThreadクラスは、送信された要求キューにサービスを提供し、心拍数を生成する.ReadThreadも生成されます.
SendThreadのrunメソッドの主体を見てみましょう.
                    if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }

                            if (sendAuthEvent == true) {
                                eventThread.queueEvent(new WatchedEvent(
                                      Watcher.Event.EventType.None,
                                      authState,null));
                            }
                        }
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    
                    if (to <= 0) {
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
                    if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small 
                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                                ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }

                    // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {
                        long now = Time.currentElapsedTime();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }

                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                

ClientCnxnSocketNettyは、サーバに接続し、ネットワークトラフィックを読み取り/書き込み、ネットワークデータ層とより高いpacket層の中間層として機能するClientCnxnSocketの抽象的な方法を実現します.ライフサイクルは次のとおりです.
     loop:
     - try:
     - - !isConnected()
     - - - connect()
     - - doTransport()
     - catch:
     - - cleanup()
     close()

上記の説明から、ClientCnxnSocketのワークフローを見ることができ、まず接続するかどうかを判断し、接続がなければconnectメソッドを呼び出して接続し、接続があれば直接使用する.その後doTransportメソッドを呼び出して通信し、接続中に異常が発生した場合、cleanup()メソッドを呼び出す.最後に接続を閉じます.したがって、最も主要なプロセスはdoTransport()メソッドです.
 @Override
    void doTransport(int waitTimeOut,
                     List<Packet> pendingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        try {
            if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                return;
            }
            Packet head = null;
            if (needSasl.get()) {
                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
                if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                    return;
                }
            }
            // check if being waken up on closing.
            if (!sendThread.getZkState().isAlive()) {
                // adding back the patck to notify of failure in conLossPacket().
                addBack(head);
                return;
            }
            // channel disconnection happened
            if (disconnected.get()) {
                addBack(head);
                throw new EndOfStreamException("channel for sessionid 0x"
                        + Long.toHexString(sessionId)
                        + " is lost");
            }
            if (head != null) {
                doWrite(pendingQueue, head, cnxn);
            }
        } finally {
            updateNow();
        }
    }

上のプログラムを簡略化します.一つはaddBack(head)、もう一つの正常なプロセスdoWrite(pendingQueue,head,cnxn)です.まず異常を捨てて、正常なプロセスを見てみましょう.
Packetを先に取得:
Packet head = null;
            if (needSasl.get()) {
                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
                if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                    return;
                }
            }

ここで、protected LinkedBlockingDequeoutgoingQueueはチェーンテーブルブロックキューであり、発行された要求を保存する.
次にdoWriteメソッドを実行します.
 /**
     * doWrite handles writing the packets from outgoingQueue via network to server.
     */
    private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
        updateNow();
        while (true) {
            if (p != WakeupPacket.getInstance()) {
                if ((p.requestHeader != null) &&
                        (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
                        (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
                    p.requestHeader.setXid(cnxn.getXid());
                    synchronized (pendingQueue) {
                        pendingQueue.add(p);
                    }
                }
   sendPkt(p);
            }
            if (outgoingQueue.isEmpty()) {
                break;
            }
            p = outgoingQueue.remove();
        }
    }

dowriteメソッドはoutgoingQueueのメッセージをネットワークを介してサーバに書き込む責任を負う.送信メッセージプログラムは赤のように表示されます.
    private void sendPkt(Packet p) {
        // Assuming the packet will be sent out successfully. Because if it fails,
        // the channel will close and clean up queues.
        p.createBB();
        updateLastSend();
        sentCount++;
        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
    }

1.Packetメッセージの構成は次のとおりです.
 /**
     * This class allows us to pass the headers and the relevant records around.
     */
    static class Packet {
        RequestHeader requestHeader;

        ReplyHeader replyHeader;

        Record request;

        Record response;

        ByteBuffer bb;

        /** Client's view of the path (may differ due to chroot) **/
        String clientPath;
        /** Servers's view of the path (may differ due to chroot) **/
        String serverPath;

        boolean finished;

        AsyncCallback cb;

        Object ctx;

        WatchRegistration watchRegistration;

        public boolean readOnly;

        WatchDeregistration watchDeregistration;

        /** Convenience ctor */
        Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
               Record request, Record response,
               WatchRegistration watchRegistration) {
            this(requestHeader, replyHeader, request, response,
                 watchRegistration, false);
        }

        Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
               Record request, Record response,
               WatchRegistration watchRegistration, boolean readOnly) {

            this.requestHeader = requestHeader;
            this.replyHeader = replyHeader;
            this.request = request;
            this.response = response;
            this.readOnly = readOnly;
            this.watchRegistration = watchRegistration;
        }

        public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");
                }
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");
                }
                baos.close();
                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();

            sb.append("clientPath:" + clientPath);
            sb.append(" serverPath:" + serverPath);
            sb.append(" finished:" + finished);

            sb.append(" header:: " + requestHeader);
            sb.append(" replyHeader:: " + replyHeader);
            sb.append(" request:: " + request);
            sb.append(" response:: " + response);

            // jute toString is horrible, remove unnecessary newlines
            return sb.toString().replaceAll("\r*
+", " "); } }

createBB法から,最下位の実際のネットワーク伝送シーケンス化ではzookeeperはrequestHeaderとrequestの2つの属性のみをシーケンス化し,すなわち,これら2つだけが最下位のバイト配列にシーケンス化されてネットワーク伝送され,watchRegistrationに関する情報はネットワーク伝送されないことを示した.
2.最終送信updateLastSendの更新
    void updateLastSend() {
        this.lastSend = now;
    }

3.nio channelを使用してserverにバイトキャッシュを送信
channel.write(ChannelBuffers.wrappedBuffer(p.bb));
ここでbbのタイプはByteBufferであり,packetで初期化されている.
                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();

 
まとめ:
zookeeeperクライアントとサーバの接続は主にClientCnxnSocketによって実現され、2つの具体的な実装クラスClientCnxnSocketNettyとClientCnxnSocketNIOがあり、そのワークフローは以下の通りである.
まず接続するかどうかを判断し、接続がなければconnectメソッドを呼び出して接続し、接続があれば次のステップに進む.
その後doTransportメソッドを呼び出して通信し、接続中に異常が発生した場合、cleanup()メソッドを呼び出す.
最後に接続を閉じます.
上記の知見は,SendThreadのrun法で具現化できる.
 
また、Zookeeperの特性--』順序整合性:クライアントが要求を送信する順序でデータを更新します.さらにsendThreadでは、次のように順序の一貫性を保証するために、複数の更新タイムスタンプが表示されます.