Sparkでよく使われるAction演算子

35627 ワード

1.reduce(function)
reduceはRDDの要素の2つを入力関数に渡し、同時に新しい値を生成し、新しい値とRDDの次の要素は最後に1つの値しかないまで入力関数に渡される.
/**
     * Reduce  
     */
    private static void reduce() {
        //   SparkConf JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("reduce")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //      ,   1 10,10   ,    10       
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);

        //   reduce             
        // reduce     :
        //             ,  call()  ,    ,       ,  1 + 2 = 3
        //               call()  ,    ,  3 + 3 = 6
        //     
        //   reduce     ,    ,            
        int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }

        });

        System.out.println(sum);

        //   JavaSparkContext
        sc.close();
    }

2.collect()
1つのRDDをArray配列としてすべての要素に戻します.(詳細は以下を参照:
https://blog.csdn.net/Fortuna_i/article/details/80851775)
  /**
     *   Collect java  
     */
    private static void collect(){
        //   SparkConf JavaSparkContext
        SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //      ,   1 10,10   ,    10       
        List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        JavaRDD<Integer> numbers =   sc.parallelize(numberList);

        //  map            2
        JavaRDD<Integer> doubleNumbers = numbers.map(

                new Function<Integer, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }

                });

        //   foreach action  ,        rdd    
        //    collect  ,          doubleNumbers RDD        
        //     ,       ,    rdd          ,    1  
        //         ,              ,        
        //   ,     ,    rdd           ,  oom  ,    
        //   ,  ,      foreach action  ,     rdd      
        List<Integer> doubleNumberList = doubleNumbers.collect();
        for(Integer num : doubleNumberList) {
            System.out.println(num);
        }

        //   JavaSparkContext
        sc.close();

    }

3.count()
データセット要素の数、デフォルトのLongタイプを返します
 /**
     *  Count java  
     */
    private static void count() {
        //   SparkConf JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("count")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //      ,   1 10,10   ,    10       
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);

        //  rdd  count  ,         
        long count = numbers.count();
        System.out.println(count);
        //   JavaSparkContext
        sc.close();
    }

4.take(n)
データセットの前のn要素を含む配列(0からn-1までの要素)を返し、ソートしません.
 /**
     *  Take   java  
     */
    private static void take() {
        //   SparkConf JavaSparkContext
        SparkConf conf = new SparkConf()
                .setAppName("take")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //      ,   1 10,10   ,    10       
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);

        //  rdd  count  ,         
        // take  , collect  ,        ,  rdd   
        //   collect   rdd     ,take     n   
        List<Integer> top3Numbers = numbers.take(3);
        for(Integer num : top3Numbers) {
            System.out.println(num);
        }

        //   JavaSparkContext
        sc.close();
    }

5.saveAsTextFile(path)
dataSetの要素をテキストファイルとしてローカルファイルシステムやHDFSなどに書き込む.Sparkは各要素に対してtoStringメソッドを呼び出し、データ要素をテキストファイルの1行のレコードに変換します.
ファイルをローカルファイルシステムに保存すると、executorが存在するマシンのローカルディレクトリにのみ保存されます.
/**
 * saveAsTextFile java  
 */
private static void saveAsTextFile() {
    //   SparkConf JavaSparkContext
    SparkConf conf = new SparkConf()
            .setAppName("saveAsTextFile").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    //      ,   1 10,10   ,    10       
    List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    JavaRDD<Integer> numbers = sc.parallelize(numberList);

    //   map            2
    JavaRDD<Integer> doubleNumbers = numbers.map(

            new Function<Integer, Integer>() {

                private static final long serialVersionUID = 1L;

                @Override
                public Integer call(Integer v1) throws Exception {
                    return v1 * 2;
                }

            });

    //    rdd    ,   HFDS   
    //      ,           ,     
    //      ,        /double_number.txt/part-00000  
    //doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");
    doubleNumbers.saveAsTextFile("C:\\Users\\Desktop\\spark");
    //   JavaSparkContext
    sc.close();
}

6.countByKey()
RDD[K,V]のK当たりの数を統計し,key当たりのカウントを持つ(k,int)pairsのhashMapを返す.
  /**
     *  countByKey java  
     */
    private static void countByKey() {
        //   SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("countByKey")
                .setMaster("local");
        //   JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        //     
        List<Tuple2<String, String>> scoreList = Arrays.asList(
                new Tuple2<String, String>("class1", "leo"),
                new Tuple2<String, String>("class2", "jack"),
                new Tuple2<String, String>("class1", "marry"),
                new Tuple2<String, String>("class2", "tom"),
                new Tuple2<String, String>("class2", "david"));

        //      ,  JavaPairRDD
        JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);

        //  rdd  countByKey  ,           ,       key       
        //    countByKey   
        // countByKey     ,    Map
        Map<String, Object> studentCounts = students.countByKey();

        for(Map.Entry<String, Object> studentCount : studentCounts.entrySet()) {
            System.out.println(studentCount.getKey() + ": " + studentCount.getValue());
        }

        //   JavaSparkContext
        sc.close();
    }