Hadoop reduce複数出力
転入先:Hadoop in Action
hadoopではreduceが複数の出力をサポートするには2つの実装方式がある.
1つ目は、MultipleTextOutputFormatクラスを継承し、generateFileNameForKeyメソッドを書き換えることです.
このクラスはのmapを維持している.job構成にcollectorを追加し、reduceメソッドで対応するcollectorを取得してcollectorを呼び出すことができます.writeでいいです.
最後に、reduceのすべての出力がnamed collectorにある場合、フレームワークが最後にcounterを統計すると、reduce records=0が得られます(ただし、実際には正常な出力があります).
hadoopではreduceが複数の出力をサポートするには2つの実装方式がある.
1つ目は、MultipleTextOutputFormatクラスを継承し、generateFileNameForKeyメソッドを書き換えることです.
public static class PartitionByCountryMTOF
extends MultipleTextOutputFormat<Text,Text>
{
protected String generateFileNameForKeyValue(Text key,
Text value, String filename)
{
String[] arr = value.toString().split(",", -1);
String country = arr[4].substring(1,3);
return country + "/" + filename;
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, MultiFile.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName(“MultiFile”);
job.setMapperClass(MapClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(PartitionByCountryMTOF.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
JobClient.runJob(job);
return 0;
}
この方法の制限は明らかであり、出力するファイルは各行のデータに従ってしか決定できず、各行のデータに対して1つの出力ファイルしか決定できない.同じ行のデータを複数のファイルに同時に出力する必要がある場合は、それはできません.この場合、MultipleOutputsクラスを使用できます.public class MultiFile extends Confi gured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, NullWritable, Text> {
private MultipleOutputs mos;
private OutputCollector<NullWritable, Text> collector;
public void confi gure(JobConf conf) {
mos = new MultipleOutputs(conf);
}
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, Text> output,
Reporter reporter) throws IOException {
String[] arr = value.toString().split(",", -1);
String chrono = arr[0] + "," + arr[1] + "," + arr[2];
String geo = arr[0] + "," + arr[4] + "," + arr[5];
collector = mos.getCollector("chrono", reporter);
collector.collect(NullWritable.get(), new Text(chrono));
collector = mos.getCollector("geo", reporter);
collector.collect(NullWritable.get(), new Text(geo));
}
public void close() throws IOException {
mos.close();
}
}
public int run(String[] args) throws Exception {
Confi guration conf = getConf();
JobConf job = new JobConf(conf, MultiFile.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MultiFile");
job.setMapperClass(MapClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job,
"chrono",
TextOutputFormat.class,
NullWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"geo",
TextOutputFormat.class,
NullWritable.class,
Text.class);
JobClient.runJob(job);
return 0;
}
}
このクラスは
最後に、reduceのすべての出力がnamed collectorにある場合、フレームワークが最後にcounterを統計すると、reduce records=0が得られます(ただし、実際には正常な出力があります).