Spark RDD Action演算子の基本使用(Java)

84942 ワード

Spark RDD Action演算子の基本使用(Java)
最近Spark RDD関連演算子の使用をまとめ、いくつかの基本的な使用方法をリストし、参考にすることができ、迅速に手に入れることができます.

package com.edward.spark.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import java.util.*;

import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.io.Serializable;

public class ApiTestAction {
    private static SparkConf conf = new SparkConf()
            .setMaster("local[1]")
            .setAppName("ApiTest");
    private static JavaSparkContext jsc = new JavaSparkContext(conf);

    /**
     * count(),       ,       long
     */
    private static void api_count()
    {
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,3);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        long value =rdd2.count();
        System.out.println(value);
    }


    /**
     *     value   ,     map
     */
    private static void api_countByValue()
    {
        Map<Integer,Long> map =new HashMap<>();
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,1);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        map=rdd.countByValue();
        map.forEach((k,v)->{
            System.out.println("key:" + k + " value:" + v);
        });

    }


    /**
     *   key   ,     map ;  JavaPairRDD  
     */
    private static void api_countByKey()
    {
        Map<Integer,Long> map =new HashMap<>();
        Map<Tuple2<Integer,String>,Long> map2 =new HashMap<>();
        List<Tuple2<Integer,String>> data3 =Arrays.asList(new Tuple2<>(1,"Edward"), new Tuple2<>(1,"CiCi"),new Tuple2<>(3,"Della"));
        JavaPairRDD<Integer,String> rdd3=jsc.parallelizePairs(data3,3);
        map=rdd3.countByKey();
        map2=rdd3.countByValue();
        map.forEach((k,v)->{
            System.out.println("key:" + k + " count:" + v);
        });
        map2.forEach((k,v)->{
            System.out.println("value:" + k + " count:" + v);
        });
    }

    /**
     *         ,    comparator  
     */
    private static void api_max()
    {
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,3);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        String value2 =rdd2.max(Comparator.naturalOrder());
        int value =rdd.max(Comparator.naturalOrder());
        System.out.println("value=" + value);
        System.out.println("value2=" + value2);
    }


    /**
     *         ,    comparator  
     */
    private static void api_min()
    {
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,3);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        String value2 =rdd2.max(Comparator.naturalOrder());
        int value =rdd.min(Comparator.naturalOrder());
        System.out.println("value=" + value);
        System.out.println("value2=" + value2);
    }


    /**
     *         
     */
    private static void api_first()
    {
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,3);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        String firstValue =rdd2.first();
        System.out.println(firstValue);

    }

    /**
     *             ,     List ;
     *                ,          
     */
    private static void api_collect()
    {
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,3);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        List<Integer> output =rdd.collect();
        System.out.println(output);
    }

    /**
     *     key value ,     List
     */
    private static void api_lookup()
    {
        Map<Integer,Long> map =new HashMap<>();
        Map<Tuple2<Integer,String>,Long> map2 =new HashMap<>();
        List<Tuple2<Integer,String>> data3 =Arrays.asList(new Tuple2<>(1,"Edward"), new Tuple2<>(1,"CiCi"),new Tuple2<>(3,"Della"));
        JavaPairRDD<Integer,String> rdd3=jsc.parallelizePairs(data3,3);
        List<String> out =rdd3.lookup(1);
        System.out.println(rdd3.lookup(1));
    }

    /**
     *     num     ,     List
     *              ,             
     *           ,         
     */
    private static void api_take()
    {
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,3);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        List<String> list =rdd2.take(3);
        System.out.println(list);

    }

    private static class TestComparator implements Serializable,Comparator<Tuple2<String, Integer>>{
        @Override
        public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
            return o1._2.compareTo(o2._2);
        }
    }

    private static class TestComparator2 implements Serializable,Comparator<Integer>{

        @Override
        public int compare(Integer o1, Integer o2) {
            return o1.compareTo(o2);
        }
    }

    private static class TestCompartor3 implements Serializable,Comparator<String>{

        @Override
        public int compare(String o1, String o2) {
            return o1.compareTo(o2);
        }
    }

    /**
     *              N   ,           
     *           ,          
     */
    private static void api_takeOrdered()
    {
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,3);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        List<Integer> output =rdd.takeOrdered(2);
        System.out.println(output);
        System.out.println(rdd2.takeOrdered(2, new TestCompartor3()));
        System.out.println(rdd.takeOrdered(3,new TestComparator2()));
    }


    /**
     *                N   ,           
     *           ,          
     */
    private static void api_top()
    {
        List<Tuple2<String, Integer>> pairs = Arrays.asList(new Tuple2<>("A", 1),
                new Tuple2<>("A", 2), new Tuple2<>("B", 1));
        JavaPairRDD< String, Integer> rdd = jsc.parallelizePairs(pairs, 3);
        List<Tuple2<String, Integer>> result = rdd.top(2, new TestComparator());
        System.out.println(result);
    }

    /**
     *               ,takeSample 3   ,
     * withReplacement:        (      ),true:     (       ) false:      (      )
     * num:      
     * seed:         (             ,             ,      (    ),    ;            (      8682522807148012L)             ;        )
     */
    private static void api_takeSample()
    {
        List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
        List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
        JavaRDD<Integer> rdd =jsc.parallelize(data,3);
        JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
        List<Integer> out=rdd.takeSample(false,3);
        System.out.println(out);
    }

    /**
     *   key-value Map
     */
    private static void api_collectAsMap()
    {
        List<Tuple2<Integer,String>> data =Arrays.asList(new Tuple2<>(1,"Edward"), new Tuple2<>(1,"CiCi"),new Tuple2<>(3,"Della"));
        JavaPairRDD<Integer,String> rdd =jsc.parallelizePairs(data,2);
        Map<Integer,String> map=rdd.collectAsMap();
        System.out.println(map);
    }

    /**
     *     ,zeroValue:    ;seqOp:             ;comOp:               
     *         :(0,0) ->(0+1,0+1)=(1,1) ->(1+1,1+1) =(2,2) ->(2+2,2+1)=(4,3) ->(4+3,3+1)=(7,4)
     *               
     */
    private static void api_aggregate()
    {
        List<Integer> data = Arrays.asList(1, 1, 2, 3);
        JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
        Tuple2<Integer, Integer> result = rdd.aggregate(new Tuple2<>(0,0),
                (x,y)->new Tuple2<>(x._1+y, x._2+1),
                (x,y)->new Tuple2<>(x._1+y._1, x._2+y._2));
        System.out.println(result);
        int result2 =rdd.aggregate(0, (x,y)-> (x+y), (x,y)-> (x+y));
        System.out.println(result2);
    }

    /**
     *         
     *  aggregate         ;fold           
     */
    private static void api_fold()
    {
        List<Integer> data = Arrays.asList(1, 1, 2, 3);
        JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
        int result =rdd.fold(0,(x,y)->(x+y));
        System.out.println(result);

    }

    /**
     *     
     *  fold          ,          
     */
    private static void api_reduce()
    {
        List<Integer> data = Arrays.asList(1, 1, 2, 3);
        JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
        int result =rdd.reduce((x,y)->(x+y));
        System.out.println(result);
    }


    /**
     *             
     */
    private static void api_foreach()
    {
        List<Integer> data = Arrays.asList(1, 1, 2, 3);
        JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
        rdd.foreach(x->System.out.println(x));
    }

    /**
     *                 
     *  foreach   :         ,  TCP IO  , foreachPartition            ,        
     */
    private static void api_foreachPartition()
    {
        List<Integer> data = Arrays.asList(1, 1, 2, 3);
        JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
        rdd.foreachPartition(x->x.forEachRemaining(y->System.out.println(y)));
    }

    private static void action_type1()
    {
        api_count();
        api_countByValue();
        api_countByKey();
        api_max();
        api_min();
    }

    private static void action_type2()
    {
        api_first();
        api_collect();
        api_collectPartitions();
        api_lookup();
        api_take();
        api_takeOrdered();
        api_top();
        api_takeSample();
        api_collectAsMap();
    }

    private static void action_type3()
    {
        api_aggregate();
        api_fold();
        api_reduce();
    }

    private static void action_type4()
    {
        api_foreach();
        api_foreachPartition();
    }

    public static void main(String[] args) {
        action_type1();
        action_type2();
        action_type3();
        action_type4();
        jsc.stop();
    }
}