KafkaソースコードのKafkaConsumer分析のpoll方法分析
2231 ワード
私たちが情報を取得するのはpoll方法です.今、消費の流れを全体的に見てみましょう.
この方法では、まずacquireが同時動作を防止するために呼び出され、次に、メッセージがプルされた場合、ブロックなしで中断できない要求が実行され、ブロック処理されたメッセージのセットに戻るように、所定の時間内にメッセージをプルアップしようと試みる.
public ConsumerRecords poll(long timeout) {
//
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
//
long start = time.milliseconds();
long remaining = timeout;
do {
//
Map>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// , ,
fetcher.sendFetches();
client.pollNoWakeup();
//
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
//
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
//
release();
}
}
この方法では、まずacquireが同時動作を防止するために呼び出され、次に、メッセージがプルされた場合、ブロックなしで中断できない要求が実行され、ブロック処理されたメッセージのセットに戻るように、所定の時間内にメッセージをプルアップしようと試みる.
private Map>> pollOnce(long timeout) {
// coordinator
coordinator.ensureCoordinatorReady();
// rebalace
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
// position
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
long now = time.milliseconds();
//
client.executeDelayedTasks(now);
// completed
Map>> records = fetcher.fetchedRecords();
//
if (!records.isEmpty())
return records;
//
fetcher.sendFetches();
client.poll(timeout, now);
return fetcher.fetchedRecords();
}