Hadoop reduce複数出力


転入先:Hadoop in Action
hadoopではreduceが複数の出力をサポートするには2つの実装方式がある.
1つ目は、MultipleTextOutputFormatクラスを継承し、generateFileNameForKeyメソッドを書き換えることです.
public static class PartitionByCountryMTOF
    extends MultipleTextOutputFormat<Text,Text>
{
    protected String generateFileNameForKeyValue(Text key,
            Text value, String filename)
    {
        String[] arr = value.toString().split(",", -1);
        String country = arr[4].substring(1,3);
        return country + "/" + filename;
    }
}


public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    JobConf job = new JobConf(conf, MultiFile.class);
    Path in = new Path(args[0]);
    Path out = new Path(args[1]);
    FileInputFormat.setInputPaths(job, in);
    FileOutputFormat.setOutputPath(job, out);
    job.setJobName(“MultiFile”);
    job.setMapperClass(MapClass.class);
    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(PartitionByCountryMTOF.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    job.setNumReduceTasks(0);
    JobClient.runJob(job);
    return 0;
}

この方法の制限は明らかであり、出力するファイルは各行のデータに従ってしか決定できず、各行のデータに対して1つの出力ファイルしか決定できない.同じ行のデータを複数のファイルに同時に出力する必要がある場合は、それはできません.この場合、MultipleOutputsクラスを使用できます.
public class MultiFile extends Confi gured implements Tool {
    public static class MapClass extends MapReduceBase
        implements Mapper<LongWritable, Text, NullWritable, Text> {
            private MultipleOutputs mos;

            private OutputCollector<NullWritable, Text> collector;
            public void confi gure(JobConf conf) {
                mos = new MultipleOutputs(conf);
            }

            public void map(LongWritable key, Text value,
                    OutputCollector<NullWritable, Text> output,
                    Reporter reporter) throws IOException {
                String[] arr = value.toString().split(",", -1);
                String chrono = arr[0] + "," + arr[1] + "," + arr[2];
                String geo = arr[0] + "," + arr[4] + "," + arr[5];
                collector = mos.getCollector("chrono", reporter);
                collector.collect(NullWritable.get(), new Text(chrono));
                collector = mos.getCollector("geo", reporter);
                collector.collect(NullWritable.get(), new Text(geo));
            }

            public void close() throws IOException {
                mos.close();
            }
    }

    public int run(String[] args) throws Exception {
        Confi guration conf = getConf();
        JobConf job = new JobConf(conf, MultiFile.class);
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setJobName("MultiFile");
        job.setMapperClass(MapClass.class);
        job.setInputFormat(TextInputFormat.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        MultipleOutputs.addNamedOutput(job,
                "chrono",
                TextOutputFormat.class,
                NullWritable.class,
                Text.class);
        MultipleOutputs.addNamedOutput(job,
                "geo",
                TextOutputFormat.class,
                NullWritable.class,
                Text.class);
        JobClient.runJob(job);
        return 0;
    }
}

このクラスはのmapを維持している.job構成にcollectorを追加し、reduceメソッドで対応するcollectorを取得してcollectorを呼び出すことができます.writeでいいです.
最後に、reduceのすべての出力がnamed collectorにある場合、フレームワークが最後にcounterを統計すると、reduce records=0が得られます(ただし、実際には正常な出力があります).