Kafka消費者-ノート2
文書ディレクトリ Kafka消費者-ノート2 消費者、消費グループ 消費者クライアント パラメータ構成 トピックとパーティションの購読 トピック パーティション 逆シーケンス化 メッセージ消費モード シフトコミット 再均衡 消費者遮断器 消費者クライアントパラメータ Kafka消費者-ノート2
消費者グループ
1つの消費者グループは1つの消費者だけが情報を消費することができます
消費者Consumerは、kafkaのトピックTopicを購読し、購読したトピックからメッセージを引き出す責任を負います.各消費者には対応するコンシューマグループConsumer Groupがあり、メッセージがトピックに公開されると、各コンシューマグループの1つの消費者のみが購読されます.
消費者クライアントパラメータ
kafkaは2つのメッセージ配信モードをサポートする:ポイント対ポイントP 2 P(すべての消費者が1つの消費グループに属する)、Pub/Subモードを公開/購読する(すべての消費者が異なる消費グループに属する)
消費者クライアントパラメータgroup.id消費グループの構成
コンシューマクライアント
正常な消費ロジック:消費者クライアントパラメータを構成し、消費者インスタンス を作成する.トピック の購読メッセージ消費 消費シフト を提出する.消費者インスタンス を閉じる.
demo
KafkaConsumer(subscribeメソッドサブスクリプショントピック)に構成されていることを初期化する必要があります(pollメソッドはメッセージリストを引きます)
パラメータ構成
必要なパラメータ:
トピックとパーティションの購読
消費者は1つ以上のトピックを購読することができ、subscribeメソッドを使用してトピックを購読することができ、購読方法には配列と正則がある.
テーマ
subscribeサブスクリプショントピック:
正規subscribeサブスクリプショントピックのインスタンス:
unsubscribeの購読をキャンセルするには、次の手順に従います.
パーティション
KafkaConsumerはassignメソッドで特定のトピックのパーティションを直接購読します.
TopicPartition:パーティションを表すオブジェクト
assignインスタンス:
KafkaConsumer:partitionsFor
すべてのトピックのすべてのパーティションをassignおよびpartitionsForで購読します.
subscribe法によるサブスクリプションテーマは消費者の自動再均衡機能を持ち、消費グループ内の消費者数が変化し、区分分配関係が自動的に調整される.assignメソッドがパーティションを購読する場合、消費者の自動均衡機能を備えていない
逆シーケンス化
逆シーケンス化にはDeserializerが必要です
kafkaが提供するシーケンサと逆シーケンサはアプリケーションのニーズを満たすことができない前提の下で、Avro、JSON、Thrift、ProtoBufあるいはProtostuffなどの共通のシーケンサ化ツールを使って包装することをお勧めします
一般的なシーケンス化ツールを使用して、SerializerとDeserializerインタフェースも実装します.
メッセージ消費モード
Kafkaの消費はpoll引き抜きモードに基づいている.メッセージのメッセージには、一般に、1プッシュモード(サービス側がメッセージを消費者に自発的にプッシュする)と2プルモード(消費者がサービス側に要求してメッセージを引き出す)のモードがある.
消費者はpollメソッドを呼び出すポーリングを必要とし、pollメソッドは購読したトピックまたはパーティション上のメッセージのセットを返す.
poll:消費シフト、消費者コーディネータ、グループコーディネータ、消費者の選挙、区分分配の配布、再均衡の論理、心拍などに関する
消費者消費の各メッセージConsumer Record:
ConsumerRecords:消費者メッセージの集合
シフトコミット
Kafkaはパーティション秩序を維持し、パーティション内のメッセージにはoffsetがあり、メッセージのパーティション内の位置を表し、一般的にオフセット量と呼ばれ、消費者にもoffsetがあり、パーティション内のメッセージが存在する位置を表し、一般的に消費シフトと呼ばれている.
消費シフトはKafkaの内部テーマに格納される_consumer_offsetsでは,消費シフトを永続化する方法はcommitコミットであり,消費者はメッセージを消費したときに消費シフトのコミットを実行する必要がある.
消費されたシフト=コミットされたシフト-1
名詞の解釈:
kafkaのデフォルトの消費シフトコミット方式は自動コミットであり、消費者クライアントパラメータは
手動コミットシフト:
非同期コミット
消費の制御または停止:pauseメソッドとresumeメソッドにより、一部のパーティションがプル操作時にクライアントにデータを返し、一部のパーティションがクライアントにデータを返す操作を一時停止することを実現します.
KafkaConsumer非スレッドセキュリティ、wakeupメソッドは他のスレッドから安全に呼び出す唯一の方法です
指定シフト消費消費消費消費消費者が記録された消費シフトが見つからない場合、消費者クライアントパラメータ
消費の位置をより細かく制御するために、KafkaConsumerのseekメソッドを使用して、消費を追跡したり遡及したりすることができます. poll内割当パーティション seek消費者が割り当てるパーティションの消費位置 をリセットする.
パーティションの末尾から消費を指定
消費指定時間のシフト
さいへいこう
再均衡とは、パーティションの所属権が1人の消費者から別の消費者に移行する行為を指し、再均衡が発生すると、消費グループ内の消費者がメッセージを読み取ることができず、再均衡が重複消費を招く可能性がある.
再等化リスナー:
コンシューマブロッキング
コンシューマブロッカーは、メッセージを消費したり、コンシューマシフトをコミットしたりする際にいくつかのカスタマイズされた操作を行い、Consumer Interceptorを実現する必要がある.
消費者ブロッカーはTTLブロッキングを実現し、情報のtimestamp判断メッセージが期限切れであるか否かを判断する
カスタムブロッキングは
消費者ブロッキングにもブロッキングチェーンが存在し、
コンシューマクライアントパラメータ
消費者グループ
1つの消費者グループは1つの消費者だけが情報を消費することができます
消費者Consumerは、kafkaのトピックTopicを購読し、購読したトピックからメッセージを引き出す責任を負います.各消費者には対応するコンシューマグループConsumer Groupがあり、メッセージがトピックに公開されると、各コンシューマグループの1つの消費者のみが購読されます.
消費者クライアントパラメータ
partition.assignment.strategy
によって、消費者とサブスクリプショントピックとの間のパーティション割り当てポリシーを設定することができるkafkaは2つのメッセージ配信モードをサポートする:ポイント対ポイントP 2 P(すべての消費者が1つの消費グループに属する)、Pub/Subモードを公開/購読する(すべての消費者が異なる消費グループに属する)
消費者クライアントパラメータgroup.id消費グループの構成
コンシューマクライアント
正常な消費ロジック:
demo
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
ConsumerRecords records =
consumer.poll(Duration.ofMinutes(2));
//
for (ConsumerRecord record : records) {
System.out.println("topic = " + record.topic()
+ ", partition = " + record.partition()
+ ", offset = " + record.offset());
System.out.println("key = " + record.key()
+ ", value = " + record.value());
//do something to process record.
System.out.println(record.headers());
}
}
} catch (Exception e) {
log.error("occur exception ", e);
} finally {
consumer.close();
}
}
KafkaConsumer(subscribeメソッドサブスクリプショントピック)に構成されていることを初期化する必要があります(pollメソッドはメッセージリストを引きます)
パラメータ構成
必要なパラメータ:
bootstrap.servers: kafka
group.id: id
key.deserializer value.deserializer:
client.id: KafkaConsumer id, consumer-1
ConsumerConfig
トピックとパーティションの購読
消費者は1つ以上のトピックを購読することができ、subscribeメソッドを使用してトピックを購読することができ、購読方法には配列と正則がある.
テーマ
subscribeサブスクリプショントピック:
//
public void subscribe(Collection topics, ConsumerRebalanceListener listener)
public void subscribe(Collection topics)
//
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
:subscribe
ConsumerRebalanceListener
正規subscribeサブスクリプショントピックのインスタンス:
topic-1
consumer.subscribe(Pattern.compile("topic-.*"));
unsubscribeの購読をキャンセルするには、次の手順に従います.
public void unsubscribe() assign
パーティション
KafkaConsumerはassignメソッドで特定のトピックのパーティションを直接購読します.
public void assign(Collection partitions)
TopicPartition:パーティションを表すオブジェクト
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
//
private final String topic;
...
}
assignインスタンス:
topic-test 0
consumer.assign(Arrays.asList(new TopicPartition("topic-test", 0)));
KafkaConsumer:partitionsFor
PartitionInfo
public List partitionsFor(String topic)
public class PartitionInfo {
private final String topic;
private final int partition;
// leader
private final Node leader;
// AR
private final Node[] replicas;
// ISR
private final Node[] inSyncReplicas;
// OSR
private final Node[] offlineReplicas;
...
}
すべてのトピックのすべてのパーティションをassignおよびpartitionsForで購読します.
consumer.assign(consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList()));
subscribe法によるサブスクリプションテーマは消費者の自動再均衡機能を持ち、消費グループ内の消費者数が変化し、区分分配関係が自動的に調整される.assignメソッドがパーティションを購読する場合、消費者の自動均衡機能を備えていない
逆シーケンス化
逆シーケンス化にはDeserializerが必要です
public interface Deserializer extends Closeable {
void configure(Map var1, boolean var2);
T deserialize(String var1, byte[] var2);
void close();
}
kafkaが提供するシーケンサと逆シーケンサはアプリケーションのニーズを満たすことができない前提の下で、Avro、JSON、Thrift、ProtoBufあるいはProtostuffなどの共通のシーケンサ化ツールを使って包装することをお勧めします
一般的なシーケンス化ツールを使用して、SerializerとDeserializerインタフェースも実装します.
io.protostuff
protostuff-core
1.5.6
:ProtostuffIOUtil
public static byte[] toByteArray(T message, io.protostuff.Schema schema, LinkedBuffer buffer)
Schema schema = (Schema) RuntimeSchema.getSchema( .getClass());
LinkedBuffer buffer =
LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
byte[] protostuff = null;
try {
protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
:ProtostuffIOUtil
public static void mergeFrom(byte[] data, T message, io.protostuff.Schema schema)
Schema schema = RuntimeSchema.getSchema( .class);
obj = new ();
ProtostuffIOUtil.mergeFrom(byteArray, obj, schema);
メッセージ消費モード
Kafkaの消費はpoll引き抜きモードに基づいている.メッセージのメッセージには、一般に、1プッシュモード(サービス側がメッセージを消費者に自発的にプッシュする)と2プルモード(消費者がサービス側に要求してメッセージを引き出す)のモードがある.
消費者はpollメソッドを呼び出すポーリングを必要とし、pollメソッドは購読したトピックまたはパーティション上のメッセージのセットを返す.
poll:消費シフト、消費者コーディネータ、グループコーディネータ、消費者の選挙、区分分配の配布、再均衡の論理、心拍などに関する
timeout
public ConsumerRecords poll(Duration timeout)
消費者消費の各メッセージConsumer Record:
public class ConsumerRecord {
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
//
private final long timestamp;
// :CreateTime LogAppendTime
private final TimestampType timestampType;
// key value
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
// value
private final V value;
private volatile Long checksum;
...
}
ConsumerRecords:消費者メッセージの集合
public List> records(TopicPartition partition)
public Set partitions()
public Iterable> records(String topic)
count(), isEmpty(),empty()
シフトコミット
Kafkaはパーティション秩序を維持し、パーティション内のメッセージにはoffsetがあり、メッセージのパーティション内の位置を表し、一般的にオフセット量と呼ばれ、消費者にもoffsetがあり、パーティション内のメッセージが存在する位置を表し、一般的に消費シフトと呼ばれている.
消費シフトはKafkaの内部テーマに格納される_consumer_offsetsでは,消費シフトを永続化する方法はcommitコミットであり,消費者はメッセージを消費したときに消費シフトのコミットを実行する必要がある.
消費されたシフト=コミットされたシフト-1
名詞の解釈:
committed offset:
KafkaConsumer
OffsetAndMetadata committed(TopicPartition partition)
position:
KafkaConsumer
long position(TopicPartition partition)
lastConsumedOffet:
position = committed offset = lastConsumedOffset + 1
kafkaのデフォルトの消費シフトコミット方式は自動コミットであり、消費者クライアントパラメータは
enable.auto.commit
、デフォルトtrueである.コミットサイクル時間はクライアントパラメータauto.commit.interval.ms
によって構成され、デフォルト5 s自動シフトコミットの論理は方法pollにある.手動コミットシフト:
enable.auto.commit
セットfalse同期コミット// commitSync poll ,
void commitSync();
void commitSync(Duration timeout);
// offsets ,
void commitSync(Map offsets);
for (TopicPartition partition : records.partitions()) {
List> partitionRecords =
records.records(partition);
for (ConsumerRecord record : partitionRecords) {
//do some logical processing.
System.out.println(record.value());
}
long lastConsumedOffset = partitionRecords
.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(lastConsumedOffset + 1)));
}
void commitSync(final Map offsets, final Duration timeout);
非同期コミット
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
// callback
void commitAsync(Map offsets, OffsetCommitCallback callback);
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
//do some logical processing.
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets,
Exception exception) {
if (exception == null) {
// success
} else {
log.error("fail to commit offsets {}", offsets, exception);
}
}
});
消費の制御または停止:pauseメソッドとresumeメソッドにより、一部のパーティションがプル操作時にクライアントにデータを返し、一部のパーティションがクライアントにデータを返す操作を一時停止することを実現します.
KafkaConsumer
public void pause(Collection partitions)
public void resume(Collection partitions)
public Set paused()
KafkaConsumer非スレッドセキュリティ、wakeupメソッドは他のスレッドから安全に呼び出す唯一の方法です
指定シフト消費消費消費消費消費者が記録された消費シフトが見つからない場合、消費者クライアントパラメータ
auto.offset.reset
の構成に基づいて消費を開始する場所が決定され、デフォルトlatestは、パーティションの末尾から消費メッセージが開始されることを示す(earliestパーティションの先頭にnoneが異常を報告する)消費の位置をより細かく制御するために、KafkaConsumerのseekメソッドを使用して、消費を追跡したり遡及したりすることができます.
seek poll
public void seek(TopicPartition partition, long offset)
パーティションの末尾から消費を指定
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Set assignment = new HashSet<>();
//1 poll
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
//2 endOffsets timeout `request.timeout.ms 30000ms`
Map offsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
consumer.seek(tp, offsets.get(tp));
}
//2 seekToEnd ,
consumer.seekToEnd(assignment);
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(1000));
//consume the record.
for (ConsumerRecord record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
消費指定時間のシフト
// map key value , OffsetAndTimestamp offset timestamp
public Map offsetsForTimes(Map timestampsToSearch)
public Map offsetsForTimes(Map timestampsToSearch, Duration timeout)
//
Map partitionTimestamp = new HashMap<>(10);
for (TopicPartition partition : consumer.assignment()) {
// Map
partitionTimestamp.put(partition, LocalDateTime.now().minusDays(2).toInstant(ZoneOffset.of("+8")).toEpochMilli());
}
Map offsetsForTimes = consumer.offsetsForTimes(partitionTimestamp);
for (Map.Entry entry : offsetsForTimes.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
さいへいこう
再均衡とは、パーティションの所属権が1人の消費者から別の消費者に移行する行為を指し、再均衡が発生すると、消費グループ内の消費者がメッセージを読み取ることができず、再均衡が重複消費を招く可能性がある.
再等化リスナー:
public interface ConsumerRebalanceListener {
// DB
void onPartitionsRevoked(Collection partitions);
// DB
void onPartitionsAssigned(Collection partitions);
}
subscribe
コンシューマブロッキング
コンシューマブロッカーは、メッセージを消費したり、コンシューマシフトをコミットしたりする際にいくつかのカスタマイズされた操作を行い、Consumer Interceptorを実現する必要がある.
public interface ConsumerInterceptor extends Configurable {
// KafkaConsumer poll onConsume , ,
public ConsumerRecords onConsume(ConsumerRecords records);
// onCommit ,
public void onCommit(Map offsets);
public void close();
}
消費者ブロッカーはTTLブロッキングを実現し、情報のtimestamp判断メッセージが期限切れであるか否かを判断する
public class ConsumerInterceptorTTL implements
ConsumerInterceptor {
private static final long EXPIRE_INTERVAL = 10 * 1000;
//
@Override
public ConsumerRecords onConsume(
ConsumerRecords records) {
System.out.println("before:" + records);
long now = System.currentTimeMillis();
Map>> newRecords
= new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List> tpRecords = records.records(tp);
List> newTpRecords = new ArrayList<>();
for (ConsumerRecord record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
if (!newTpRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
// commit
@Override
public void onCommit(Map offsets) {
System.out.println(" ");
offsets.forEach((tp, offset) ->
System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
カスタムブロッキングは
interceptor.classes
を構成する必要があり、消費者ブロッキングは、コミットメソッドが誤ったシフト情報をコミットした可能性があることに注意する必要がある.あるいは、再びメッセージpollにおいて、最大オフセット量を含む可能性のあるメッセージが消費者ブロッキングによってフィルタリングされる消費者ブロッキングにもブロッキングチェーンが存在し、
interceptor.classes
パラメータで構成されたブロッキングの順序で実行され、ブロッキングチェーンのいずれかのブロッキングが失敗すると、次のブロッキングは前の実行に成功したブロッキングから実行され続けます(失敗は実行され続けます)コンシューマクライアントパラメータ
1.fetch.min.bytes: Consumer ( poll ) Kafka , 1B
2.fetch.max.bytes: , 50MB, , ;Kafka `message.max.bytes` max.message.bytes
3.fetch.max.wait.ms: 1 , Kafka , 500ms
4.max.partition.fetch.bytes: Consumer , 1MB, 2 , 4 ,2
5.max.poll.records: poll , 500
6.connections.max.idle.ms:
7.exclude.internal.topics: __consumer_offsets __transaction_state ,false , true ( )
8.receive.buffer.bytes
9.send.buffer.bytes
10.request.timeout.ms: Consumer 30000ms
11.metadata.max.age.ms: , 5
12.reconnect.backoff.ms: 50ms
13.retry.backoff.ms: 100ms
14.isolation.level: , read_uncommitted( ) read_committed, , , LSO , HW