pythonメッセージミドルウェアkafka
3600 ワード
ダウンロードとインストール
メッセージの送信と受信
wget http://apache.fayea.com/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzvf kafka_2.12-2.2.1.tgz
cd kafka_2.12-2.2.1
# zookeeper(kafka )
bin/zookeeper-server-start.sh config/zookeeper.properties
# @1 jdk
# kafka
bin/kafka-server-start.sh config/server.properties
Kafka
# ,
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
tar -xzvf jdk-8u211-linux-x64.tar.gz
mv jdk1.8.0_211 /opt/jdk1.8
vim /etc/profile
## JAVA
export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
#:wq
source /etc/profile
#
java -version
メッセージの送信と受信
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
# -- test1
# -- 1
# --partitions 1 topic
Created topic test1. #
# kafka topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# topic, , ,
#
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
#
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
@1 JDK8
開発# pip install kafka-python
from kafka import KafkaProducer
import names
producer = KafkaProducer()
for _ in range(10):
name = names.get_full_name()
future = producer.send('test',bytes(name,'ascii'))
result = future.get(60)
print(result)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',group_id='test01')
for msg in consumer:
print(msg)