MapReduceインスタンス-ChainMapperの使用


APIの説明に従います.
/** 
* The ChainMapper class allows to use multiple Mapper classes within a single
 * Map task.
 * <p/>
 * The Mapper classes are invoked in a chained (or piped) fashion, the output of
 * the first becomes the input of the second, and so on until the last Mapper,
 * the output of the last Mapper will be written to the task's output.
 * <p/>
 * The key functionality of this feature is that the Mappers in the chain do not
 * need to be aware that they are executed in a chain. This enables having
 * reusable specialized Mappers that can be combined to perform composite
 * operations within a single task.
 * <p/>
 * Special care has to be taken when creating chains that the key/values output
 * by a Mapper are valid for the following Mapper in the chain. It is assumed
 * all Mappers and the Reduce in the chain use maching output and input key and
 * value classes as no conversion is done by the chaining code.
 * <p/>
 * Using the ChainMapper and the ChainReducer classes is possible to compose
 * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
 * immediate benefit of this pattern is a dramatic reduction in disk IO.
 * <p/>
 * IMPORTANT: There is no need to specify the output key/value classes for the
 * ChainMapper, this is done by the addMapper for the last mapper in the chain.
 * <p/>
**/

インスタンスコード:
package com.joey.mapred.chainjobs;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ChainJobs extends Configured implements Tool {

	public static class TokenizerMapper extends MapReduceBase implements
	    Mapper<LongWritable, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(LongWritable key, Text value,
		    OutputCollector<Text, IntWritable> output, Reporter reporter)
		    throws IOException {
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				output.collect(word, one);
			}
		}
	}

	public static class UppercaseMapper extends MapReduceBase implements
	    Mapper<Text, IntWritable, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Text key, IntWritable value,
		    OutputCollector<Text, IntWritable> output, Reporter reporter)
		    throws IOException {
			String line = key.toString();
			word.set(line.toUpperCase());
			output.collect(word, one);
		}
	}

	public static class Reduce extends MapReduceBase implements
	    Reducer<Text, IntWritable, Text, IntWritable> {

		public void reduce(Text key, Iterator<IntWritable> values,
		    OutputCollector<Text, IntWritable> output, Reporter reporter)
		    throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
	}

	public int run(String[] args) throws IOException {

		Configuration conf = getConf();
		JobConf job = new JobConf(conf);
		
		job.setJarByClass(ChainJobs.class);

		job.setJobName("TestforChainJobs");
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		JobConf map1Conf = new JobConf(false);
		ChainMapper.addMapper(job, TokenizerMapper.class, LongWritable.class, Text.class,
		    Text.class, IntWritable.class, true, map1Conf);

		JobConf map2Conf = new JobConf(false);
		ChainMapper.addMapper(job, UppercaseMapper.class, Text.class, IntWritable.class,
		    Text.class, IntWritable.class, true, map2Conf);

		JobConf reduceConf = new JobConf(false);
		ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class,
		    Text.class, IntWritable.class, true, reduceConf);

		JobClient.runJob(job);
		return 0;

	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new ChainJobs(), args);
		System.exit(res);
	}

}

入力データ:
BROWN CORPUS


A Standard Corpus of Present-Day Edited American
English, for use with Digital Computers.


by W. N. Francis and H. Kucera (1964)
Department of Linguistics, Brown University
Providence, Rhode Island, USA


Revised 1971, Revised and Amplified 1979


http://www.hit.uib.no/icame/brown/bcm.html


Distributed with the permission of the copyright holder,
redistribution permitted.

出力結果:
(1964)  1
1971,   1
1979    1
A       1
AMERICAN        1
AMPLIFIED       1
AND     2
BROWN   2
BY      1
COMPUTERS.      1
COPYRIGHT       1
CORPUS  2
DEPARTMENT      1
DIGITAL 1
DISTRIBUTED     1
EDITED  1
ENGLISH,        1
FOR     1
FRANCIS 1
H.      1
HOLDER, 1
HTTP://WWW.HIT.UIB.NO/ICAME/BROWN/BCM.HTML      1
ISLAND, 1
KUCERA  1
LINGUISTICS,    1
N.      1
OF      3
PERMISSION      1
PERMITTED.      1
PRESENT-DAY     1
PROVIDENCE,     1
REDISTRIBUTION  1
REVISED 2
RHODE   1
STANDARD        1
THE     2
UNIVERSITY      1
USA     1
USE     1
W.      1
WITH    2

実行するlog
14/01/11 18:52:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/01/11 18:52:10 WARN snappy.LoadSnappy: Snappy native library not loaded
14/01/11 18:52:10 INFO mapred.FileInputFormat: Total input paths to process : 1
14/01/11 18:52:10 INFO mapred.JobClient: Running job: job_201312251053_53092
14/01/11 18:52:11 INFO mapred.JobClient:  map 0% reduce 0%
14/01/11 18:52:15 INFO mapred.JobClient:  map 100% reduce 0%
14/01/11 18:52:23 INFO mapred.JobClient:  map 100% reduce 100%
14/01/11 18:52:23 INFO mapred.JobClient: Job complete: job_201312251053_53092
14/01/11 18:52:23 INFO mapred.JobClient: Counters: 28
14/01/11 18:52:23 INFO mapred.JobClient:   Job Counters 
14/01/11 18:52:23 INFO mapred.JobClient:     Launched reduce tasks=1
14/01/11 18:52:23 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=7975
14/01/11 18:52:23 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/01/11 18:52:23 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/01/11 18:52:23 INFO mapred.JobClient:     Rack-local map tasks=3
14/01/11 18:52:23 INFO mapred.JobClient:     Launched map tasks=4
14/01/11 18:52:23 INFO mapred.JobClient:     Data-local map tasks=1
14/01/11 18:52:23 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=8379
14/01/11 18:52:23 INFO mapred.JobClient:   FileSystemCounters
14/01/11 18:52:23 INFO mapred.JobClient:     FILE_BYTES_READ=398
14/01/11 18:52:23 INFO mapred.JobClient:     HDFS_BYTES_READ=1423
14/01/11 18:52:23 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=281090
14/01/11 18:52:23 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=5
14/01/11 18:52:23 INFO mapred.JobClient:   Map-Reduce Framework
14/01/11 18:52:23 INFO mapred.JobClient:     Map input records=15
14/01/11 18:52:23 INFO mapred.JobClient:     Reduce shuffle bytes=416
14/01/11 18:52:23 INFO mapred.JobClient:     Spilled Records=98
14/01/11 18:52:23 INFO mapred.JobClient:     Map output bytes=294
14/01/11 18:52:23 INFO mapred.JobClient:     CPU time spent (ms)=4430
14/01/11 18:52:23 INFO mapred.JobClient:     Total committed heap usage (bytes)=1258291200
14/01/11 18:52:23 INFO mapred.JobClient:     Map input bytes=387
14/01/11 18:52:23 INFO mapred.JobClient:     Combine input records=0
14/01/11 18:52:23 INFO mapred.JobClient:     SPLIT_RAW_BYTES=448
14/01/11 18:52:23 INFO mapred.JobClient:     Reduce input records=49
14/01/11 18:52:23 INFO mapred.JobClient:     Reduce input groups=1
14/01/11 18:52:23 INFO mapred.JobClient:     Combine output records=0
14/01/11 18:52:23 INFO mapred.JobClient:     Physical memory (bytes) snapshot=959954944
14/01/11 18:52:23 INFO mapred.JobClient:     Reduce output records=1
14/01/11 18:52:23 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=4436779008
14/01/11 18:52:23 INFO mapred.JobClient:     Map output records=49