flinkとsparkはScalaプログラミングのコードの対比を使います
17100 ワード
Flink():リアルタイム性が高く、スループットが高く、オフライン+リアルタイム演算子が豊富
Spark Streaming():遅延(ミリ秒レベル)があり、スループットが高く、オフライン+リアルタイム演算子が豊富で、機械学習が可能で、図計算(生態圏)
flinkのバッチ処理は実際の上流処理であり、1つのバッチ処理を大きなストリームと見なすだけであるが、sparkのリアルタイム実時間バッチ処理は、極小のロットであり、例えば1秒の1つのロットを1回処理することで、ストリーム処理と見なすことができるが、遅延Sparkがリアルタイム計算を行うことがあり、もとは1回に1つの大きなRDDを提出し、リアルタイム計算をするには、絶えずデータを読み取り、複数の小さなRDDを形成する必要がある.一定時間ごとに小さなRDDが生成され、小さなRDDがクラスタにコミットされる.
次に、Scalaを使用してflinkとsparkをプログラミングするときの比較を示します.
flinkリアルタイムwordcount:
sparkのリアルタイムwordcount
flinkのバッチ:
sparkのバッチ処理
Spark Streaming():遅延(ミリ秒レベル)があり、スループットが高く、オフライン+リアルタイム演算子が豊富で、機械学習が可能で、図計算(生態圏)
flinkのバッチ処理は実際の上流処理であり、1つのバッチ処理を大きなストリームと見なすだけであるが、sparkのリアルタイム実時間バッチ処理は、極小のロットであり、例えば1秒の1つのロットを1回処理することで、ストリーム処理と見なすことができるが、遅延Sparkがリアルタイム計算を行うことがあり、もとは1回に1つの大きなRDDを提出し、リアルタイム計算をするには、絶えずデータを読み取り、複数の小さなRDDを形成する必要がある.一定時間ごとに小さなRDDが生成され、小さなRDDがクラスタにコミットされる.
次に、Scalaを使用してflinkとsparkをプログラミングするときの比較を示します.
flinkリアルタイムwordcount:
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author: zxl
* @Date: 2019/1/12 10:16
* @Version 1.0
* , wordcount
*/
object FlinkDemo01 {
def main(args: Array[String]): Unit = {
// flink ,
val tool = ParameterTool.fromArgs(args)
// hostname,
val hostname = tool.get("hostname","192.168.136.150")
val port = tool.getInt("port",9999)
// env spark StreamContext
val env = StreamExecutionEnvironment.getExecutionEnvironment
//
val data = env.socketTextStream(hostname,port)
//
import org.apache.flink.api.scala._
val result = data.flatMap(line=>line.split(","))
.map((_,1))
.keyBy(0) // 0
.timeWindow(Time.seconds(6),Time.seconds(2))// 2
.sum(1) // sum
//
result.print().setParallelism(1)
// , job
env.execute("FlinkDemo01")
}
}
sparkのリアルタイムwordcount
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author: zxl
* @Date: 2019/1/12 10:59
* @Version 1.0
*/
object SparkDemo02 {
def main(args: Array[String]): Unit = {
// 2
val conf = new SparkConf().setAppName("SparkDemo02").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(1))
//Dstream, DStream
val data: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.136.150",9999)
val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
print("--------------------")
result.print()
ssc.start()
ssc.awaitTermination()
}
}
flinkのバッチ:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* @Author: zxl
* @Date: 2019/1/12 10:46
* @Version 1.0
*/
object FlinkDemo02 {
def main(args: Array[String]): Unit = {
// env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//
val file: DataStream[String] = env.readTextFile("d:/yue/applist.txt")
//
import org.apache.flink.api.scala._
val result = file.flatMap(line => line.split("\t"))
.map((_, 1))
.keyBy(0)
.sum(1)
result.print().setParallelism(1)
env.execute("FlinkDemo02")
}
}
sparkのバッチ処理
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author: zxl
* @Date: 2019/1/12 10:54
* @Version 1.0
*/
object SparkDemo01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkDemo01").setMaster("local")
val sc = new SparkContext(conf) //
val file = sc.textFile("d:/yue/applist.txt")
val result = file.flatMap(_.split("\t"))
.map((_, 1)).reduceByKey(_ + _)
val tuples: Array[(String, Int)] = result.collect()
tuples.foreach(println)
sc.stop()
}
}