Partitionerプログラミング——運営者グループによってユーザーのインターネットトラフィックを統計する

6984 ワード

  • Partitionerはpartionerのベースクラスであり、カスタムpartionerが必要な場合もクラスを継承する必要があります.
  • HashPartitionrはmapreduceのデフォルトpartitionrです.計算方法はwhich reducer=(key.hashCode()&Integer.MAX_VALUE)%numReduceTasksであり、現在の目的reducerを得る.
  • (例jar形式で実行)
  • 並べ替えとグループ化
  • mapとreduceの段階でソートする場合、k 2を比較する.v 2はソート比較に関与しない.v 2もソートするには、k 2とv 2を新しいクラスに組み立て、k 2として比較に参加する必要があります.
  • パケットの場合もk 2で比較した.

  • パーティションの数は誰が決めますか?—reducer!!
    reducerがあるだけpartitionerがある
    public class DataCount {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(DataCount.class);
    
            job.setMapperClass(DCMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DataInfo.class);
    
            job.setReducerClass(DCReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DataInfo.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //  partitioner    
            job.setPartitionerClass(DCPartitioner.class);
    
            job.setNumReduceTasks(Integer.parseInt(args[2]));
    
    
            job.waitForCompletion(true);
    
        }
        //Map   k1:   v1:     k2:    v2:       javaBean
        public static class DCMapper extends Mapper<LongWritable, Text, Text, DataInfo>{
    
            private Text k = new Text();
    
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, DataInfo>.Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("\t");
                String tel = fields[1];
                long up = Long.parseLong(fields[8]);
                long down = Long.parseLong(fields[9]);
                DataInfo dataInfo = new DataInfo(tel,up,down);
                k.set(tel);
                context.write(k, dataInfo);
    
            }
    
        }
        //Partition   
        /** * partition        ?----reducer !! *     reducer     partitioner */
        public static class DCPartitioner extends Partitioner<Text, DataInfo>{
            //    map             
            //static        
            private static Map<String,Integer> provider = new HashMap<String,Integer>();
    
            static{
                provider.put("138", 1);
                provider.put("139", 1);
                provider.put("152", 2);
                provider.put("153", 2);
                provider.put("182", 3);
                provider.put("183", 3);
            }
            /** *   :int    ,       map */
            @Override
            public int getPartition(Text key, DataInfo value, int numPartitions) {
                //            
                String tel_sub = key.toString().substring(0,3);
                //        ,          
                Integer count = provider.get(tel_sub);
                if(count == null){
                    count = 0;
                }
                //    
                return count;
            }
    
        }
    
        //Reduce   k2:    v2:dataInfo    k3:    v3:dataInfo
        public static class DCReducer extends Reducer<Text, DataInfo, Text, DataInfo>{
    
            @Override
            protected void reduce(Text key, Iterable<DataInfo> values,Reducer<Text, DataInfo, Text, DataInfo>.Context context)
                    throws IOException, InterruptedException {
                long up_sum = 0;
                long down_sum = 0;
                for(DataInfo d : values){
                    up_sum += d.getUpPayLoad();
                    down_sum += d.getDownPayLoad();
                }
                DataInfo dataInfo = new DataInfo("",up_sum,down_sum);
    
                context.write(key, dataInfo);
            }
    
        }
    
    
    }