Sparkでよく使われるAction演算子
35627 ワード
1.reduce(function)
reduceはRDDの要素の2つを入力関数に渡し、同時に新しい値を生成し、新しい値とRDDの次の要素は最後に1つの値しかないまで入力関数に渡される.
2.collect()
1つのRDDをArray配列としてすべての要素に戻します.(詳細は以下を参照:
https://blog.csdn.net/Fortuna_i/article/details/80851775)
3.count()
データセット要素の数、デフォルトのLongタイプを返します
4.take(n)
データセットの前のn要素を含む配列(0からn-1までの要素)を返し、ソートしません.
5.saveAsTextFile(path)
dataSetの要素をテキストファイルとしてローカルファイルシステムやHDFSなどに書き込む.Sparkは各要素に対してtoStringメソッドを呼び出し、データ要素をテキストファイルの1行のレコードに変換します.
ファイルをローカルファイルシステムに保存すると、executorが存在するマシンのローカルディレクトリにのみ保存されます.
6.countByKey()
RDD[K,V]のK当たりの数を統計し,key当たりのカウントを持つ(k,int)pairsのhashMapを返す.
reduceはRDDの要素の2つを入力関数に渡し、同時に新しい値を生成し、新しい値とRDDの次の要素は最後に1つの値しかないまで入力関数に渡される.
/**
* Reduce
*/
private static void reduce() {
// SparkConf JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("reduce")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// , 1 10,10 , 10
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// reduce
// reduce :
// , call() , , , 1 + 2 = 3
// call() , , 3 + 3 = 6
//
// reduce , ,
int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(sum);
// JavaSparkContext
sc.close();
}
2.collect()
1つのRDDをArray配列としてすべての要素に戻します.(詳細は以下を参照:
https://blog.csdn.net/Fortuna_i/article/details/80851775)
/**
* Collect java
*/
private static void collect(){
// SparkConf JavaSparkContext
SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// , 1 10,10 , 10
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// map 2
JavaRDD<Integer> doubleNumbers = numbers.map(
new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
// foreach action , rdd
// collect , doubleNumbers RDD
// , , rdd , 1
// , ,
// , , rdd , oom ,
// , , foreach action , rdd
List<Integer> doubleNumberList = doubleNumbers.collect();
for(Integer num : doubleNumberList) {
System.out.println(num);
}
// JavaSparkContext
sc.close();
}
3.count()
データセット要素の数、デフォルトのLongタイプを返します
/**
* Count java
*/
private static void count() {
// SparkConf JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("count")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// , 1 10,10 , 10
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// rdd count ,
long count = numbers.count();
System.out.println(count);
// JavaSparkContext
sc.close();
}
4.take(n)
データセットの前のn要素を含む配列(0からn-1までの要素)を返し、ソートしません.
/**
* Take java
*/
private static void take() {
// SparkConf JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("take")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// , 1 10,10 , 10
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// rdd count ,
// take , collect , , rdd
// collect rdd ,take n
List<Integer> top3Numbers = numbers.take(3);
for(Integer num : top3Numbers) {
System.out.println(num);
}
// JavaSparkContext
sc.close();
}
5.saveAsTextFile(path)
dataSetの要素をテキストファイルとしてローカルファイルシステムやHDFSなどに書き込む.Sparkは各要素に対してtoStringメソッドを呼び出し、データ要素をテキストファイルの1行のレコードに変換します.
ファイルをローカルファイルシステムに保存すると、executorが存在するマシンのローカルディレクトリにのみ保存されます.
/**
* saveAsTextFile java
*/
private static void saveAsTextFile() {
// SparkConf JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("saveAsTextFile").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// , 1 10,10 , 10
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// map 2
JavaRDD<Integer> doubleNumbers = numbers.map(
new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
// rdd , HFDS
// , ,
// , /double_number.txt/part-00000
//doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");
doubleNumbers.saveAsTextFile("C:\\Users\\Desktop\\spark");
// JavaSparkContext
sc.close();
}
6.countByKey()
RDD[K,V]のK当たりの数を統計し,key当たりのカウントを持つ(k,int)pairsのhashMapを返す.
/**
* countByKey java
*/
private static void countByKey() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("countByKey")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List<Tuple2<String, String>> scoreList = Arrays.asList(
new Tuple2<String, String>("class1", "leo"),
new Tuple2<String, String>("class2", "jack"),
new Tuple2<String, String>("class1", "marry"),
new Tuple2<String, String>("class2", "tom"),
new Tuple2<String, String>("class2", "david"));
// , JavaPairRDD
JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);
// rdd countByKey , , key
// countByKey
// countByKey , Map
Map<String, Object> studentCounts = students.countByKey();
for(Map.Entry<String, Object> studentCount : studentCounts.entrySet()) {
System.out.println(studentCount.getKey() + ": " + studentCount.getValue());
}
// JavaSparkContext
sc.close();
}