[Kafka]kafka consumer 0.9ノート
11594 ワード
へんか
コンセプト
コンフィギュレーション
管理
bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list
例
Basic Poll Loop
public abstract class BasicConsumeLoop implements Runnable {
private final KafkaConsumer consumer;
private final List topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(Properties config, List topics) {
this.consumer = new KafkaConsumer<>(config);
this.topics = topics; this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1); }
public abstract void process(ConsumerRecord record);
public void run() {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
// wakeup ,
// ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
// consumer.wakeup();
ConsumerRecords records = consumer.poll(500);
records.forEach(record -> process(record));
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
// consumer.close
shutdownLatch.await();
}
}
送信offset
AutoCommitOffsetの問題は、再起動するとデータの重複処理が発生する可能性があるという問題で、commit intervalを減らすことでこの重複を減らすことができます.
同期コミットが最も安全:
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
doCommitSync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
グループがすでにrabalancedされている場合、commitは失敗し、CommitFailedExceptionを放出します.イベントを処理するときにセッションタイムアウトを行うには、次の2つの方法があります.
推奨1)
Delivery Guarantees####
private boolean doCommitSync() {
try {
consumer.commitSync();
return true;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
return false;
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
if (doCommitSync())
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
非同期offsetコミット
非同期コミットはスループットを向上させることができますが、commitが失敗した場合はretryしません.失敗したoffsetをcallbackで自己記録する
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map offsets, Exception exception) {
if (e != null)
log.debug("Commit failed for offsets {}", offsets, e);
}
});
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
offsetcommitの失敗は通常、元のデータを繰り返していないため、pollで非同期でコミットし、rebalanceとshutdownで同期してコミットするのが一般的です.
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection partitions) {
doCommitSync();
}
public void onPartitionsAssigned(Collection partitions) {}
});
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
doCommitSync();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}
非同期コミットはat least onceしか処理できません.at most onceの場合、unreadの意味がない限り、commitが成功したかどうかを確認することはできません.
マルチスレッド
Multi-threaded Processing The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException .The only exception to this rule is wakeup() , which can safely be used from an external thread to interrupt an active operation. In this case, a WakeupException will be thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. The following snippet shows the typical pattern:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
closed.set(true);
consumer.wakeup();
We have intentionally avoided implementing a particular threading model for processing. This leaves several options for implementing multi-threaded processing of records.
PRO: This option allows independently scaling the number of consumers and processors. This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions. CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing. For processing that has no ordering requirements this is not a problem. CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition.
There are many possible variations on this approach. For example each processor thread can have its own queue, and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.
http://docs.confluent.io/2.0.1/clients/consumer.html#asynchronous-commits