MapReduce setup map(reduce)cleanupの例


Mapperクラスを継承するには、3つのメソッドを書き換えることができます. 
1.setup
  各mapが作成時に実行するメソッド、すなわち、このメソッドは1回のみ実行され、ファイル名の取得などの準備作業に一般的に使用されます.
2.map
  本当に分類操作を実行する方法.
3.cleanup
mapがタスク破棄を完了したときに実行する方法は、一度だけ実行され、一般的には終了作業に使用されます.
この3つの方法はreduceのようにデータを転送することができます
コード#コード#    映画ごとに20個のデータを取ります(ソートの前に書いたので、書きません):
mapの個数に注意し、複数のmapがあると20以上のデータが取り出されます
package nuc.edu.ls;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class MovieBean implements WritableComparable{
    private String movie;
    private String rate;
    private String timeStamp;
    private String uid;
    
	public String getMovie() {
		return movie;
	}
	public void setMovie(String movie) {
		this.movie = movie;
	}
	public String getRate() {
		return rate;
	}
	public void setRate(String rate) {
		this.rate = rate;
	}
	public String getTimeStamp() {
		return timeStamp;
	}
	public void setTimeStamp(String timeStamp) {
		this.timeStamp = timeStamp;
	}
	public String getUid() {
		return uid;
	}
	public void setUid(String uid) {
		this.uid = uid;
	}
	
	public void set(String movie, String rate, String timeStamp, String uid) {
		this.movie = movie;
		this.rate = rate;
		this.timeStamp = timeStamp;
		this.uid = uid;
	}
	
	@Override
	public String toString() {
		return "MovieBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
	}
	@Override
	public void readFields(DataInput arg0) throws IOException {
		movie = arg0.readUTF();
		rate = arg0.readUTF();
		timeStamp = arg0.readUTF();
		uid = arg0.readUTF();
	}
	@Override
	public void write(DataOutput arg0) throws IOException {
		arg0.writeUTF(movie);
		arg0.writeUTF(rate);
		arg0.writeUTF(timeStamp);
		arg0.writeUTF(uid);
		
	}
	@Override
	public int compareTo(MovieBean o) {
		// TODO Auto-generated method stub
		return 0;
	}
}




package nuc.edu.ls;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

import org.apache.commons.collections.list.TreeList;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.alibaba.fastjson.JSON;

public class TopN {

	static Map> map;

	public static class MapTask extends Mapper {
          
		@Override
		protected void setup(Mapper.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			map = new HashMap<>();
		}

		@Override
		protected void map(LongWritable key, Text value,
				Mapper.Context context)
				throws IOException, InterruptedException {
			MovieBean movieBean2 = JSON.parseObject(value.toString(), MovieBean.class);
			List list = map.getOrDefault(movieBean2.getMovie(), new ArrayList());
			list.add(movieBean2);
			map.put(movieBean2.getMovie(), list);
		}

		@Override
		protected void cleanup(Mapper.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Set keySet = map.keySet();
			for (String string : keySet) {
				List list = map.get(string);
				Collections.sort(list, new Comparator() {

					@Override
					public int compare(MovieBean o1, MovieBean o2) {
						// TODO Auto-generated method stub
						return o1.getMovie().compareTo(o2.getMovie());
					}
				});
				if (list.size() > 19) {
					for (int i = 0; i < 20; i++) {
						context.write(NullWritable.get(), list.get(i));
					}
				} else {
					for (int i = 0; i < list.size(); i++) {
						context.write(NullWritable.get(), list.get(i));
					}
				}
			}
		}

	}

	public static class ReduceTask extends Reducer {

		@Override
		protected void reduce(NullWritable arg0, Iterable arg1,
				Reducer.Context arg2)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			for (MovieBean movieBean : arg1) {

				arg2.write(movieBean, NullWritable.get());
			}
		}
	}

	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "root");

		Configuration conf = new Configuration();
		conf.set("dfs.block.size", "10567840");
		conf.set("dfs.replication", "1");
		conf.set("mapred.min.split.size", "1");
		Job job = Job.getInstance(conf, "eclipseToCluster");

		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(TopN.class);
		job.setMapOutputKeyClass(NullWritable.class);
		job.setMapOutputValueClass(MovieBean.class);
		job.setOutputKeyClass(MovieBean.class);
		job.setOutputValueClass(NullWritable.class);
		// job.setCombinerClass(ReduceTask.class);
		FileInputFormat.addInputPath(job, new Path("d:/rating.json"));
		FileOutputFormat.setOutputPath(job, new Path("d:/outdata/movie"));
		File file = new File("d:/outdata/movie");
		if (file.exists()) {
			FileUtils.deleteDirectory(file);
		}
		boolean completion = job.waitForCompletion(true);
		System.out.println(completion ? 0 : 1);

	}
}


結果: