Kafka消費者-ノート2


文書ディレクトリ
  • Kafka消費者-ノート2
  • 消費者、消費グループ
  • 消費者クライアント
  • パラメータ構成
  • トピックとパーティションの購読
  • トピック
  • パーティション
  • 逆シーケンス化
  • メッセージ消費モード
  • シフトコミット
  • 再均衡
  • 消費者遮断器
  • 消費者クライアントパラメータ
  • Kafka消費者-ノート2
    消費者グループ
    1つの消費者グループは1つの消費者だけが情報を消費することができます
    消費者Consumerは、kafkaのトピックTopicを購読し、購読したトピックからメッセージを引き出す責任を負います.各消費者には対応するコンシューマグループConsumer Groupがあり、メッセージがトピックに公開されると、各コンシューマグループの1つの消費者のみが購読されます.
    消費者クライアントパラメータpartition.assignment.strategyによって、消費者とサブスクリプショントピックとの間のパーティション割り当てポリシーを設定することができる
    kafkaは2つのメッセージ配信モードをサポートする:ポイント対ポイントP 2 P(すべての消費者が1つの消費グループに属する)、Pub/Subモードを公開/購読する(すべての消費者が異なる消費グループに属する)
    消費者クライアントパラメータgroup.id消費グループの構成
    コンシューマクライアント
    正常な消費ロジック:
  • 消費者クライアントパラメータを構成し、消費者インスタンス
  • を作成する.
  • トピック
  • の購読
  • メッセージ消費
  • 消費シフト
  • を提出する.
  • 消費者インスタンス
  • を閉じる.
    demo
        public static final String brokerList = "localhost:9092";
        public static final String topic = "topic-demo";
        public static final String groupId = "group.demo";
        public static final AtomicBoolean isRunning = new AtomicBoolean(true);
    
        public static Properties initConfig() {
            Properties props = new Properties();
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("client.id", "consumer.client.id.demo");
            return props;
        }
    
        public static void main(String[] args) {
            Properties props = initConfig();
            KafkaConsumer consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
            try {
                while (isRunning.get()) {
                    ConsumerRecords records =
                            consumer.poll(Duration.ofMinutes(2));
    
                    //               
                    for (ConsumerRecord record : records) {
                        System.out.println("topic = " + record.topic()
                                + ", partition = " + record.partition()
                                + ", offset = " + record.offset());
                        System.out.println("key = " + record.key()
                                + ", value = " + record.value());
                        //do something to process record.
                        System.out.println(record.headers());
                    }
                    
                }
            } catch (Exception e) {
                log.error("occur exception ", e);
            } finally {
                consumer.close();
            }
        }
    

    KafkaConsumer(subscribeメソッドサブスクリプショントピック)に構成されていることを初期化する必要があります(pollメソッドはメッセージリストを引きます)
    パラメータ構成
    必要なパラメータ:
    bootstrap.servers:  kafka    
    group.id:         id
    key.deserializer value.deserializer:               
    client.id:  KafkaConsumer      id,          consumer-1
           ConsumerConfig
    

    トピックとパーティションの購読
    消費者は1つ以上のトピックを購読することができ、subscribeメソッドを使用してトピックを購読することができ、購読方法には配列と正則がある.
    テーマ
    subscribeサブスクリプショントピック:
    //         
    public void subscribe(Collection topics, ConsumerRebalanceListener listener)
    public void subscribe(Collection topics)
    //         
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
    public void subscribe(Pattern pattern)
    
      :subscribe               
    ConsumerRebalanceListener             
    

    正規subscribeサブスクリプショントピックのインスタンス:
        topic-1          
    consumer.subscribe(Pattern.compile("topic-.*"));
    

    unsubscribeの購読をキャンセルするには、次の手順に従います.
    public void unsubscribe()     assign     
    

    パーティション
    KafkaConsumerはassignメソッドで特定のトピックのパーティションを直接購読します.
    public void assign(Collection partitions) 
    

    TopicPartition:パーティションを表すオブジェクト
    public final class TopicPartition implements Serializable {
        private int hash = 0;
        private final int partition;
        //       
        private final String topic;
    ...
    }
    

    assignインスタンス:
        topic-test   0  
    consumer.assign(Arrays.asList(new TopicPartition("topic-test", 0)));
    

    KafkaConsumer:partitionsFor
                 PartitionInfo        
    public List partitionsFor(String topic) 
    
        public class PartitionInfo {
            private final String topic;
            private final int partition;
            //   leader       
            private final Node leader;
    		//    AR  
    		private final Node[] replicas;
            //    ISR  
            private final Node[] inSyncReplicas;
            //    OSR  
            private final Node[] offlineReplicas;
            ...
        }
    

    すべてのトピックのすべてのパーティションをassignおよびpartitionsForで購読します.
    consumer.assign(consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList()));
    

    subscribe法によるサブスクリプションテーマは消費者の自動再均衡機能を持ち、消費グループ内の消費者数が変化し、区分分配関係が自動的に調整される.assignメソッドがパーティションを購読する場合、消費者の自動均衡機能を備えていない
    逆シーケンス化
    逆シーケンス化にはDeserializerが必要です
    public interface Deserializer extends Closeable {
    
        void configure(Map var1, boolean var2);
    
        T deserialize(String var1, byte[] var2);
    
        void close();
    }
    

    kafkaが提供するシーケンサと逆シーケンサはアプリケーションのニーズを満たすことができない前提の下で、Avro、JSON、Thrift、ProtoBufあるいはProtostuffなどの共通のシーケンサ化ツールを使って包装することをお勧めします
    一般的なシーケンス化ツールを使用して、SerializerとDeserializerインタフェースも実装します.
    
    	io.protostuff
    	protostuff-core
    	1.5.6
    
    
       :ProtostuffIOUtil
    public static  byte[] toByteArray(T message, io.protostuff.Schema schema, LinkedBuffer buffer) 
    
    
    Schema schema = (Schema) RuntimeSchema.getSchema(        .getClass());
    LinkedBuffer buffer =
            LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    byte[] protostuff = null;
    try {
        protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
    
        :ProtostuffIOUtil
    public static  void mergeFrom(byte[] data, T message, io.protostuff.Schema schema)
    
    
    Schema schema = RuntimeSchema.getSchema(         .class);
             obj = new         ();
    ProtostuffIOUtil.mergeFrom(byteArray, obj, schema);
    

    メッセージ消費モード
    Kafkaの消費はpoll引き抜きモードに基づいている.メッセージのメッセージには、一般に、1プッシュモード(サービス側がメッセージを消費者に自発的にプッシュする)と2プルモード(消費者がサービス側に要求してメッセージを引き出す)のモードがある.
    消費者はpollメソッドを呼び出すポーリングを必要とし、pollメソッドは購読したトピックまたはパーティション上のメッセージのセットを返す.
    poll:消費シフト、消費者コーディネータ、グループコーディネータ、消費者の選挙、区分分配の配布、再均衡の論理、心拍などに関する
    timeout        
    public ConsumerRecords poll(Duration timeout)
    

    消費者消費の各メッセージConsumer Record:
    public class ConsumerRecord {
        public static final long NO_TIMESTAMP = -1L;
        public static final int NULL_SIZE = -1;
        public static final int NULL_CHECKSUM = -1;
        private final String topic;
        private final int partition;
        private final long offset;
        //    
        private final long timestamp;
        //         :CreateTime LogAppendTime
        private final TimestampType timestampType;
        //         key value          
        private final int serializedKeySize;
        private final int serializedValueSize;
        private final Headers headers;
        private final K key;
        //            value
        private final V value;
        private volatile Long checksum;
        ...
    }
    

    ConsumerRecords:消費者メッセージの集合
                  
    public List> records(TopicPartition partition)
               
    public Set partitions()
             
    public Iterable> records(String topic)
                count(),          isEmpty(),empty()          
    

    シフトコミット
    Kafkaはパーティション秩序を維持し、パーティション内のメッセージにはoffsetがあり、メッセージのパーティション内の位置を表し、一般的にオフセット量と呼ばれ、消費者にもoffsetがあり、パーティション内のメッセージが存在する位置を表し、一般的に消費シフトと呼ばれている.
    消費シフトはKafkaの内部テーマに格納される_consumer_offsetsでは,消費シフトを永続化する方法はcommitコミットであり,消費者はメッセージを消費したときに消費シフトのコミットを実行する必要がある.
    消費されたシフト=コミットされたシフト-1
    名詞の解釈:
    committed offset:          
    	KafkaConsumer     
    	OffsetAndMetadata committed(TopicPartition partition)
    position:           
    	KafkaConsumer     
    	long position(TopicPartition partition)
    lastConsumedOffet:        
    
        position = committed offset = lastConsumedOffset + 1      
    

    kafkaのデフォルトの消費シフトコミット方式は自動コミットであり、消費者クライアントパラメータはenable.auto.commit、デフォルトtrueである.コミットサイクル時間はクライアントパラメータauto.commit.interval.msによって構成され、デフォルト5 s自動シフトコミットの論理は方法pollにある.
    手動コミットシフト:enable.auto.commitセットfalse同期コミット
    // commitSync     poll              ,                  
    void commitSync();
    
    void commitSync(Duration timeout);
    // offsets  ,           
    void commitSync(Map offsets);
        for (TopicPartition partition : records.partitions()) {
            List> partitionRecords =
                    records.records(partition);
            for (ConsumerRecord record : partitionRecords) {
                //do some logical processing.
                System.out.println(record.value());
            }
            long lastConsumedOffset = partitionRecords
                    .get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition,
                    new OffsetAndMetadata(lastConsumedOffset + 1)));
        }	
        
    void commitSync(final Map offsets, final Duration timeout);
    
    

    非同期コミット
    void commitAsync();
    
    void commitAsync(OffsetCommitCallback callback);
    //          callback
    void commitAsync(Map offsets, OffsetCommitCallback callback);
        ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord record : records) {
            //do some logical processing.
        }
        consumer.commitAsync(new OffsetCommitCallback() {
            @Override
            public void onComplete(Map offsets,
                                   Exception exception) {
                if (exception == null) {
                    // success
                } else {
                    log.error("fail to commit offsets {}", offsets, exception);
                }
            }
        });
    

    消費の制御または停止:pauseメソッドとresumeメソッドにより、一部のパーティションがプル操作時にクライアントにデータを返し、一部のパーティションがクライアントにデータを返す操作を一時停止することを実現します.
    KafkaConsumer
          
    public void pause(Collection partitions)
    public void resume(Collection partitions)
    
              
    public Set paused()
    

    KafkaConsumer非スレッドセキュリティ、wakeupメソッドは他のスレッドから安全に呼び出す唯一の方法です
    指定シフト消費消費消費消費消費者が記録された消費シフトが見つからない場合、消費者クライアントパラメータauto.offset.resetの構成に基づいて消費を開始する場所が決定され、デフォルトlatestは、パーティションの末尾から消費メッセージが開始されることを示す(earliestパーティションの先頭にnoneが異常を報告する)
    消費の位置をより細かく制御するために、KafkaConsumerのseekメソッドを使用して、消費を追跡したり遡及したりすることができます.
  • poll内割当パーティション
  • seek消費者が割り当てるパーティションの消費位置
  • をリセットする.
      seek        poll  
    public void seek(TopicPartition partition, long offset)
    

    パーティションの末尾から消費を指定
    KafkaConsumer consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));
    Set assignment = new HashSet<>();
    //1   poll    
    while (assignment.size() == 0) {
        consumer.poll(Duration.ofMillis(100));
        assignment = consumer.assignment();
    }
    //2 endOffsets                   timeout              `request.timeout.ms     30000ms`
    Map offsets = consumer.endOffsets(assignment);
    for (TopicPartition tp : assignment) {
        consumer.seek(tp, offsets.get(tp));
    }
    //2     seekToEnd  ,     
    consumer.seekToEnd(assignment);
    
    while (true) {
        ConsumerRecords records =
                consumer.poll(Duration.ofMillis(1000));
        //consume the record.
        for (ConsumerRecord record : records) {
            System.out.println(record.offset() + ":" + record.value());
        }
    }
    
    

    消費指定時間のシフト
    // map  key   value                                     ,      OffsetAndTimestamp offset timestamp  
    public Map offsetsForTimes(Map timestampsToSearch)
    public Map offsetsForTimes(Map timestampsToSearch, Duration timeout)
    
    //            
    Map partitionTimestamp = new HashMap<>(10);
    for (TopicPartition partition : consumer.assignment()) {
    	//   Map       
        partitionTimestamp.put(partition, LocalDateTime.now().minusDays(2).toInstant(ZoneOffset.of("+8")).toEpochMilli());
    }
    Map offsetsForTimes = consumer.offsetsForTimes(partitionTimestamp);
    for (Map.Entry entry : offsetsForTimes.entrySet()) {
        consumer.seek(entry.getKey(), entry.getValue().offset());
    }
    

    さいへいこう
    再均衡とは、パーティションの所属権が1人の消費者から別の消費者に移行する行為を指し、再均衡が発生すると、消費グループ内の消費者がメッセージを読み取ることができず、再均衡が重複消費を招く可能性がある.
    再等化リスナー:
    public interface ConsumerRebalanceListener {
    	//                              DB
        void onPartitionsRevoked(Collection partitions);
    	//                           DB      
        void onPartitionsAssigned(Collection partitions);
    }
    
    subscribe        
    

    コンシューマブロッキング
    コンシューマブロッカーは、メッセージを消費したり、コンシューマシフトをコミットしたりする際にいくつかのカスタマイズされた操作を行い、Consumer Interceptorを実現する必要がある.
    public interface ConsumerInterceptor extends Configurable {
    
    // KafkaConsumer poll           onConsume           ,   ,      
        public ConsumerRecords onConsume(ConsumerRecords records);
    //                onCommit  ,            
    	public void onCommit(Map offsets);
        
        public void close();
    }
    

    消費者ブロッカーはTTLブロッキングを実現し、情報のtimestamp判断メッセージが期限切れであるか否かを判断する
    public class ConsumerInterceptorTTL implements
            ConsumerInterceptor {
        private static final long EXPIRE_INTERVAL = 10 * 1000;
    
        //       
        @Override
        public ConsumerRecords onConsume(
                ConsumerRecords records) {
            System.out.println("before:" + records);
            long now = System.currentTimeMillis();
            Map>> newRecords
                    = new HashMap<>();
            for (TopicPartition tp : records.partitions()) {
                List> tpRecords = records.records(tp);
                List> newTpRecords = new ArrayList<>();
                for (ConsumerRecord record : tpRecords) {
                    if (now - record.timestamp() < EXPIRE_INTERVAL) {
                        newTpRecords.add(record);
                    }
                }
                if (!newTpRecords.isEmpty()) {
                    newRecords.put(tp, newTpRecords);
                }
            }
            return new ConsumerRecords<>(newRecords);
        }
    
        // commit       
        @Override
        public void onCommit(Map offsets) {
            System.out.println("    ");
            offsets.forEach((tp, offset) ->
                    System.out.println(tp + ":" + offset.offset()));
        }
    
        @Override
        public void close() {
        }
    
        @Override
        public void configure(Map configs) {
        }
    }
    

    カスタムブロッキングはinterceptor.classesを構成する必要があり、消費者ブロッキングは、コミットメソッドが誤ったシフト情報をコミットした可能性があることに注意する必要がある.あるいは、再びメッセージpollにおいて、最大オフセット量を含む可能性のあるメッセージが消費者ブロッキングによってフィルタリングされる
    消費者ブロッキングにもブロッキングチェーンが存在し、interceptor.classesパラメータで構成されたブロッキングの順序で実行され、ブロッキングチェーンのいずれかのブロッキングが失敗すると、次のブロッキングは前の実行に成功したブロッキングから実行され続けます(失敗は実行され続けます)
    コンシューマクライアントパラメータ
    1.fetch.min.bytes:  Consumer       (  poll  )   Kafka         ,  1B
    2.fetch.max.bytes:     ,  50MB,           ,           ;Kafka                     `message.max.bytes`        max.message.bytes   
    3.fetch.max.wait.ms:   1  ,  Kafka     ,  500ms
    4.max.partition.fetch.bytes:           Consumer      ,  1MB,   2  ,  4                ,2              
    5.max.poll.records:  poll         ,  500 
    6.connections.max.idle.ms:           
    7.exclude.internal.topics:    __consumer_offsets   __transaction_state  ,false            ,  true (      )
    8.receive.buffer.bytes
    9.send.buffer.bytes
    10.request.timeout.ms:  Consumer              30000ms
    11.metadata.max.age.ms:          ,  5  
    12.reconnect.backoff.ms:                      50ms
    13.retry.backoff.ms:                              100ms
    14.isolation.level:            ,   read_uncommitted(  )   read_committed,            ,             ,      LSO   ,       HW