Spark kafkaリアルタイム消費実現

6175 ワード

直接コードをつけて、完全です.scala作成
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.typesafe.config.{Config, ConfigFactory}
import xxxxxx.JsonUtil

import scala.collection._
import kafka.serializer.StringDecoder



/**
 * @author iris_new
 */

//    spark Logging         
object Example extends Logging{

    def startJob(args: Array[String]){
      //  typesafe Config    app.conf    ,           
      val appConf = ConfigFactory.load("app.conf")
      //sparkContext
      val sc = new SparkContext(new SparkConf().setAppName(appConf.getString("name")))

      //      Job         Job   
      val streamConf = appConf.getConfig("streaming")
      def functionToCreateContext(): StreamingContext = {
          val context = new StreamingContext(sc, Seconds(streamConf.getInt("duration")))

          //    
          doXxx(appConf,sc,context)

          //  checkpoint,     hdfs     
          context.checkpoint(streamConf.getString("checkpointDir"))
          context
      }
      val ssc = StreamingContext.getOrCreate(streamConf.getString("checkpointDir"),functionToCreateContext)

      ssc.start()
      ssc.awaitTermination()
    }

    def doXxx(appConf: Config , sc: SparkContext, ssc : StreamingContext) {

      //kafka   
      //     app.conf kafka   
      /**
      kafka {
          brokers = "22.2.22.22:9092,22.2.22.23:9092,22.2.22.24:9092,22.2.22.25:9092"
          topics = "example"
          offset = "largest"
    }
      */
      val kafkaConf = appConf.getConfig("kafka")
      val topics = kafkaConf.getString("topics")
      val brokers = kafkaConf.getString("brokers")
      val offset = kafkaConf.getString("offset")
      val topicSet = topics.split(",").toSet


      // Kafka     
      val kafkaParams = immutable.Map[String, String]("metadata.broker.list" -> brokers,"auto.offset.reset"-> offset)
      // org.apache.spark.streaming.kafka.KafkaUtils  DirectStream
      val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet).map(_._2)

      //    
      val minuteLines = lines.filter(line =>{
        if(line.indexOf("\"code\":\"aaa\"")>=0){
          true
        }else{
          false
        }
      }).map(message => {
          //      json  ,   jsonUtil      。  [Map[String,Any]]  
          val jsonData = JsonUtil.read(message)
          jsonData
      })

      minuteLines.foreachRDD(rdd => {
        rdd.foreach {jsonData => {
          val code = jsonData("code").asInstanceOf[String]
          val name = jsonData("name").asInstanceOf[String]
          //do something

       }}
      })

    }

    def main(args: Array[String]) {
      startJob(args)
    }
}

上で使用したconfigツール、maven依存、ここに貼り付けます.
<dependency>
            <groupId>com.typesafegroupId>
            <artifactId>configartifactId>
            <version>1.2.1version>
dependency>