カスタムUDTFとhiveカスタム関数の永続登録

10086 ワード

まず、私の参考ブログを宣言します.http://blog.csdn.net/kent7306/article/details/50200061    http://blog.csdn.net/bdchome/article/details/45843559
1:
私のビジネス全体のニーズを説明します.
会社はnginxログデータをflumeでHDFSに保存し、hive処理でソーステーブルに挿入するように要求した.
nginxログデータフォーマット:16/Jun/2017:16:42:18+0800^{'data':[{'product':'aapp','userAgent':'864682030329437|LON-AL 00','clickElement':'e_login_login','userId':null,'clickTime':'1497602537'}}
その後hiveのloadメソッドで分割子「^」loadを指定して一時テーブルに挿入し、カスタム関数でデータを処理して自分のニーズに合ったデータフォーマットを得る.
二:hiveカスタム関数は三つの形式がある:UDF、UDAF、UDTF
UDF:一進一出
UDAF:sum()、min()、avg()、max()
UDTF:一進多出、通俗的には一つのフィールドに入って複数のフィールドを返し、本ニーズに合う
三:まず、画像を通じて、本需要の前後の対比を直感的に理解する.
nginxログデータ:
自定义UDTF和hive自定义函数的永久注册_第1张图片
 
loadメソッドloadによってテンポラリ・テーブルに入力されるデータ(time_stamp string,name stringの2つのフィールドのみ)nameがjson形式のデータ
自定义UDTF和hive自定义函数的永久注册_第2张图片
カスタム関数で処理されたデータ:(nameフィールドのみを処理)
自定义UDTF和hive自定义函数的永久注册_第3张图片
四:本需要について詳しく理解した後、くだらないことをあまり言わずに直接コードを引く
カスタムUDTFコード:
 
package myUDF;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/**
 *  APPlog       UDTF
 *     json  
 * {'data':[{'product':'aAPP','userAgent':'860209036790711|vivo X6D','clickElement':'e_main_moreInfo','userId':'3916482','clickTime':'1497502470'}]}
 * {'data':[{'product':'iAPP|1.2.0','userAgent':'fc95b2415cc54dd5b3999891acee985e|iPhone6s','clickElement':'e_main_moreInfo','userId':'3960209','clickTime':'1497502470'}]}
 * @author 
 *
 */
public class AppJsonTransformUDTF extends GenericUDTF {  
	  
    private PrimitiveObjectInspector stringOI = null;  

    /**
     *     、      
     */
    @Override  
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {  

      if (args.length != 1) {  
        throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");  
      }  

      if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE  
          && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {  
        throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");  
      }  
        
      //     (inspectors)  
      stringOI = (PrimitiveObjectInspector) args[0];  

      //     (inspectors) --         
      List fieldNames = new ArrayList(7);  
      List fieldOIs = new ArrayList(7);  
      fieldNames.add("product");  //  
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
      fieldNames.add("version");  //APP  
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); 
      fieldNames.add("device_number");  //   
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
      fieldNames.add("model");  //  
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
      fieldNames.add("clickElement");  //    
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
      fieldNames.add("userId");  //  ID
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
      fieldNames.add("clickTime");  //    
      fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
      return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);  
    }  
         
    /**
     *          
     * @param APP_log_data
     * @return
     */
    public ArrayList processInputRecord(String APP_log_data){  
    	
    	String[] jsonKey = new String[]{"product","userAgent","clickElement","userId","clickTime"};
          ArrayList result = new ArrayList();  
          
          //   null      
          if (APP_log_data == null || APP_log_data.isEmpty()) {  
            return result;  
          }  
           
          JSONObject jsonObject = JSONObject.fromObject(APP_log_data);
    		JSONArray jsonArray = jsonObject.getJSONArray("data");
    		JSONObject log_data = jsonArray.getJSONObject(0);
    		
    		String product1 ;//  
        	String product2 ;//APP  
    		
    		String product = log_data.getString(jsonKey[0]);
            if(product.indexOf("|")!=-1){
            	//iOS      
            	product1 = product.split("\\|")[0];
           	 product2 = product.split("\\|")[1]; 
            }else {
            	//         
            	product1 = product;
                product2 = "null";
            }
            
    		String userAgents = log_data.getString(jsonKey[1]);
    		String device_number = userAgents.split("\\|")[0];   //   
    		String model = userAgents.split("\\|")[1];       //    
    		String clickElement = log_data.getString(jsonKey[2]);
    		String userId = log_data.getString(jsonKey[3]);
    		String clickTime_stamp = log_data.getString(jsonKey[4]); 
    		
    		/**
    		 *    java        
    		 */
    		SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            long lt = new Long(clickTime_stamp);
            Date date = new Date(lt* 1000L);
            String clickTime = simpleDateFormat.format(date);
             
            //     
            result.add(new Object[] { product1, product2, device_number, model, clickElement, userId, clickTime});
            return result;  
    }  
      
    /**
     *             
     */
    @Override  
    public void process(Object[] record) throws HiveException { 
    	

      final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();  
      //        
      ArrayList results = processInputRecord(name);  

      Iterator it = results.iterator();  
        
      //       
      while (it.hasNext()){  
          Object[] r = it.next();  
          forward(r);  
      }  
    }  

    @Override  
    public void close() throws HiveException {  
      // do nothing  
    }  
}  

初めて書くと、コードは少し冗長かもしれませんが、ビジネスが実現すればOKです.
 
具体的な各メソッドはどのような役割を果たしていますか.http://blog.csdn.net/kent7306/article/details/50200061
五:カスタムUDTFをパッケージアップロードする
推奨$HIVE_HOMEディレクトリの下にauxlibディレクトリを作成し、jarパッケージをauxlibディレクトリの下に転送すると、hiveのコマンドラインでadd jar///pathを叩くたびに使用できなくなります.
一時関数の作成names
 
hive> CREATE TEMPORARY FUNCTION process_names as 'myUDF.AppJsonTransformUDTF';

一時関数の使用
 
 
select process_names(name) from test_wcf.test_udtf;

自定义UDTF和hive自定义函数的永久注册_第4张图片テスト中にclass not foundのエラーが複数回発生しました.主にjsonデータを解析するにはjarパケットが不足しているためです.
 
すべてのjar:
自定义UDTF和hive自定义函数的永久注册_第5张图片
また、javaコードを変更してアップロードを再パッケージすればよい場合も複数あります.jarパケットを再アップロードした後、hiveのコマンドラインを終了して再アクセスし、関数を作成することに注意してください.そうしないと、新しいjarパケットが機能しない可能性があります.
六:一時関数をテストした後、永久関数を作成することができます.
 
create function  default.AppJsTsfmUDTF as 'myUDF.AppJsonTransformUDTF'

関数の表示:
 
 
hive> show functions;

自定义UDTF和hive自定义函数的永久注册_第6张图片
 
 
 
 
 
カスタム関数を見つければOKですが、不安ならhiveコマンドラインを終了して再入力し、show functionsでいいです.