MapReduce---接続操作--Reduceエンド接続

6249 ワード

MapReduce---接続操作--Reduceエンド接続
reduceエンド接続は入力データセットが特定の構造に適合することを要求しないため、reduceエンドはmapエンド接続よりも一般的であるが、関連する2つのデータセットはいずれもMapReduceのshufferプロセスを通過する必要があるため、reduceエンドの接続の効率は低いことが多い.
基本構想:mapperは各記録マークソースであり、接続キーをmapの出力キーとして使用し、キーの同じ記録を同じreduceに置く
1、組合せCombKeyを定義する
package hadoop.join.reduce;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class CombKey implements WritableComparable {
	//  :0-customer 1-order
	public int type = -1 ;
	//customer id
	public int cid = -1 ;
	//order id
	public int oid = -1;

	/**
	 *   
	 */
	public int compareTo(CombKey o) {
		int otype = o.type ;
		int ocid = o.cid ;
		int ooid = o.oid ;
		//    
		if(type == otype){
			if(type == 0){
				return cid - ocid ;
			}
			else{
				//      
				if(cid == ocid){
					return oid - ooid ;
				}
				//       
				else{
					return cid - ocid ;
				}
			}
		}
		//   
		else{
			//      
			if(type == 0){
				//         
				if(cid == ocid){
					return -1 ;
				}
				else{
					return cid - ocid ;
				}
			}
			else{
				if(cid == ocid){
					return 1 ;
				}
				else{
					return cid - ocid ;
				}
			}
		}
	}

	public void write(DataOutput out) throws IOException {
		out.writeInt(type);
		out.writeInt(cid);
		out.writeInt(oid);

	}

	public void readFields(DataInput in) throws IOException {
		this.type = in.readInt() ;
		this.cid = in.readInt() ;
		this.oid = in.readInt() ;
	}
}

2、JoinMapper
package hadoop.join.reduce;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;

/**
 * Mapper
 */
public class JoinMapper extends Mapper{

	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		//    
		String line = value.toString();
		String[] arr = line.split(",") ;

		FileSplit split = (FileSplit) context.getInputSplit();
		String path = split.getPath().getName() ;

		CombKey keyOut = new CombKey() ;
		//customer
		if(path.contains("customers")){
			keyOut.type = 0 ;
			keyOut.cid = Integer.parseInt(arr[0]) ;
		}
		//  
		else{
			keyOut.type = 1;
			keyOut.cid = Integer.parseInt(arr[3]);
			keyOut.oid = Integer.parseInt(arr[0]);
		}
		context.write(keyOut,value);
	}
}

3、JoinReducer
package hadoop.join.reduce;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 *
 */
public class JoinReducer extends Reducer{

	protected void reduce(CombKey key, Iterable values, Context context) throws IOException, InterruptedException {
		System.out.println("================================");
		//customer
		Iterator it = values.iterator() ;
		if(key.type == 0){
			//  custInfo
			String custInfo = it.next().toString() ;
			System.out.println(custInfo);
			while(it.hasNext()){
				String orderInfo = it.next().toString();

				System.out.println(custInfo + "," + orderInfo);
				context.write(new Text(custInfo + "," + orderInfo),NullWritable.get());
			}
		}
		//order
		else{
			while (it.hasNext()) {
				String orderInfo = it.next().toString();
				System.out.println("NULL," + orderInfo);
				context.write(new Text("NULL," + orderInfo), NullWritable.get());
			}
		}
	}
}

4、カスタムCIDPartitionr
package hadoop.join.reduce;

import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.io.Text;

/**
 * 
 */
public class CIDPartitioner extends Partitioner{

	public int getPartition(CombKey key, Text text, int numPartitions) {
		return key.cid % numPartitions ;
	}
}

5、カスタムCIDGroup Comparator
package hadoop.join.reduce;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * cid     
 */
public class CIDGroupComparator extends WritableComparator{
	protected CIDGroupComparator() {
		super(CombKey.class, true);
	}

	public int compare(WritableComparable k1, WritableComparable k2) {
		CombKey ck1 = (CombKey) k1;
		CombKey ck2 = (CombKey) k2;
		return ck1.cid - ck2.cid ;

	}
}

6、App
package hadoop.join.reduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * join:reduce   
 */
public class App {
	public static void main(String[] args) throws Exception {
		args = new String[]{"d:/java/mr/join", "d:/java/mr/out"} ;
		Configuration conf = new Configuration();

		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1]))){
			fs.delete(new Path(args[1]),true);
		}

		Job job = Job.getInstance(conf);

		job.setJobName("join-reduce");
		job.setJarByClass(App.class);

		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);

		//      
		FileInputFormat.addInputPath(job,new Path(args[0]));
		//      
		FileOutputFormat.setOutputPath(job,new Path(args[1]));

		//  mapreduce  
		job.setMapOutputKeyClass(CombKey.class);
		job.setMapOutputValueClass(Text.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		job.setGroupingComparatorClass(CIDGroupComparator.class);
		job.setPartitionerClass(CIDPartitioner.class);

		job.setNumReduceTasks(2);

		//     (job)
		job.waitForCompletion(true) ;
	}
}