SparkStreamingデータソースを取得する2つの方法(ポート番号の傍受とkafkaの統合)

15566 ワード

方法1:ポート番号を傍受し、linux上でnc-lkポート番号サービスを開始する必要があります.その後、SparkStreamingはこのポートからデータを引き出し、リアルタイムで処理することができます.コードは以下の通りです.
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object StreamingWordCount {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
    val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("linux01", 8888)
    val dStream2: DStream[String] = dStream.flatMap(_.split(" "))
    val dStream3: DStream[(String, Int)] = dStream2.map((_, 1))
    val reduced: DStream[(String, Int)] = dStream3.reduceByKey(_ + _)
    reduced.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

方式2:SparkStreamingはkafkaを統合し、この方式はkafkaクラスタが正常に運行する必要があり、コードは以下の通りである.
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingKafkaWordCount {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
    //            Info,     ,        ,            WARN,       
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "test01:9092,test02:9092,test03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "1",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )

    val topics = Array("wordcount")
    /*
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V]
     */
     //        DStream,  KafkaUtils      createDirectStream  .       Kafka   API,Consumer  Leader,    
    val value: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
    //   :StreamingContext  
      ssc,
    //   :    , kafka Worker        ,    ,          
      LocationStrategies.PreferConsistent,
    //   :     ,        ,  1   kafka    topic,  2 kafka       
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //        map,        object not serializable  ,         InputStream       ConsumerRecord  ,        ,      Key Value      
    val value1: DStream[String] = value.map(cr => {
      cr.value()
    })
    value1.print()
    ssc.start()
    ssc.awaitTermination()

  }
}