MapReduceプログラミング-joinアルゴリズム実装

15547 ワード

注文書テーブルt_があると仮定するorderとt_Productの2つのデータベース・テーブルで、関連クエリーが必要になります.このようなsql文は書きやすいです.
select  a.id,a.date,b.name,b.category_id,b.price 
from t_order a left out join t_product b 
on a.pid = b.id

では、どのようにmapreduceで実現しますか?関連する条件をmapとして出力するkey(pid,商品id,受注テーブルと商品テーブルは多対一関係)により,join条件を満たす2つのテーブルのデータをデータのソースとなるファイル情報を携帯し,同じreduce taskに送り,reduceでデータの直列接続を行い,最後に1つのファイルに書き込む.
2つのテーブルのすべてのフィールド情報をカプセル化し、最後にファイルに書き出したときにbeanを出力すればいいBeanをカスタマイズできます.
public class InfoBean implements Writable{

    private String oid;//  id
    private String date;
    private String pid;//  id
    private int amount;
    private String pname;//    
    private int category_id;//    
    private int price;

    //0:            1:    
    private int flag;
    //     
    public void write(DataOutput out) throws IOException {
        out.writeUTF(oid);
        out.writeUTF(date);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeInt(category_id);
        out.writeInt(price);
        out.writeInt(flag);
    }
    //      
    public void readFields(DataInput in) throws IOException {
        this.oid=in.readUTF();
        this.date=in.readUTF();
        this.pid=in.readUTF();
        this.amount=in.readInt();
        this.pname=in.readUTF();
        this.category_id=in.readInt();
        this.price=in.readInt();
        this.flag=in.readInt();
    }
    //     
    public void setInfoBean(String oid, String date, String pid, int amount, String pname, int category_id, int price,
            int flag) {
        this.oid = oid;
        this.date = date;
        this.pid = pid;
        this.amount = amount;
        this.pname = pname;
        this.category_id = category_id;
        this.price = price;
        this.flag = flag;
    }
    //  toString  ,       
    @Override
    public String toString() {
        return "oid=" + oid + ", date=" + date + ", pid=" + pid + ", amount=" + amount + ", pname=" + pname
                + ", category_id=" + category_id + ", price=" + price ;
    }
    //            
    public InfoBean() {
    }
    getset
}
public class MapReduceJoin {
    static class MapReduceJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
        InfoBean bean = new InfoBean();
        Text text = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            //    
            FileSplit inputSplit = (FileSplit)context.getInputSplit();
            //     ,            
            //     order.txt,     product.txt
            String name = inputSplit.getPath().getName();
            String pid = "";
            if(name.startsWith("order")){
                String[] fields = line.split(",");
                pid = fields[2];
                //    
                bean.setInfoBean(fields[0], fields[1], pid, 
                        Integer.parseInt(fields[3]), "", 0, 0, 0);
            }else {
                String[] fields = line.split(",");
                pid = fields[0];
                //    
                bean.setInfoBean("", "", pid, 
                        0, fields[1], Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), 1);
            }
            text.set(pid);
            context.write(text, bean);
        }
    }
    static class MapReduceJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{
        //               bean       bean
        @Override
        protected void reduce(Text key, Iterable beans,
                Context context) throws IOException, InterruptedException {
            //         bean  
            InfoBean ProductBean = new InfoBean();
            //             
            List orderList = new ArrayList();
            for (InfoBean bean : beans) {
                //           
                int flag = bean.getFlag();
                if(flag == 1){
                    //    ,     
                    try {
                        BeanUtils.copyProperties(ProductBean, bean);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }else {
                    //    ,     
                    InfoBean orderBean = new InfoBean();
                    try {
                        BeanUtils.copyProperties(orderBean, bean);
                        orderList.add(orderBean);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            for (InfoBean orderBean : orderList) {
                //                 
                orderBean.setPname(ProductBean.getPname());
                orderBean.setCategory_id(ProductBean.getCategory_id());
                orderBean.setPrice(ProductBean.getPrice());
                context.write(orderBean, NullWritable.get());
            }
        }
    }
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(MapReduceJoin.class);
        //     job    mapper,reducer   
        job.setMapperClass(MapReduceJoinMapper.class);
        job.setReducerClass(MapReduceJoinReducer.class);
        //       ,            
        //  mapper     kv  
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(InfoBean.class);

        //          kv  
        job.setOutputKeyClass(InfoBean.class);
        job.setOutputValueClass(NullWritable.class);

        //  job          
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //  job          
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        // job          job   java    jar ,   yarn   
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

テスト:linux作成入力ディレクトリ作成受注ファイルと商品ファイル編集フィールド情報入力ディレクトリ実行プログラムにファイルを転送して生産ファイルの内容を表示する
[root@mini1 ~]# hadoop fs -mkdir -p /mrjoin/input
[root@mini1 ~]# vi order.txt
1001,20170710,P0001,1
1002,20170710,P0001,3
1003,20170710,P0002,3
1003,20170710,P0002,4
[root@mini1 ~]# vi product.txt 
P0001,xiaomi4,1000,2
P0002,iphone6s,1000,3
[root@mini1 ~]# hadoop fs -put order.txt product.txt /mrjoin/input/
[root@mini1 ~]# hadoop jar mrjoin.jar com.scu.hadoop.rjoin.MapReduceJoin /mrjoin/input /mrjoin/output
[root@mini1 ~]# hadoop fs -cat /mrjoin/output/part-r-00000
oid=1002, date=20170710, pid=P0001, amount=3, pname=xiaomi4, category_id=1000, price=2
oid=1001, date=20170710, pid=P0001, amount=1, pname=xiaomi4, category_id=1000, price=2
oid=1003, date=20170710, pid=P0002, amount=4, pname=iphone6s, category_id=1000, price=3
oid=1003, date=20170710, pid=P0002, amount=3, pname=iphone6s, category_id=1000, price=3