spark streamingカスタムkafka topicのoffset(python)を読み込む
2500 ワード
kafkaデータをspark streamingで処理すると、プログラムに異常が発生したり、プログラムを変更して再実行する必要がある場合があります. kafkaのデータが読み出され、zookeeperには読み出されたoffsetが保存されていますが、データ処理に異常が発生した場合、プログラムを修正してから再実行してもこのデータは処理されません. 従来のプログラムは修正してから実行する必要があり、killが落ちてから実行する場合、kafkaのoffsetがzookeeperに提出されていない可能性があり、プログラムを修正してから再実行すると一部のデータが繰り返し処理される.
これらの問題から,kafkaにおけるデータ読み出しの状態を自分で手動で管理することが望ましい.
kafkaストリームは、次の方法で作成されました.
この方法はzookeeperを用いてkafkaにおけるtopicにおける各partition読み出しのoffsetを管理する.
インターネットでいくつかの資料を探してみると、createDirectStreamを使用して定義設定から読み取るoffsetを使用することができます.
ここで、fromOffsetはtopic partitionに対応するoffsetの情報を保存し、例ではtestというtopicは1つのpartitionしかなく、offset=0の位置から読むように設定している.
データの処理が完了したら、データの読み出し位置の状態を保存しておきたい.
例のstoreOffsetRanges関数は、データ読み出しのオフセット情報をoffsetRangesに保存し、printOffsetRangesにrddのtopic、partition、読み出しデータ最小のoffset、最大のoffsetを印刷する.
これによりmysqlやmongodbなどにも保存でき、後でspark streamingタスクを再起動する必要がある場合は、データベースに保存した前回のoffsetからデータの読み取りを開始できます.
api:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils
github:https://github.com/Stratio/spark-kafka
stackoverflow: http://stackoverflow.com/questions/33268689/how-to-create-inputdstream-with-offsets-in-pyspark-using-kafkautils-createdirec
これらの問題から,kafkaにおけるデータ読み出しの状態を自分で手動で管理することが望ましい.
kafkaストリームは、次の方法で作成されました.
kvs = KafkaUtils.createStream(ssc, "xxx.xxx.xxx.xxx:2182/kafka", group_id, {topic:1},{'auto.offset.reset':'smallest'})
この方法はzookeeperを用いてkafkaにおけるtopicにおける各partition読み出しのoffsetを管理する.
インターネットでいくつかの資料を探してみると、createDirectStreamを使用して定義設定から読み取るoffsetを使用することができます.
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
spark_conf = SparkConf()
sc = SparkContext(conf=spark_conf)
ssc = StreamingContext(sc, 30)
topic = "test"
partition = 0
start = 0
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 'xxx.xxx.xxx.xxx:9092'}, fromOffsets=fromOffset)
ここで、fromOffsetはtopic partitionに対応するoffsetの情報を保存し、例ではtestというtopicは1つのpartitionしかなく、offset=0の位置から読むように設定している.
データの処理が完了したら、データの読み出し位置の状態を保存しておきたい.
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
print rdd.count()
for o in offsetRanges:
print "__________________________________"
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
print "__________________________________"
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
例のstoreOffsetRanges関数は、データ読み出しのオフセット情報をoffsetRangesに保存し、printOffsetRangesにrddのtopic、partition、読み出しデータ最小のoffset、最大のoffsetを印刷する.
これによりmysqlやmongodbなどにも保存でき、後でspark streamingタスクを再起動する必要がある場合は、データベースに保存した前回のoffsetからデータの読み取りを開始できます.
api:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils
github:https://github.com/Stratio/spark-kafka
stackoverflow: http://stackoverflow.com/questions/33268689/how-to-create-inputdstream-with-offsets-in-pyspark-using-kafkautils-createdirec