UDF/UDAF/UDTFのMap Reduceコードフレームワークテンプレート

4810 ワード

1.UDFステップ
1>orgを継承する必要がある.apache.Hadoop.hive.ql.exec.UDF
2>evaluate関数を実装する必要があります.evaluate関数はリロードをサポートします.
import org.apache.Hadoop.hive.ql.exec.UDF   
      
public class helloword extends UDF{   
	
     public String evaluate(){   
          return "hello world!";   
     }   
  
     public String evaluate(String str){   
          return "hello world: " + str;   
     }   
}
 
2.UDAFステップ
1>継承する必要があります
org.apache.Hadoop.hive.ql.exec.UDAF(関数クラス継承)
org.apache.Hadoop.hive.ql.exec.UDAFEvaluator(内部クラスEvaluator実装UDAFEvaluatorインタフェース)
2>Evaluatorは、init、iterate、terminatePartial、merge、terminateといったいくつかの関数を実装する必要があります
Init():UDAFの初期化のための構造関数と同様
iterate():入力されたパラメータを受信し、内部の回転を行います.戻りタイプはbooleanです
terminatePartial():iterate関数のローテーションが終了した後、ローテーションデータを返します.iterateとterminatePartialはHadoopのCombiner(iterate-mapper;terminatePartial--reducer)に似ています.
merge():terminatePartialの戻り結果を受信し、booleanタイプのデータmerge操作を行う
terminate():最終的な集計関数の結果を返します.
import org.apache.Hadoop.hive.ql.exec.UDAF;
import org.apache.Hadoop.hive.ql.exec.UDAFEvaluator;

public class myAVG extends UDAF {

	public static class avgScore {
		private long pSum;
		private double pCount;
	}

	public static class AvgEvaluator extends UDAFEvaluator {
		avgScore score;

		public AvgEvaluator() {
			score = new avgScore();
			init();
		}

		/*
		 * init , UDAF 
		 */
		public void init() {
			score.pSum = 0;
			score.pCount = 0;
		}

		/*
		 * iterate , 。 boolean Combiner mapper
		 */
		public boolean iterate(Double in) {
			if (in != null) {
				score.pSum += in;
				score.pCount++;
			}
			return true;
		}

		/*
		 * terminatePartial , iterate , Combiner reducer
		 */
		public avgScore terminatePartial() {
			return score.pCount == 0 ? null : score;
		}

		/*
		 * merge terminatePartial , merge , boolean
		 */
		public boolean merge(avgScore in) {
			if (in != null) {
				score.pSum += in.pSum;
				score.pCount += in.pCount;
			}
			return true;
		}

		/*
		 * terminate 
		 */
		public Double terminate() {
			return score.pCount == 0 ? null : Double.valueof(score.pSum
					/ score.pCount);
		}
	}
}

 3.UDTFステップ
1>orgを継承する必要がある.apache.Hadoop.hive.ql.udf.generic.GenericUDTF
2>initialize,process,closeの3つの方法の実装
3>UDTFはまず
a.initializeメソッドを呼び出し、このメソッドはUDTFの戻り行の情報(戻り個数、タイプ)を返す
b.初期化が完了すると、プロセスメソッドが呼び出され、入力されたパラメータが処理され、forword()メソッドで結果が返されます
c.最後にclose()メソッド呼び出し、クリーンアップする必要があるメソッドをクリーンアップする
public class GenericUDTFExplode extends GenericUDTF {   
      
      private ListObjectInspector listOI = null;   
      
      @Override  
      public void close() throws HiveException {   
      }   
      
      @Override  
      public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {   
        if (args.length != 1) {   
          throw new UDFArgumentException("explode() takes only one argument");   
        }   
      
        if (args[0].getCategory() != ObjectInspector.Category.LIST) {   
          throw new UDFArgumentException("explode() takes an array as a parameter");   
        }   
        listOI = (ListObjectInspector) args[0];   
      
        ArrayList<String> fieldNames = new ArrayList<String>();   
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();   
        fieldNames.add("col");   
        fieldOIs.add(listOI.getListElementObjectInspector());   
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,   
            fieldOIs);   
      }   
      
      private final Object[] forwardObj = new Object[1];   
      
      @Override  
      public void process(Object[] o) throws HiveException {   
        List<?> list = listOI.getList(o[0]);   
        if(list == null) {   
          return;   
        }   
        for (Object r : list) {   
          forwardObj[0] = r;   
          forward(forwardObj);   
        }   
      }   
      
      @Override  
      public String toString() {   
        return "explode";   
      }   
}