spark streaming socketデータソースの使用

11927 ワード

1.socketを傍受するアナログsocketプログラムを作成する.SocketWordCount 3を作成する.状態に基づく単語の累計出現回数4.ウィンドウベースの単語累計出現回数
1.socketを傍受するアナログsocketプログラムの作成
import java.io.PrintWriter
import java.net.ServerSocket

import scala.io.Source

object DataFlowSimulator {

  //           
  def index(length:Int)={
    import java.util.Random
    val rdm = new Random();
    rdm.nextInt(length)
  }

  def main(args: Array[String]): Unit = {

    //            ,      、   、    (  :  )
    if(args.length != 3){
      System.err.println("Usage   ")
      System.exit(-1)
    }

    //        
    val filename = args(0)
    val lines = Source.fromFile(filename).getLines().toList
    val filerow = lines.length

    //    ,            
    val lister = new ServerSocket(args(1).toInt)
    while (true){
      val socket = lister.accept()
      new Thread(){
        override def run(): Unit ={
          println("Got client connection from:"+socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream,true)
          while(true){
            Thread.sleep(args(2).toLong)
            //         ,          
            val content = lines(index(filerow))
            println(content)
            out.write(content+'
'
) out.flush() } out.close() socket.close() } }.start() } } }

2.SocketWordCountの作成
package streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SocketSparkStreaming {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("socketSparkStreaming").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(5))

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val lines = ssc.socketTextStream("spark02",8089,StorageLevel.MEMORY_ONLY)

    val worlds = lines.flatMap(_.split(","))

    val worldCounts = worlds.map(x=>(x,1)).reduceByKey((_+_))
    worldCounts.print()
    //worldCounts.saveAsTextFiles("file:///home/spark/test/sparktest")
    ssc.start()
    ssc.awaitTermination()
  }
}

3.状態に基づく単語累計出現回数
package streaming
import org.apache.hadoop.hdfs.server.common.Storage
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StateWorldCount {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)


    //    ,  values           ,state          
    val updateFunc = (values:Seq[Int],state:Option[Int])=>{
      val currentCount = values.foldLeft(0)(_+_)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val conf = new SparkConf().setAppName("stateSparkStreaming").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(5))
    ssc.checkpoint("file:///d:/checkpoint")
    val lines = ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_ONLY)

    val worldCounts = lines.flatMap(_.split(",")).map((_,1))

    //  updateStateByKey     ,                 
    val stateDStream = worldCounts.updateStateByKey(updateFunc)
    stateDStream.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

4.ウィンドウベースの単語累計出現回数
package streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowWordCount {
  def main(args: Array[String]): Unit = {

//    if(args.length != 4){
//      System.err.println("Usage: WindowWorldCount    ")
//      System.exit(-1)
//    }

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("window").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(5))

    ssc.checkpoint("file:///H:/checkpoint")

    val lines = ssc.socketTextStream("spark02",8089,StorageLevel.MEMORY_ONLY)
    val words = lines.flatMap(_.split(",")).map((_,1))

    //windos  ,        ,        
    //val worldCounts = words.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(10),Seconds(10))
    val worldCounts = words.reduceByKeyAndWindow(_+_,_-_,Seconds(10),Seconds(10))

    worldCounts.print()
    ssc.start()
    ssc.awaitTermination()

  }
}