【Kafka公式文書翻訳】5.5.1.API設計

5148 ワード

原文住所:https://kafka.apache.org/0101/documentation.html#apidesign

Producer APIs


Producer APIには、最下位の2つのProducerがカプセル化されています.
  • kafka.producer.SyncProducer
  • kafka.producer.async.AsyncProducer
  •     class Producer {
    
        /* Sends the data, partitioned by key to the topic using either the */
        /* synchronous or the asynchronous producer */
        public void send(kafka.javaapi.producer.ProducerData producerData);
    
        /* Sends a list of data, partitioned by key to the topic using either */
        /* the synchronous or the asynchronous producer */
        public void send(java.util.List> producerData);
    
        /* Closes the producer and cleans up */
        public void close();
    
        }
    

    この目的は,簡単なAPI暴露によってすべてのProducerの機能をClientに暴露することである.KafkaのProducerは次のことができます.
  • 複数の送信要求をキュー/キャッシュし、非同期の一括配布:kafka.producer.Producerは、複数の送信要求(producer.type=async)をバッチ量子化し、その後、シーケンス化して送信するパーティションの能力を提供する.バッチ・サイズは、いくつかの構成パラメータで設定できます.イベントをqueueに追加すると、queueが満たされるまでqueueにバッファリングされます.timeまたはbatch.sizeは構成の値に達します.バックグラウンドスレッド(kafka.producer.async.ProducerSendThread)はqueueからデータを取得しkafkaを使用する.producer.EventHandlerは、データをシーケンス化し、適切なパーティションに送信します.イベントを通過できます.handlerパラメータは、カスタムevent handlerプログラムをプラグインとして追加します.Producer queue pipeline処理の各段階でコールバックを注入し、カスタムログ/トラッキングコードまたはモニタリングロジックに使用できます.これはkafkaを実現することによって実現できる.producer.async.CallbackHandlerインタフェースでcallbackを設定.handlerパラメータで実現します.
  • ユーザ指定Encoderを使用してデータをシーケンス化する:
  •     interface Encoder {
        public Message toMessage(T data);
        }
    

    デフォルトではkafkaが使用されます.serializer.DefaultEncoder.
  • は、ユーザが選択可能なPartitionerによって乱負荷等化を実現する:Partitionのルーティングはkafka.producer.Partitioner決定.
  •     interface Partitioner {
        int partition(T key, int numPartitions);
        }
    

    パーティション選択APIは、keyとパーティション総数を使用して最終的なパーティションを選択する(選択したパーティションidを返す).idは、ソートされたpartitionリストから最終的なパーティションを選択してデータを送信するために使用される.デフォルトのパーティションポリシーはhash(key)%numPartitionsです.keyがnullの場合、ランダムにパーティションが選択されます.partitioner.を通じてclassパラメータは、特定のパーティション選択ポリシーを構成します.

    Consumer APIs


    私たちは2つのレベルのConsumer APIを持っています.低レベルの「単純」APIと単一Brokerとの間にはリンクが保持され、サービス側に送信されるネットワーク要求と密接な対応関係がある.このAPIはステータスがなく、各リクエストにoffset情報が含まれており、ユーザーがこのメタデータを維持できるようにします.ハイレベルのAPIはConsumer側でBrokerの詳細を非表示にし,下位層のトポロジー構造に関心を持たずにクラスタからデータを消費することを可能にした.同様に「どのデータが消費されたか」の状態を維持している.高レベルのAPIはまた、式によって購読されるTopicの機能(例えば、ホワイトリストまたはブラックリストによって購読される)を提供する.

    Low-level API

        class SimpleConsumer {
    
        /* Send fetch request to a broker and get back a set of messages. */
        public ByteBufferMessageSet fetch(FetchRequest request);
    
        /* Send a list of fetch requests to a broker and get back a response set. */
        public MultiFetchResponse multifetch(List fetches);
    
        /**
        * Get a list of valid offsets (up to maxSize) before the given time.
        * The result is a list of offsets, in descending order.
        * @param time: time in millisecs,
        *              if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available.
        *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
        */
        public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
        }
    

    低レベルのAPIは、高レベルのAPIを実装するために使用され、ステータスに特別な要件がある「オフライン」Consumerも直接使用される.

    High-level API

        /* create a connection to the cluster */
        ConsumerConnector connector = Consumer.create(consumerConfig);
    
        interface ConsumerConnector {
    
        /**
        * This method is used to get a list of KafkaStreams, which are iterators over
        * MessageAndMetadata objects from which you can obtain messages and their
        * associated metadata (currently only topic).
        *  Input: a map of 
        *  Output: a map of 
        */
        public Map> createMessageStreams(Map topicCountMap);
    
        /**
        * You can also obtain a list of KafkaStreams, that iterate over messages
        * from topics that match a TopicFilter. (A TopicFilter encapsulates a
        * whitelist or a blacklist which is a standard Java regex.)
        */
        public List createMessageStreamsByFilter(
            TopicFilter topicFilter, int numStreams);
    
        /* Commit the offsets of all messages consumed so far. */
        public commitOffsets()
    
        /* Shut down the connector */
        public shutdown()
        }
    

    このAPIは反復器の周りにあり,KafkaStreamクラスによって実現される.1つのKafkaStreamは、1つまたは複数のパーティション(異なるBroker上に分散可能)からなるメッセージストリームを表す.各Streamは単一スレッドで処理され、クライアントはストリームの作成時に必要な数を指定できます.これにより、1つのストリームの背後には複数のパーティションがありますが、1つのパーティションは1つのストリームにのみ属します.createMessageStreams呼び出しはConsumerをTopicに登録し、Consumer/Brokerの再割り当てを促します.APIは、1回の呼び出しで複数のStreamを作成することを奨励し、チャージの回数を減らす.createMessageStreamsByFilterメソッドの呼び出し(さらに)は、watcherを登録してフィルタルールに一致するtopicを発見するために使用される.createMessageStreamsByFilterが返す反復器は、複数のTopicのメッセージを反復することができます(複数のTopicがフィルタルールに合致する場合).