Hadoopシリーズ(4)MapReduceはカスタムbeanを渡し、結果のソート、mapperとreducerの同時数の分析

8354 ワード

実例要求:携帯電話番号の上りと下りの流量統計を実現し、グループテストデータを下図に示す:
解析:通常、mapperとreducerの入出力タイプはLongWritable、Textなどであり、カスタムbeanを渡す場合はhadoopのシーケンス化仕様に合致する必要があります.LongWritableソースコードを表示すると、WritableComparableインタフェースが実装されていることがわかります.
/** A WritableComparable for longs. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class LongWritable implements WritableComparable {...}

同様に,我々がカスタマイズしたbeanをhadoopのmapreduceフレームワークに伝達するには,同様のインタフェースを実現する必要がある.実際、Writable ComparableインタフェースはWritableインタフェースとComparableインタフェースの組み合わせインタフェースであり、beanをそれぞれシーケンス化して比較することができます.
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable extends Writable, Comparable {
}

jdkのデフォルトのシーケンス化方式とは異なり、hadoopではbeanの継承構造と実装インタフェースのシーケンス化が除去され、bean内部のフィールドのみが保持され、ネットワーク伝送の帯域幅が節約されます.
次にhadoopシーケンス化仕様に合致するbeanを実現します.
FlowBean:(対応するフィールドのsetterとgetterメソッドは省略)
public class FlowBean implements Writable {
    
    private String phone;
    private long upStream;
    private long downStream;
    private long sumStream;
    
    /**
     *  , 
     */
    public FlowBean() {}
    
    public FlowBean(String phone, long upStream, long downStream) {
        super();
        this.phone = phone;
        this.upStream = upStream;
        this.downStream = downStream;
        this.sumStream = upStream + downStream;
    }

    /**
     *  
     *  
     */
    public void readFields(DataInput input) throws IOException {
        phone = input.readUTF();
        upStream = input.readLong();
        downStream = input.readLong();
        sumStream = input.readLong();
    }

    /**
     *  
     */
    public void write(DataOutput output) throws IOException {
        output.writeUTF(phone);
        output.writeLong(upStream);
        output.writeLong(downStream);
        output.writeLong(sumStream);
    }

    /**
     * reduce 
     */
    @Override
    public String toString() {
        return "" + upStream + "\t" + downStream + "\t" + sumStream;
    }

}

FlowMapper:
public class FlowMapper extends Mapper {
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        String line = value.toString();
        String[] fields = StringUtils.split(line, "\t");
        
        String phone = fields[1];
        long upStream = Long.parseLong(fields[7]);
        long downStream = Long.parseLong(fields[8]);
        
        context.write(new Text(phone), new FlowBean(phone, upStream, downStream));
        
    }

}

FlowReducer:
public class FlowReducer extends Reducer {
    
    @Override
    protected void reduce(Text key, Iterable values, Context context)
            throws IOException, InterruptedException {
        
        long upStreamCounter = 0;
        long downStreamCounter = 0;
        
        for (FlowBean bean: values) {
            upStreamCounter += bean.getUpStream();
            downStreamCounter += bean.getDownStream();
        }
        
        context.write(key, new FlowBean(key.toString(), upStreamCounter, downStreamCounter));
        
    }

}

FlowRunner:Runnerの標準的な実装方式は、Configuredクラスを継承し、Toolインタフェースを実装することである.
public class FlowRunner extends Configured implements Tool{

    public int run(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(FlowRunner.class);
        
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        return job.waitForCompletion(true)?0:1;
    }
    
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new FlowRunner(), args);
        System.exit(res);
    }

}

出力結果のソートは、例えば、総トラフィックに従って高から低にソートされると、FlowBeanは、WritableComparableインタフェースを直接実現することができる.
public class FlowBean implements WritableComparable {...}

compareToメソッドを同時に書き換える:
    public int compareTo(FlowBean o) {
        return sumStream > o.sumStream ? -1 : 1;
    }

mapperとreducerコードを変更するには、次のようにします.
public class SortMR {
    
    public static class SortMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            
            String line = value.toString();
            String[] fields = StringUtils.split(line, "\t");
            String phone = fields[0];
            long upStream = Long.parseLong(fields[1]);
            long downStream = Long.parseLong(fields[2]);
            
            context.write(new FlowBean(phone, upStream, downStream), NullWritable.get());
        }
    }
    
    public static class SortReducer extends Reducer {
        @Override
        protected void reduce(FlowBean key, Iterable values, Context context)
                throws IOException, InterruptedException {
            
            String phone = key.getPhone();
            context.write(new Text(phone), key);
        }
    }
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(SortMR.class);
        
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
        
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

yarnにコミットされた実行結果は次のとおりです.
実行結果をグループ化する場合、すなわち、異なるセグメントの携帯電話番号流量統計結果を異なるファイルに出力する場合は、Reducerの同時タスク数を設定する必要があります.まず、partitionerクラスをカスタマイズします.次のようにします.
public class AreaPartitioner extends Partitioner{

    private static HashMap areaMap = new HashMap();
    
    static{
        areaMap.put("135", 0);
        areaMap.put("136", 1);
        areaMap.put("137", 2);
        areaMap.put("138", 3);
        areaMap.put("139", 4);
    }
    
    @Override
    public int getPartition(KEY key, VALUE value, int numPartitions) {
        // key , , 
        int areaCoder  = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));
        return areaCoder;
    }

}

次にconfigurationで次のように構成します.
    //  
    job.setPartitionerClass(AreaPartitioner.class);
    
    //  reduce , ; , reducer , ;
    //  , ; 1, , reducer , reducer 。
    job.setNumReduceTasks(6);


これによりjob実行の結果は以下のようになる.
テストデータを4つコピーすると、次のようになります.
タスクをyarn処理にコミットし、mapタスクが起動してまだ完了していない前にjavaプロセスを表示します.
同時に5つのYarnChildプロセスがmapタスクを実行していることがわかります.各小さなファイルが1つのblockを占めるため、各blockは1つのプロセスでmapタスク処理を行う必要があります.このようにファイルの数が多ければ多いほど、mapタスクプロセスが多ければ多いほど、リソースを消費するほど、効率が低くなります.
実際,mapタスクの同時数はスライスの数によって決定される.スライスがいくつあるかは、mapタスクを起動して実行します.スライスは論理概念であり、ファイル内のデータのオフセット量を指す.スライスの具体的なサイズは、処理するファイルのサイズに応じて調整する必要があります.具体的にはmapからreduceへの入出力タスク処理プロセスをshuffleと呼ぶ.