SparkがListのデータを読み取ることによるテスト感想

3745 ワード

        //spark hadoop counter
        final Accumulator<Integer> accumulator = javaSparkContext.accumulator(0);
        // , disk hdfs , 。
        JavaPairRDD<String, String> pairRDD1 = javaSparkContext.parallelize(Arrays.asList("2016-02-25", "2016-02-24", "2016-02-26")).mapToPair(new PairFunction<String, String, String>() {
            @Override
            public Tuple2<String, String> call(String s) throws Exception {
                accumulator.add(1);
                return new Tuple2<String, String>(s, "1");
            }
        });

        System.out.println("pairRDD1   :" + pairRDD1.count());

        // pariRDD1 
        pairRDD1.foreachPartition(new VoidFunction<Iterator<Tuple2<String, String>>>() {
            @Override
            public void call(Iterator<Tuple2<String, String>> tuple2Iterator) throws Exception {
                List<String> list = new ArrayList<String>();
                while (tuple2Iterator.hasNext()) {
                    list.add(tuple2Iterator.next()._1());
                }
                System.out.println("list1:::::" + list);
            }
        });

        // pariRDD1 
        pairRDD1.foreachPartition(new VoidFunction<Iterator<Tuple2<String, String>>>() {
            @Override
            public void call(Iterator<Tuple2<String, String>> tuple2Iterator) throws Exception {
                List<String> list = new ArrayList<String>();
                while (tuple2Iterator.hasNext()) {
                    list.add(tuple2Iterator.next()._1());
                }
                System.out.println("list2:::::" + list);
            }
        });

        // collect driver 
        List<Tuple2<String, String>> collect = pairRDD1.collect();
        System.out.println("collect data : " + collect);

        System.out.println("accumulator : " + accumulator.localValue());

一、pairRDD 1のデータをcacheしないで、以下のコード結果を実行する.
pairRDD1   :3
-----------------------------------------------------------------------------------------
list1:::::[2016-02-25]
list1:::::[2016-02-24, 2016-02-26]
-----------------------------------------------------------------------------------------
list2:::::[2016-02-24, 2016-02-26]
list2:::::[2016-02-25]
-----------------------------------------------------------------------------------------
collect data : [(2016-02-25,1), (2016-02-24,1), (2016-02-26,1)]
-----------------------------------------------------------------------------------------
accumulator : 12

二、pairRDD 1のデータcacheを、実行コードの結果は以下の通りである.
pairRDD1   :3
-----------------------------------------------------------------------------------------
list1:::::[2016-02-25]
list1:::::[2016-02-24, 2016-02-26]
-----------------------------------------------------------------------------------------
list2:::::[2016-02-24, 2016-02-26]
list2:::::[2016-02-25]
-----------------------------------------------------------------------------------------
collect data : [(2016-02-25,1), (2016-02-24,1), (2016-02-26,1)]
-----------------------------------------------------------------------------------------
accumulator : 3

結論1,上記の例について,実行結果を観察したところ,唯一の違いはaccumulatorの数値であり,1つは12,1つは3であることが分かった.なぜなら、cacheを実行すると、データはrddに永続化されるため、他の場所でこのRDDを再使用すると、RDDから直接データを取り出し、外部デバイスからデータをロードしないからです.
結論2,あるRDDを複数の場合に使用する必要がある場合は,cacheする.例えば、RDDはRDD 1とRDD 2を組み合わせて計算されるが、RDDはまた多くの場所で使用されるので、RDDcacheを立てるべきである.そうしないと、RDD 1がどこから来たのか、RDD 2がどこから来たのかについて再計算される.
結論3、cacheは慎重に使用し、sparkが操作するデータ量が1 Tであれば、すべてのcacheをメモリに入れることは不可能なので....