python3.6操作kafka、生産者消費者キュー
2216 ワード
使用シーンを紹介します。私はここでredisを生産者の消費者の列にしていました。それから、redisの容量が大きくなく、アップグレードコストも高いので、kafkaをメッセージの列に使いました。データはタイムリーに生産され、消費されているので、あまり深くはありません。topicをredisのkeyに使いました。
あとでテストしてみましたが、pykafkaよりkafkaの方が速度が10倍ほど速く、コードも比較的簡便なので、kafkaで接続しましょう、サンプルコード:
from kafka import KafkaConsumer
from kafka import KafkaProducer
server_list = [ "192.168.0.1:xxxx","192.168.0.2:xxxx"]
#
producer = KafkaProducer(bootstrap_servers=server_list, compression_type='gzip')
msg = {"name": " 1", "text": " 1"}
bmsg = bytes(str(msg).encode('utf-8'))
producer.send('xiaofei_test', bmsg)
#
consumer = KafkaConsumer('xiaofei_test', auto_offset_reset='earliest', bootstrap_servers=server_list)
print(consumer)
for msg in consumer:
print(msg)
生産者(pykafka)
from pykafka import KafkaClient
import json
hosts = "192.168.0.1:xxxx,192.168.0.2:xxxx"
client = KafkaClient(hosts=hosts)
print(client.topics)
key = "test"
key = bytes(key, encoding='utf8')
topic = client.topics[key]
## , , topic redis key
producer = topic.get_producer(sync=True)
producer.start()
print(producer)
#
msg_dict ={"test": " "}
msg = json.dumps(msg_dict)
with topic.get_sync_producer() as producer:
producer.produce(bytes(msg, encoding='utf8'))
print(msg)
print(' ')
消費者(pykafka)
from pykafka import KafkaClient
hosts = "192.168.0.1:xxxx,192.168.0.2:xxxx"
client = KafkaClient(hosts=hosts)
print("Kafka client:", client.topics)
#
key = "chengdu-cdfgjtj-research_details"
key = bytes(key, encoding='utf8')
topic = client.topics[key]
# https://www.cnblogs.com/jun1019/p/6656223.html
consumer = topic.get_simple_consumer(auto_commit_enable=True)
# consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, consumer_id='test')
for message in consumer:
if message is not None:
print("consumer message:", message.offset)
print(message.value)