【菜鳥シリーズ】spark常用演算子まとめ(scala、java)--groupByKey,reduceByKey

12754 ワード

groupByKeyとreduceByKeyはよく使われる集約関数であり、作用するデータセットはPairRDDである.
scala reduceByKey関数プロトタイプ
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
入力パラメータは2つで、1つ目のパラメータはパーティション関数で、2つ目はvalueに作用する関数であり、1つ目のパラメータが空の場合、デフォルトのhashPartitionerが使用されることがわかります.
reduceByKeyサンプル
  val conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local")
  val sc = new SparkContext(conf)
  val data = sc.makeRDD(List("pandas","numpy","pip","pip","pip"))
  //mapToPair
  val dataPair = data.map((_,1))

  //reduceByKey
  val result1 = dataPair.reduceByKey(_+_)
  //  
  // val result2 = dataPair.reduceByKey((x,y)=>(x+y))

groupByKey関数プロトタイプ
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { //groupByKey shouldn’t use map side combine because map side combine does not //reduce the amount of data shuffled and requires all map side data be inserted //into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
groupByKey入力パラメータはパーティション化方式であり,パラメータは空であり,デフォルトのhashPartitionrを用い,戻り値はRDD[(K,Iterable[V])であることを示した.
実はreduceByKeyとgroupByKeyの下部で呼び出されたのはcombineByKeyWithClassTagで、reduceByKeyはパーティション内で先に合併し、shuffle混洗を行っていることを知っています.一方groupByKeyは直接shuffle混洗を行い、効率的にはreduceByKeyの方が優れているに違いありません.次にcombineByKeyWithClassTagを見てみると、mapSideCombineというパラメータがあります.このパラメータはshuffleの前にパーティション内重合操作を行うかどうかを制御し、reduceByKeyのデフォルトはtrueです.
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, “mergeCombiners must be defined”)//required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException(“Cannot use map-side combining with array keys.”) } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException(“Default partitioner cannot partition array keys.”) } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
groupByKeyサンプル
  val conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local")
  val sc = new SparkContext(conf)
  val data = sc.makeRDD(List("pandas","numpy","pip","pip","pip"))
  //mapToPair
  val dataPair = data.map((_,1))
  //groupByKey
  val result3 = dataPair.groupByKey()
  //      RDD:String,Iterable,        x._2      
  val result4 = result3.map(x=>(x._1,x._2.sum))

===========================================================
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //      JavaRDD
        JavaRDD lines = jsc.parallelize(Arrays.asList("pandas","numpy","pip","pip","pip"));
JavaPairRDD<String,Integer> mapToPairResult = lines.mapToPair(new PairFunction<String,String,Integer>() {
            @Override
            public Tuple2<String,Integer> call(String o) throws Exception {
                Tuple2 tuple2 = new Tuple2(o,1);
                //System.out.println(tuple2._1()+":"+tuple2._2());
                return tuple2;
            }
        });
        //reduceByKey     
        JavaPairRDD reduceByKeyResult = mapToPairResult.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1+i2;
            }
        });

groupByKeyサンプル
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //      JavaRDD
        JavaRDD lines = jsc.parallelize(Arrays.asList("pandas","numpy","pip","pip","pip"));
JavaPairRDD<String,Integer> mapToPairResult = lines.mapToPair(new PairFunction<String,String,Integer>() {
            @Override
            public Tuple2<String,Integer> call(String o) throws Exception {
                Tuple2 tuple2 = new Tuple2(o,1);
                //System.out.println(tuple2._1()+":"+tuple2._2());
                return tuple2;
            }
        });
        //groupByKey     
        JavaPairRDD groupByKeyResult = mapToPairResult.groupByKey();
        //        JavaPairRDD
        //[(pip,[1, 1, 1]), (pandas,[1]), (numpy,[1])]
        System.out.println(groupByKeyResult.collect());
        JavaPairRDD<String,Integer> gr = groupByKeyResult.mapToPair(new PairFunction<Tuple2<String,Iterable>,String,Integer>(){

            public Tuple2<String,Integer> call(Tuple2<String,Iterable> tuple2){
                int sum = 0;
                Iterator<Integer> it = tuple2._2.iterator();
                while(it.hasNext()){
                    sum += it.next();
                }
                return new Tuple2<String,Integer>(tuple2._1,sum);
            }
        });
        System.out.println(gr.collect());
        //    [(pip,3), (pandas,1), (numpy,1)]

後続は更に多くの常用演算子を補充することができて、注目して下さい、この文のオリジナル、転載は出典を明記してください