Spark RDD Action演算子の基本使用(Java)
Spark RDD Action演算子の基本使用(Java)
最近Spark RDD関連演算子の使用をまとめ、いくつかの基本的な使用方法をリストし、参考にすることができ、迅速に手に入れることができます.
最近Spark RDD関連演算子の使用をまとめ、いくつかの基本的な使用方法をリストし、参考にすることができ、迅速に手に入れることができます.
package com.edward.spark.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import java.util.*;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import java.io.Serializable;
public class ApiTestAction {
private static SparkConf conf = new SparkConf()
.setMaster("local[1]")
.setAppName("ApiTest");
private static JavaSparkContext jsc = new JavaSparkContext(conf);
/**
* count(), , long
*/
private static void api_count()
{
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,3);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
long value =rdd2.count();
System.out.println(value);
}
/**
* value , map
*/
private static void api_countByValue()
{
Map<Integer,Long> map =new HashMap<>();
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,1);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
map=rdd.countByValue();
map.forEach((k,v)->{
System.out.println("key:" + k + " value:" + v);
});
}
/**
* key , map ; JavaPairRDD
*/
private static void api_countByKey()
{
Map<Integer,Long> map =new HashMap<>();
Map<Tuple2<Integer,String>,Long> map2 =new HashMap<>();
List<Tuple2<Integer,String>> data3 =Arrays.asList(new Tuple2<>(1,"Edward"), new Tuple2<>(1,"CiCi"),new Tuple2<>(3,"Della"));
JavaPairRDD<Integer,String> rdd3=jsc.parallelizePairs(data3,3);
map=rdd3.countByKey();
map2=rdd3.countByValue();
map.forEach((k,v)->{
System.out.println("key:" + k + " count:" + v);
});
map2.forEach((k,v)->{
System.out.println("value:" + k + " count:" + v);
});
}
/**
* , comparator
*/
private static void api_max()
{
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,3);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
String value2 =rdd2.max(Comparator.naturalOrder());
int value =rdd.max(Comparator.naturalOrder());
System.out.println("value=" + value);
System.out.println("value2=" + value2);
}
/**
* , comparator
*/
private static void api_min()
{
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,3);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
String value2 =rdd2.max(Comparator.naturalOrder());
int value =rdd.min(Comparator.naturalOrder());
System.out.println("value=" + value);
System.out.println("value2=" + value2);
}
/**
*
*/
private static void api_first()
{
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,3);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
String firstValue =rdd2.first();
System.out.println(firstValue);
}
/**
* , List ;
* ,
*/
private static void api_collect()
{
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,3);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
List<Integer> output =rdd.collect();
System.out.println(output);
}
/**
* key value , List
*/
private static void api_lookup()
{
Map<Integer,Long> map =new HashMap<>();
Map<Tuple2<Integer,String>,Long> map2 =new HashMap<>();
List<Tuple2<Integer,String>> data3 =Arrays.asList(new Tuple2<>(1,"Edward"), new Tuple2<>(1,"CiCi"),new Tuple2<>(3,"Della"));
JavaPairRDD<Integer,String> rdd3=jsc.parallelizePairs(data3,3);
List<String> out =rdd3.lookup(1);
System.out.println(rdd3.lookup(1));
}
/**
* num , List
* ,
* ,
*/
private static void api_take()
{
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,3);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
List<String> list =rdd2.take(3);
System.out.println(list);
}
private static class TestComparator implements Serializable,Comparator<Tuple2<String, Integer>>{
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return o1._2.compareTo(o2._2);
}
}
private static class TestComparator2 implements Serializable,Comparator<Integer>{
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
}
private static class TestCompartor3 implements Serializable,Comparator<String>{
@Override
public int compare(String o1, String o2) {
return o1.compareTo(o2);
}
}
/**
* N ,
* ,
*/
private static void api_takeOrdered()
{
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,3);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
List<Integer> output =rdd.takeOrdered(2);
System.out.println(output);
System.out.println(rdd2.takeOrdered(2, new TestCompartor3()));
System.out.println(rdd.takeOrdered(3,new TestComparator2()));
}
/**
* N ,
* ,
*/
private static void api_top()
{
List<Tuple2<String, Integer>> pairs = Arrays.asList(new Tuple2<>("A", 1),
new Tuple2<>("A", 2), new Tuple2<>("B", 1));
JavaPairRDD< String, Integer> rdd = jsc.parallelizePairs(pairs, 3);
List<Tuple2<String, Integer>> result = rdd.top(2, new TestComparator());
System.out.println(result);
}
/**
* ,takeSample 3 ,
* withReplacement: ( ),true: ( ) false: ( )
* num:
* seed: ( , , ( ), ; ( 8682522807148012L) ; )
*/
private static void api_takeSample()
{
List<Integer> data =Arrays.asList(1,2,3,4,5,11,23);
List<String> data2 = Arrays.asList("Edward","CiCi","Della","Mystique");
JavaRDD<Integer> rdd =jsc.parallelize(data,3);
JavaRDD<String> rdd2 =jsc.parallelize(data2,2);
List<Integer> out=rdd.takeSample(false,3);
System.out.println(out);
}
/**
* key-value Map
*/
private static void api_collectAsMap()
{
List<Tuple2<Integer,String>> data =Arrays.asList(new Tuple2<>(1,"Edward"), new Tuple2<>(1,"CiCi"),new Tuple2<>(3,"Della"));
JavaPairRDD<Integer,String> rdd =jsc.parallelizePairs(data,2);
Map<Integer,String> map=rdd.collectAsMap();
System.out.println(map);
}
/**
* ,zeroValue: ;seqOp: ;comOp:
* :(0,0) ->(0+1,0+1)=(1,1) ->(1+1,1+1) =(2,2) ->(2+2,2+1)=(4,3) ->(4+3,3+1)=(7,4)
*
*/
private static void api_aggregate()
{
List<Integer> data = Arrays.asList(1, 1, 2, 3);
JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
Tuple2<Integer, Integer> result = rdd.aggregate(new Tuple2<>(0,0),
(x,y)->new Tuple2<>(x._1+y, x._2+1),
(x,y)->new Tuple2<>(x._1+y._1, x._2+y._2));
System.out.println(result);
int result2 =rdd.aggregate(0, (x,y)-> (x+y), (x,y)-> (x+y));
System.out.println(result2);
}
/**
*
* aggregate ;fold
*/
private static void api_fold()
{
List<Integer> data = Arrays.asList(1, 1, 2, 3);
JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
int result =rdd.fold(0,(x,y)->(x+y));
System.out.println(result);
}
/**
*
* fold ,
*/
private static void api_reduce()
{
List<Integer> data = Arrays.asList(1, 1, 2, 3);
JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
int result =rdd.reduce((x,y)->(x+y));
System.out.println(result);
}
/**
*
*/
private static void api_foreach()
{
List<Integer> data = Arrays.asList(1, 1, 2, 3);
JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
rdd.foreach(x->System.out.println(x));
}
/**
*
* foreach : , TCP IO , foreachPartition ,
*/
private static void api_foreachPartition()
{
List<Integer> data = Arrays.asList(1, 1, 2, 3);
JavaRDD<Integer> rdd = jsc.parallelize(data, 2);
rdd.foreachPartition(x->x.forEachRemaining(y->System.out.println(y)));
}
private static void action_type1()
{
api_count();
api_countByValue();
api_countByKey();
api_max();
api_min();
}
private static void action_type2()
{
api_first();
api_collect();
api_collectPartitions();
api_lookup();
api_take();
api_takeOrdered();
api_top();
api_takeSample();
api_collectAsMap();
}
private static void action_type3()
{
api_aggregate();
api_fold();
api_reduce();
}
private static void action_type4()
{
api_foreach();
api_foreachPartition();
}
public static void main(String[] args) {
action_type1();
action_type2();
action_type3();
action_type4();
jsc.stop();
}
}