Kafkaを使用して順序性を保証する方法
一、topicは単一パーティションのみ(partition)
Kafka自体はpartitonにおけるメッセージの順序性を保証しているので,単一パーティションでは特に順序性の問題を考慮する必要はない.
比較的特殊なメッセージ再試行の場合、メッセージ1が失敗すると、メッセージ2,3がメッセージ1の前に並ぶ.これを防ぐには、Kafkaのmax.in.flight.requests.per.connectionパラメータを1に設定する必要があります.
二、topicは複数のpartitionを有する
メッセージが異なるパーティションに分散している場合、Kafkaはその順序性を保証できません.ただし,順序性を要求する複数のメッセージが同じpartitonに送信されることを確保するだけで,その順序性を満たすことができる.
Kafkaのソースコードを解読すると、keyと同じメッセージが同じpartitionに送信され、対応するソースコードは以下の通りです.
また、Kafkaの各partitionは、同時に2つの消費者インスタンスによって消費されることはなく、これにより、メッセージの焼き出しから処理への順序性を保証することができる.
まとめ
1、max.in.flight.requests.per.connectionパラメータを12に設定し、順序性を保証するメッセージは同じkeyを使用する必要がある
Kafka自体はpartitonにおけるメッセージの順序性を保証しているので,単一パーティションでは特に順序性の問題を考慮する必要はない.
比較的特殊なメッセージ再試行の場合、メッセージ1が失敗すると、メッセージ2,3がメッセージ1の前に並ぶ.これを防ぐには、Kafkaのmax.in.flight.requests.per.connectionパラメータを1に設定する必要があります.
二、topicは複数のpartitionを有する
メッセージが異なるパーティションに分散している場合、Kafkaはその順序性を保証できません.ただし,順序性を要求する複数のメッセージが同じpartitonに送信されることを確保するだけで,その順序性を満たすことができる.
Kafkaのソースコードを解読すると、keyと同じメッセージが同じpartitionに送信され、対応するソースコードは以下の通りです.
// org.apache.kafka.clients.producer.KafkaProducer#doSend
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
int partition = partition(record, serializedKey, serializedValue, cluster);
// org/apache/kafka/clients/producer/KafkaProducer.java:1106
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
// org/apache/kafka/clients/producer/internals/DefaultPartitioner.java:69
// keyBytes serializedKey
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
また、Kafkaの各partitionは、同時に2つの消費者インスタンスによって消費されることはなく、これにより、メッセージの焼き出しから処理への順序性を保証することができる.
まとめ
1、max.in.flight.requests.per.connectionパラメータを12に設定し、順序性を保証するメッセージは同じkeyを使用する必要がある