spark 2.x浅入深底シリーズ六のRDD java apiから詳しく説明する


sparkのどんな技術を学ぶ前に、まずsparkを正しく理解して、参考にすることができます:sparkを正しく理解します
以下に、RDDの3つの作成方法、単一タイプRDDの基本的なtransformation api、サンプリングApi、pipe操作についてjava apiについて説明する.
一、RDDの三つの作成方式
  • 安定したファイルストレージシステムから、local file SystemやhdfsなどのRDDが作成される.
    // hdfs     
    JavaRDD textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt");
    
    //           ,  file:         ///,    ,     
    //           ,     RDD       ,                
    //              
    JavaRDD textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt" 2 );

    2.  transformation apiを介して既存のRDDから新しいRDDを作成できます.以下はmapという変換apiです.
    JavaRDD mapRDD = textFileRDD.map(new Function() {
        @Override
        public String call(String s) throws Exception {
            return s + "test";
        }
    });
    System.out.println("mapRDD = " + mapRDD.collect());

    3.  メモリ内のリストデータからRDDを作成し、RDDのパーティション数を指定できます.指定しない場合は、Executorのすべてのcores数を取得します.
    //        JavaRDD
    JavaRDD integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3, 4), 2);
    System.out.println("integerJavaRDD = " + integerJavaRDD.glom().collect());
    
    //           Double JavaRDD
    JavaDoubleRDD doubleJavaDoubleRDD = sc.parallelizeDoubles(Arrays.asList(2.0, 3.3, 5.6));
    System.out.println("doubleJavaDoubleRDD = " + doubleJavaDoubleRDD.collect());
    
    //    key-value   RDD
    import scala.Tuple2;
    JavaPairRDD javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));
    System.out.println("javaPairRDD = " + javaPairRDD.collect());

    注意:第3のケースでは、scalaにはmakeRDD apiも用意されています.このapiは、spark core RDD scala apiを参照して、RDDの各パーティションを作成するマシンを指定することができます.
    二、単一タイプRDD基本的なtransformation api
    メモリ内のデータに基づいてRDDを作成する
    JavaRDD integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
  •  map操作は、integerJavaRDDの各要素にカスタム関数インタフェースを適用することを示し、以下に各要素に1:
  • を追加する.
    JavaRDD mapRDD = integerJavaRDD.map(new Function() {
        @Override
        public Integer call(Integer element) throws Exception {
            return element + 1;
        }
    });
    //  :[2, 3, 4, 4]
    System.out.println("mapRDD = " + mapRDD.collect());

    なお、mapオペレーションは、RDDとは異なるタイプのデータを返すことができ、以下のように、カスタムUserオブジェクトを返すことができる.
    public class User implements Serializable {
        private String userId;
    
        private Integer amount;
    
        public User(String userId, Integer amount) {
            this.userId = userId;
            this.amount = amount;
        }
        //getter setter....
        @Override
        public String toString() {
            return "User{" +
                    "userId='" + userId + '\'' +
                    ", amount=" + amount +
                    '}';
        }
    }
    JavaRDD userJavaRDD = integerJavaRDD.map(new Function() {
        @Override
        public User call(Integer element) throws Exception {
            if (element  
      

    2.  flatMap , integerJavaRDD FlatMapFunction, ,flatMap

    JavaRDD flatMapJavaRDD = integerJavaRDD.flatMap(new FlatMapFunction() {
        @Override
        public Iterator call(Integer element) throws Exception {
            //    list,  list     0 element
            List list = new ArrayList<>();
            int i = 0;
            while (i <= element) {
                list.add(i);
                i++;
            }
            return list.iterator();
        }
    });
    //  : [0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3]
    System.out.println("flatMapJavaRDD = " + flatMapJavaRDD.collect());

    3.  filter では、integerJavaRDDの にカスタムフィルタ を し、 な をフィルタします. のように、1に しくない をフィルタします.
    JavaRDD filterJavaRDD = integerJavaRDD.filter(new Function() {
        @Override
        public Boolean call(Integer integer) throws Exception {
            return integer != 1;
        }
    });
    //   :[2, 3, 3]
    System.out.println("filterJavaRDD = " + filterJavaRDD.collect());

    4.  glom 、integerJavaRDD パーティションに する データを
    JavaRDD> glomRDD = integerJavaRDD.glom();
    //  : [[1, 2], [3, 3]],   integerJavaRDD     ,         1 2,         3 3
    System.out.println("glomRDD = " + glomRDD.collect());

    5.  mapPartitions は、integerJavaRDDの パーティションのデータにカスタマイズされた インタフェースメソッドを します. に を する があると します.この の には がかかります.この 、mapPartitionsを すると、 のように きなメリットがあります.
    //            ,          
    public static Integer getInitNumber(String source) {
        System.out.println("get init number from " + source + ", may be take much time........");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }
    
    JavaRDD mapPartitionTestRDD = integerJavaRDD.mapPartitions(new FlatMapFunction, Integer>() {
        @Override
        public Iterator call(Iterator integerIterator) throws Exception {
            //            ,integerJavaRDD     ,       getInitNumber  
            //                 ,            ,     mapPartitions             ,      map  
            Integer initNumber = getInitNumber("mapPartitions");
    
            List list = new ArrayList<>();
            while (integerIterator.hasNext()) {
                list.add(integerIterator.next() + initNumber);
            }
            return list.iterator();
        }
    });
    //   : [2, 3, 4, 4]
    System.out.println("mapPartitionTestRDD = " + mapPartitionTestRDD.collect());
    
    JavaRDD mapInitNumberRDD = integerJavaRDD.map(new Function() {
        @Override
        public Integer call(Integer integer) throws Exception {
            //                  ,  integerJavaRDD  4   ,    getInitNumber      4 ,        ,  mapPartitions   
            Integer initNumber = getInitNumber("map");
            return integer + initNumber;
        }
    });
    //   :[2, 3, 4, 4]
    System.out.println("mapInitNumberRDD = " + mapInitNumberRDD.collect());

    6.  mapPartitionsWithIndex はintegerJavaRDDの パーティションのデータに して のカスタマイズした インタフェースの を して、 インタフェースの を する パーティションの を って、つまりあなたが しているのが のパーティションのデータであることを っています
    JavaRDD mapPartitionWithIndex = integerJavaRDD.mapPartitionsWithIndex(new Function2, Iterator>() {
        @Override
        public Iterator call(Integer partitionId, Iterator integerIterator) throws Exception {
            //partitionId               
            System.out.println("partition id = " + partitionId);
            List list = new ArrayList<>();
            while (integerIterator.hasNext()) {
                list.add(integerIterator.next() + partitionId);
            }
            return list.iterator();
        }
    }, false);
    //   [1, 2, 4, 4]
    System.out.println("mapPartitionWithIndex = " + mapPartitionWithIndex.collect());

    、サンプリングApi
    メモリ のデータに づいてRDDを する
    JavaRDD listRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
  •  sample
  • //      withReplacement
    //  withReplacement=true          ,          
    //  withReplacement=false          ,           
    
    //      :fraction,                ,                
    //   100      ,fraction=0.2,         100 * 0.2 = 20   ,
    //    100             0.2;           ,        
    // withReplacement=true   fraction>=0
    // withReplacement=false    0  sampleRDD = listRDD.sample(false, 0.5, 100);
    //  : [1, 3]
    System.out.println("sampleRDD = " + sampleRDD.collect());

    2.  randomSplit
    //     RDD        ,           RDD
    //               ,         ,      RDD
    JavaRDD[] splitRDDs = listRDD.randomSplit(new double[]{0.4, 0.6});
    //   2
    System.out.println("splitRDDs.length = " + splitRDDs.length);
    //   [2, 3]       
    System.out.println("splitRDD(0) = " + splitRDDs[0].collect());
    //   [1, 3]        
    System.out.println("splitRDD(1) = " + splitRDDs[1].collect());

    3.  takeSample
    //             
    //      withReplacement
    //  withReplacement=true          ,          
    //  withReplacement=false          ,           
    //         ,         
       [2, 3]
    System.out.println(listRDD.takeSample(false, 2));

    4. サンプリング、key-valueタイプのRDDをサンプリング
    //    key value   RDD
    import scala.Tuple2;
    JavaPairRDD javaPairRDD =
            sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3),
                    new Tuple2("kkk", 3), new Tuple2("kkk", 3)));
    //     key     
    Map fractions = new HashMap<>();
    fractions.put("test", 0.5);
    fractions.put("kkk", 0.4);
    //    key    
    //    [(test,3), (kkk,3)]
    //sampleByKey          ,        
    System.out.println(javaPairRDD.sampleByKey(true, fractions).collect());
    //    [(test,3), (kkk,3)]
    //sampleByKeyExtra            ,           ,        。
    System.out.println(javaPairRDD.sampleByKeyExact(true, fractions).collect());

    サンプリングの の は、spark core RDD apiを することができる.これらの なものは ではあまり しにくい.
    、pipeは、RDD ストリームのあるステップでpythonやshellスクリプトなどの のスクリプトを することを す
    JavaRDD dataRDD = sc.parallelize(Arrays.asList("hi", "hello", "how", "are", "you"), 2);
    
    //  echo.py       
    Map env = new HashMap<>();
    env.put("env", "envtest");
    
    List commands = new ArrayList<>();
    commands.add("python");
    //       spark   ,    echo.py                    
    commands.add("/Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py");
    
    JavaRDD result = dataRDD.pipe(commands, env);
    //   : [slave1-hi-envtest, slave1-hello-envtest, slave1-how-envtest, slave1-are-envtest, slave1-you-envtest]
    System.out.println(result.collect());

    echo.pyの は のとおりです.
    import sys
    import os
    
    #input = "test"
    input = sys.stdin
    env_keys = os.environ.keys()
    env = ""
    if "env" in env_keys:
       env = os.environ["env"]
    for ele in input:
       output = "slave1-" + ele.strip('
    ') + "-" + env        print (output) input.close

    pipeの 、およびどのように するかについて、 :spark core RDD api、この でまたどのように でスクリプトをすべての にコピーする を するかを らかにしました
    システム spark:1、[ ]Spark 2.xの Spark Core:https://edu.51cto.com/sd/88429  2、[ ]Spark 2.xの Spark SQL :https://edu.51cto.com/sd/16f3d  3、[ ]Scala シリーズのテーマ:https://edu.51cto.com/sd/8e85b  4、[ ]Spark 2.xの Spark Streamig:https://edu.51cto.com/sd/8c525  5、[ ]Spark 2.x コース:https://edu.51cto.com/sd/ff9a4  6、ScalaからSpark 2.xまでのテーマ:https://edu.51cto.com/sd/d72af