spark streaming socketデータソースの使用
11927 ワード
1.socketを傍受するアナログsocketプログラムを作成する.SocketWordCount 3を作成する.状態に基づく単語の累計出現回数4.ウィンドウベースの単語累計出現回数
1.socketを傍受するアナログsocketプログラムの作成
2.SocketWordCountの作成
3.状態に基づく単語累計出現回数
4.ウィンドウベースの単語累計出現回数
1.socketを傍受するアナログsocketプログラムの作成
import java.io.PrintWriter
import java.net.ServerSocket
import scala.io.Source
object DataFlowSimulator {
//
def index(length:Int)={
import java.util.Random
val rdm = new Random();
rdm.nextInt(length)
}
def main(args: Array[String]): Unit = {
// , 、 、 ( : )
if(args.length != 3){
System.err.println("Usage " )
System.exit(-1)
}
//
val filename = args(0)
val lines = Source.fromFile(filename).getLines().toList
val filerow = lines.length
// ,
val lister = new ServerSocket(args(1).toInt)
while (true){
val socket = lister.accept()
new Thread(){
override def run(): Unit ={
println("Got client connection from:"+socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream,true)
while(true){
Thread.sleep(args(2).toLong)
// ,
val content = lines(index(filerow))
println(content)
out.write(content+'
')
out.flush()
}
out.close()
socket.close()
}
}.start()
}
}
}
2.SocketWordCountの作成
package streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SocketSparkStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("socketSparkStreaming").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val lines = ssc.socketTextStream("spark02",8089,StorageLevel.MEMORY_ONLY)
val worlds = lines.flatMap(_.split(","))
val worldCounts = worlds.map(x=>(x,1)).reduceByKey((_+_))
worldCounts.print()
//worldCounts.saveAsTextFiles("file:///home/spark/test/sparktest")
ssc.start()
ssc.awaitTermination()
}
}
3.状態に基づく単語累計出現回数
package streaming
import org.apache.hadoop.hdfs.server.common.Storage
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StateWorldCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// , values ,state
val updateFunc = (values:Seq[Int],state:Option[Int])=>{
val currentCount = values.foldLeft(0)(_+_)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val conf = new SparkConf().setAppName("stateSparkStreaming").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("file:///d:/checkpoint")
val lines = ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_ONLY)
val worldCounts = lines.flatMap(_.split(",")).map((_,1))
// updateStateByKey ,
val stateDStream = worldCounts.updateStateByKey(updateFunc)
stateDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
4.ウィンドウベースの単語累計出現回数
package streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowWordCount {
def main(args: Array[String]): Unit = {
// if(args.length != 4){
// System.err.println("Usage: WindowWorldCount " )
// System.exit(-1)
// }
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("window").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(5))
ssc.checkpoint("file:///H:/checkpoint")
val lines = ssc.socketTextStream("spark02",8089,StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(_.split(",")).map((_,1))
//windos , ,
//val worldCounts = words.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(10),Seconds(10))
val worldCounts = words.reduceByKeyAndWindow(_+_,_-_,Seconds(10),Seconds(10))
worldCounts.print()
ssc.start()
ssc.awaitTermination()
}
}