Kafka生産者と消費者学習ノート
最近kafkaクラスタ環境を構築し,アプリケーションログを収集し,個性的な処理を行うためにkafka生産者と消費者pythonプログラムの実現を学習した.この文章はkafkaの学習ノートだと思います.
一、kafkaクラスタの構築
テストを容易にするために,MacOS単機上に3つのkafkaノードを有するクラスタを構築した.本番にkafkaを配備する場合は、kafkaクラスタを異なる物理マシンに配備します.
1.kafkaミラーをダウンロードする
wurstmeister/kafkaミラーを使用してkafkaを配置します.kafkaはzookeeperに依存するため、zookeeperとkafkaの2つのミラーをダウンロードする必要があります.
2.docker-compopseを作成する.yml
我々が構築したkafkaクラスタにはzookerノードと3つのkakfaノードが含まれている.
使用
二、Kafka生産者
私たちは
Kafka生産者のソースコードは以下の通りです.
まず、
三、Kafka消費者
生産者からのメッセージを受け取るため、消費者
まず消費者プログラムを実行し、生産者プログラムを実行すると、消費者プログラムの出力が表示されます.
b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’
複数の消費者
複数の消費者を起動して生産者が送信したメッセージを消費し、負荷の均衡を実現する必要がある場合、topicに複数のpartitionを設定することができる.便宜上、topicのpartition数と消費者数を2に設定します.
どちらかのkafkaコンテナに入ります.
容器内で、表示
見られる
2つの消費者プログラムを起動し、1つの生産者プログラムを起動します.1つの消費者出力が表示されます.
b’hello kafka’ b’hello kafka’ b’hello kafka’
もう1つの消費者出力:
b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’
生産者が送信したメッセージは、デフォルトの負荷等化ポリシーが使用されているため、2人の消費者によって消費され、異なる消費者が受信したメッセージの数が一致しない.生産者プログラムを複数回実行すると、消費者が受信した総メッセージの数がほぼ一致していることがわかります.
参考資料 http://kafka.apache.org/ http://wurstmeister.github.io/kafka-docker/ https://www.cnblogs.com/answerThe/p/11267129.html https://www.jianshu.com/p/fe73765ef74d https://pypi.org/project/kafka-python/ https://www.cnblogs.com/small-office/p/9399907.html
一、kafkaクラスタの構築
テストを容易にするために,MacOS単機上に3つのkafkaノードを有するクラスタを構築した.本番にkafkaを配備する場合は、kafkaクラスタを異なる物理マシンに配備します.
1.kafkaミラーをダウンロードする
wurstmeister/kafkaミラーを使用してkafkaを配置します.kafkaはzookeeperに依存するため、zookeeperとkafkaの2つのミラーをダウンロードする必要があります.
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper
2.docker-compopseを作成する.yml
我々が構築したkafkaクラスタにはzookerノードと3つのkakfaノードが含まれている.
使用
ifconfig
命令取得自機のipは192.168.0.104
、完全なdocker-compose.ymlファイルは以下の通りです.docker-composeの使用については、『Docker Component入門チュートリアル』を参照してください.version: '3'
services:
zookeeper:
image: docker.io/wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- "2181:2181"
kafka1:
image: docker.io/wurstmeister/kafka
container_name: kafka1
restart: always
ports:
- "9095:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9095
kafka2:
image: docker.io/wurstmeister/kafka
container_name: kafka2
restart: always
ports:
- "9096:9093"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9096
kafka3:
image: docker.io/wurstmeister/kafka
container_name: kafka3
restart: always
ports:
- "9097:9094"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9097
docker-compose up -d
関連するコンテナを使用して、kafkaクラスタの構築を完了します.二、Kafka生産者
私たちは
kafka-python
パッケージを使ってkafka生産者を書きます.以下のコマンドでダウンロードkafka-python
パッケージ:pip install kafka-python
Kafka生産者のソースコードは以下の通りです.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=["192.168.0.104:9095", '192.168.0.104:9096', '192.168.0.104:9097'])
for _ in range(10):
producer.send('topic_test', b'hello kafka')
producer.flush()
まず、
KafkaProducer
オブジェクトproducer
、KafkaProducer
kafkaクラスタの各ノードのアドレスを指定します.そしてKafkaProducer
オブジェクトを介してtopictopic_test
へメッセージを送信する.三、Kafka消費者
生産者からのメッセージを受け取るため、消費者
KafkaConsumer
対象consumer
を定義します.定義KafkaConsumer
オブジェクトの場合、topicをtopic_test
と指定し、上記のプロダクションプログラムを実行すると、メッセージを正常に受信できるようにします.from kafka import KafkaConsumer
consumer = KafkaConsumer('topic_test', group_id="my_group", bootstrap_servers=["192.168.0.104:9095", '192.168.0.104:9096', '192.168.0.104:9097'])
for msg in consumer:
print(msg.value)
まず消費者プログラムを実行し、生産者プログラムを実行すると、消費者プログラムの出力が表示されます.
b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’
複数の消費者
複数の消費者を起動して生産者が送信したメッセージを消費し、負荷の均衡を実現する必要がある場合、topicに複数のpartitionを設定することができる.便宜上、topicのpartition数と消費者数を2に設定します.
どちらかのkafkaコンテナに入ります.
docker exec -it kafka1 /bin/bash
容器内で、表示
topic_test
のpartition数:cd /opt/kafka
bin/kafka-topics.sh --describe --zookeeper 192.168.0.104:2181 --topic topic_test
見られる
topic_test
のpartition数は1です.修正topic_test
partition数2:bin/kafka-topics.sh --zookeeper 192.168.0.104:2181 --alter --topic topic_test --partitions 2
2つの消費者プログラムを起動し、1つの生産者プログラムを起動します.1つの消費者出力が表示されます.
b’hello kafka’ b’hello kafka’ b’hello kafka’
もう1つの消費者出力:
b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’
生産者が送信したメッセージは、デフォルトの負荷等化ポリシーが使用されているため、2人の消費者によって消費され、異なる消費者が受信したメッセージの数が一致しない.生産者プログラムを複数回実行すると、消費者が受信した総メッセージの数がほぼ一致していることがわかります.
参考資料