flinkとsparkはScalaプログラミングのコードの対比を使います

17100 ワード

Flink():リアルタイム性が高く、スループットが高く、オフライン+リアルタイム演算子が豊富
Spark Streaming():遅延(ミリ秒レベル)があり、スループットが高く、オフライン+リアルタイム演算子が豊富で、機械学習が可能で、図計算(生態圏)
flinkのバッチ処理は実際の上流処理であり、1つのバッチ処理を大きなストリームと見なすだけであるが、sparkのリアルタイム実時間バッチ処理は、極小のロットであり、例えば1秒の1つのロットを1回処理することで、ストリーム処理と見なすことができるが、遅延Sparkがリアルタイム計算を行うことがあり、もとは1回に1つの大きなRDDを提出し、リアルタイム計算をするには、絶えずデータを読み取り、複数の小さなRDDを形成する必要がある.一定時間ごとに小さなRDDが生成され、小さなRDDがクラスタにコミットされる.
次に、Scalaを使用してflinkとsparkをプログラミングするときの比較を示します.
flinkリアルタイムwordcount:

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time


/**
  * @Author: zxl
  * @Date: 2019/1/12 10:16
  * @Version 1.0
  *               ,     wordcount
  */
object FlinkDemo01 {

  def main(args: Array[String]): Unit = {
  // flink       ,      
    val tool = ParameterTool.fromArgs(args)
//   hostname,         
    val hostname = tool.get("hostname","192.168.136.150")
    val port = tool.getInt("port",9999)
    // env    spark  StreamContext   
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //               
    val data = env.socketTextStream(hostname,port)
    //         
    import org.apache.flink.api.scala._

    val result = data.flatMap(line=>line.split(","))
      .map((_,1))
      .keyBy(0)  //   0       
      .timeWindow(Time.seconds(6),Time.seconds(2))//  2         
      .sum(1) //         sum

    //         
    result.print().setParallelism(1)

    //   ,   job 
    env.execute("FlinkDemo01")

  }
}


sparkのリアルタイムwordcount

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Author: zxl
  * @Date: 2019/1/12 10:59
  * @Version 1.0
  */
object SparkDemo02 {
  def main(args: Array[String]): Unit = {
    //       2    
    val conf = new SparkConf().setAppName("SparkDemo02").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(1))
    //Dstream,   DStream
    val data: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.136.150",9999)

   val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    print("--------------------")

    result.print()
    ssc.start()

    ssc.awaitTermination()


  }

}


flinkのバッチ:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
  * @Author: zxl
  * @Date: 2019/1/12 10:46
  * @Version 1.0
  */
object FlinkDemo02 {
  def main(args: Array[String]): Unit = {
  //  env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //      
    val file: DataStream[String] = env.readTextFile("d:/yue/applist.txt")
//      
    import org.apache.flink.api.scala._

    val result = file.flatMap(line => line.split("\t"))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    result.print().setParallelism(1)

    env.execute("FlinkDemo02")
  }

}

sparkのバッチ処理

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Author: zxl
  * @Date: 2019/1/12 10:54
  * @Version 1.0
  */
object SparkDemo01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkDemo01").setMaster("local")
    val sc = new SparkContext(conf) //   
    val file = sc.textFile("d:/yue/applist.txt")
    val result = file.flatMap(_.split("\t"))
      .map((_, 1)).reduceByKey(_ + _)

    val tuples: Array[(String, Int)] = result.collect()
    tuples.foreach(println)
    sc.stop()

  }

}