Spark kafkaリアルタイム消費実現
6175 ワード
直接コードをつけて、完全です.scala作成
上で使用したconfigツール、maven依存、ここに貼り付けます.
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.typesafe.config.{Config, ConfigFactory}
import xxxxxx.JsonUtil
import scala.collection._
import kafka.serializer.StringDecoder
/**
* @author iris_new
*/
// spark Logging
object Example extends Logging{
def startJob(args: Array[String]){
// typesafe Config app.conf ,
val appConf = ConfigFactory.load("app.conf")
//sparkContext
val sc = new SparkContext(new SparkConf().setAppName(appConf.getString("name")))
// Job Job
val streamConf = appConf.getConfig("streaming")
def functionToCreateContext(): StreamingContext = {
val context = new StreamingContext(sc, Seconds(streamConf.getInt("duration")))
//
doXxx(appConf,sc,context)
// checkpoint, hdfs
context.checkpoint(streamConf.getString("checkpointDir"))
context
}
val ssc = StreamingContext.getOrCreate(streamConf.getString("checkpointDir"),functionToCreateContext)
ssc.start()
ssc.awaitTermination()
}
def doXxx(appConf: Config , sc: SparkContext, ssc : StreamingContext) {
//kafka
// app.conf kafka
/**
kafka {
brokers = "22.2.22.22:9092,22.2.22.23:9092,22.2.22.24:9092,22.2.22.25:9092"
topics = "example"
offset = "largest"
}
*/
val kafkaConf = appConf.getConfig("kafka")
val topics = kafkaConf.getString("topics")
val brokers = kafkaConf.getString("brokers")
val offset = kafkaConf.getString("offset")
val topicSet = topics.split(",").toSet
// Kafka
val kafkaParams = immutable.Map[String, String]("metadata.broker.list" -> brokers,"auto.offset.reset"-> offset)
// org.apache.spark.streaming.kafka.KafkaUtils DirectStream
val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
//
val minuteLines = lines.filter(line =>{
if(line.indexOf("\"code\":\"aaa\"")>=0){
true
}else{
false
}
}).map(message => {
// json , jsonUtil 。 [Map[String,Any]]
val jsonData = JsonUtil.read(message)
jsonData
})
minuteLines.foreachRDD(rdd => {
rdd.foreach {jsonData => {
val code = jsonData("code").asInstanceOf[String]
val name = jsonData("name").asInstanceOf[String]
//do something
}}
})
}
def main(args: Array[String]) {
startJob(args)
}
}
上で使用したconfigツール、maven依存、ここに貼り付けます.
<dependency>
<groupId>com.typesafegroupId>
<artifactId>configartifactId>
<version>1.2.1version>
dependency>