Offset管理

8023 ワード

offset管理:
    checkpoint     zk、nosql、rdbms     kafka
一、CheckPoint:以下の要求があるアプリケーションに対して、チェックポイントを有効にする必要がある:1.ステータス変換の使用-アプリケーションでupdateStateByKeyまたはreduceByKey Andを使用する場合.(逆関数を有する)では、周期的なRDD検査を可能にするためにCheckPointを提供する必要がある.               2.アプリケーションを実行しているドライバの障害からリカバリします.メタデータチェックポイントは、進捗情報を使用してリカバリするために使用されます.             
注:上記のステータス変換のない単純なストリームアプリケーションは、チェックポイントを有効にしないで実行できます.この場合、driverからの障害復旧も部分的に行われます(一部の受信されたが未処理のデータが失われる可能性があります).これは通常受け入れられ、多くの人がSparkStreamingアプリケーションをこのように実行しています.
CheckPointの定義と使用:フォールトトレランス、信頼性の高いファイルシステム(例えばHDFS、S 3など)にディレクトリを設定することでチェックポイント機能を有効にすることができ、チェックポイント情報が保存される.これは、フローコンテキストチェックポイント(チェックポイントディレクトリ)を使用して行います.これにより、前述したステータス変換を使用できます.また、Driverプログラムの障害からアプリケーションをリカバリする場合は、次のような動作をするようにストリームアプリケーションを書き換える必要があります.             
1.プログラムが最初に起動すると、新しいStreamingContextが作成され、すべてのストリームが設定され、start()が呼び出されます.2.プログラムが失敗した後に再起動すると、チェックポイントディレクトリのチェックポイントデータからStreamingContextが再作成されます. 
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

注意:RDDSのcheckpointsは、信頼性の高いストレージに保存するコストを考慮する必要があります.これはRDDSがcheckpointsを得るバッチ処理時間の増加をもたらす可能性がある.そのため、checkpointsの間隔を細かく設定する必要があります.小ロット(例えば1秒)では、各ロットのcheckpointsは、動作スループットを著しく低減することができる.逆にcheckpointsが少なすぎるとlineageとtaskのサイズが増加し、不利な影響を及ぼす可能性があります.ステータス変換にはRDD checkpointsが必要であり、デフォルト間隔はバッチ間隔の倍数であり、少なくとも10秒である.dstreamを使用することができます.checkpoint(checkpointInterval)で設定します.通常、1つのcheckpoint intervalは、5〜10個のDStreamのスライド間隔が好ましい. 
例:
package com.ruozedata.spark.streaming.day04

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

/**
  * Created by ruozedata on 2018/9/15.
  */
object OffsetApp1 {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("OffsetApp1")
    val kafkaParams =  Map[String, String](
      "metadata.broker.list"->"ip:port",
      "auto.offset.reset" -> "smallest"
    )
    val topics = "ruoze_g3_offset".split(",").toSet
    val checkpointDirectory = "hdfs://hadoop000:8020/g3_offset/"

    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val ssc = new StreamingContext(sparkConf,Seconds(10))   // new context
      val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
      ssc.checkpoint(checkpointDirectory)
      messages.checkpoint(Duration(8*10*1000))

      messages.foreachRDD(rdd=>{
        if(!rdd.isEmpty()) {
          println("         :"+ rdd.count())
        }
      })

      ssc
    }
    
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    ssc.start()
    ssc.awaitTermination()
  }
}

二、KafkaのOffset管理:offsetはconsumer positionであり、Topicの各パーティションにはそれぞれoffsetがある.消費者は自分でoffsetを保持し、kafkaからメッセージを取得するときは、現在のoffset以降のメッセージだけを引く必要があります.Kafkaのscala/java版のclientはこの部分の論理を実現し、offsetをzookeeperに保存した.詳細は、Kafkaのoffset管理
例:プロファイル:
# MySQL example
//db.default.driver="com.mysql.jdbc.Driver"
//db.default.url="jdbc:mysql://hadoop000:3306/ruozedata?characterEncoding=utf-8"
//db.default.user="root"
//db.default.password="root"

metadata.broker.list="ip:port"
auto.offset.reset="smallest"
group.id="ruoze_offset_group_pk"
kafka.topics="ruoze_g3_offset"



db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop000:3306/g3?characterEncoding=utf-8"
db.default.user="root"
db.default.password="root"

プロファイルからkeyでvalueを取得するには、次の手順に従います.
package com.ruozedata.spark.streaming.day04
import com.typesafe.config.ConfigFactory
import org.apache.commons.lang3.StringUtils

/**
  * Created by ruozedata on 2018/9/15.
  */
object ValueUtils {
  val load = ConfigFactory.load()
  def getStringValue(key:String, defaultValue:String="") = {
    val value = load.getString(key)
    if(StringUtils.isNotEmpty(value)){
      value
    }else{
      defaultValue
    }
  }
}

MySQLでOffsetを管理するには:
package com.ruozedata.spark.streaming.day04

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
import scalikejdbc.config.DBs
import scalikejdbc._

/**
  * 1) StreamingContext
  * 2)  kafka          (  offset)
  * 3)          
  * 4)              (  offset)
  * 5)    ,      
  *
  */
object JdbcOffsetApp {
  def main(args: Array[String])
  {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcOffsetApp")
    val kafkaParams =  Map[String, String](
      "metadata.broker.list"-> ValueUtils.getStringValue("metadata.broker.list"),
      "auto.offset.reset" -> ValueUtils.getStringValue("auto.offset.reset"),
      "group.id" -> ValueUtils.getStringValue("group.id")
    )
    val topics = ValueUtils.getStringValue("kafka.topics").split(",").toSet
    val ssc = new StreamingContext(sparkConf,Seconds(10))   // new context

    DBs.setup()
    val fromOffsets = DB.readOnly{ implicit  session => {
      sql"select * from offsets_storage".map(rs => {
        (TopicAndPartition(rs.string("topic"),rs.int("partitions")), rs.long("offset"))
      }).list().apply()
    }
    }.toMap


    /**
      *       ,     smallest,      ,      
      *
      * true   
      */
    val messages =  if(fromOffsets.size == 0) {
      println("~~~~~~      ~~~~~~~~~")

      KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    } else { //      offset    
      println("~~~~~~     offset    ~~~~~~~~~")

//    val fromOffsets = Map[TopicAndPartition, Long]()
    val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(),mm.message())
    KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)
  }

    messages.foreachRDD(rdd=>{
      println("         :"+ rdd.count())

      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      offsetRanges.foreach(x =>{
        // TODO...      MySQL   
        println(s"---${x.topic},${x.partition},${x.fromOffset},${x.untilOffset}--")

        DB.autoCommit{
          implicit session => {
            sql"replace into offsets_storage(topic,groupid,partitions,offset) values(?,?,?,?)"
              .bind(x.topic,ValueUtils.getStringValue("group.id"),x.partition,x.untilOffset).update().apply()
          }
        }
      })

      //      if(!rdd.isEmpty()) {
//        println("         :"+ rdd.count())
//      }


    })
    ssc.start()
    ssc.awaitTermination()
  }
}

MySQLテーブル:
create table offsets_storage( topic varchar(32), groupid varchar(50), partitions int, offset bigint, primary key(topic,groupid,partitions) );