UDF/UDAF/UDTFのMap Reduceコードフレームワークテンプレート
4810 ワード
1.UDFステップ
1>orgを継承する必要がある.apache.Hadoop.hive.ql.exec.UDF
2>evaluate関数を実装する必要があります.evaluate関数はリロードをサポートします.
2.UDAFステップ
1>継承する必要があります
org.apache.Hadoop.hive.ql.exec.UDAF(関数クラス継承)
org.apache.Hadoop.hive.ql.exec.UDAFEvaluator(内部クラスEvaluator実装UDAFEvaluatorインタフェース)
2>Evaluatorは、init、iterate、terminatePartial、merge、terminateといったいくつかの関数を実装する必要があります
Init():UDAFの初期化のための構造関数と同様
iterate():入力されたパラメータを受信し、内部の回転を行います.戻りタイプはbooleanです
terminatePartial():iterate関数のローテーションが終了した後、ローテーションデータを返します.iterateとterminatePartialはHadoopのCombiner(iterate-mapper;terminatePartial--reducer)に似ています.
merge():terminatePartialの戻り結果を受信し、booleanタイプのデータmerge操作を行う
terminate():最終的な集計関数の結果を返します.
3.UDTFステップ
1>orgを継承する必要がある.apache.Hadoop.hive.ql.udf.generic.GenericUDTF
2>initialize,process,closeの3つの方法の実装
3>UDTFはまず
a.initializeメソッドを呼び出し、このメソッドはUDTFの戻り行の情報(戻り個数、タイプ)を返す
b.初期化が完了すると、プロセスメソッドが呼び出され、入力されたパラメータが処理され、forword()メソッドで結果が返されます
c.最後にclose()メソッド呼び出し、クリーンアップする必要があるメソッドをクリーンアップする
1>orgを継承する必要がある.apache.Hadoop.hive.ql.exec.UDF
2>evaluate関数を実装する必要があります.evaluate関数はリロードをサポートします.
import org.apache.Hadoop.hive.ql.exec.UDF
public class helloword extends UDF{
public String evaluate(){
return "hello world!";
}
public String evaluate(String str){
return "hello world: " + str;
}
}
2.UDAFステップ
1>継承する必要があります
org.apache.Hadoop.hive.ql.exec.UDAF(関数クラス継承)
org.apache.Hadoop.hive.ql.exec.UDAFEvaluator(内部クラスEvaluator実装UDAFEvaluatorインタフェース)
2>Evaluatorは、init、iterate、terminatePartial、merge、terminateといったいくつかの関数を実装する必要があります
Init():UDAFの初期化のための構造関数と同様
iterate():入力されたパラメータを受信し、内部の回転を行います.戻りタイプはbooleanです
terminatePartial():iterate関数のローテーションが終了した後、ローテーションデータを返します.iterateとterminatePartialはHadoopのCombiner(iterate-mapper;terminatePartial--reducer)に似ています.
merge():terminatePartialの戻り結果を受信し、booleanタイプのデータmerge操作を行う
terminate():最終的な集計関数の結果を返します.
import org.apache.Hadoop.hive.ql.exec.UDAF;
import org.apache.Hadoop.hive.ql.exec.UDAFEvaluator;
public class myAVG extends UDAF {
public static class avgScore {
private long pSum;
private double pCount;
}
public static class AvgEvaluator extends UDAFEvaluator {
avgScore score;
public AvgEvaluator() {
score = new avgScore();
init();
}
/*
* init , UDAF
*/
public void init() {
score.pSum = 0;
score.pCount = 0;
}
/*
* iterate , 。 boolean Combiner mapper
*/
public boolean iterate(Double in) {
if (in != null) {
score.pSum += in;
score.pCount++;
}
return true;
}
/*
* terminatePartial , iterate , Combiner reducer
*/
public avgScore terminatePartial() {
return score.pCount == 0 ? null : score;
}
/*
* merge terminatePartial , merge , boolean
*/
public boolean merge(avgScore in) {
if (in != null) {
score.pSum += in.pSum;
score.pCount += in.pCount;
}
return true;
}
/*
* terminate
*/
public Double terminate() {
return score.pCount == 0 ? null : Double.valueof(score.pSum
/ score.pCount);
}
}
}
3.UDTFステップ
1>orgを継承する必要がある.apache.Hadoop.hive.ql.udf.generic.GenericUDTF
2>initialize,process,closeの3つの方法の実装
3>UDTFはまず
a.initializeメソッドを呼び出し、このメソッドはUDTFの戻り行の情報(戻り個数、タイプ)を返す
b.初期化が完了すると、プロセスメソッドが呼び出され、入力されたパラメータが処理され、forword()メソッドで結果が返されます
c.最後にclose()メソッド呼び出し、クリーンアップする必要があるメソッドをクリーンアップする
public class GenericUDTFExplode extends GenericUDTF {
private ListObjectInspector listOI = null;
@Override
public void close() throws HiveException {
}
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentException("explode() takes only one argument");
}
if (args[0].getCategory() != ObjectInspector.Category.LIST) {
throw new UDFArgumentException("explode() takes an array as a parameter");
}
listOI = (ListObjectInspector) args[0];
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col");
fieldOIs.add(listOI.getListElementObjectInspector());
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
}
private final Object[] forwardObj = new Object[1];
@Override
public void process(Object[] o) throws HiveException {
List<?> list = listOI.getList(o[0]);
if(list == null) {
return;
}
for (Object r : list) {
forwardObj[0] = r;
forward(forwardObj);
}
}
@Override
public String toString() {
return "explode";
}
}