hive処理ログ、カスタムinputformat

7775 ワード

オープン環境、hadoop-00.2、hive-0.6
1.ログ区切り記号
Xmlコード 
收藏代码
  • 2010-05-31 10:50:17𞓜𞓜61.132.8.82𞓜𞓜http://www.360buy.com/product/201185.html  
  • 2010-05-31 10:50:17|||61.132.4.82|||http://www.360buy.com/product/201185.html
    セパレータは、ログ本文がセパレータと同じ文字であることをできるだけ防ぐために、データが混同される。
    hiveの内部セパレータは「\001」ですので、切り替えが必要です。
    2.カスタムInput Formatを作成する
    Javaコード 
    收藏代码
  • package comp.jd.cloud.lickstore;  
  •   
  • import java.io.IOException  
  •   
  • import org.apache.hadoop.io.LongWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapred.FileSplit;  
  • import org.apache.hadoop.mapred.InputSplit;  
  • import org.apache.hadoop.mapred.JobConf;  
  • import org.apache.hadoop.mapred.JobConfigrable;  
  • import org.apache.hadoop.mapred.RecordReader;  
  • import org.apache.hadoop.mapred.Reporter;  
  • import org.apache.hadoop.mapred.TextInput Format;  
  •   
  • /* 
  •  * カスタムhadoopの org.apache.hadoop.mapred.Input Format 
  •  *  
  •  * @author wiston 
  •  *  
  •  */  
  • pblic クラス Clickstream Input Format extens TextInput Format implements  
  •         JobConfigrable {  
  •   
  •     public RecordReader get RecordReader(  
  •             Input Split genelity JobConf job、 レポート レポート  
  •             ローソン IOException {  
  •   
  •         report.set Storts(generity.toString();  
  •         return new Clickstream RecordReader(job、 (FileSplit) genelity;  
  •     }  
  • }  
  • package com.jd.cloud.clickstore;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.JobConfigurable;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextInputFormat;
    
    /**
     *    hadoop  org.apache.hadoop.mapred.InputFormat
     * 
     * @author winston
     * 
     */
    public class ClickstreamInputFormat extends TextInputFormat implements
    		JobConfigurable {
    
    	public RecordReader<LongWritable, Text> getRecordReader(
    			InputSplit genericSplit, JobConf job, Reporter reporter)
    			throws IOException {
    
    		reporter.setStatus(genericSplit.toString());
    		return new ClickstreamRecordReader(job, (FileSplit) genericSplit);
    	}
    }
    
    3.Clickstream RecordReaderをカスタマイズし、RecordReaderインターフェースを実現し、next方法を書き換える。
     
    Javaコード 
    收藏代码
  • /* リード a. ライン. */  
  •   public synchronized bollan next(LongWritable キー、 Text value)  
  •     ローソン IOException {  
  •   
  •     while (pos < end) {  
  •       key.set(pos)  
  •   
  •       要点 newSize = i.readline(value、 maxLine Length、  
  •                                 Math.max(int)Math.min(Integer.MAXUVALE) end-pos)、  
  •                                          maxLine Length)  
  •         
  •       //start  
  •       String streplace = value.toString().toLowerCase().replacceAll(\\\124\\|\\124;」 , "\001" );  
  •       Text txt Replace = new Text()  
  •       txt Replace.set(streplace) );  
  •       value.set(txtReplace.getByttes() 0, txtReplace.getLength()  
  •       //end  
  •         
  •         
  •       if (newSize == 0) {  
  •         return false;  
  •       }  
  •       pos += newSize;  
  •       if (newSize < maxLine Length) {  
  •         return true;  
  •       }  
  •   
  •       // ライン too long. try アゲイン  
  •       LOG.info("スキpped" ライン 保存先 size " + newSize + " at。 pos " + (pos - newSize)  
  •     }  
  •   
  •     return false;  
  •   }  
  • /** Read a line. */
      public synchronized boolean next(LongWritable key, Text value)
        throws IOException {
    
        while (pos < end) {
          key.set(pos);
    
          int newSize = in.readLine(value, maxLineLength,
                                    Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                             maxLineLength));
          
          //start
          String strReplace = value.toString().toLowerCase().replaceAll("\\|\\|\\|" , "\001" );
    	  Text txtReplace = new Text();
          txtReplace.set(strReplace );
          value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
          //end
          
          
          if (newSize == 0) {
            return false;
          }
          pos += newSize;
          if (newSize < maxLineLength) {
            return true;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
    
        return false;
      }
    私たちは直接Linese RecordReaderを使って、next方法を修正できます。
    3.hiveを起動し、私たち自身が追加したクラスを追加します。
    4.データベースの作成
    Javaコード 
    收藏代码
  • クリアー テーブル clickstream able ストリングス ip ストリングス url string stored as INPUT FOREMAT 'comp.jd.clodc.lickstore.lickstream Input Format' OUT FOREMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKey TextOutput Format' LOCATION '/data/clickstream';  
  • create table clickstream_table(time string, ip string, url string) stored as INPUTFORMAT 'com.jd.cloud.clickstore.ClickstreamInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '/data/clickstream_20110216.txt';
    5.データの導入
    Javaコード 
    收藏代码
  • LOAD データ LOCAL INPATH '/data/clickstremu 2010216.txt' OVERWRITE INTO TABLE clickstream able;  
  • LOAD DATA LOCAL INPATH '/data/clickstream_20110216.txt' OVERWRITE INTO TABLE clickstream_table;
    6.入荷したばかりのデータを調べる
    select*from clickstream able;
    hive处理日志,自定义inputformat_第1张图片
    参照http://wiki.apache.org/hadoop/Hive/SerDe