hive処理ログ、カスタムinputformat
7775 ワード
オープン環境、hadoop-00.2、hive-0.6
1.ログ区切り記号
Xmlコード
2010-05-31 10:50:17𞓜𞓜61.132.8.82𞓜𞓜http://www.360buy.com/product/201185.html
hiveの内部セパレータは「\001」ですので、切り替えが必要です。
2.カスタムInput Formatを作成する
Javaコード
package comp.jd.cloud.lickstore; import java.io.IOException import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigrable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInput Format; /* * カスタムhadoopの org.apache.hadoop.mapred.Input Format * * @author wiston * */ pblic クラス Clickstream Input Format extens TextInput Format implements JobConfigrable { public RecordReader get RecordReader( Input Split genelity JobConf job、 レポート レポート ローソン IOException { report.set Storts(generity.toString(); return new Clickstream RecordReader(job、 (FileSplit) genelity; } }
Javaコード
/* リード a. ライン. */ public synchronized bollan next(LongWritable キー、 Text value) ローソン IOException { while (pos < end) { key.set(pos) 要点 newSize = i.readline(value、 maxLine Length、 Math.max(int)Math.min(Integer.MAXUVALE) end-pos)、 maxLine Length) //start String streplace = value.toString().toLowerCase().replacceAll(\\\124\\|\\124;」 , "\001" ); Text txt Replace = new Text() txt Replace.set(streplace) ); value.set(txtReplace.getByttes() 0, txtReplace.getLength() //end if (newSize == 0) { return false; } pos += newSize; if (newSize < maxLine Length) { return true; } // ライン too long. try アゲイン LOG.info("スキpped" ライン 保存先 size " + newSize + " at。 pos " + (pos - newSize) } return false; }
3.hiveを起動し、私たち自身が追加したクラスを追加します。
4.データベースの作成
Javaコード
クリアー テーブル clickstream able ストリングス ip ストリングス url string stored as INPUT FOREMAT 'comp.jd.clodc.lickstore.lickstream Input Format' OUT FOREMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKey TextOutput Format' LOCATION '/data/clickstream';
Javaコード
LOAD データ LOCAL INPATH '/data/clickstremu 2010216.txt' OVERWRITE INTO TABLE clickstream able;
select*from clickstream able;
参照http://wiki.apache.org/hadoop/Hive/SerDe
1.ログ区切り記号
Xmlコード
2010-05-31 10:50:17|||61.132.4.82|||http://www.360buy.com/product/201185.html
セパレータは、ログ本文がセパレータと同じ文字であることをできるだけ防ぐために、データが混同される。hiveの内部セパレータは「\001」ですので、切り替えが必要です。
2.カスタムInput Formatを作成する
Javaコード
package com.jd.cloud.clickstore;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
/**
* hadoop org.apache.hadoop.mapred.InputFormat
*
* @author winston
*
*/
public class ClickstreamInputFormat extends TextInputFormat implements
JobConfigurable {
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new ClickstreamRecordReader(job, (FileSplit) genericSplit);
}
}
3.Clickstream RecordReaderをカスタマイズし、RecordReaderインターフェースを実現し、next方法を書き換える。Javaコード
/** Read a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
while (pos < end) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
//start
String strReplace = value.toString().toLowerCase().replaceAll("\\|\\|\\|" , "\001" );
Text txtReplace = new Text();
txtReplace.set(strReplace );
value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
//end
if (newSize == 0) {
return false;
}
pos += newSize;
if (newSize < maxLineLength) {
return true;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
return false;
}
私たちは直接Linese RecordReaderを使って、next方法を修正できます。3.hiveを起動し、私たち自身が追加したクラスを追加します。
4.データベースの作成
Javaコード
create table clickstream_table(time string, ip string, url string) stored as INPUTFORMAT 'com.jd.cloud.clickstore.ClickstreamInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '/data/clickstream_20110216.txt';
5.データの導入Javaコード
LOAD DATA LOCAL INPATH '/data/clickstream_20110216.txt' OVERWRITE INTO TABLE clickstream_table;
6.入荷したばかりのデータを調べるselect*from clickstream able;
参照http://wiki.apache.org/hadoop/Hive/SerDe