SparkStreamingはkafkaデータをリアルタイムで抽出、フィルタリング、変換し、HDFSに格納する

9054 ワード

テキストリンク:https://blog.51cto.com/simplelife/2311296?source=dra
概要:本例はSparkStreamingがkafkaメッセージを消費する例であり、実現する機能はデータをリアルタイムで抽出、フィルタリング、変換し、HDFSに格納することである.
インスタンスコード
package com.fwmagic.test

import com.alibaba.fastjson.{JSON, JSONException}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory

/**
  * created by fwmagic
  */
object RealtimeEtl {

  private val logger = LoggerFactory.getLogger(PVUV.getClass)

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hadoop")

    val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]")

    val spark = SparkSession.builder().config(conf).getOrCreate()

    val streamContext = new StreamingContext(spark.sparkContext, Seconds(5))

    //        kafka Topic     
    //"auto.offset.reset:earliest(          ),latest(        offset    )
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "fwmagic",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("access")

    val kafkaDStream = KafkaUtils.createDirectStream[String, String](
      streamContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    //    SparkStream Kafka      ,   kafkaDStream    foreachRDD
    kafkaDStream.foreachRDD(kafkaRDD => {
      if (!kafkaRDD.isEmpty()) {
        //       RDD    
        val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges

        //  kafka    
        val lines = kafkaRDD.map(_.value())
        // lines      json  
        val logBeanRDD = lines.map(line => {
          var logBean: LogBean = null
          try {
            logBean = JSON.parseObject(line, classOf[LogBean])
          } catch {
            case e: JSONException => {
              //logger  
              logger.error("json    !line:" + line, e)
            }
          }
          logBean
        })

        //  
        val filteredRDD = logBeanRDD.filter(_ != null)

        // RDD   DataFrame,  RDD    case class
        import spark.implicits._

        val df = filteredRDD.toDF()

        df.show()
        //     hdfs :hdfs://hd1:9000/360
        df.repartition(1).write.mode(SaveMode.Append).parquet(args(0))

        //          ,       kafka
        kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })

    //  
    streamContext.start()
    streamContext.awaitTermination()
    streamContext.stop()

  }

}

case class LogBean(time:String,
                   longitude:Double,
                   latitude:Double,
                   openid:String,
                   page:String,
                   evnet_type:Int)

依存環境(pom.xml)


    4.0.0

    com.fwmagic.360
    fwmagic-360
    1.0

    
        1.8
        1.8
        2.11.7
        2.2.2
        2.7.7
        UTF-8
    

    
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        

        
        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
        

        
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
        

        
        
            org.apache.spark
            spark-streaming_2.11
            ${spark.version}
        

        
            org.apache.spark
            spark-streaming-kafka-0-10_2.11
            ${spark.version}
        

        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
            com.alibaba
            fastjson
            1.2.39
        

    

    
        
            
                
                
                    net.alchim31.maven
                    scala-maven-plugin
                    3.2.2
                
                
                
                    org.apache.maven.plugins
                    maven-compiler-plugin
                    3.5.1
                
            
        
        
            
                net.alchim31.maven
                scala-maven-plugin
                
                    
                        scala-compile-first
                        process-resources
                        
                            add-source
                            compile
                        
                    
                    
                        scala-test-compile
                        process-test-resources
                        
                            testCompile
                        
                    
                
            

            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    
                        compile
                        
                            compile
                        
                    
                
            

            
            
                org.apache.maven.plugins
                maven-shade-plugin
                2.4.3
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    *:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                        
                    
                
            
        
    


転載:https://blog.51cto.com/simplelife/2311296?source=dra