pythonメッセージミドルウェアkafka

3600 ワード

ダウンロードとインストール
wget http://apache.fayea.com/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzvf kafka_2.12-2.2.1.tgz 
cd kafka_2.12-2.2.1

#    zookeeper(kafka  )
bin/zookeeper-server-start.sh config/zookeeper.properties
#      @1   jdk

#   kafka
bin/kafka-server-start.sh config/server.properties
Kafka
#     ,    
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
tar -xzvf jdk-8u211-linux-x64.tar.gz
mv jdk1.8.0_211 /opt/jdk1.8


vim /etc/profile

## JAVA

export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
#:wq     

source /etc/profile
#   
java -version

メッセージの送信と受信
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

# --        test1
# --      1
# --partitions   1   topic

Created topic test1. #     

#     kafka      topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

#        topic,     ,    ,      

#    
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
#    
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
@1 JDK8開発
# pip install kafka-python

from kafka import KafkaProducer
import names

producer = KafkaProducer()

for _ in range(10):
    name = names.get_full_name()
    future = producer.send('test',bytes(name,'ascii'))
    result = future.get(60)
    print(result)
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',group_id='test01')

for msg in consumer:
    print(msg)