RcFileストレージと読み込み操作


仕事中にRcFileを使ってRcFile形式のファイルを保存して読み込み、記録します.
RcFileはFaceBookが開発したセットラインストレージとカラムストレージの利点を兼ね備えており、圧縮比が高く、カラムの読み取りが速く、MapReduce環境における大規模なデータ処理において重要な役割を果たしています.
読み込み操作:

job :
	    Job job = new Job();
            job.setJarByClass( .class);
		// RcFile 
            job.setInputFormatClass(RCFileInputFormat.class);  
		// 
            job.setOutputFormatClass(TextOutputFormat.class);
		// 
            RCFileInputFormat.addInputPath(job, new Path(srcpath));
            //MultipleInputs.addInputPath(job, new Path(srcpath), RCFileInputFormat.class);
		//  
            TextOutputFormat.setOutputPath(job, new Path(respath));
            //  key 
            job.setOutputKeyClass(Text.class);  
		// value 
            job.setOutputValueClass(NullWritable.class);  
		// mapper 
            job.setMapperClass(ReadTestMapper.class);
		// reduce,reduce Text , mapper 。
            
            code = (job.waitForCompletion(true)) ? 0 : 1;


// mapper  

pulic class ReadTestMapper extends Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {
        
        @Override
        protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Text txt = new Text(); 
		// RcFile , ,Value , , 。
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < value.size(); i++) {
                BytesRefWritable v = value.get(i);
                txt.set(v.getData(), v.getStart(), v.getLength());
                if(i==value.size()-1){
                    sb.append(txt.toString());
                }else{
                    sb.append(txt.toString()+"\t");
                }
            }
            context.write(new Text(sb.toString()),NullWritable.get());
            }
        }


出力はRcFile形式に圧縮されます.

job :
            Job job = new Job();
            Configuration conf = job.getConfiguration();
		// 
            RCFileOutputFormat.setColumnNumber(conf, 4);
            job.setJarByClass( .class);

            FileInputFormat.setInputPaths(job, new Path(srcpath));
            RCFileOutputFormat.setOutputPath(job, new Path(respath));

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(RCFileOutputFormat.class);

            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(BytesRefArrayWritable.class);

            job.setMapperClass(OutPutTestMapper.class);

            conf.set("date", line.getOptionValue(DATE));
		// 
            conf.setBoolean("mapred.output.compress", true);
            conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");

            code = (job.waitForCompletion(true)) ? 0 : 1;

mapper :
    public class OutPutTestMapper extends Mapper<LongWritable, Text, LongWritable, BytesRefArrayWritable> {
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String day = context.getConfiguration().get("date");
            if (!line.equals("")) {
                String[] lines = line.split(" ", -1);
                if (lines.length > 3) {
                    String time_temp = lines[1];
                    String times = timeStampDate(time_temp);
                    String d = times.substring(0, 10);
                    if (day.equals(d)) {
                        byte[][] record = {lines[0].getBytes("UTF-8"), lines[1].getBytes("UTF-8"),lines[2].getBytes("UTF-8"), lines[3].getBytes("UTF-8")};

                        BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);

                        for (int i = 0; i < record.length; i++) {
                            BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);
                            bytes.set(i, cu);
                        }
                        context.write(key, bytes);
                    }
                }
            }
        }



転載、ハイパーリンク形式で文章の元の出所と作者を明記してください.
永続リンク:http://smallboby.iteye.com/blog/1592531.
感謝します.不適切な点、ご指導ください.