mapreduceプログラムreduce出力制御


1,hadoopではreduceは複数の出力をサポートしており、出力されるファイル名も制御可能であり、M u l t i p l e TextOutputFormatクラスを継承しgenerateFileNameForKeyメソッドを書き換えることである
public class LzoHandleLogMr extends Configured implements Tool {

	 static class LzoHandleLogMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
       
	  
		public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
	    	try {
	    	    String[] sp = value.toString().split(",");
	    		output.collect(new Text(sp[0]), value);
	    	}catch (Exception e) {
			   e.printStackTrace();
		    }    	
		}


	}
	static class LzoHandleLogReducer  extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> {
        


		@Override
		public void reduce(Text key, Iterator<Text> values,
				OutputCollector<Text, NullWritable> output, Reporter reporter)
				throws IOException {
			while (values.hasNext()) {
		   		  output.collect(values.next(), NullWritable.get());   
		   	   }
			
		}	
	}
	
	public static class LogNameMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, NullWritable> 
	   {


		@Override
		protected String generateFileNameForKeyValue(Text key,
				NullWritable value, String name) {
			String sp[] = key.toString().split(",");
			String filename = sp[0];
			if(sp[0].contains(".")) filename="000000000000";
			return filename;
		}
		
	}
    


	@Override
	public int run(String[] args) throws Exception {
		 
		    JobConf jobconf = new JobConf(LzoHandleLogMr.class);
		    jobconf.setMapperClass(LzoHandleLogMapper.class);
		    jobconf.setReducerClass(LzoHandleLogReducer.class);
		    jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);
		    jobconf.setOutputKeyClass(Text.class);
		    jobconf.setNumReduceTasks(12);
		    
		    
		 FileInputFormat.setInputPaths(jobconf,new Path(args[0]));
	    	FileOutputFormat.setOutputPath(jobconf,new Path(args[1]));
	    	FileOutputFormat.setCompressOutput(jobconf, true);
	    	FileOutputFormat.setOutputCompressorClass(jobconf, LzopCodec.class);  
	    	
	    	JobClient.runJob(jobconf);
	      return 0;
			
	}
}

新しいバージョンのhadoopAPIではJobクラスによって様々なパラメータが設定されていますが、Jobを呼び出します.M u l t i p l e TextOutputFormat()を使用してM u t i p l e TextOutputFormatを使用する場合、子orgを継承する必要があるため、エラーが発生する.apache.hadoop.mapreduce.OutputFormat.0.20.2致命的なバグの1つです.0.21にアップグレードすると解決します.
2.同じ行のデータを複数のファイルに同時に出力する必要がある場合は、MultipleOutputsクラスを使用します.
public class MultiFile extends Confi gured implements Tool {  
    public static class MapClass extends MapReduceBase  
        implements Mapper {  
            private MultipleOutputs mos;  
  
            private OutputCollector collector;  
            public void confi gure(JobConf conf) {  
                mos = new MultipleOutputs(conf);  
            }  
  
            public void map(LongWritable key, Text value,  
                    OutputCollector output,  
                    Reporter reporter) throws IOException {  
                String[] arr = value.toString().split(",", -1);  
                String chrono = arr[0] + "," + arr[1] + "," + arr[2];  
                String geo = arr[0] + "," + arr[4] + "," + arr[5];  
                collector = mos.getCollector("chrono", reporter);  
                collector.collect(NullWritable.get(), new Text(chrono));  
                collector = mos.getCollector("geo", reporter);  
                collector.collect(NullWritable.get(), new Text(geo));  
            }  
  
            public void close() throws IOException {  
                mos.close();  
            }  
    }  
  
    public int run(String[] args) throws Exception {  
        Confi guration conf = getConf();  
        JobConf job = new JobConf(conf, MultiFile.class);  
        Path in = new Path(args[0]);  
        Path out = new Path(args[1]);  
        FileInputFormat.setInputPaths(job, in);  
        FileOutputFormat.setOutputPath(job, out);  
        job.setJobName("MultiFile");  
        job.setMapperClass(MapClass.class);  
        job.setInputFormat(TextInputFormat.class);  
        job.setOutputKeyClass(NullWritable.class);  
        job.setOutputValueClass(Text.class);  
        job.setNumReduceTasks(0);  
        MultipleOutputs.addNamedOutput(job,  
                "chrono",  
                TextOutputFormat.class,  
                NullWritable.class,  
                Text.class);  
        MultipleOutputs.addNamedOutput(job,  
                "geo",  
                TextOutputFormat.class,  
                NullWritable.class,  
                Text.class);  
        JobClient.runJob(job);  
        return 0;  
    }  
}  
このクラスはのmapを維持している.job構成にcollectorを追加し、reduceメソッドで対応するcollectorを取得してcollectorを呼び出すことができます.writeでいいです.