SparkStreamingデータソースを取得する2つの方法(ポート番号の傍受とkafkaの統合)
15566 ワード
方法1:ポート番号を傍受し、linux上でnc-lkポート番号サービスを開始する必要があります.その後、SparkStreamingはこのポートからデータを引き出し、リアルタイムで処理することができます.コードは以下の通りです.
方式2:SparkStreamingはkafkaを統合し、この方式はkafkaクラスタが正常に運行する必要があり、コードは以下の通りである.
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("linux01", 8888)
val dStream2: DStream[String] = dStream.flatMap(_.split(" "))
val dStream3: DStream[(String, Int)] = dStream2.map((_, 1))
val reduced: DStream[(String, Int)] = dStream3.reduceByKey(_ + _)
reduced.print()
ssc.start()
ssc.awaitTermination()
}
}
方式2:SparkStreamingはkafkaを統合し、この方式はkafkaクラスタが正常に運行する必要があり、コードは以下の通りである.
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingKafkaWordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
// Info, , , WARN,
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "test01:9092,test02:9092,test03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topics = Array("wordcount")
/*
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
*/
// DStream, KafkaUtils createDirectStream . Kafka API,Consumer Leader,
val value: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
// :StreamingContext
ssc,
// : , kafka Worker , ,
LocationStrategies.PreferConsistent,
// : , , 1 kafka topic, 2 kafka
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// map, object not serializable , InputStream ConsumerRecord , , Key Value
val value1: DStream[String] = value.map(cr => {
cr.value()
})
value1.print()
ssc.start()
ssc.awaitTermination()
}
}