MapReduceでJoin操作を実現

8302 ワード

需要
オーダーデータテーブルt_orderのデータは次のとおりです.
id
date
pid
amount
1001
20150710
P0001
2
1002
20150710
P0001
2
1002
20150710
P0002
3
商品情報表t_productのデータは以下の通りです.
id
name
categroy_id
price
P0001
小米5
C01
2
P0002
ハンマーT 1
C01
3
要求はMapReduceで実現
select a.id,a.date,b.name,b.categroy,bi.price from t_order a hoin t_product b on a.pid = b.id
構想
t_orderでpidとt_Productのidと同じレコードを同じReduceに格納します.
エンティティクラスの設計
InfoBean.java
package tech.mrbcy.bigdata.mr.rjoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class InfoBean implements Writable{

    private int order_id = 0;
    private String dateString = "";
    private String p_id = "";
    private int amount = 0;
    private String pname = "";
    private String category_id = "";
    private float price = 0;
    private int flag = -1; // =0   ,=1   

    public InfoBean(){}

    public InfoBean(int order_id, String dateString, String p_id, int amount) {
        super();
        this.order_id = order_id;
        this.dateString = dateString;
        this.p_id = p_id;
        this.amount = amount;
        this.flag = 0;
    }

    public InfoBean(String p_id, String pname, String category_id, float price) {
        super();
        this.p_id = p_id;
        this.pname = pname;
        this.category_id = category_id;
        this.price = price;
        this.flag = 1;
    }


    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public String getDateString() {
        return dateString;
    }

    public void setDateString(String dateString) {
        this.dateString = dateString;
    }

    public String getP_id() {
        return p_id;
    }

    public void setP_id(String p_id) {
        this.p_id = p_id;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getCategory_id() {
        return category_id;
    }

    public void setCategory_id(String category_id) {
        this.category_id = category_id;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(order_id);
        out.writeUTF(dateString);
        out.writeUTF(p_id);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(category_id);
        out.writeFloat(price);
        out.writeInt(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        order_id = in.readInt();
        dateString = in.readUTF();
        p_id = in.readUTF();
        amount = in.readInt();
        pname = in.readUTF();
        category_id = in.readUTF();
        price = in.readFloat();
        flag = in.readInt();
    }

    @Override
    public String toString() {
        return "order_id=" + order_id + ", dateString=" + dateString
                + ", p_id=" + p_id + ", amount=" + amount + ", pname=" + pname
                + ", category_id=" + category_id + ", price=" + price
                + ", flag=" + flag;
    }


}

MapReduceプログラムの実装
package tech.mrbcy.bigdata.mr.rjoin;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class RJoin {
    static class RJoinMapper extends Mapper{

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split(",");
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            String name = inputSplit.getPath().getName();

            InfoBean bean = null;
            //             
            if(name.startsWith("order")){
                bean = new InfoBean(Integer.parseInt(fields[0]), fields[1],
                        fields[2], Integer.parseInt(fields[3]));
                context.write(new Text(fields[2]), bean);
            }else{
                bean = new InfoBean(fields[0], fields[1], 
                        fields[2], Float.parseFloat(fields[3]));
                context.write(new Text(fields[0]), bean);
            }

        }

    }

    static class RJoinReducer extends Reducer{

        @Override
        protected void reduce(Text pid, Iterable beans,Context context)
                throws IOException, InterruptedException {
            InfoBean productBean = new InfoBean();
            List orderBeans = new ArrayList();

            try {
                for(InfoBean bean : beans){
                    if(bean.getFlag() == 0){
                        InfoBean orderBean = new InfoBean();
                        BeanUtils.copyProperties(orderBean, bean);      
                        orderBeans.add(orderBean);
                    }else{
                        BeanUtils.copyProperties(productBean, bean);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            for(InfoBean orderBean : orderBeans){
                //  orderBean      productBean  
                productBean.setOrder_id(orderBean.getOrder_id());
                productBean.setDateString(orderBean.getDateString());
                productBean.setP_id(orderBean.getP_id());
                productBean.setAmount(orderBean.getAmount());
                context.write(productBean, NullWritable.get());
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"Flow Sum");
        job.setJarByClass(RJoin.class);
        job.setMapperClass(RJoinMapper.class);
        job.setReducerClass(RJoinReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(InfoBean.class);
        job.setOutputKeyClass(InfoBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

実行結果
データを入力:
order.txt
1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0001,3

product.txt
P0001,  5,C01,2000
P0002,  T1,C01,3000

出力:
order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname=  5, category_id=C01, price=2000.0, flag=1
order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname=  5, category_id=C01, price=2000.0, flag=1
order_id=1001, dateString=20150710, p_id=P0001, amount=2, pname=  5, category_id=C01, price=2000.0, flag=1