RCFile読み書き操作

4250 ワード

http://smallboby.iteye.com/blog/1592531
RCfileを読みだします
Job job = new Job();  
    job.setJarByClass( .class);  
    //       RcFile    
    job.setInputFormatClass(RCFileInputFormat.class);    
     //      
    job.setOutputFormatClass(TextOutputFormat.class);  
     //        
    RCFileInputFormat.addInputPath(job, new Path(srcpath));  
    //MultipleInputs.addInputPath(job, new Path(srcpath), RCFileInputFormat.class);  
     //     
    TextOutputFormat.setOutputPath(job, new Path(respath));  
    //   key    
    job.setOutputKeyClass(Text.class);    
    //  value    
    job.setOutputValueClass(NullWritable.class);    
    //  mapper   
     job.setMapperClass(ReadTestMapper.class);  
            //     reduce,reduce      Text    ,  mapper      。                
                code = (job.waitForCompletion(true)) ? 0 : 1;    
      
    // mapper          
    pulic class ReadTestMapper extends 
                Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {  
              
            @Override  
            protected void map(LongWritable key, BytesRefArrayWritable value, Context context) 
                    throws IOException, InterruptedException {  
                // TODO Auto-generated method stub  
                Text txt = new Text();   
            //  RcFile       ,           ,Value    ,  ,  。  
                StringBuffer sb = new StringBuffer();  
                for (int i = 0; i < value.size(); i++) {  
                    BytesRefWritable v = value.get(i);  
                    txt.set(v.getData(), v.getStart(), v.getLength());  
                    if(i==value.size()-1){  
                        sb.append(txt.toString());  
                    }else{  
                        sb.append(txt.toString()+"\t");  
                    }  
                }  
                context.write(new Text(sb.toString()),NullWritable.get());  
            }  
     }
RCFileを書きます
Job job = new Job();  
    Configuration conf = job.getConfiguration();  
//          
    RCFileOutputFormat.setColumnNumber(conf, 4);  
    job.setJarByClass( .class);  

    FileInputFormat.setInputPaths(job, new Path(srcpath));  
    RCFileOutputFormat.setOutputPath(job, new Path(respath));  

    job.setInputFormatClass(TextInputFormat.class);  
    job.setOutputFormatClass(RCFileOutputFormat.class);  

    job.setMapOutputKeyClass(LongWritable.class);  
    job.setMapOutputValueClass(BytesRefArrayWritable.class);  

    job.setMapperClass(OutPutTestMapper.class);  

    conf.set("date", line.getOptionValue(DATE));  
//        
    conf.setBoolean("mapred.output.compress", true);  
    conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");  

    code = (job.waitForCompletion(true)) ? 0 : 1;  

public class OutPutTestMapper 
             extends Mapper<LongWritable, Text, LongWritable, BytesRefArrayWritable> {  
@Override  
public void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException {  
    String line = value.toString();  
    String day = context.getConfiguration().get("date");  
    if (!line.equals("")) {  
        String[] lines = line.split(" ", -1);  
        if (lines.length > 3) {  
            String time_temp = lines[1];  
            String times = timeStampDate(time_temp);  
            String d = times.substring(0, 10);  
            if (day.equals(d)) {  
                byte[][] record = {lines[0].getBytes("UTF-8"), 
                                   lines[1].getBytes("UTF-8"),lines[2].getBytes("UTF-8"), 
                                   lines[3].getBytes("UTF-8")};  
                }

                BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);  

                for (int i = 0; i < record.length; i++) {  
                    BytesRefWritable cu = new BytesRefWritable(record[i], 0, record[i].length);  
                    bytes.set(i, cu);  
                }  
                context.write(key, bytes);  
            }  
        }  
    }  
}