MapReduceプログラミングケース.10 th-joinアルゴリズムの改善
MapReduceプログラミングケース.10 th-joinアルゴリズムの改善
Partitioner+CompareTo+GroupingComparatorによる効率的な実現
ケース9 thを参照できます.
直接実装コード:
JoinBean
ReduceSideJoinGroupingComparatorクラス
ReduceSideJoinPartitionrクラス
ReduceSideJoin 2クラス
Partitioner+CompareTo+GroupingComparatorによる効率的な実現
ケース9 thを参照できます.
直接実装コード:
JoinBean
package cn.edu360.mr.join.improve;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class JoinBean implements WritableComparable{
private String orderId;
private String userId;
private String userName;
private int userAge;
private String userFriend;
private String tableName;
public void set(String orderId, String userId, String userName, int userAge, String userFriend,String tableName) {
this.orderId = orderId;
this.userId = userId;
this.userName = userName;
this.userAge = userAge;
this.userFriend = userFriend;
this.tableName = tableName;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getUserAge() {
return userAge;
}
public void setUserAge(int userAge) {
this.userAge = userAge;
}
public String getUserFriend() {
return userFriend;
}
public void setUserFriend(String userFriend) {
this.userFriend = userFriend;
}
@Override
public String toString() {
return this.orderId +","+ this.userId + ","+this.userAge + ","+this.userName + ","+this.userFriend;
}
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.userId = in.readUTF();
this.userName = in.readUTF();
this.userAge = in.readInt();
this.userFriend = in.readUTF();
this.tableName = in.readUTF();
}
public void write(DataOutput out) throws IOException {
out.writeUTF(this.orderId);
out.writeUTF(this.userId);
out.writeUTF(this.userName);
out.writeInt(this.userAge);
out.writeUTF(this.userFriend);
out.writeUTF(this.tableName);
}
public int compareTo(JoinBean o) {
return o.getUserId().compareTo(this.getUserId()) == 0 ? o.getTableName().compareTo(this.getTableName()) : o.getUserId().compareTo(this.getUserId());
}
}
ReduceSideJoinGroupingComparatorクラス
package cn.edu360.mr.join.improve;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class ReduceSideJoinGroupingComparator extends WritableComparator{
public ReduceSideJoinGroupingComparator() {
super(JoinBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
JoinBean o1 = (JoinBean)a;
JoinBean o2 = (JoinBean)b;
return o1.getUserId().compareTo(o2.getUserId());
}
}
ReduceSideJoinPartitionrクラス
package cn.edu360.mr.join.improve;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class ReduceSideJoinPartitioner extends Partitioner{
@Override
public int getPartition(JoinBean key, NullWritable value, int numPartitions) {
return (key.getUserId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
ReduceSideJoin 2クラス
package cn.edu360.mr.join.improve;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceSideJoin2 {
public static class ReduceSideJoin2Mapper extends Mapper{
JoinBean bean = new JoinBean();
String fileName = null;
@Override
protected void setup(Mapper.Context context)
throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit)context.getInputSplit();
fileName = inputSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if(fileName.startsWith("order")) {
bean.set(fields[0],fields[1],"NULL",-1,"NULL","order");
}else {
bean.set("NULL", fields[0], fields[1], Integer.parseInt(fields[2]), fields[3], "user");
}
context.write(bean, NullWritable.get());
}
}
public static class ReduceSideJoin2Reducer extends Reducer{
JoinBean orderBean = new JoinBean();
JoinBean userBean = new JoinBean();
@Override
protected void reduce(JoinBean bean, Iterable values,
Reducer.Context context)
throws IOException, InterruptedException {
try {
for (NullWritable v : values) {
if("user".equals(bean.getTableName())) {
BeanUtils.copyProperties(userBean, bean);
}else{
BeanUtils.copyProperties(orderBean, bean);
orderBean.setUserName(userBean.getUserName());
orderBean.setUserAge(userBean.getUserAge());
orderBean.setUserFriend(userBean.getUserFriend());
context.write(orderBean,NullWritable.get());
}
}
}catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInt("order.top.n", 2);
Job job = Job.getInstance(conf);
job.setPartitionerClass(ReduceSideJoinPartitioner.class);
job.setGroupingComparatorClass(ReduceSideJoinGroupingComparator.class);
job.setJarByClass(ReduceSideJoin2.class);
job.setMapperClass(ReduceSideJoin2Mapper.class);
job.setReducerClass(ReduceSideJoin2Reducer.class);
job.setNumReduceTasks(2);
job.setMapOutputKeyClass(JoinBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(JoinBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\join\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\join\\out2"));
job.waitForCompletion(true);
}
}