Spark Streaming開発入門――WordCount(Java&Scal)
9207 ワード
一、Java方式の開発
1、開発前準備
Sparkクラスタを構築したと仮定します。
2、開発環境はeclipse mavenプロジェクトを採用しており、Spark Streaming依存を追加する必要があります。
1.localモードは、local後は2以上の数字でなければなりません。receiverが一つを占めたからです。なぜなら、SparkStreamingは、実行時に少なくとも1つのスレッドが繰り返してデータを受信し、少なくとも1つのスレッドが受信したデータを処理する必要があるからです。スレッドが一つしかないと、受信したデータは処理されません。
次に、Javaコードを書き始めましょう。
ステップ1:
私たちは配置ファイルに基づいてSparkStreaming Contectオブジェクトを作成します。
データソースをローカルポート9999に設定します。
1、flatMap操作:
二、Scara方式の開発
1、開発前準備
Sparkクラスタを構築したと仮定します。
2、開発環境はeclipse mavenプロジェクトを採用しており、Spark Streaming依存を追加する必要があります。
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming_2.10artifactId>
<version>1.6.0version>
dependency>
3、Spark streamingはSpark Coreに基づいて計算します。注意事項が必要です。1.localモードは、local後は2以上の数字でなければなりません。receiverが一つを占めたからです。なぜなら、SparkStreamingは、実行時に少なくとも1つのスレッドが繰り返してデータを受信し、少なくとも1つのスレッドが受信したデータを処理する必要があるからです。スレッドが一つしかないと、受信したデータは処理されません。
:
クラスタにとって、exccturごとに、一般的には一つのThreadだけでなく、Spark Streamingアプリケーションを処理するには、各exectorは、一般的にどのぐらいのcoreを割り当てるべきですか?私達の過去の経験によると、5つぐらいのcoreが一番いいです。(段:奇数のcoreに割り当てられた表現が一番いいです。例えば、3つ、5つ、7つのcoreなど)次に、Javaコードを書き始めましょう。
ステップ1:
SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("wordCountOnline");
ステップ2:私たちは配置ファイルに基づいてSparkStreaming Contectオブジェクトを作成します。
/**
* : SparkStreamingContext, SparkStreaming
* SparkStreamingContext SparkConf , SparkStreamingContext
* ( Driver , Spark Streaming 7*24 ,
* Drver , Checkpoint)
*
*
* 2. SparkStreaming SparkStreamingContext , SparkStreaming
* SparkStreamingContext , , :
*
* SparkStreaming Spark Core 。
* Spark 。
* Spark, SparkStreaming 。
*
* java os c。c++ 。
*
* ssc
*
*
*/
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
第3ステップ、Spark Streaming入力データソースを作成します。データソースをローカルポート9999に設定します。
/**
* : Spark Streaming :input Stream
* 1. File、HDFS、Flume、Kafka、Socket
*
* 2. Socket ,
* Spark Streaming ( , )。
*
* ( )
*
* 3. 5 , job 。 。
* Job , , Job;
*
*
*/
JavaReceiverInputDStream lines = jsc.socketTextStream("Master", 9999);
第四ステップ:私たちはRDDをプログラミングするようにDStreamに基づいてプログラミングします。原因はDStreamはRDDが生成したテンプレートです。Spark Streamingが計算される前に、その実質は各BatchのDStreamの操作をRDD操作に翻訳しました。DStreamはRDDを抽象化した。DataFrameのようにRDDを抽象化する。JavaRDD->Java DStream1、flatMap操作:
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(line.split(" "));
}
});
2、mapToPair操作: JavaPairDStream pairs = words.mapToPair(new PairFunction() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2(word,1);
}
});
3、reduceByKey操作: JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
4、printなどの操作: /**
* print Job , Spark Streaming 。
* Job Duration 。
*
*
* Spark Job DStream output Stream 。
* ouput Stream , print、savaAsTextFile、saveAsHadoopFiles , foreachRDD, Spark Streamimg
* Redis、DB、DashBoard ,foreachRDD
* , 。
*
*/
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
:
print(print)方法が処理したデータを出力する以外に、他の方法も非常に重要であり、開発においては、SaveAsTextFile、SaveAsHadoopFileなど、最も重要なのはforeachRDD方法であり、この方法はRedis、DB、DashBoardなどにデータを書き込むことができます。二、Scara方式の開発
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
/**
* @author lxh
*/
object WordCountOnline {
def main(args: Array[String]): Unit = {
/**
* SparkConf
*/
val conf = new SparkConf().setMaster("spark://Master:7077").setAppName("wordcountonline")
val ssc = new StreamingContext(conf,Seconds(1))
val lines = ssc.socketTextStream("Master", 9999, StorageLevel.MEMORY_AND_DISK)
/**
*
*/
val words = lines.flatMap { line => line.split(" ") }
/**
* word tuple
*/
val wordCount = words.map { word => (word,1) }
/**
* (key1,1) (key1,1)
* key 。
*/
wordCount.reduceByKey(_+_)
wordCount.print()
ssc.start()
/**
*
*/
ssc.awaitTermination()
ssc.stop(true)
}
}