spark streamingカスタムkafka topicのoffset(python)を読み込む

2500 ワード

kafkaデータをspark streamingで処理すると、プログラムに異常が発生したり、プログラムを変更して再実行する必要がある場合があります.
  • kafkaのデータが読み出され、zookeeperには読み出されたoffsetが保存されていますが、データ処理に異常が発生した場合、プログラムを修正してから再実行してもこのデータは処理されません.
  • 従来のプログラムは修正してから実行する必要があり、killが落ちてから実行する場合、kafkaのoffsetがzookeeperに提出されていない可能性があり、プログラムを修正してから再実行すると一部のデータが繰り返し処理される.

  • これらの問題から,kafkaにおけるデータ読み出しの状態を自分で手動で管理することが望ましい.
    kafkaストリームは、次の方法で作成されました.
    kvs = KafkaUtils.createStream(ssc, "xxx.xxx.xxx.xxx:2182/kafka", group_id, {topic:1},{'auto.offset.reset':'smallest'})

    この方法はzookeeperを用いてkafkaにおけるtopicにおける各partition読み出しのoffsetを管理する.
    インターネットでいくつかの資料を探してみると、createDirectStreamを使用して定義設定から読み取るoffsetを使用することができます.
    from pyspark.streaming import StreamingContext
    from pyspark import SparkContext,SparkConf
    from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
    
    
    spark_conf = SparkConf()
    
    sc = SparkContext(conf=spark_conf)
    ssc = StreamingContext(sc, 30)
    
    topic = "test"
    partition = 0
    start = 0
    
    topicPartion = TopicAndPartition(topic,partition)
    fromOffset = {topicPartion: long(start)}
    directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],  {"metadata.broker.list": 'xxx.xxx.xxx.xxx:9092'}, fromOffsets=fromOffset)

    ここで、fromOffsetはtopic partitionに対応するoffsetの情報を保存し、例ではtestというtopicは1つのpartitionしかなく、offset=0の位置から読むように設定している.
    データの処理が完了したら、データの読み出し位置の状態を保存しておきたい.
    offsetRanges = []
    def storeOffsetRanges(rdd):
        global offsetRanges
        offsetRanges = rdd.offsetRanges()
        return rdd
    
    def printOffsetRanges(rdd):
        print rdd.count()
        for o in offsetRanges:
            print "__________________________________"
            print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
            print "__________________________________"
    
    
    directKafkaStream\
        .transform(storeOffsetRanges)\
        .foreachRDD(printOffsetRanges)

    例のstoreOffsetRanges関数は、データ読み出しのオフセット情報をoffsetRangesに保存し、printOffsetRangesにrddのtopic、partition、読み出しデータ最小のoffset、最大のoffsetを印刷する.
    これによりmysqlやmongodbなどにも保存でき、後でspark streamingタスクを再起動する必要がある場合は、データベースに保存した前回のoffsetからデータの読み取りを開始できます.
    api:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils
    github:https://github.com/Stratio/spark-kafka
    stackoverflow: http://stackoverflow.com/questions/33268689/how-to-create-inputdstream-with-offsets-in-pyspark-using-kafkautils-createdirec