Partitionerプログラミング——運営者グループによってユーザーのインターネットトラフィックを統計する
6984 ワード
パーティションの数は誰が決めますか?—reducer!!
reducerがあるだけpartitionerがある
public class DataCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataInfo.class);
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataInfo.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// partitioner
job.setPartitionerClass(DCPartitioner.class);
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.waitForCompletion(true);
}
//Map k1: v1: k2: v2: javaBean
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataInfo>{
private Text k = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, DataInfo>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String tel = fields[1];
long up = Long.parseLong(fields[8]);
long down = Long.parseLong(fields[9]);
DataInfo dataInfo = new DataInfo(tel,up,down);
k.set(tel);
context.write(k, dataInfo);
}
}
//Partition
/** * partition ?----reducer !! * reducer partitioner */
public static class DCPartitioner extends Partitioner<Text, DataInfo>{
// map
//static
private static Map<String,Integer> provider = new HashMap<String,Integer>();
static{
provider.put("138", 1);
provider.put("139", 1);
provider.put("152", 2);
provider.put("153", 2);
provider.put("182", 3);
provider.put("183", 3);
}
/** * :int , map */
@Override
public int getPartition(Text key, DataInfo value, int numPartitions) {
//
String tel_sub = key.toString().substring(0,3);
// ,
Integer count = provider.get(tel_sub);
if(count == null){
count = 0;
}
//
return count;
}
}
//Reduce k2: v2:dataInfo k3: v3:dataInfo
public static class DCReducer extends Reducer<Text, DataInfo, Text, DataInfo>{
@Override
protected void reduce(Text key, Iterable<DataInfo> values,Reducer<Text, DataInfo, Text, DataInfo>.Context context)
throws IOException, InterruptedException {
long up_sum = 0;
long down_sum = 0;
for(DataInfo d : values){
up_sum += d.getUpPayLoad();
down_sum += d.getDownPayLoad();
}
DataInfo dataInfo = new DataInfo("",up_sum,down_sum);
context.write(key, dataInfo);
}
}
}