java言語を使ってsparkのwodCount入門プログラムを実現します.


sparkはscalaを使って書いているので、scalaを使ってsparkプログラムを作るのは正常ですが、ほとんどの場合はjava言語を使ってsparkプログラムを作成します.
scalaを学んだ後にsparkのソースコードを読むことができるためです!
以下はプログラムです
package com.john.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.junit.Test;
import scala.Serializable;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class JavaSpark01 implements Serializable {

    @Test
    public void testJavaSpark () {

        //1.  Spark     SparkConf,      spark     
        SparkConf sparkConf = new SparkConf().setAppName("spark_java01").setMaster("local[2]");

        //2.  SparkContext  ,SparkContext Spark           (java:JavaSparkContext)
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //        
        sc.setLogLevel("WARN");

        //3.    
        JavaRDD lineRdd = sc.textFile("D:\\Develop\\spark\\javaSpark\\input");

        /**
         * 4.          
         * new FlatMapFunction  string           
         * Override call               ,     Iterable   
         *
         * flatmap         spark  ,           rdd                rdd  
         *
         */
        JavaRDD wordsRdd = lineRdd.flatMap(new FlatMapFunction() {
            public Iterator call(String s) throws Exception {

                String[] words = s.split(" ");

                return Arrays.asList(words).iterator();
            }
        });

        /**
         * 5.          
         * map     ,   MR map  
         * pairFunction: T:    ;K,V:     
         *     call      
         */
        JavaPairRDD wordAndOneRdd = wordsRdd.mapToPair(new PairFunction() {

            public Tuple2 call(String s) throws Exception {
                //       :
                return new Tuple2(s, 1);
            }
        });

        /**
         * 6.        
         * reduceByKey  ,   MR reduce
         *         (       ones) KV     ,      key       ,     
         */
        JavaPairRDD wordAndCountRdd = wordAndOneRdd.reduceByKey(new Function2() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //7.  
        JavaPairRDD sortedRdd = wordAndCountRdd.sortByKey();

        //8.collect     spark RDD          java    
        List> finalResult = sortedRdd.collect();

        //9.       
        for (Tuple2 tuple2 : finalResult) {
            System.out.println(tuple2._1 + "---->" + tuple2._2);
        }

        //10.             
        sortedRdd.saveAsTextFile("D:\\Develop\\spark\\javaSpark\\output");

        //11.  sparkContxt
        sc.stop();

        }

}
注意が必要です.flatMapメソッドにおけるコール方法におけるリターン結果(return Arays.asList(words).iterator();
場合によっては、後のフォローが必要です.ネット上にある答えはそのままです.return Arays.asList;