MapReduce 2列データ昇順配列

7160 ワード


データ#データ#
3	3
3	2
3	1
2	2
2	1
1	1

2列のデータを昇順に並べ替える
 
static class SortMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
			String val = value.toString();
			String[] vals = val.split("\t");
			if (vals.length == 2)
				context.write(new LongWritable(Long.parseLong(vals[0])), new LongWritable(Long.parseLong(vals[1])));
		}
	}

	static class SortReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {

		@Override
		protected void reduce(LongWritable key, Iterable<LongWritable> value, Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
			Iterator<LongWritable> iterator = value.iterator();
			while (iterator.hasNext()) {
				LongWritable next = iterator.next();
				context.write(key, next);
			}
		}
	}

出力結果
1	1
2	2
2	1
3	3
3	2
3	1

 
MapReduceのデフォルトではkeyのみがソートされ、2列のデータを保存するための新しいデータ型が作成されます.新しいタイプはWritableComparableインタフェースを実装し、comparareTo()を複写して2列のデータの比較を実現します.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import utils.HDPConstants;

public class SortApp2 extends Configured implements Tool {

	static class SortMapper extends Mapper<LongWritable, Text, DataBean, NullWritable> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DataBean, NullWritable>.Context context) throws IOException, InterruptedException {
			String val = value.toString();
			String[] vals = val.split("\t");
			if (vals.length == 2)
				context.write(new DataBean(Long.parseLong(vals[0]), Long.parseLong(vals[1])), NullWritable.get());
		}
	}

	static class SortReducer extends Reducer<DataBean, NullWritable, LongWritable, LongWritable> {

		@Override
		protected void reduce(DataBean key, Iterable<NullWritable> value, Reducer<DataBean, NullWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(key.getFirst()), new LongWritable(key.getSecond()));

		}
	}

	/**
	 * 
	 * @author Rock Lee
	 *  
	 * @Description  , 
	 */
	static class DataBean implements WritableComparable<DataBean> {
		private Long first;
		private Long second;

		public DataBean() {
			super();
		}

		public DataBean(Long first, Long second) {
			super();
			this.first = first;
			this.second = second;
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(first);
			out.writeLong(second);
		}

		public Long getFirst() {
			return this.first;
		}

		public void setFirst(Long first) {
			this.first = first;
		}

		public Long getSecond() {
			return this.second;
		}

		public void setSecond(Long second) {
			this.second = second;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			first = in.readLong();
			second = in.readLong();
		}

		@Override
		public int compareTo(DataBean o) {
			long offset = this.first - o.first;
			if (offset != 0)
				return (int) offset;
			return (int) (this.second - o.second);
		}

		@Override
		public int hashCode() {
			final int prime = 31;
			long result = 1L;
			result = prime * result + this.first;
			result = prime * result + this.second;
			return (int) result;
		}

		@Override
		public boolean equals(Object obj) {
			if (this == obj)
				return true;
			if (obj == null)
				return false;
			if (getClass() != obj.getClass())
				return false;
			DataBean other = (DataBean) obj;
			if (this.first != other.first)
				return false;
			if (this.second != other.second)
				return false;
			return true;
		}

		@Override
		public String toString() {
			return "DataBean [first=" + this.first + ", second=" + this.second + "]";
		}

	}

	@Override
	public int run(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = new Job(conf, SortApp2.class.getSimpleName());

		checkOutputPath(conf);

		job.setMapperClass(SortMapper.class);
		job.setReducerClass(SortReducer.class);

		job.setOutputKeyClass(DataBean.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(HDPConstants.SORT_APP_INPUT_PATH));
		FileOutputFormat.setOutputPath(job, new Path(HDPConstants.SORT_APP_OUTPUT_PATH));

		boolean exitCode = job.waitForCompletion(true);
		return exitCode ? 1 : 0;
	}

	private void checkOutputPath(Configuration configuration) throws IOException, URISyntaxException {
		final FileSystem fileSystem = FileSystem.get(new URI(HDPConstants.SORT_APP_INPUT_PATH), configuration);
		final Path outPath = new Path(HDPConstants.SORT_APP_OUTPUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
			System.out.println("delete -->" + HDPConstants.SORT_APP_OUTPUT_PATH);
		}
	}

	public static void main(String[] args) throws Exception {
		int status = ToolRunner.run(new SortApp2(), args);
		System.exit(status);
	}
}

出力結果
1	1
2	1
2	2
3	1
3	2
3	3