Kafka手動メンテナンスオフセット(offset)

4208 ワード

ケース:オフセット量をMysqlに手動で維持
●API
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html 
 
  • 手動でoffsetをコミットし、データが失われないことを保証する
  • 具体的なコード実装
    import java.sql.{DriverManager, ResultSet}
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{OffsetRange, _}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    /**
      *
      * Date 2019/8/8 10:47
      * Desc   Spark-Kafka-0-10    ,        ,   MySQL 
      */
    object SparkKafkaDemo2 {
      def main(args: Array[String]): Unit = {
        //1.  StreamingContext
        //spark.master should be set as local[n], n > 1
        val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        val ssc = new StreamingContext(sc,Seconds(5))//5  5             RDD
        //    Kafka   
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "node-01:9092,node-02:9092,node-03:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "SparkKafkaDemo",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val topics = Array("spark_kafka")
        //2.  KafkaUtil  Kafak    
        //  :
        //  MySQL     offset,     , latest    
        //  MySQL    offset,     offset     
        val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("SparkKafkaDemo","spark_kafka")
        val recordDStream: InputDStream[ConsumerRecord[String, String]] = if(offsetMap.size > 0){//   offset
          println("MySQL    offset,   offset     ")
          KafkaUtils.createDirectStream[String, String](ssc,
          LocationStrategies.PreferConsistent,//    ,           ,  Spark Executor Kafka Broker    
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetMap))//    ,           
        }else{//    offset
          println("    offset,     , latest    ")
          // /opt/soft/kafka/bin/kafka-console-producer.sh --broker-list node-01:9092 --topic  spark_kafka
          KafkaUtils.createDirectStream[String, String](ssc,
          LocationStrategies.PreferConsistent,//    ,           ,  Spark Executor Kafka Broker    
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//    ,           
        }
        //3.    
        //  :                ,     ,               offset
        //        DStream       RDD,       DStream  RDD    
        //  DStream  RDD     API transform(  ) foreachRDD(  )
        recordDStream.foreachRDD(rdd=>{
          if(rdd.count() > 0){//           
            rdd.foreach(record => println("    Kafk        :" + record))
            //    Kafk        :ConsumerRecord(topic = spark_kafka, partition = 1, offset = 6, CreateTime = 1565400670211, checksum = 1551891492, serialized key size = -1, serialized value size = 43, key = null, value = hadoop spark ...)
            //  :              ,          offset,       
            //            ....    transform         
            //          ,    offset ,         offset   /  ,spark      ,     offset   
            val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            for (o  rs.getLong("offset")
          }
          rs.close()
          pstmt.close()
          connection.close()
          offsetMap
        }
    
        /**
          *           
          */
        def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
          val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
          //replace into        ,     
          val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)")
          for (o