sparkStreamingカスタムデータソース
10929 ワード
カスタムデータソースを要求し、あるポート番号を監視し、そのポート番号の内容を取得するにはReceiverを継承し、onStart、onStop方法を実現してデータソースの採集をカスタマイズする必要がある.
コード実装
カスタム・データ・ソースを使用してデータを収集
コード実装
package com.atguigu
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
// , , : Spark
override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
// Spark
def receive(): Unit = {
// Socket
var socket: Socket = new Socket(host, port)
// ,
var input: String = null
// BufferedReader
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
input = reader.readLine()
// receiver , Spark
while (!isStopped() && input != null) {
store(input)
input = reader.readLine()
}
//
reader.close()
socket.close()
//
restart("restart")
}
override def onStop(): Unit = {}
}
カスタム・データ・ソースを使用してデータを収集
object FileStream {
def main(args: Array[String]): Unit = {
//1. Spark
Val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")
//2. SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(5))
//3. receiver Streaming
val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))
//4. ,
val wordStreams = lineStream.flatMap(_.split("\t"))
//5. (word,1)
val wordAndOneStreams = wordStreams.map((_, 1))
//6.
val wordAndCountStreams] = wordAndOneStreams.reduceByKey(_ + _)
//7.
wordAndCountStreams.print()
//8. SparkStreamingContext
ssc.start()
ssc.awaitTermination()
}
}