KafKaのオフセット量をMysqlに書き込む

9989 ワード

package SparkStreamingKafKa.OffSetMysql

import java.sql.{DriverManager, ResultSet}

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

import scala.collection.mutable

object StreamingKafkaWCMysqlOffset1 {
  //      
  Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    //conf       
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName(this.getClass.getSimpleName)
    //SparkStreaming
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
    val groupId = "hello_topic_group0"
    // kafka     
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "Linux00:9092,Linux01:9092,Linux04:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topic = "he8"
    val topics = Array(topic)
    val config: Config = ConfigFactory.load()
    //          
    val offsets = mutable.HashMap[TopicPartition, Long]()
    val conn1 = DriverManager.getConnection(config.getString("db.url"), config.getString("db.user"), config.getString("db.password"))

    val pstm = conn1.prepareStatement("select * from mysqloffset_copy where groupId = ? and topic = ? ")
    pstm.setString(1, groupId)
    pstm.setString(2, topic)

    val result: ResultSet = pstm.executeQuery()
    while (result.next()) {
      //               
      val p = result.getInt("partition")
      val f = result.getInt("untilOffset")
      //      offsets += (new TopicPartition(topic,p)-> f)
      val partition: TopicPartition = new TopicPartition(topic, p)
      offsets.put(partition, f)
    }

    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      Subscribe[String, String](topics, kafkaParams,offsets)
    )



    //   RDD
    stream.foreachRDD(rdd => {
      //         
      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      println("  =" + ranges.length)
      ranges.foreach(println)
      val result: RDD[(String, Int)] = rdd.map(_.value()).map((_, 1)).reduceByKey(_ + _)
      result.foreach(println)
      result.foreachPartition(p => {
        val jedis: Jedis = ToolsRedisMysql.getJedis()
        p.foreach(t => {
          jedis.hincrBy("wc1", t._1, t._2)
        })
        jedis.close()
      })
      val conn = DriverManager.getConnection(config.getString("db.url"), config.getString("db.user"), config.getString("db.password"))

      //      Array     mysql 
      ranges.foreach(t => {
        //   ,         ?      offset           groupid

        val pstm = conn.prepareStatement("replace into mysqloffset_copy values (?,?,?,?)")
        pstm.setString(1, t.topic)
        pstm.setInt(2, t.partition)
        pstm.setLong(3, t.untilOffset)
        pstm.setString(4, groupId)
        pstm.execute()
        pstm.close()
      })
    })
    ssc.start()
    ssc.awaitTermination()

  }
}