pythonによるkafkaの基本操作
-- coding:utf-8 --
from kafka import KafkaProducerfrom kafka import KafkaConsumerfrom kafka.structs import TopicPartitionimport time
bootstrap_servers = []class OperateKafka:def init(self,bootstrap_servers,topic):self.bootstrap_servers = bootstrap_serversself.topic = topic
def main():bootstrap_servers = ['192.168.124.201:9092']topic = "TEST"operateKafka = OperateKafka(bootstrap_servers,topic)operateKafka.produce()#operateKafka.consume()#operateKafka.consume2()operateKafka.consume3()main()
from kafka import KafkaProducerfrom kafka import KafkaConsumerfrom kafka.structs import TopicPartitionimport time
bootstrap_servers = []class OperateKafka:def init(self,bootstrap_servers,topic):self.bootstrap_servers = bootstrap_serversself.topic = topic
""" """
def produce(self):
producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
for i in range(4):
msg = "msg%d" %i
producer.send(self.topic,key=str(i),value=msg)
producer.close()
""" topic"""
def consume(self):
#consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
print consumer.partitions_for_topic(self.topic) # test
print consumer.topics() #
print consumer.subscription() #
print consumer.assignment() # topic、
print consumer.beginning_offsets(consumer.assignment()) #
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1) # , 1
for message in consumer:
print ("%s:%d:%d: key=%s value=%s"
% (message.topic,message.partition,message.offset, message.key,message.value))
""" topic """
def consume2(self):
consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2')) #
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
""" ( )"""
def consume3(self):
consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
message = consumer.poll(timeout_ms=5) # kafka
if message:
print message
time.sleep(1)
def main():bootstrap_servers = ['192.168.124.201:9092']topic = "TEST"operateKafka = OperateKafka(bootstrap_servers,topic)operateKafka.produce()#operateKafka.consume()#operateKafka.consume2()operateKafka.consume3()main()