【菜鳥シリーズ】spark常用演算子まとめ(scala、java)--groupByKey,reduceByKey
12754 ワード
groupByKeyとreduceByKeyはよく使われる集約関数であり、作用するデータセットはPairRDDである.
scala reduceByKey関数プロトタイプ
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
入力パラメータは2つで、1つ目のパラメータはパーティション関数で、2つ目はvalueに作用する関数であり、1つ目のパラメータが空の場合、デフォルトのhashPartitionerが使用されることがわかります.
reduceByKeyサンプル
groupByKey関数プロトタイプ
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { //groupByKey shouldn’t use map side combine because map side combine does not //reduce the amount of data shuffled and requires all map side data be inserted //into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
groupByKey入力パラメータはパーティション化方式であり,パラメータは空であり,デフォルトのhashPartitionrを用い,戻り値はRDD[(K,Iterable[V])であることを示した.
実はreduceByKeyとgroupByKeyの下部で呼び出されたのはcombineByKeyWithClassTagで、reduceByKeyはパーティション内で先に合併し、shuffle混洗を行っていることを知っています.一方groupByKeyは直接shuffle混洗を行い、効率的にはreduceByKeyの方が優れているに違いありません.次にcombineByKeyWithClassTagを見てみると、mapSideCombineというパラメータがあります.このパラメータはshuffleの前にパーティション内重合操作を行うかどうかを制御し、reduceByKeyのデフォルトはtrueです.
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, “mergeCombiners must be defined”)//required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException(“Cannot use map-side combining with array keys.”) } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException(“Default partitioner cannot partition array keys.”) } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
groupByKeyサンプル
===========================================================
groupByKeyサンプル
後続は更に多くの常用演算子を補充することができて、注目して下さい、この文のオリジナル、転載は出典を明記してください
scala reduceByKey関数プロトタイプ
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
入力パラメータは2つで、1つ目のパラメータはパーティション関数で、2つ目はvalueに作用する関数であり、1つ目のパラメータが空の場合、デフォルトのhashPartitionerが使用されることがわかります.
reduceByKeyサンプル
val conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.makeRDD(List("pandas","numpy","pip","pip","pip"))
//mapToPair
val dataPair = data.map((_,1))
//reduceByKey
val result1 = dataPair.reduceByKey(_+_)
//
// val result2 = dataPair.reduceByKey((x,y)=>(x+y))
groupByKey関数プロトタイプ
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { //groupByKey shouldn’t use map side combine because map side combine does not //reduce the amount of data shuffled and requires all map side data be inserted //into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
groupByKey入力パラメータはパーティション化方式であり,パラメータは空であり,デフォルトのhashPartitionrを用い,戻り値はRDD[(K,Iterable[V])であることを示した.
実はreduceByKeyとgroupByKeyの下部で呼び出されたのはcombineByKeyWithClassTagで、reduceByKeyはパーティション内で先に合併し、shuffle混洗を行っていることを知っています.一方groupByKeyは直接shuffle混洗を行い、効率的にはreduceByKeyの方が優れているに違いありません.次にcombineByKeyWithClassTagを見てみると、mapSideCombineというパラメータがあります.このパラメータはshuffleの前にパーティション内重合操作を行うかどうかを制御し、reduceByKeyのデフォルトはtrueです.
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, “mergeCombiners must be defined”)//required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException(“Cannot use map-side combining with array keys.”) } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException(“Default partitioner cannot partition array keys.”) } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
groupByKeyサンプル
val conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.makeRDD(List("pandas","numpy","pip","pip","pip"))
//mapToPair
val dataPair = data.map((_,1))
//groupByKey
val result3 = dataPair.groupByKey()
// RDD:String,Iterable, x._2
val result4 = result3.map(x=>(x._1,x._2.sum))
===========================================================
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
// JavaRDD
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas","numpy","pip","pip","pip"));
JavaPairRDD<String,Integer> mapToPairResult = lines.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String,Integer> call(String o) throws Exception {
Tuple2 tuple2 = new Tuple2(o,1);
//System.out.println(tuple2._1()+":"+tuple2._2());
return tuple2;
}
});
//reduceByKey
JavaPairRDD reduceByKeyResult = mapToPairResult.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1+i2;
}
});
groupByKeyサンプル
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
// JavaRDD
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas","numpy","pip","pip","pip"));
JavaPairRDD<String,Integer> mapToPairResult = lines.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String,Integer> call(String o) throws Exception {
Tuple2 tuple2 = new Tuple2(o,1);
//System.out.println(tuple2._1()+":"+tuple2._2());
return tuple2;
}
});
//groupByKey
JavaPairRDD groupByKeyResult = mapToPairResult.groupByKey();
// JavaPairRDD
//[(pip,[1, 1, 1]), (pandas,[1]), (numpy,[1])]
System.out.println(groupByKeyResult.collect());
JavaPairRDD<String,Integer> gr = groupByKeyResult.mapToPair(new PairFunction<Tuple2<String,Iterable>,String,Integer>(){
public Tuple2<String,Integer> call(Tuple2<String,Iterable> tuple2){
int sum = 0;
Iterator<Integer> it = tuple2._2.iterator();
while(it.hasNext()){
sum += it.next();
}
return new Tuple2<String,Integer>(tuple2._1,sum);
}
});
System.out.println(gr.collect());
// [(pip,3), (pandas,1), (numpy,1)]
後続は更に多くの常用演算子を補充することができて、注目して下さい、この文のオリジナル、転載は出典を明記してください