KafkaController分析2-NetworkClient分析

7631 ワード

  • NetworkClient:名前の通り、ネットワーク接続、メッセージ送信のためのクライアントパッケージ、ソースコードのコメント:
  • A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.
  • 使用場所:
  • 1.kafka自身はjava版のproducerとconsumerを実現し、中のネットワーク接続、要求送信はNetworkClientで実現した.
    2.KafkaControllerのコントロールと他のbrokerとの通信は、NetworkClientを使用して実現される.
    InFlightRequestsクラス
  • ファイル:clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
  • は、送信中および送信済みであるがresponseを受信していないrequestを含むrequestの集合を実現する.
  • 主要メンバー変数:private final Map> requests = new HashMap>();は、接続ごとにDequeデータ構造を使用してすべてのrequestを保存する.Dequeは両端キューです.
  • は新しいrequestを追加し、新しいreqeustは常にaddFirstを通じてチームトップ
  • に配置される.
    public void add(ClientRequest request) {
            Deque reqs = this.requests.get(request.request().destination());
            if (reqs == null) {
                reqs = new ArrayDeque<>();
                this.requests.put(request.request().destination(), reqs);
            }
            reqs.addFirst(request);
        }
    
  • は最も早く送信requestを取り出し、pollLast()により
  • を取り出す.
    public ClientRequest completeNext(String node) {
            return requestQueue(node).pollLast();
        }
    
  • public boolean canSendMore(String node)は、NetworkClientを介して送信されたリクエストについて、NetworkClientを介して送信するリクエストをNetworkClientを介して送信することができるかどうかを決定し、以前に送信されたリクエストが最下位のsocketを介して実際に送信されなかった場合、新しいリクエストの送信を許可しない
  • である.
    public boolean canSendMore(String node) {
            Deque queue = requests.get(node);
            return queue == null || queue.isEmpty() ||
                   (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
        }
    ```
    
    # ClusterConnectionStates
    *     :clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
    *      broker node     :
    `private final Map nodeState`
    *    node               ,        :
    `private final long reconnectBackoffMs`
    *          :
    ```
    ConnectionState.DISCONNECTED --    
    ConnectionState.DISCONNECTING --     
    ConnectionState.CONNECTED --    
    ```
    * `canConnect`:          node:                              `reconnectBackoffMs`,      ;
    ```
    public boolean canConnect(String id, long now) {
            NodeConnectionState state = nodeState.get(id);
            if (state == null)
                return true;
            else
                return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
        }
    ```
    
    # NetworkClien 
    *     : clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
    *      
    *     `KafkaClient`
    *     `org.apache.kafka.common.network.Selector`     IO, [      => Kafka    -   ](http://www.jianshu.com/p/8cbc7618abcb)
    *              broker node   ,         :
    >A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.
    *      `poll`
      `selector.poll`      socket    ;
    ```
            long metadataTimeout = metadataUpdater.maybeUpdate(now);
            try {
                this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
            } catch (IOException e) {
                log.error("Unexpected error during I/O", e);
            }
    ```
      `selector.poll`   ,  **     requet**, **     response**, **       **, **         **    `selector`       ;
    *        request
    ```
    private void handleCompletedSends(List responses, long now) {
            // if no response is expected then when the send is completed, return it
            for (Send send : this.selector.completedSends()) {
                ClientRequest request = this.inFlightRequests.lastSent(send.destination());
                if (!request.expectResponse()) {
                    this.inFlightRequests.completeLastSent(send.destination());
                    responses.add(new ClientResponse(request, now, false, null));
                }
            }
        }
    ```
           response   ,  `ifFlightRequests`   ;
    *       response
    ```
    private void handleCompletedReceives(List responses, long now) {
            for (NetworkReceive receive : this.selector.completedReceives()) {
                String source = receive.source();
                ClientRequest req = inFlightRequests.completeNext(source);
                ResponseHeader header = ResponseHeader.parse(receive.payload());
                // Always expect the response version id to be the same as the request version id
                short apiKey = req.request().header().apiKey();
                short apiVer = req.request().header().apiVersion();
                Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
                correlate(req.request().header(), header);
                if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                    responses.add(new ClientResponse(req, now, false, body));
            }
        }
    ```
       `metadata`   response,   `metadataUpdater.maybeHandleCompletedReceive`   metadata   ;
    *        
    ```
     private void handleConnections() {
            for (String node : this.selector.connected()) {
                log.debug("Completed connection to node {}", node);
                this.connectionStates.connected(node);
            }
        }
    ```
    *      `handle***`     responses
    ```
            List responses = new ArrayList<>();
            handleCompletedSends(responses, updatedNow);
            handleCompletedReceives(responses, updatedNow);
            handleDisconnections(responses, updatedNow);
            handleConnections();
            handleTimedOutRequests(responses, updatedNow);
    
            // invoke callbacks
            for (ClientResponse response : responses) {
                if (response.request().hasCallback()) {
                    try {
                        response.request().callback().onComplete(response);
                    } catch (Exception e) {
                        log.error("Uncaught error in request completion:", e);
                    }
                }
            }
    ```
    
    # NetworkClientBlockingOps
    *     : core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
    *       `NetworkClient`   ,         ;
    *     `Client.ready`
    ```
    def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
        client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
          if (client.isReady(node, now))
            true
          else if (client.connectionFailed(node))
            throw new IOException(s"Connection to $node failed")
          else false
        }
      }
    ```
    *     request    response
    ```
    def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime): Option[ClientResponse] = {
        client.send(request, time.milliseconds())
    
        pollUntilFound(timeout) { case (responses, _) =>
          val response = responses.find { response =>
            response.request.request.header.correlationId == request.request.header.correlationId
          }
          response.foreach { r =>
            if (r.wasDisconnected) {
              val destination = request.request.destination
              throw new IOException(s"Connection to $destination was disconnected before the response was read")
            }
          }
          response
        }
      }
    ```
    
    ##### [Kafka    -  ](http://www.jianshu.com/p/aa274f8fe00f)