Kafka消費積圧Lag値Pythonコード取得
1434 ワード
Kafka消費積圧Lag値Pythonコード取得
背景
kafkaのメッセージ蓄積状況に応じて、消費者クラスタインスタンスの数を自動的に拡張する.
コードおよびテスト
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka import TopicPartition
# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
bootstrap_server = ['localhost:9092']
top = 'share'
producer = KafkaProducer(bootstrap_servers=bootstrap_server)
consumer = KafkaConsumer(top,
group_id='my-groups',
bootstrap_servers=bootstrap_server)
# Lan
def get_lan():
partitions = producer.partitions_for(top)
sum = 0
for pt in partitions:
p = TopicPartition(topic=top, partition=pt)
beginning_offsets = consumer.committed(p)
end_offsets = consumer.end_offsets([p])
print(beginning_offsets,end_offsets)
sum = sum + end_offsets[p]-beginning_offsets
return sum
# test
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
print("======")
print(get_lan())
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka import TopicPartition
# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
bootstrap_server = ['localhost:9092']
top = 'share'
producer = KafkaProducer(bootstrap_servers=bootstrap_server)
consumer = KafkaConsumer(top,
group_id='my-groups',
bootstrap_servers=bootstrap_server)
# Lan
def get_lan():
partitions = producer.partitions_for(top)
sum = 0
for pt in partitions:
p = TopicPartition(topic=top, partition=pt)
beginning_offsets = consumer.committed(p)
end_offsets = consumer.end_offsets([p])
print(beginning_offsets,end_offsets)
sum = sum + end_offsets[p]-beginning_offsets
return sum
# test
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
print("======")
print(get_lan())