pythonによるkafkaの基本操作

2568 ワード

-- coding:utf-8 --
from kafka import KafkaProducerfrom kafka import KafkaConsumerfrom kafka.structs import TopicPartitionimport time
bootstrap_servers = []class OperateKafka:def init(self,bootstrap_servers,topic):self.bootstrap_servers = bootstrap_serversself.topic = topic
"""   """
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""         topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #  test       
print consumer.topics()  #      
print consumer.subscription()  #            
print consumer.assignment()  #       topic、    
print consumer.beginning_offsets(consumer.assignment()) #              
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #     ,  1      
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""         topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #        
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #            
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))
"""   (      )"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   # kafka    
        if message:
        print message
        time.sleep(1)

def main():bootstrap_servers = ['192.168.124.201:9092']topic = "TEST"operateKafka = OperateKafka(bootstrap_servers,topic)operateKafka.produce()#operateKafka.consume()#operateKafka.consume2()operateKafka.consume3()main()