Map/ReduceのGroup pingComparatorソートの概要


一、背景
ソートはMRにとって核心的な内容であり、どのようにソートするかは非常に重要であり、ここ数日いくつか書いて、後で読むためにまとめた.
二、準備
1、hadoopバージョンは0.20.2
2、入力したデータフォーマット(これは重要で、フォーマットがはっきり見える)、名前はsecondary.txt:
abc     123
acb     124
cbd     523
abc     234
nbc     563
fds     235
khi     234
cbd     675
fds     971
hka     862
ubd     621
khi     123
fds     321

よく ると、データファイルの の はアルファベットで、2 の は で、 がしなければならないのはこのデータと びつけていくつかのソートのテストを うことです.
3、コードフレームワーク、 のテスト はすべて のコードに する であるため、フレームワークのコードは されないので、まず なコードをここに り けます.
コードは2つの に かれています.カスタムkeyとメインフレームコード( い を てください).まずメインフレームコードを り けます.
MyGrouping.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.run.lenged.business.TextPair;

public class MyGrouping {

	/**
	 * Map
	 * 
	 * @author Administrator
	 */
	public static class MyGroupingMap extends Mapper<LongWritable, Text, TextPair, Text> {
		protected void map(LongWritable key, Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, TextPair, Text>.Context context)
				throws java.io.IOException, InterruptedException {
			String arr[] = value.toString().split("/t");
			if (arr.length != 2) {
				return;
			}
			TextPair tp = new TextPair();
			tp.set(new Text(arr[0]), new Text(arr[1]));
			context.write(tp, new Text(arr[1]));
		}
	}

	/**
	 *  Hashcode 
	 * 
	 * @author Administrator
	 */
	public static class MyGroupingPartition extends Partitioner<TextPair, Text> {
		@Override
		public int getPartition(TextPair key, Text value, int numPartitions) {
			return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
		}
	}

	/**
	 * group 
	 * 
	 * @author Administrator
	 */
	@SuppressWarnings("unchecked")
	public static class MyGroupingGroup extends WritableComparator {
		// 
	}

	/**
	 * reduce
	 * 
	 * @author Administrator
	 */
	public static class MyGroupingReduce extends Reducer<TextPair, Text, Text, Text> {
		protected void reduce(TextPair key, java.lang.Iterable<Text> value,
				org.apache.hadoop.mapreduce.Reducer<TextPair, Text, Text, Text>.Context context)
				throws java.io.IOException, InterruptedException {
			StringBuffer sb = new StringBuffer();
			while (value.iterator().hasNext()) {
				sb.append(value.iterator().next().toString() + "_");
			}
			context.write(key.getFirst(), new Text(sb.toString().substring(0, sb.toString().length() - 1)));
		}
	}

	public static void main(String args[]) throws Exception {
		Configuration conf = new Configuration();
		GenericOptionsParser parser = new GenericOptionsParser(conf, args);
		String[] otherArgs = parser.getRemainingArgs();
		if (args.length != 2) {
			System.err.println("Usage: NewlyJoin <inpath> <output>");
			System.exit(2);
		}

		Job job = new Job(conf, "MyGrouping");
		//  job
		job.setJarByClass(MyGrouping.class);
		//  Map 
		job.setMapperClass(MyGroupingMap.class);
		job.setMapOutputKeyClass(TextPair.class);
		job.setMapOutputValueClass(Text.class);
		job.setPartitionerClass(MyGroupingPartition.class);
		
		job.setGroupingComparatorClass(MyGroupingGroup.class);
		
		//  reduce
		job.setReducerClass(MyGroupingReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		//  
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		//  , 
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

TextPair.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair> {

	private Text first;
	private Text second;

	public TextPair() {
		set(new Text(), new Text());
	}

	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}

	public Text getFirst() {
		return first;
	}

	public Text getSecond() {
		return second;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}

	@Override
	public int compareTo(TextPair o) {
		int cmp = first.compareTo(o.first);
		if (cmp != 0) {
			return cmp;
		} else {
			return second.compareTo(o.second);
		}
	}
}

、テストの
1、まず を して、 たちは と びつけてテストして、それから します. は、1 のカラムの が じ 、2 のカラムの が され、2 のカラムの が にソートされます. に するときは、 のカラム の に を べ えます.2、 . の に づいて,1 のフィールドと2 のフィールドを べ える がある ,MRフレームワークを に してkey をvalue することはできないと できる.valueはソートされていないからです.だから たちはいくつかの をして、keyを に するように する があります.TextPair.JAvaクラスはカスタムkeyです. にkeyとvalueを にソートする 、カスタムコンビネーションkeyのフォーマットの の は のフィールドであり、2 の は2 のフィールドです.3、ではjobを します.setGroupingComparatorClass(MyGroupingGroup.class);コードは のとおりです.
public static class MyGroupingGroup extends WritableComparator {
		public int compare(WritableComparable a, WritableComparable b) {
			return mip1.getFirst().compareTo(mip2.getFirst());
	}


		protected MyGroupingGroup() {
			super(TextPair.class, true);
		}

		@Override
			TextPair mip1 = (TextPair) a;
			TextPair mip2 = (TextPair) b;
		}

された コンポーネントの の のみをソートします. の は のとおりです.
abc 123_234
cbd 523_675
khi 123_234
ubd 621
nbc 563
acb 124
fds 235_321_971
hka 862

4、 を ると、 のニーズを に たしていることがわかります.では に、MRのソート を するためのテストを います.
、Groupは つ のフィールド で べ えテストを う
1、groupの べ え を し、2 の に して べ えを い、コードは の りである.
public static class MyGroupingGroup extends WritableComparator {
		protected MyGroupingGroup() {
			super(TextPair.class, true);
		}

		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			TextPair mip1 = (TextPair) a;
			TextPair mip2 = (TextPair) b;
			return mip1.getSecond().compareTo(mip2.getSecond());
			//return mip1.getFirst().compareTo(mip2.getFirst());
		}
	}

2、reduceの は し して、2 のフィールドも して、 に て、コードは の りです.
context.write(key.getFirst(), new Text(sb.toString().substring(0, sb.toString().length() - 1)));

reduce の :
abc_123	123
abc_234	234
acb_124	124
cbd_523	523
cbd_675	675
fds_235	235
fds_321	321
fds_971	971
hka_862	862
khi_123	123
khi_234	234
nbc_563	563
ubd_621	621

3、 を て、 は の に っていないで、 でソート を います.
はそうではありません.この は かにgroupのソートが われていますが、 データに していないということです.だから、ソートしていないように えます.
ここにはgroupがいつソートされたのかという があります. はこのように かれています.
Job.setGroupingComparatorClass(Class cls)  Define the comparator that controls which keys are grouped together  for a single call to Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
は してみました( のレベルは に られていて、 っているところは さんに してもらいたいです):
reduceの び し にcomparatorを し、グループ されたkeyをソートします.
なぜkhi_123,123およびabc_123,123は なっていません.
、まとめ
1、ここにはgroupのソートしか かれていません.sortは かれていません. で きます. かもしれません.
2、そのうちMRの フローを いて、 を いて、 ってみてください.
3、このランキングについては も して もないので、 っているところがあるかもしれません. が って してくれるのを ています.
4、 やフォローが い は、メールでコミュニケーションすることができます[email protected]