SparkがListのデータを読み取ることによるテスト感想
3745 ワード
//spark hadoop counter
final Accumulator<Integer> accumulator = javaSparkContext.accumulator(0);
// , disk hdfs , 。
JavaPairRDD<String, String> pairRDD1 = javaSparkContext.parallelize(Arrays.asList("2016-02-25", "2016-02-24", "2016-02-26")).mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) throws Exception {
accumulator.add(1);
return new Tuple2<String, String>(s, "1");
}
});
System.out.println("pairRDD1 :" + pairRDD1.count());
// pariRDD1
pairRDD1.foreachPartition(new VoidFunction<Iterator<Tuple2<String, String>>>() {
@Override
public void call(Iterator<Tuple2<String, String>> tuple2Iterator) throws Exception {
List<String> list = new ArrayList<String>();
while (tuple2Iterator.hasNext()) {
list.add(tuple2Iterator.next()._1());
}
System.out.println("list1:::::" + list);
}
});
// pariRDD1
pairRDD1.foreachPartition(new VoidFunction<Iterator<Tuple2<String, String>>>() {
@Override
public void call(Iterator<Tuple2<String, String>> tuple2Iterator) throws Exception {
List<String> list = new ArrayList<String>();
while (tuple2Iterator.hasNext()) {
list.add(tuple2Iterator.next()._1());
}
System.out.println("list2:::::" + list);
}
});
// collect driver
List<Tuple2<String, String>> collect = pairRDD1.collect();
System.out.println("collect data : " + collect);
System.out.println("accumulator : " + accumulator.localValue());
一、pairRDD 1のデータをcacheしないで、以下のコード結果を実行する.
pairRDD1 :3
-----------------------------------------------------------------------------------------
list1:::::[2016-02-25]
list1:::::[2016-02-24, 2016-02-26]
-----------------------------------------------------------------------------------------
list2:::::[2016-02-24, 2016-02-26]
list2:::::[2016-02-25]
-----------------------------------------------------------------------------------------
collect data : [(2016-02-25,1), (2016-02-24,1), (2016-02-26,1)]
-----------------------------------------------------------------------------------------
accumulator : 12
二、pairRDD 1のデータcacheを、実行コードの結果は以下の通りである.
pairRDD1 :3
-----------------------------------------------------------------------------------------
list1:::::[2016-02-25]
list1:::::[2016-02-24, 2016-02-26]
-----------------------------------------------------------------------------------------
list2:::::[2016-02-24, 2016-02-26]
list2:::::[2016-02-25]
-----------------------------------------------------------------------------------------
collect data : [(2016-02-25,1), (2016-02-24,1), (2016-02-26,1)]
-----------------------------------------------------------------------------------------
accumulator : 3
結論1,上記の例について,実行結果を観察したところ,唯一の違いはaccumulatorの数値であり,1つは12,1つは3であることが分かった.なぜなら、cacheを実行すると、データはrddに永続化されるため、他の場所でこのRDDを再使用すると、RDDから直接データを取り出し、外部デバイスからデータをロードしないからです.
結論2,あるRDDを複数の場合に使用する必要がある場合は,cacheする.例えば、RDDはRDD 1とRDD 2を組み合わせて計算されるが、RDDはまた多くの場所で使用されるので、RDDcacheを立てるべきである.そうしないと、RDD 1がどこから来たのか、RDD 2がどこから来たのかについて再計算される.
結論3、cacheは慎重に使用し、sparkが操作するデータ量が1 Tであれば、すべてのcacheをメモリに入れることは不可能なので....