Kafka消費積圧Lag値Pythonコード取得

1434 ワード

Kafka消費積圧Lag値Pythonコード取得


 

背景


kafkaのメッセージ蓄積状況に応じて、消費者クラスタインスタンスの数を自動的に拡張する.

コードおよびテスト

from kafka import KafkaConsumer

from kafka  import KafkaProducer
from kafka import TopicPartition

# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
bootstrap_server = ['localhost:9092']
top = 'share'


producer = KafkaProducer(bootstrap_servers=bootstrap_server)

consumer = KafkaConsumer(top,
                         group_id='my-groups',
                         bootstrap_servers=bootstrap_server)


#  Lan 
def get_lan():
    partitions = producer.partitions_for(top)
    sum = 0
    for pt in partitions:
        p = TopicPartition(topic=top, partition=pt)
        beginning_offsets = consumer.committed(p)
        end_offsets = consumer.end_offsets([p])
        print(beginning_offsets,end_offsets)

        sum = sum + end_offsets[p]-beginning_offsets

    return sum




# test
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

    print("======")
    print(get_lan())