spark streamingテストの2ネットワークデータソースの使用
テストの考え方:
まず、ネットワークデータソースデータ送信器(プログラム1)を作成する.
次に、spark受信データプログラム(プログラム2)を作成する.
次に、プログラムをパッケージ化し、サーバ上で実行します.ここには3つのパラメータがあります.送信するデータファイルは、どのポート番号で送信され、何ミリ秒おきにデータが送信されますか.
最後にsparkプログラムを実行し,ここでは5秒おきにデータを処理する.2つのパラメータがあります.リスニングされたポート番号で、数ミリ秒ごとにデータが受信されます.
効果を観察する.
プログラム1:
プログラム2:
まず、ネットワークデータソースデータ送信器(プログラム1)を作成する.
次に、spark受信データプログラム(プログラム2)を作成する.
次に、プログラムをパッケージ化し、サーバ上で実行します.ここには3つのパラメータがあります.送信するデータファイルは、どのポート番号で送信され、何ミリ秒おきにデータが送信されますか.
最後にsparkプログラムを実行し,ここでは5秒おきにデータを処理する.2つのパラメータがあります.リスニングされたポート番号で、数ミリ秒ごとにデータが受信されます.
効果を観察する.
プログラム1:
sparkStreaming
import java.io.PrintWriter
import java.net.ServerSocket
import scala.io.Source
object SalaSimulation {
(length: ) = {
java.util.Random
rdm = Random
rdm.nextInt(length)
}
(args: Array[]){
(args.length != ){
System..println()
System.()
}
filename = args()
lines = Source.(filename).getLines.toList
filerow = lines.length
listener = ServerSocket(args().toInt)
(){
socket = listener.accept()
Thread(){
= {
(+socket.getInetAddress)
out = PrintWriter(socket.getOutputStream())
(){
Thread.(args().toLong)
content = lines((filerow))
(content)
out.write(content +)
out.flush()
}
socket.close()
}
}.start()
}
}
}
プログラム2:
sparkStreaming
import org.apache.log4j.{LoggerLevel}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{SecondsStreamingContext}
import org.apache.spark.{SparkContextSparkConf}
import org.apache.spark.streaming.StreamingContext._
object NetworkWordCount {
def main(args: Array[]){
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
conf = SparkConf().setAppName().setMaster()
sc = SparkContext(conf)
ssc = StreamingContext(sc())
lines = ssc.socketTextStream(args()args().toIntStorageLevel.)
words = lines.flatMap(_.split())
wordCounts = words.map(x=>(x)).reduceByKey(_+_)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}