ラベル生成
8757 ワード
目的:javaコードは団体購入サイトのラベル生成を実現する
最終結果:
コード:
降順に上位10のコメントを取り出すには、コントラストを呼び出す必要があります.
最終結果:
83644298=============> :1
82317795=============> :1
77705462=============> :3, :2, :1
85766086=============> :2, :1
74145782=============> :18, :14, :13, :12, :11, :6, :5, :4, :3, :2
71039150=============> :1
70611801=============> :4, :3, :2, :1
73963176=============> :15, :12, :11, :10, :7, :6, :4, :3, :1
84270191=============> :2, :1
89223651=============> :8, :7, :5, :4, :3, :2
82016443=============> :3, :2, :1
77287793=============> :29, :26, :25, :19, :18, :16, :13, :1
79197522=============> :2, :1
83084036=============> :1
73879078=============> :3, :2, :1
88284865=============> :1
83073343=============> :17, :16, :15, :11, :9, :8, :7, :6, :4, :1
76114040=============> :1
86913510=============> :1
88496862=============> :5, :4, :3, :2, :1
78477325=============> :8, :7, :5, :4, :3, :2, :1
83981222=============> :4, :3, :2
82705919=============> :3, :2, :1
87994574=============> :12, :8, :7, :5, :4, :3, :2, :1
77373671=============> :1
75144086=============> :38, :30, :22, :21, :19, :18, :13, :12, :10, :3
85648235=============> :17, :15, :13, :12, :11, :10, :9, :8, :7, :1
73607905=============> :16, :15, :14, :13, :11, :7, :5, :2, :1
76893145=============> :10, :7, :5, :4, :3, :2, :1
78824187=============> :13, :11, :10, :8, :7, :6, :5, :4, :1
コード:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
/**
* java
*/
public class ReviewTagsJava {
public static void main(String [] args){
SparkConf conf = new SparkConf();
conf.setMaster("local[4]");
conf.setAppName("ReviewTagsJava");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD rdd1 = jsc.textFile("file:///d:/scala/taggen/temptags.txt");
// \t String
JavaRDD rdd2 = rdd1.map(new Function() {
public String[] call(String s) throws Exception {
return s.split("\t");
}
});
//
JavaRDD rdd3 = rdd2.filter(new Function() {
public Boolean call(String[] v) throws Exception {
return v.length == 2;
}
});
// ,ID--> , ,
JavaPairRDD rdd4 = rdd3.mapToPair(new PairFunction() {
public Tuple2 call(String[] t) throws Exception {
return new Tuple2(t[0],ReviewTags.extractTags(t[1]));
}
});
//
JavaPairRDD rdd5 = rdd4.filter(new Function, Boolean>() {
public Boolean call(Tuple2 t) throws Exception {
return t._2.length() > 0;
}
});
// V ,V
JavaPairRDD rdd6 = rdd5.mapToPair(new PairFunction, String,String[]>() {
public Tuple2 call(Tuple2 v) throws Exception {
return new Tuple2(v._1(),v._2().split(","));
}
});
//V , 。ID-> ID-> .......
JavaPairRDD rdd7 = rdd6.flatMapValues(new Function>() {
public Iterable call(String[] v) throws Exception {
List list = new ArrayList();
for(String v1 : v){
list.add(v1);
}
return list;
}
});
//K:ID V:1......
JavaPairRDD, Integer> rdd8 = rdd7.mapToPair(new PairFunction, Tuple2, Integer>() {
public Tuple2, Integer> call(Tuple2 v) throws Exception {
return new Tuple2, Integer>(v,1);
}
});
JavaPairRDD,Integer> rdd9 = rdd8.reduceByKey(new Function2() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//K:ID V: ,1 ......
JavaPairRDD> rdd10 = rdd9.mapToPair(new PairFunction,Integer>, String, Tuple2>() {
public Tuple2> call(Tuple2, Integer> v) throws Exception {
return new Tuple2>(v._1()._1(),new Tuple2(v._1()._2(),v._2()));
}
});
// V ,
JavaPairRDD>> rdd11 = rdd10.mapToPair(new PairFunction>, String, List>>() {
public Tuple2>> call(Tuple2> v) throws Exception {
List> list = new ArrayList>();
list.add(v._2());
return new Tuple2>>(v._1(),list);
}
});
//
JavaPairRDD>> rdd12 = rdd11.reduceByKey(new Function2>, List>, List>>() {
public List> call(List> v1, List> v2) throws Exception {
v1.addAll(v2);
return v1;
}
});
// , 10
JavaPairRDD rdd13 = rdd12.mapToPair(new PairFunction>>, String, String>() {
public Tuple2 call(Tuple2>> v) throws Exception {
// ,
TreeSet> ts = new TreeSet>(new Tuple2Comparator());
ts.addAll(v._2());
// 10
Iterator> it = ts.iterator();
int index = 0;
String str = "";
while(it.hasNext()){
if(index > 9){
break;
}
// 10 t0
Tuple2 t0 = it.next();
//V: :12, :13
str = str + t0._1() + ":" + t0._2() + ",";
index++;
}
// ","
str = str.substring(0,str.length()-1);
//K:ID, V: +12
return new Tuple2(v._1(),str);
}
});
//collect ---->List
List> list = rdd13.collect();
//
for(Tuple2 l : list){
System.out.println(l._1() + "=============>" + l._2());
}
}
}
降順に上位10のコメントを取り出すには、コントラストを呼び出す必要があります.
import scala.Tuple2;
import java.util.Comparator;
/**
* , ,
*/
public class Tuple2Comparator implements Comparator> {
public int compare(Tuple2 o1, Tuple2 o2) {
return o2._2() - o1._2();
}
}