
12754 ワード

scala reduceByKey関数プロトタイプ
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
  val conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local")
  val sc = new SparkContext(conf)
  val data = sc.makeRDD(List("pandas","numpy","pip","pip","pip"))
  val dataPair = data.map((_,1))

  val result1 = dataPair.reduceByKey(_+_)
  // val result2 = dataPair.reduceByKey((x,y)=>(x+y))

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { //groupByKey shouldn’t use map side combine because map side combine does not //reduce the amount of data shuffled and requires all map side data be inserted //into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, “mergeCombiners must be defined”)//required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException(“Cannot use map-side combining with array keys.”) } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException(“Default partitioner cannot partition array keys.”) } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
  val conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local")
  val sc = new SparkContext(conf)
  val data = sc.makeRDD(List("pandas","numpy","pip","pip","pip"))
  val dataPair = data.map((_,1))
  val result3 = dataPair.groupByKey()
  //      RDD:String,Iterable,        x._2      
  val result4 = result3.map(x=>(x._1,x._2.sum))

SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //      JavaRDD
        JavaRDD lines = jsc.parallelize(Arrays.asList("pandas","numpy","pip","pip","pip"));
JavaPairRDD<String,Integer> mapToPairResult = lines.mapToPair(new PairFunction<String,String,Integer>() {
            public Tuple2<String,Integer> call(String o) throws Exception {
                Tuple2 tuple2 = new Tuple2(o,1);
                return tuple2;
        JavaPairRDD reduceByKeyResult = mapToPairResult.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1+i2;

SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //      JavaRDD
        JavaRDD lines = jsc.parallelize(Arrays.asList("pandas","numpy","pip","pip","pip"));
JavaPairRDD<String,Integer> mapToPairResult = lines.mapToPair(new PairFunction<String,String,Integer>() {
            public Tuple2<String,Integer> call(String o) throws Exception {
                Tuple2 tuple2 = new Tuple2(o,1);
                return tuple2;
        JavaPairRDD groupByKeyResult = mapToPairResult.groupByKey();
        //        JavaPairRDD
        //[(pip,[1, 1, 1]), (pandas,[1]), (numpy,[1])]
        JavaPairRDD<String,Integer> gr = groupByKeyResult.mapToPair(new PairFunction<Tuple2<String,Iterable>,String,Integer>(){

            public Tuple2<String,Integer> call(Tuple2<String,Iterable> tuple2){
                int sum = 0;
                Iterator<Integer> it = tuple2._2.iterator();
                    sum += it.next();
                return new Tuple2<String,Integer>(tuple2._1,sum);
        //    [(pip,3), (pandas,1), (numpy,1)]
