MapReduceプログラミングケース.10 th-joinアルゴリズムの改善


MapReduceプログラミングケース.10 th-joinアルゴリズムの改善
Partitioner+CompareTo+GroupingComparatorによる効率的な実現
ケース9 thを参照できます.
直接実装コード:
JoinBean
package cn.edu360.mr.join.improve;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.WritableComparable;

public class JoinBean implements WritableComparable{
	
	private String orderId;
	private String userId;
	private String userName;
	private int userAge;
	private String userFriend;
	private String tableName;
	
	
	
	
	public void set(String orderId, String userId, String userName, int userAge, String userFriend,String tableName) {

		this.orderId = orderId;
		this.userId = userId;
		this.userName = userName;
		this.userAge = userAge;
		this.userFriend = userFriend;
		this.tableName = tableName;
	}
	
	
	
	public String getTableName() {
		return tableName;
	}

	public void setTableName(String tableName) {
		this.tableName = tableName;
	}



	public String getOrderId() {
		return orderId;
	}
	public void setOrderId(String orderId) {
		this.orderId = orderId;
	}
	public String getUserId() {
		return userId;
	}
	public void setUserId(String userId) {
		this.userId = userId;
	}
	public String getUserName() {
		return userName;
	}
	public void setUserName(String userName) {
		this.userName = userName;
	}
	public int getUserAge() {
		return userAge;
	}
	public void setUserAge(int userAge) {
		this.userAge = userAge;
	}
	public String getUserFriend() {
		return userFriend;
	}
	public void setUserFriend(String userFriend) {
		this.userFriend = userFriend;
	}
	
	@Override
	public String toString() {
		return this.orderId +","+ this.userId + ","+this.userAge + ","+this.userName + ","+this.userFriend;
	}



	public void readFields(DataInput in) throws IOException {
		this.orderId = in.readUTF();
		this.userId = in.readUTF();
		this.userName = in.readUTF();
		this.userAge = in.readInt();
		this.userFriend = in.readUTF();
		this.tableName = in.readUTF();
		
		
	}



	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.orderId);
		out.writeUTF(this.userId);
		out.writeUTF(this.userName);
		out.writeInt(this.userAge);
		out.writeUTF(this.userFriend);
		out.writeUTF(this.tableName);
		
	}



	public int compareTo(JoinBean o) {
		return o.getUserId().compareTo(this.getUserId()) == 0 ? o.getTableName().compareTo(this.getTableName()) : o.getUserId().compareTo(this.getUserId());
    }
	
}

ReduceSideJoinGroupingComparatorクラス
package cn.edu360.mr.join.improve;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ReduceSideJoinGroupingComparator extends WritableComparator{
	
	public ReduceSideJoinGroupingComparator() {
		super(JoinBean.class,true);
	}
	
	
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		
		JoinBean o1 = (JoinBean)a;
		JoinBean o2 = (JoinBean)b;
		
		return o1.getUserId().compareTo(o2.getUserId());
		
	}
	

}

ReduceSideJoinPartitionrクラス
package cn.edu360.mr.join.improve;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class ReduceSideJoinPartitioner extends Partitioner{

	@Override
	public int getPartition(JoinBean key, NullWritable value, int numPartitions) {
		return (key.getUserId().hashCode() & Integer.MAX_VALUE) % numPartitions;
	}

}

ReduceSideJoin 2クラス
package cn.edu360.mr.join.improve;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class ReduceSideJoin2 {
	
	public static class ReduceSideJoin2Mapper extends Mapper{
		
		JoinBean bean = new JoinBean();
		String fileName = null;
		
		@Override
		protected void setup(Mapper.Context context)
				throws IOException, InterruptedException {
			FileSplit inputSplit = (FileSplit)context.getInputSplit();
			fileName = inputSplit.getPath().getName();
			
		}
		
		@Override
		protected void map(LongWritable key, Text value,
				Mapper.Context context)
				throws IOException, InterruptedException {
			String[] fields = value.toString().split(",");
			if(fileName.startsWith("order")) {
				bean.set(fields[0],fields[1],"NULL",-1,"NULL","order");
			}else {
				bean.set("NULL", fields[0], fields[1], Integer.parseInt(fields[2]), fields[3], "user");				
			}
			context.write(bean, NullWritable.get());
			
		}
		
	}
	
	public static class ReduceSideJoin2Reducer extends Reducer{
		
		JoinBean orderBean = new JoinBean();
		JoinBean userBean = new JoinBean();
		
		@Override
		protected void reduce(JoinBean bean, Iterable values,
				Reducer.Context context)
				throws IOException, InterruptedException {
			
			
			try {
			  for (NullWritable v : values) {
				  if("user".equals(bean.getTableName())) {
					
						BeanUtils.copyProperties(userBean, bean);
					
				  }else{
					  BeanUtils.copyProperties(orderBean, bean);
					  orderBean.setUserName(userBean.getUserName());
					  orderBean.setUserAge(userBean.getUserAge());
					  orderBean.setUserFriend(userBean.getUserFriend());
					  context.write(orderBean,NullWritable.get());
				  }
				} 
				
			}catch (IllegalAccessException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InvocationTargetException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}  
		}
	}
	
	public static void main(String[] args) throws Exception {
		
		
		Configuration conf = new Configuration(); 
		conf.setInt("order.top.n", 2);
		
		Job job = Job.getInstance(conf);
		
		job.setPartitionerClass(ReduceSideJoinPartitioner.class);
		job.setGroupingComparatorClass(ReduceSideJoinGroupingComparator.class);

		job.setJarByClass(ReduceSideJoin2.class);

		job.setMapperClass(ReduceSideJoin2Mapper.class);
		job.setReducerClass(ReduceSideJoin2Reducer.class);
		
		job.setNumReduceTasks(2);

		job.setMapOutputKeyClass(JoinBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(JoinBean.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\join\\input"));
		FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\join\\out2"));

		job.waitForCompletion(true);
	}
	
	

}