Kafkaソース分析のSender

4118 ワード

Senderはproduce要求をKafkaクラスタに送信するバックグラウンドスレッドを処理する.このスレッドはクラスタメタデータを更新し、produce要求を適切なノードに送信する.
まず、メンバー変数を見てみましょう.
    /* the state of each nodes connection */
    //          KafkaClient  client
    private final KafkaClient client;

    /* the record accumulator that batches records */
    //           RecordAccumulator  accumulator
    private final RecordAccumulator accumulator;

    /* the metadata for the client */
    //       Metadata  metadata
    private final Metadata metadata;

    /* the maximum request size to attempt to send to the server */
    //      server        maxRequestSize
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    //  server              acks
    private final short acks;

    /* the number of times to retry a failed request before giving up */
    //                  retries
    private final int retries;

    /* the clock instance used for getting the time */
    //        Time  time
    private final Time time;

    /* true while the sender thread is still running */
    // Sender        , true  Sender       
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    //         forceClose
    private volatile boolean forceClose;

    /* metrics */
    //     
    private final SenderMetrics sensors;

    /* param clientId of the client */
    //     clientId
    private String clientId;

    /* the max time to wait for the server to respond to the request*/
    //   server          requestTimeout
    private final int requestTimeout;
スレッドである以上、主な実行論理run()メソッドを見てみましょう.コードは以下の通りです.
    /**
     * The main run loop for the sender thread
     * sender      
     */
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        //    ,      close   
        while (running) {//    running true,     
            try {
            	//       run()  
                run(time.milliseconds());
            } catch (Exception e) {
            	
            	//        err  log  ,    
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        //         ,      accumulator       ,     client      (in-flight)   
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
            	//         run()      
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        
        //        ,       accumulator abortIncompleteBatches(),         
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }
        
        //      
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }
Senderスレッドの主ループはrun()メソッド内であり、その主な処理ロジックは以下の通りである.
1、まずwhileメインループに入り、フラグビットrunningがtrueの場合、closeが呼び出されるまでループします.
パラメータ付きrun(long now)メソッドを呼び出し、メッセージの送信を処理する.
2、closeが呼び出されるとrunningはfalseに設定され、while主循環は終了する.
2.1、強制的に閉じていない場合、メッセージアキュムレータaccumulatorはまだメッセージが送信されていないか、クライアントclientはまだ処理中の要求がある場合、別のwhileサイクルに入り、パラメータ付きrun(long now)方法を呼び出し、まだ送信されていないメッセージの送信を処理する.
2.2、強制的に閉じる場合、メッセージアキュムレータaccumulatorのabortIncompleteBatches()を呼び出し、未処理の要求を放棄する.
2.3、クライアントを閉じる.