SparkStreamingはkafkaデータをリアルタイムで抽出、フィルタリング、変換し、HDFSに格納する
9054 ワード
テキストリンク:https://blog.51cto.com/simplelife/2311296?source=dra
概要:本例はSparkStreamingがkafkaメッセージを消費する例であり、実現する機能はデータをリアルタイムで抽出、フィルタリング、変換し、HDFSに格納することである.
インスタンスコード
依存環境(pom.xml)
転載:https://blog.51cto.com/simplelife/2311296?source=dra
概要:本例はSparkStreamingがkafkaメッセージを消費する例であり、実現する機能はデータをリアルタイムで抽出、フィルタリング、変換し、HDFSに格納することである.
インスタンスコード
package com.fwmagic.test
import com.alibaba.fastjson.{JSON, JSONException}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
/**
* created by fwmagic
*/
object RealtimeEtl {
private val logger = LoggerFactory.getLogger(PVUV.getClass)
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hadoop")
val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()
val streamContext = new StreamingContext(spark.sparkContext, Seconds(5))
// kafka Topic
//"auto.offset.reset:earliest( ),latest( offset )
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "fwmagic",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("access")
val kafkaDStream = KafkaUtils.createDirectStream[String, String](
streamContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// SparkStream Kafka , kafkaDStream foreachRDD
kafkaDStream.foreachRDD(kafkaRDD => {
if (!kafkaRDD.isEmpty()) {
// RDD
val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
// kafka
val lines = kafkaRDD.map(_.value())
// lines json
val logBeanRDD = lines.map(line => {
var logBean: LogBean = null
try {
logBean = JSON.parseObject(line, classOf[LogBean])
} catch {
case e: JSONException => {
//logger
logger.error("json !line:" + line, e)
}
}
logBean
})
//
val filteredRDD = logBeanRDD.filter(_ != null)
// RDD DataFrame, RDD case class
import spark.implicits._
val df = filteredRDD.toDF()
df.show()
// hdfs :hdfs://hd1:9000/360
df.repartition(1).write.mode(SaveMode.Append).parquet(args(0))
// , kafka
kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
//
streamContext.start()
streamContext.awaitTermination()
streamContext.stop()
}
}
case class LogBean(time:String,
longitude:Double,
latitude:Double,
openid:String,
page:String,
evnet_type:Int)
依存環境(pom.xml)
4.0.0
com.fwmagic.360
fwmagic-360
1.0
1.8
1.8
2.11.7
2.2.2
2.7.7
UTF-8
org.scala-lang
scala-library
${scala.version}
org.apache.spark
spark-core_2.11
${spark.version}
org.apache.spark
spark-sql_2.11
${spark.version}
org.apache.spark
spark-streaming_2.11
${spark.version}
org.apache.spark
spark-streaming-kafka-0-10_2.11
${spark.version}
org.apache.hadoop
hadoop-client
${hadoop.version}
org.apache.hadoop
hadoop-client
${hadoop.version}
com.alibaba
fastjson
1.2.39
net.alchim31.maven
scala-maven-plugin
3.2.2
org.apache.maven.plugins
maven-compiler-plugin
3.5.1
net.alchim31.maven
scala-maven-plugin
scala-compile-first
process-resources
add-source
compile
scala-test-compile
process-test-resources
testCompile
org.apache.maven.plugins
maven-compiler-plugin
compile
compile
org.apache.maven.plugins
maven-shade-plugin
2.4.3
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
転載:https://blog.51cto.com/simplelife/2311296?source=dra