Spark Streaming開発入門――WordCount(Java&Scal)


一、Java方式の開発
1、開発前準備
Sparkクラスタを構築したと仮定します。
2、開発環境はeclipse mavenプロジェクトを採用しており、Spark Streaming依存を追加する必要があります。
    <dependency>
      <groupId>org.apache.sparkgroupId>
      <artifactId>spark-streaming_2.10artifactId>
      <version>1.6.0version>
    dependency>
3、Spark streamingはSpark Coreに基づいて計算します。注意事項が必要です。
1.localモードは、local後は2以上の数字でなければなりません。receiverが一つを占めたからです。なぜなら、SparkStreamingは、実行時に少なくとも1つのスレッドが繰り返してデータを受信し、少なくとも1つのスレッドが受信したデータを処理する必要があるからです。スレッドが一つしかないと、受信したデータは処理されません。
クラスタにとって、exccturごとに、一般的には一つのThreadだけでなく、Spark Streamingアプリケーションを処理するには、各exectorは、一般的にどのぐらいのcoreを割り当てるべきですか?私達の過去の経験によると、5つぐらいのcoreが一番いいです。(段:奇数のcoreに割り当てられた表現が一番いいです。例えば、3つ、5つ、7つのcoreなど)
次に、Javaコードを書き始めましょう。
ステップ1:
SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("wordCountOnline");
ステップ2:
私たちは配置ファイルに基づいてSparkStreaming Contectオブジェクトを作成します。
    /**
     *    :  SparkStreamingContext,   SparkStreaming                    
     * SparkStreamingContext       SparkConf  ,        SparkStreamingContext        
     * (      Driver       ,  Spark Streaming    7*24          ,
     *      Drver           ,              Checkpoint)
     * 
     * 
     * 2.   SparkStreaming          SparkStreamingContext  ,     SparkStreaming    
     *           SparkStreamingContext     ,  ,        :
     * 
     * SparkStreaming      Spark Core        。
     *      Spark          。
     *       Spark,  SparkStreaming    。
     * 
     * java    os           c。c++    。
     * 
     *            ssc
     * 
     * 
     */
    JavaStreamingContext jsc = new  JavaStreamingContext(conf,Durations.seconds(5));
第3ステップ、Spark Streaming入力データソースを作成します。
データソースをローカルポート9999に設定します。
    /**
     *    :  Spark Streaming       :input Stream
     * 1.          File、HDFS、Flume、Kafka、Socket
     * 
     * 2.              Socket  ,
     * Spark Streaming                       (             ,                    )。
     * 
     *                    (         )
     * 
     * 3.        5         ,       job             。              。
     *                  Job         ,      ,     Job;
     * 
     * 
     */
    JavaReceiverInputDStream  lines = jsc.socketTextStream("Master", 9999);
第四ステップ:私たちはRDDをプログラミングするようにDStreamに基づいてプログラミングします。原因はDStreamはRDDが生成したテンプレートです。Spark Streamingが計算される前に、その実質は各BatchのDStreamの操作をRDD操作に翻訳しました。DStreamはRDDを抽象化した。DataFrameのようにRDDを抽象化する。JavaRDD->Java DStream
1、flatMap操作:
    JavaDStream<String> words =  lines.flatMap(new FlatMapFunction<String, String>() {

        @Override
        public Iterable<String> call(String line) throws Exception {
            // TODO Auto-generated method stub
            return Arrays.asList(line.split(" "));
        }
    });
2、mapToPair操作:
    JavaPairDStream pairs = words.mapToPair(new PairFunction() {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public Tuple2 call(String word) throws Exception {
            // TODO Auto-generated method stub
            return new Tuple2(word,1);
        }
    });
3、reduceByKey操作:
    JavaPairDStream<String, Integer>  wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            // TODO Auto-generated method stub
            return v1 + v2;
        }
    });
4、printなどの操作:
    /**
     *    print       Job   ,          Spark Streaming        。
     *         Job         Duration        。
     * 
     * 
     * Spark           Job DStream    output Stream  。
     * ouput Stream           , print、savaAsTextFile、saveAsHadoopFiles ,      foreachRDD,  Spark Streamimg         
     *   Redis、DB、DashBoard   ,foreachRDD  
     *             ,                  。
     * 
     */
    wordsCount.print();

    jsc.start();

    jsc.awaitTermination();
    jsc.close();
print(print)方法が処理したデータを出力する以外に、他の方法も非常に重要であり、開発においては、SaveAsTextFile、SaveAsHadoopFileなど、最も重要なのはforeachRDD方法であり、この方法はRedis、DB、DashBoardなどにデータを書き込むことができます。
二、Scara方式の開発
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

/**
 * @author lxh
 */
object WordCountOnline {

  def main(args: Array[String]): Unit = {

    /**
   *   SparkConf
   */
  val conf =  new SparkConf().setMaster("spark://Master:7077").setAppName("wordcountonline")

  val ssc  = new StreamingContext(conf,Seconds(1))

  val lines = ssc.socketTextStream("Master", 9999, StorageLevel.MEMORY_AND_DISK)

  /**
   *      
   */
  val words = lines.flatMap { line => line.split(" ") }

  /**
   *     word  tuple
   */
  val wordCount  = words.map { word => (word,1) }

  /**
   * (key1,1) (key1,1)
   * key     。
   */
  wordCount.reduceByKey(_+_)

  wordCount.print()
  ssc.start()

  /**
   *       
   */
  ssc.awaitTermination()
  ssc.stop(true)

  }
}