[Kafka]kafka consumer 0.9ノート

11594 ワード

へんか

  • ssl/sasl
  • をサポート
  • はgroup Management protocolをサポートし、consumer groupsがbroker数が
  • 増加するにつれて
  • より小さい依存はkafka core
  • に依存する必要はありません

    コンセプト

  • Consumerグループは同じTopicsのConsumerのセットを消費し、各consumerの加入と離脱はrebalance partitionの各consumerでの分配をもたらす.Brokersはcoordinatorとして機能し、partitionの割り当てとこのグループのメンバーを保存します.
  • Offset Managementプロファイルからoffsetの開始位置(最も早いまたは最も遅い)を読み出し、offsetをコミットするには自動モードと手動モードがあります.自動モードでは、
  • が一定時間ごとに自動的にコミットされます.

    コンフィギュレーション

  • Core Configurationはいつもbootstrapをserversはclientを設定します.id
  • Group Configuration
  • groupを設定.id
  • session.timeout.ms、通常は30 sですが、プログラムでconsumerを使用して同じスレッドで処理している場合は、rebalanceが速すぎることを避けるために、この値を上げることをお勧めします.唯一の欠点は、consumerの検出に失敗した時間が長すぎると、一部のpartition消費が遅くなることですが、通常、consumerの終了はcoordinator
  • にすぐに通知されます.
  • heartbeat.interval.ms、彼を昇格させてrebalance
  • を減らす
  • Offset Management
  • enable.auto.commit
  • auto.commit.interval.ms
  • auto.offset.reset(earliest/latest)


  • 管理

  • list Groups
    bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list
    
  • Describe Group bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo


  • Basic Poll Loop

    public abstract class BasicConsumeLoop implements Runnable {
      private final KafkaConsumer consumer;
      private final List topics;
      private final AtomicBoolean shutdown;
      private final CountDownLatch shutdownLatch;
      public BasicConsumeLoop(Properties config, List topics) {
        this.consumer = new KafkaConsumer<>(config);
        this.topics = topics; this.shutdown = new AtomicBoolean(false);
      this.shutdownLatch = new CountDownLatch(1); }
      public abstract void process(ConsumerRecord record);
      public void run() {
        try { 
          consumer.subscribe(topics);
          while (!shutdown.get()) {
            //    wakeup , 
            // ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
            //  consumer.wakeup();
            ConsumerRecords records = consumer.poll(500);
            records.forEach(record -> process(record));
          }
          } finally {
            consumer.close();
            shutdownLatch.countDown();
          }
      }
      public void shutdown() throws InterruptedException {
        shutdown.set(true);
        //  consumer.close 
        shutdownLatch.await();
      }
    }
    

    送信offset


    AutoCommitOffsetの問題は、再起動するとデータの重複処理が発生する可能性があるという問題で、commit intervalを減らすことでこの重複を減らすことができます.
    同期コミットが最も安全:
    private void doCommitSync() {
      try {
        consumer.commitSync();
      } catch (WakeupException e) {
        // we're shutting down, but finish the commit first and then
        // rethrow the exception so that the main loop can exit
        doCommitSync();
        throw e;
      } catch (CommitFailedException e) {
        // the commit failed with an unrecoverable error. if there is any
        // internal state which depended on the commit, you can clean it
        // up here. otherwise it's reasonable to ignore the error and go on
        log.debug("Commit failed", e);
      }
    }
    
    public void run() {
      try {
        consumer.subscribe(topics);
    
        while (true) {
          ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
          records.forEach(record -> process(record));
          doCommitSync();
        }
      } catch (WakeupException e) {
        // ignore, we're closing
      } catch (Exception e) {
        log.error("Unexpected error", e);
        } finally {
          consumer.close();
          shutdownLatch.countDown();
        }
    }
    

    グループがすでにrabalancedされている場合、commitは失敗し、CommitFailedExceptionを放出します.イベントを処理するときにセッションタイムアウトを行うには、次の2つの方法があります.
  • sessionを調整する.timeout.msは十分大きく、max.partitionを調整します.fetch.bytesはbatchの戻りイベント数を1回減らす.
  • イベント処理を別のスレッドに入れます.たとえばeventをBlockingQueueに入れますが、heartbeat requestは2つのpoll()呼び出しの間で処理され、offer処理でblock時間が長いとノードが蹴り出されるという問題があります.参照先:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

  • 推奨1)

    Delivery Guarantees####

  • at least once: auto-commit
  • at most once:
  • private boolean doCommitSync() {
      try {
          consumer.commitSync();
          return true;
      } catch (CommitFailedException e) {
        // the commit failed with an unrecoverable error. if there is any
        // internal state which depended on the commit, you can clean it
        // up here. otherwise it's reasonable to ignore the error and go on
        log.debug("Commit failed", e);
        return false;
      }
    }
    
    public void run() {
      try {
        consumer.subscribe(topics);
    
        while (true) {
          ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
          if (doCommitSync())
            records.forEach(record -> process(record));
        }
      } catch (WakeupException e) {
        // ignore, we're closing
      } catch (Exception e) {
        log.error("Unexpected error", e);
        } finally {
          consumer.close();
          shutdownLatch.countDown();
        }
    }
    
  • Exactly-once deliveryは
  • をサポートしていません

    非同期offsetコミット


    非同期コミットはスループットを向上させることができますが、commitが失敗した場合はretryしません.失敗したoffsetをcallbackで自己記録する
    public void run() {
      try {
        consumer.subscribe(topics);
    
        while (true) {
          ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
          records.forEach(record -> process(record));
          consumer.commitAsync(new OffsetCommitCallback() {
            public void onComplete(Map offsets, Exception exception) {
              if (e != null)
                log.debug("Commit failed for offsets {}", offsets, e);
            }
          });
        }
      } catch (WakeupException e) {
        // ignore, we're closing
      } catch (Exception e) {
        log.error("Unexpected error", e);
      } finally {
        consumer.close();
        shutdownLatch.countDown();
      }
    }
    

    offsetcommitの失敗は通常、元のデータを繰り返していないため、pollで非同期でコミットし、rebalanceとshutdownで同期してコミットするのが一般的です.
    private void doCommitSync() {
      try {
        consumer.commitSync();
      } catch (WakeupException e) {
        // we're shutting down, but finish the commit first and then
        // rethrow the exception so that the main loop can exit
        doCommitSync();
        throw e;
      } catch (CommitFailedException e) {
        // the commit failed with an unrecoverable error. if there is any
        // internal state which depended on the commit, you can clean it
        // up here. otherwise it's reasonable to ignore the error and go on
        log.debug("Commit failed", e);
      }
    }
    
    public void run() {
      try {
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
          public void onPartitionsRevoked(Collection partitions) {
            doCommitSync();
          }
    
          public void onPartitionsAssigned(Collection partitions) {}
        });
    
        while (true) {
          ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
          records.forEach(record -> process(record));
          consumer.commitAsync();
        }
      } catch (WakeupException e) {
        // ignore, we're closing
      } catch (Exception e) {
        log.error("Unexpected error", e);
      } finally {
        try {
          doCommitSync();
        } finally {
          consumer.close();
          shutdownLatch.countDown();
        }
      }
    }
    

    非同期コミットはat least onceしか処理できません.at most onceの場合、unreadの意味がない限り、commitが成功したかどうかを確認することはできません.

    マルチスレッド


    Multi-threaded Processing The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException .The only exception to this rule is wakeup() , which can safely be used from an external thread to interrupt an active operation. In this case, a WakeupException will be thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. The following snippet shows the typical pattern:
    public class KafkaConsumerRunner implements Runnable {
         private final AtomicBoolean closed = new AtomicBoolean(false);
         private final KafkaConsumer consumer;
    
         public void run() {
             try {
                 consumer.subscribe(Arrays.asList("topic"));
                 while (!closed.get()) {
                     ConsumerRecords records = consumer.poll(10000);
                     // Handle new records
                 }
             } catch (WakeupException e) {
                 // Ignore exception if closing
                 if (!closed.get()) throw e;
             } finally {
                 consumer.close();
             }
         }
    
         // Shutdown hook which can be called from a separate thread
         public void shutdown() {
             closed.set(true);
             consumer.wakeup();
         }
     }
    

    Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
     closed.set(true);
     consumer.wakeup();
    

    We have intentionally avoided implementing a particular threading model for processing. This leaves several options for implementing multi-threaded processing of records.
  • One Consumer Per Thread A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:PRO: It is the easiest to implement PRO: It is often the fastest as no inter-thread co-ordination is needed PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just processes messages in the order it receives them). CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles connections very efficiently so this is generally a small cost. CON: Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput. CON: The number of total threads across all processes will be limited by the total number of partitions.
  • Decouple Consumption and Processing Another alternative is to have one or more consumer threads that do all data consumption and hands off ConsumerRecords instances to a blocking queue consumed by a pool of processor threads that actually handle the record processing. This option likewise has pros and cons:

  • PRO: This option allows independently scaling the number of consumers and processors. This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions. CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing. For processing that has no ordering requirements this is not a problem. CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition.
    There are many possible variations on this approach. For example each processor thread can have its own queue, and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.
    http://docs.confluent.io/2.0.1/clients/consumer.html#asynchronous-commits