MapReduce map side joinインスタンス


1.問題の説明
既存の大きな時計(約2億件以上の記録がある)は、トップボックスユーザーの毎日の放送記録を保存し、視聴するテレビ局名と開始時間があるが、番組名はない.もう1枚の小さな時計(数十万本)は、爬虫類で取得した毎日のテレビ局の番組表情報です.ユーザの再生記録を番組情報に関連付ける必要がある.すなわち、ユーザの放送記録におけるテレビ局名と開始時間とに基づいて番組名を決定する.
2.reduce side joinかmap side joinか
小表のデータ量が比較的小さいため、完全にメモリに入れることができるので、map side joinを採用し、Mapperクラスを継承したmapクラスではsetupメソッドで小表データを読み出してメモリに入れ、map()メソッドでは各大表のデータを関連付けます.
3.完全なコード
main関数
public class AddRelation2 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
        Configuration conf = HBaseConfiguration.create();
        conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/core-site.xml"));
        conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/hdfs-site.xml"));
        conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/mapred-site.xml"));
        if(args.length !=1){
            System.out.println("1 args <datapath>");
            System.exit(2);
        }
        String cachefile = "hdfs://bigdata/tvInfo/tvinfo.txt";//    ,     
        Job job = Job.getInstance(conf);
        job.setJobName("iadd Relation 2");
        job.setJarByClass(AddRelation2.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.addCacheFile(new Path(cachefile).toUri());
        job.setMapperClass(JoinMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputFormatClass(MultiTableOutputFormat.class);
        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.addDependencyJars(job.getConfiguration());

        job.setNumReduceTasks(0);
        System.exit(job.waitForCompletion(true)?0:1);

    }

Mapperクラス
public class JoinMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{

        static ArrayList<ArrayList<String>> cctv1 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv2 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv3 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv4 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv5 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv6 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv7 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv8 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv9 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv10 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv11 = new ArrayList<ArrayList<String>>();
        static ArrayList<ArrayList<String>> cctv12 = new ArrayList<ArrayList<String>>();
        public static final String usefulMenu="CCTV1CCTV2CCTV3CCTV4CCTV5CCTV6CCTV7CCTV8CCTV9CCTV10CCTV11CCTV12";

        @Override
        protected void setup(Context context) throws IOException,InterruptedException {
            super.setup(context);
            Configuration conf =  context.getConfiguration();
            URI[] localCacheFiles = context.getCacheFiles();//          
// System.out.println("filename="+localCacheFiles[0]);
// System.out.println("filePath="+localCacheFiles[0].getPath());
            Path tvinfoSetPath = new Path(localCacheFiles[0]);
            FileSystem fs= FileSystem.get(conf);
            FSDataInputStream in = fs.open(tvinfoSetPath);
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            readCacheFile(br);

        }
        private static void readCacheFile( BufferedReader br) throws IOException {
              BufferedReader reader = br;
              String line;
              while ((line = reader.readLine()) != null) {
                  String[] detail = line.split("\\|");
                  ArrayList<String> temp = new ArrayList<String>();
                  temp.add(detail[3]+" "+detail[1]);                    // temp = date + menu
                  temp.add(detail[2]);
                  if(detail[0].equals("CCTV1")){
                      cctv1.add(temp);
                  }else if(detail[0].equals("CCTV2")){
                      cctv2.add(temp);
                  }else if(detail[0].equals("CCTV3")){
                      cctv3.add(temp);
                  }else if(detail[0].equals("CCTV4")){
                      cctv4.add(temp);
                  }else if(detail[0].equals("CCTV5")){
                      cctv5.add(temp);
                  }else if(detail[0].equals("CCTV6")){
                      cctv6.add(temp);
                  }else if(detail[0].equals("CCTV7")){
                      cctv7.add(temp);
                  }else if(detail[0].equals("CCTV8")){
                      cctv8.add(temp);
                  }else if(detail[0].equals("CCTV9")){
                      cctv9.add(temp);
                  }else if(detail[0].equals("CCTV10")){
                      cctv10.add(temp);
                  }else if(detail[0].equals("CCTV11")){
                      cctv11.add(temp);
                  }else if(detail[0].equals("CCTV12")){
                      cctv12.add(temp);
                  }
              }
              reader.close();
            }

        public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
            String values = value.toString();
            String detail[] = values.split("\\|");
            String channelName = detail[7];
            String collectTime = detail[0];   //collect_time
            String uid = detail[5];           //user_id
            String channelId = detail[8];     //channel_id
            String startTime = detail[9];     //start_time
            //user_id
            String temp ="";
            if(uid.length()>=4){
                temp = uid.substring(0,4);
            }
            ArrayList<ArrayList<String>> cctvTemp = null;
            if(temp.equals("0531") && !uid.contains("test")){           //check the row belong to ji nan or not
                if(usefulMenu.contains(channelName)){
                    if(channelName.equals("CCTV1")){
                        cctvTemp = cctv1;
                    }else if(channelName.equals("CCTV2")){
                        cctvTemp = cctv2;
                    }else if(channelName.equals("CCTV3")){
                        cctvTemp = cctv3;
                    }else if(channelName.equals("CCTV4")){
                        cctvTemp = cctv4;
                    }else if(channelName.equals("CCTV5")){
                        cctvTemp = cctv5;
                    }else if(channelName.equals("CCTV6")){
                        cctvTemp = cctv6;
                    }else if(channelName.equals("CCTV7")){
                        cctvTemp = cctv7;
                    }else if(channelName.equals("CCTV8")){
                        cctvTemp = cctv8;
                    }else if(channelName.equals("CCTV9")){
                        cctvTemp = cctv9;
                    }else if(channelName.equals("CCTV10")){
                        cctvTemp = cctv10;
                    }else if(channelName.equals("CCTV11")){
                        cctvTemp = cctv11;
                    }else if(channelName.equals("CCTV12")){
                        cctvTemp = cctv12;
                    }

                    if(cctvTemp!=null){
                        String menuName ="";
                        try {
                            menuName = formJoin(detail[9],cctvTemp);
                        } catch (ParseException e) {
                            // TODO Auto-generated catch block
                            System.out.println("string date transform error");
                            e.printStackTrace();
                        }

                        //channelid get the last 6 char
                        int channelnum = channelId.length();
                        String channelId6 ="";
                        if(channelnum>6){
                             channelId6 = channelId.substring(channelnum-6,channelnum);  
                        }else{
                            channelId6 = channelId;
                        }
                        //starttime get the last 8 char
                        int startTimenum = startTime.length();
                        String startTime8 ="";
                        if(startTimenum > 8){
                            startTime8 = startTime.substring(startTimenum-8,startTimenum); 
                        }else{
                            startTime8 = startTime;
                        }

                        byte[] time = Bytes.toBytes(collectTime);
                        String hashPrefix = MD5Hash.getMD5AsHex(time).substring(0,8);   //collect_time hash
                        byte[] bytesMD52Date = Bytes.toBytes(hashPrefix);               //change md5 to byte[]
                        byte[] uidBytes = Bytes.toBytes(uid);                   //change user_id to byte[]
                        byte[] channelidBytes = Bytes.toBytes(channelId6);      //change the last 6 of channelid to byte[] 
                        byte[] starttimeBytes = Bytes.toBytes(startTime8);      //change the last 8 of starttime to byte[]
                        byte[] rowKeytemp =Bytes.add(bytesMD52Date, time, uidBytes);  //md5+collect_time+user_id
                        byte[] rowKey = Bytes.add(rowKeytemp,channelidBytes,starttimeBytes); //md5+collect_time+user_id+channel_id+start_time
                        Put p1 = new Put(rowKey);
                        p1.add(Bytes.toBytes("cf"), Bytes.toBytes("tvStation"), Bytes.toBytes(channelName));
                        p1.add(Bytes.toBytes("cf"), Bytes.toBytes("tvMenu"), Bytes.toBytes(menuName));
                        if(!p1.isEmpty()){
                            ImmutableBytesWritable ib = new ImmutableBytesWritable();
                            ib.set(Bytes.toBytes("play_record_file_jn_programme"));
                            context.write(ib, p1);
                        }
                    }

                }
            }           
        }

        public String formJoin(String time,ArrayList<ArrayList<String>> cctvTemp) throws ParseException{
            String playTime = time;
            DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            DateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
            Date playDate = null;
            playDate = df1.parse(playTime);
            long min = 36000000;
            int index = 0;
            for(int i=0;i<cctvTemp.size();i++){
                String menuTime = cctvTemp.get(i).get(0);
                Date menuDate = df2.parse(menuTime);
                long temp = playDate.getTime()-menuDate.getTime();
                if(temp>=0 && temp<min){
                    min = temp;
                    index = i;
                }
            }
            String menuName = cctvTemp.get(index).get(1);
            return menuName;
        }


    }