Kafka生産者と消費者学習ノート


最近kafkaクラスタ環境を構築し,アプリケーションログを収集し,個性的な処理を行うためにkafka生産者と消費者pythonプログラムの実現を学習した.この文章はkafkaの学習ノートだと思います.
一、kafkaクラスタの構築
テストを容易にするために,MacOS単機上に3つのkafkaノードを有するクラスタを構築した.本番にkafkaを配備する場合は、kafkaクラスタを異なる物理マシンに配備します.
1.kafkaミラーをダウンロードする
wurstmeister/kafkaミラーを使用してkafkaを配置します.kafkaはzookeeperに依存するため、zookeeperとkafkaの2つのミラーをダウンロードする必要があります.
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper

2.docker-compopseを作成する.yml
我々が構築したkafkaクラスタにはzookerノードと3つのkakfaノードが含まれている.
使用ifconfig命令取得自機のipは192.168.0.104、完全なdocker-compose.ymlファイルは以下の通りです.docker-composeの使用については、『Docker Component入門チュートリアル』を参照してください.
version: '3'
services:
  zookeeper:
    image: docker.io/wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - "2181:2181"

  kafka1:
    image: docker.io/wurstmeister/kafka
    container_name: kafka1
    restart: always
    ports:
      - "9095:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9095

  kafka2:
    image: docker.io/wurstmeister/kafka
    container_name: kafka2
    restart: always
    ports:
      - "9096:9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9096

  kafka3:
    image: docker.io/wurstmeister/kafka
    container_name: kafka3
    restart: always
    ports:
      - "9097:9094"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9097
docker-compose up -d関連するコンテナを使用して、kafkaクラスタの構築を完了します.
二、Kafka生産者
私たちはkafka-pythonパッケージを使ってkafka生産者を書きます.以下のコマンドでダウンロードkafka-pythonパッケージ:
pip install kafka-python

Kafka生産者のソースコードは以下の通りです.
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=["192.168.0.104:9095", '192.168.0.104:9096', '192.168.0.104:9097']) 

for _ in range(10):
    producer.send('topic_test', b'hello kafka')
producer.flush()

まず、KafkaProducerオブジェクトproducerKafkaProducerkafkaクラスタの各ノードのアドレスを指定します.そしてKafkaProducerオブジェクトを介してtopictopic_testへメッセージを送信する.
三、Kafka消費者
生産者からのメッセージを受け取るため、消費者KafkaConsumer対象consumerを定義します.定義KafkaConsumerオブジェクトの場合、topicをtopic_testと指定し、上記のプロダクションプログラムを実行すると、メッセージを正常に受信できるようにします.
from kafka import KafkaConsumer

consumer = KafkaConsumer('topic_test', group_id="my_group", bootstrap_servers=["192.168.0.104:9095", '192.168.0.104:9096', '192.168.0.104:9097'])

for msg in consumer:
    print(msg.value)

まず消費者プログラムを実行し、生産者プログラムを実行すると、消費者プログラムの出力が表示されます.
b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’
複数の消費者
複数の消費者を起動して生産者が送信したメッセージを消費し、負荷の均衡を実現する必要がある場合、topicに複数のpartitionを設定することができる.便宜上、topicのpartition数と消費者数を2に設定します.
どちらかのkafkaコンテナに入ります.
docker exec -it kafka1 /bin/bash

容器内で、表示topic_testのpartition数:
cd /opt/kafka
bin/kafka-topics.sh --describe --zookeeper 192.168.0.104:2181 --topic topic_test

見られるtopic_testのpartition数は1です.修正topic_testpartition数2:
bin/kafka-topics.sh --zookeeper 192.168.0.104:2181 --alter --topic topic_test --partitions 2

2つの消費者プログラムを起動し、1つの生産者プログラムを起動します.1つの消費者出力が表示されます.
b’hello kafka’ b’hello kafka’ b’hello kafka’
もう1つの消費者出力:
b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’
生産者が送信したメッセージは、デフォルトの負荷等化ポリシーが使用されているため、2人の消費者によって消費され、異なる消費者が受信したメッセージの数が一致しない.生産者プログラムを複数回実行すると、消費者が受信した総メッセージの数がほぼ一致していることがわかります.
参考資料
  • http://kafka.apache.org/
  • http://wurstmeister.github.io/kafka-docker/
  • https://www.cnblogs.com/answerThe/p/11267129.html
  • https://www.jianshu.com/p/fe73765ef74d
  • https://pypi.org/project/kafka-python/
  • https://www.cnblogs.com/small-office/p/9399907.html