spark-TopKアルゴリズム


  • Case:入力:テキストファイル出力:(158,)(28,the)(19,to)(18,Spark)(17,and)(11,Hadoop)(10,龚(8,with)
  • アルゴリズム:まずwodcountを実現し、topk実現はwodcountを基礎として、分類統計が完了したらkey/valueを交換し、そしてsortByKeyを呼び出して並べ替えを行う.
  • java
  • import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.Comparator;
    import java.util.List;
    import java.util.regex.Pattern;
    
    
    public class TopK {
        public static final Pattern SPACE = Pattern.compile(" ");
    
        public static void main(String[] args)throws Exception {
            String inPath = null;
    
            if (args.length == 1) {
                inPath = args[0];
            } else {
                System.out.println("Usage: <src> [des]");
            }
    
            SparkConf sparkConf = new SparkConf().setAppName("Word Count");
            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            JavaRDD<String> lines = jsc.textFile(inPath);
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterable<String> call(String s) throws Exception {
                    return Arrays.asList(SPACE.split(s));
                }
            });
    
            JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1 , Integer i2) throws Exception {
                    return i1 + i2;
                }
            });
    
            JavaPairRDD<Integer, String> converted = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                    return new Tuple2<Integer, String>(tuple._2(), tuple._1());
                }
            });
    
            JavaPairRDD<Integer, String> sorted = converted.sortByKey(true, 1);
            List<Tuple2<Integer, String>> topK = sorted.top(5, new Comp());
    
            for(Tuple2<Integer, String> top: topK) {
                System.out.println(top._2() + ": " + top._1());
            }
    
            jsc.stop();
        }
    }
    
    class Comp implements Comparator<Tuple2<Integer, String>>, Serializable {
    
        @Override
        public int compare(Tuple2<Integer, String> o1, Tuple2<Integer, String> o2) {
            if(o1._1() < o2._1()) {
                return -1;
            }else if(o1._1() > o2._1()) {
                return 1;
            }else {
                return 0;
            }
        }
    }
  • scala
  • 
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.SparkContext._
    
    
    object TopK {
      def main(args: Array[String]) {
        if (args.length != 2) {
          System.out.println("Usage: <src> <num>")
          System.exit(1)
        }
    
        val conf = new SparkConf().setAppName("TopK")
        val sc = new SparkContext(conf)
    
        val lines = sc.textFile(args(0))
        val ones = lines.flatMap(_.split(" ")).map(word => (word, 1))
        val count = ones.reduceByKey((a, b) => a + b)
        val convert = count.map {
          case (key, value) => (value, key)
        }.sortByKey(true, 1)
        convert.top(args(1).toInt).foreach(a => System.out.println("(" + a._2 + "," + a._1 + ")"))
    }
  • アプリケーションシーン:TopKモデルは消費者の人気消費分析、ウェブサイト/ブログのクリック数、ユーザーの閲覧量分析、最新のホットワードと人気のある検索などの分析処理によく使われています.