mapreduceプログラムreduce出力制御
1,hadoopではreduceは複数の出力をサポートしており、出力されるファイル名も制御可能であり、M u l t i p l e TextOutputFormatクラスを継承しgenerateFileNameForKeyメソッドを書き換えることである
新しいバージョンのhadoopAPIではJobクラスによって様々なパラメータが設定されていますが、Jobを呼び出します.M u l t i p l e TextOutputFormat()を使用してM u t i p l e TextOutputFormatを使用する場合、子orgを継承する必要があるため、エラーが発生する.apache.hadoop.mapreduce.OutputFormat.0.20.2致命的なバグの1つです.0.21にアップグレードすると解決します.
2.同じ行のデータを複数のファイルに同時に出力する必要がある場合は、MultipleOutputsクラスを使用します.
public class MultiFile extends Confi gured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper {
private MultipleOutputs mos;
private OutputCollector collector;
public void confi gure(JobConf conf) {
mos = new MultipleOutputs(conf);
}
public void map(LongWritable key, Text value,
OutputCollector output,
Reporter reporter) throws IOException {
String[] arr = value.toString().split(",", -1);
String chrono = arr[0] + "," + arr[1] + "," + arr[2];
String geo = arr[0] + "," + arr[4] + "," + arr[5];
collector = mos.getCollector("chrono", reporter);
collector.collect(NullWritable.get(), new Text(chrono));
collector = mos.getCollector("geo", reporter);
collector.collect(NullWritable.get(), new Text(geo));
}
public void close() throws IOException {
mos.close();
}
}
public int run(String[] args) throws Exception {
Confi guration conf = getConf();
JobConf job = new JobConf(conf, MultiFile.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MultiFile");
job.setMapperClass(MapClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job,
"chrono",
TextOutputFormat.class,
NullWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"geo",
TextOutputFormat.class,
NullWritable.class,
Text.class);
JobClient.runJob(job);
return 0;
}
}
このクラスはのmapを維持している.job構成にcollectorを追加し、reduceメソッドで対応するcollectorを取得してcollectorを呼び出すことができます.writeでいいです.
public class LzoHandleLogMr extends Configured implements Tool {
static class LzoHandleLogMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
try {
String[] sp = value.toString().split(",");
output.collect(new Text(sp[0]), value);
}catch (Exception e) {
e.printStackTrace();
}
}
}
static class LzoHandleLogReducer extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> {
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, NullWritable> output, Reporter reporter)
throws IOException {
while (values.hasNext()) {
output.collect(values.next(), NullWritable.get());
}
}
}
public static class LogNameMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, NullWritable>
{
@Override
protected String generateFileNameForKeyValue(Text key,
NullWritable value, String name) {
String sp[] = key.toString().split(",");
String filename = sp[0];
if(sp[0].contains(".")) filename="000000000000";
return filename;
}
}
@Override
public int run(String[] args) throws Exception {
JobConf jobconf = new JobConf(LzoHandleLogMr.class);
jobconf.setMapperClass(LzoHandleLogMapper.class);
jobconf.setReducerClass(LzoHandleLogReducer.class);
jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setNumReduceTasks(12);
FileInputFormat.setInputPaths(jobconf,new Path(args[0]));
FileOutputFormat.setOutputPath(jobconf,new Path(args[1]));
FileOutputFormat.setCompressOutput(jobconf, true);
FileOutputFormat.setOutputCompressorClass(jobconf, LzopCodec.class);
JobClient.runJob(jobconf);
return 0;
}
}
新しいバージョンのhadoopAPIではJobクラスによって様々なパラメータが設定されていますが、Jobを呼び出します.M u l t i p l e TextOutputFormat()を使用してM u t i p l e TextOutputFormatを使用する場合、子orgを継承する必要があるため、エラーが発生する.apache.hadoop.mapreduce.OutputFormat.0.20.2致命的なバグの1つです.0.21にアップグレードすると解決します.
2.同じ行のデータを複数のファイルに同時に出力する必要がある場合は、MultipleOutputsクラスを使用します.
public class MultiFile extends Confi gured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper
private MultipleOutputs mos;
private OutputCollector
public void confi gure(JobConf conf) {
mos = new MultipleOutputs(conf);
}
public void map(LongWritable key, Text value,
OutputCollector
Reporter reporter) throws IOException {
String[] arr = value.toString().split(",", -1);
String chrono = arr[0] + "," + arr[1] + "," + arr[2];
String geo = arr[0] + "," + arr[4] + "," + arr[5];
collector = mos.getCollector("chrono", reporter);
collector.collect(NullWritable.get(), new Text(chrono));
collector = mos.getCollector("geo", reporter);
collector.collect(NullWritable.get(), new Text(geo));
}
public void close() throws IOException {
mos.close();
}
}
public int run(String[] args) throws Exception {
Confi guration conf = getConf();
JobConf job = new JobConf(conf, MultiFile.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MultiFile");
job.setMapperClass(MapClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job,
"chrono",
TextOutputFormat.class,
NullWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"geo",
TextOutputFormat.class,
NullWritable.class,
Text.class);
JobClient.runJob(job);
return 0;
}
}
このクラスは