Kafkaソース分析のSender
4118 ワード
Senderはproduce要求をKafkaクラスタに送信するバックグラウンドスレッドを処理する.このスレッドはクラスタメタデータを更新し、produce要求を適切なノードに送信する.
まず、メンバー変数を見てみましょう.
1、まずwhileメインループに入り、フラグビットrunningがtrueの場合、closeが呼び出されるまでループします.
パラメータ付きrun(long now)メソッドを呼び出し、メッセージの送信を処理する.
2、closeが呼び出されるとrunningはfalseに設定され、while主循環は終了する.
2.1、強制的に閉じていない場合、メッセージアキュムレータaccumulatorはまだメッセージが送信されていないか、クライアントclientはまだ処理中の要求がある場合、別のwhileサイクルに入り、パラメータ付きrun(long now)方法を呼び出し、まだ送信されていないメッセージの送信を処理する.
2.2、強制的に閉じる場合、メッセージアキュムレータaccumulatorのabortIncompleteBatches()を呼び出し、未処理の要求を放棄する.
2.3、クライアントを閉じる.
まず、メンバー変数を見てみましょう.
/* the state of each nodes connection */
// KafkaClient client
private final KafkaClient client;
/* the record accumulator that batches records */
// RecordAccumulator accumulator
private final RecordAccumulator accumulator;
/* the metadata for the client */
// Metadata metadata
private final Metadata metadata;
/* the maximum request size to attempt to send to the server */
// server maxRequestSize
private final int maxRequestSize;
/* the number of acknowledgements to request from the server */
// server acks
private final short acks;
/* the number of times to retry a failed request before giving up */
// retries
private final int retries;
/* the clock instance used for getting the time */
// Time time
private final Time time;
/* true while the sender thread is still running */
// Sender , true Sender
private volatile boolean running;
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
// forceClose
private volatile boolean forceClose;
/* metrics */
//
private final SenderMetrics sensors;
/* param clientId of the client */
// clientId
private String clientId;
/* the max time to wait for the server to respond to the request*/
// server requestTimeout
private final int requestTimeout;
スレッドである以上、主な実行論理run()メソッドを見てみましょう.コードは以下の通りです. /**
* The main run loop for the sender thread
* sender
*/
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
// , close
while (running) {// running true,
try {
// run()
run(time.milliseconds());
} catch (Exception e) {
// err log ,
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
// , accumulator , client (in-flight)
while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
try {
// run()
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// , accumulator abortIncompleteBatches(),
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
this.accumulator.abortIncompleteBatches();
}
//
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
Senderスレッドの主ループはrun()メソッド内であり、その主な処理ロジックは以下の通りである.1、まずwhileメインループに入り、フラグビットrunningがtrueの場合、closeが呼び出されるまでループします.
パラメータ付きrun(long now)メソッドを呼び出し、メッセージの送信を処理する.
2、closeが呼び出されるとrunningはfalseに設定され、while主循環は終了する.
2.1、強制的に閉じていない場合、メッセージアキュムレータaccumulatorはまだメッセージが送信されていないか、クライアントclientはまだ処理中の要求がある場合、別のwhileサイクルに入り、パラメータ付きrun(long now)方法を呼び出し、まだ送信されていないメッセージの送信を処理する.
2.2、強制的に閉じる場合、メッセージアキュムレータaccumulatorのabortIncompleteBatches()を呼び出し、未処理の要求を放棄する.
2.3、クライアントを閉じる.