ラベル生成

8757 ワード

目的:javaコードは団体購入サイトのラベル生成を実現する
最終結果:
83644298=============>   :1
82317795=============>   :1
77705462=============>    :3,  :2,   :1
85766086=============>    :2,   :1
74145782=============>    :18,   :14,   :13,    :12,   :11,    :6,    :5,   :4,   :3,   :2
71039150=============>  :1
70611801=============>    :4,   :3,   :2,   :1
73963176=============>   :15,    :12,   :11,    :10,   :7,    :6,   :4,    :3,    :1
84270191=============>    :2,    :1
89223651=============>    :8,    :7,    :5,    :4,   :3,    :2
82016443=============>   :3,   :2,    :1
77287793=============>    :29,    :26,    :25,    :19,    :18,   :16,    :13,     :1
79197522=============>    :2,    :1
83084036=============>    :1
73879078=============>   :3,   :2,   :1
88284865=============>    :1
83073343=============>    :17,   :16,    :15,    :11,   :9,    :8,   :7,   :6,    :4,   :1
76114040=============>    :1
86913510=============>  :1
88496862=============>   :5,   :4,   :3,    :2,   :1
78477325=============>   :8,   :7,    :5,    :4,    :3,    :2,   :1
83981222=============>    :4,    :3,    :2
82705919=============>   :3,   :2,    :1
87994574=============>   :12,    :8,    :7,   :5,    :4,    :3,    :2,   :1
77373671=============>   :1
75144086=============>    :38,   :30,   :22,    :21,   :19,    :18,    :13,   :12,    :10,      :3
85648235=============>   :17,    :15,    :13,   :12,   :11,    :10,   :9,    :8,   :7,    :1
73607905=============>    :16,   :15,    :14,   :13,   :11,    :7,   :5,   :2,    :1
76893145=============>    :10,    :7,   :5,   :4,    :3,    :2,   :1
78824187=============>    :13,   :11,   :10,    :8,    :7,   :6,   :5,   :4,   :1

コード:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;

/**
 * java             
 */
public class ReviewTagsJava {
    public static void main(String [] args){
        SparkConf conf = new SparkConf();
        conf.setMaster("local[4]");
        conf.setAppName("ReviewTagsJava");

        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD rdd1 = jsc.textFile("file:///d:/scala/taggen/temptags.txt");

        // \t   String  
        JavaRDD rdd2 = rdd1.map(new Function() {
            public String[] call(String s) throws Exception {
                return s.split("\t");
            }
        });

        //    
        JavaRDD rdd3 = rdd2.filter(new Function() {
            public Boolean call(String[] v) throws Exception {
                return v.length == 2;
            }
        });

        //     ,ID-->   ,    ,  
        JavaPairRDD rdd4 = rdd3.mapToPair(new PairFunction() {
            public Tuple2 call(String[] t) throws Exception {
                return new Tuple2(t[0],ReviewTags.extractTags(t[1]));
            }
        });

        //     
        JavaPairRDD rdd5 = rdd4.filter(new Function, Boolean>() {
            public Boolean call(Tuple2 t) throws Exception {
                return t._2.length() > 0;
            }
        });

        // V    ,V    
        JavaPairRDD rdd6 = rdd5.mapToPair(new PairFunction, String,String[]>() {
            public Tuple2 call(Tuple2 v) throws Exception {
                return new Tuple2(v._1(),v._2().split(","));
            }
        });

        //V    ,    。ID->     ID->      .......
        JavaPairRDD rdd7 = rdd6.flatMapValues(new Function>() {
            public Iterable call(String[] v) throws Exception {
                List list = new ArrayList();
                for(String  v1 : v){
                    list.add(v1);
                }
                return list;
            }
        });

        //K:ID         V:1......
        JavaPairRDD, Integer> rdd8 = rdd7.mapToPair(new PairFunction, Tuple2, Integer>() {
            public Tuple2, Integer> call(Tuple2 v) throws Exception {
                return new Tuple2, Integer>(v,1);
            }
        });

        JavaPairRDD,Integer> rdd9 = rdd8.reduceByKey(new Function2() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //K:ID      V:   ,1 ......
        JavaPairRDD> rdd10 = rdd9.mapToPair(new PairFunction,Integer>, String, Tuple2>() {
            public Tuple2> call(Tuple2, Integer> v) throws Exception {
                return new Tuple2>(v._1()._1(),new Tuple2(v._1()._2(),v._2()));
            }
        });

        // V    ,    
        JavaPairRDD>> rdd11 = rdd10.mapToPair(new PairFunction>, String, List>>() {
            public Tuple2>> call(Tuple2> v) throws Exception {
                List> list = new ArrayList>();
                list.add(v._2());
                 return new Tuple2>>(v._1(),list);
            }
        });
        //
        JavaPairRDD>> rdd12 = rdd11.reduceByKey(new Function2>, List>, List>>() {
            public List> call(List> v1, List> v2) throws Exception {
                v1.addAll(v2);
                return  v1;
            }
        });

        //    ,  10  
        JavaPairRDD rdd13 = rdd12.mapToPair(new PairFunction>>, String, String>() {
            public Tuple2 call(Tuple2>> v) throws Exception {
                //  ,
                TreeSet> ts = new TreeSet>(new Tuple2Comparator());
                ts.addAll(v._2());
                // 10  
                Iterator> it = ts.iterator();
                int index = 0;
                String str = "";
                while(it.hasNext()){
                    if(index > 9){
                        break;
                    }
                    //    10    t0
                    Tuple2 t0 = it.next();
                    //V:    :12,  :13
                    str = str + t0._1() + ":" + t0._2() + ",";
                    index++;
                }
                //      ","
                str = str.substring(0,str.length()-1);
                //K:ID,       V:   +12    
                return  new Tuple2(v._1(),str);
            }
        });
        //collect  ---->List
        List> list = rdd13.collect();
        //      
        for(Tuple2 l : list){
            System.out.println(l._1() + "=============>" + l._2());
        }
    }
}

降順に上位10のコメントを取り出すには、コントラストを呼び出す必要があります.
import scala.Tuple2;

import java.util.Comparator;

/**
 *    ,    ,  
 */
public class Tuple2Comparator implements Comparator> {
    public int compare(Tuple2 o1, Tuple2 o2) {
        return o2._2() - o1._2();
    }
}