hive結合python:Transformの使用



使用シーンは次のとおりです.
1行を複数行に変換する方法は、次のとおりです.最初の列の値はセミコロンで区切られた複数の属性です.属性値文字列です.次に、この列を属性と属性値の2列に分割して表示します.
# ["  :RN1-10/51;  :RN1-10/52;  :RN1-10/53", '11', '22']
# ["  ", "RN1-10/51", '11', '22']
# ["  ", "RN1-10/52", '11', '22']
# ["  ", "RN1-10/53", '11', '22']
 
hiveではTransformを使ってこのようなデータ処理シーンを完成させることができますが、文法フォーマットに注意してください.
Transformの値を入力としてpythonスクリプトに渡し、pythonの処理後、元の3列を
4列表示
select
  TRANSFORM(p.joint_attr_values, p.collect_product_id, p.released_id)
  USING 'python split_product_attrs.py'
  as (custom_attr , custom_attr_value, collect_product_id, released_product_id)
 
もう一つ注意しなければならないことがあります.
 
select
  TRANSFORM(p.joint_attr_values, p.collect_product_id, p.released_id)
  USING 'python split_product_attrs.py'
  as (custom_attr , custom_attr_value, collect_product_id, released_product_id)
from 
 (           select  ,  Transform   ,        ,       )
また、このpythonスクリプトを呼び出すときは、必ずソースを付けて、文法は次のようにします.
hive>add file'pythonファイルパス'
 
完全な例:
 
select
  TRANSFORM(p.joint_attr_values, p.collect_product_id, p.released_id)
  USING 'python split_product_attrs.py'
  as (custom_attr , custom_attr_value, collect_product_id, released_product_id)
from 
  (select pc.joint_attr_values, pm.collect_product_id, pm.released_id from 
    products_compared_${hiveconf:wid_version} pc
    left join products_merged_${hiveconf:wid_version} pm
    on pc.collect_id = pm.collect_product_id
    where pm.released_id is not null
    and pc.joint_attr_values is not null and pc.joint_attr_values != '' and pc.joint_attr_values != 'null') as p ;  
 
以下はpythonのスクリプトで、3列を4列に変換するために使用されます.ここでは簡単です.主にテストに使用されます.コードは勝手に書きました.
 
#!/usr/bin/python
# #_*_ coding: utf-8 _*_

import sys
import datetime
# "  :RN1-10/50;  :RN1-10/50;  :RN1-10/50"
# ["  :RN1-10/51;  :RN1-10/52;  :RN1-10/53", '11', '22']
# ["  ", "RN1-10/51", '11', '22']
# ["  ", "RN1-10/52", '11', '22']
# ["  ", "RN1-10/53", '11', '22']

for line in sys.stdin:
        values = line.split('\t')
        values = [ i.strip() for i in values ]
        tmp = values[0]
        key_values = tmp.split(";")
        for kv in key_values:
                k = kv.split(":")[0]
                v = kv.split(":")[1]
                print '\t'.join([k,v,values[1],values[2]])
 
次はネット上で1つの列を探して、私とこの差は多くありません
hive streamingのmapスクリプトを使用してデータを処理
 
hiveをselectクエリーするときにpython、php、c++などのスクリプトを作成して対応するデータ処理を行うことができます.hiveのTRANSFORMとusingを使用します.
例を見てみましょう.
add file /www/FCCS_Data/ComETL/hive/sql_map/demo.py ;
from access_fccs select TRANSFORM (time) using 'python demo.py' where week=41 limit 10 ;
または、
add file /www/FCCS_Data/ComETL/hive/sql_map/demo.py;
select TRANSFORM (time) using 'python demo.py' as (time) from (select * from access_fccs where week=41 limit 10) a ;
SQLはクエリ結果セットのtimeをdemoを通過することを意味します.pyが処理を行って対応する結果を返します.どのような処理をしたかは、次のdemoを見てください.pyコード.
ここで注意すべきは、mapスクリプトを分散キャッシュに追加する必要があります.そうしないとmetadataがエラーになります.HiveException: [Error 20003]: An error occurred when trying to close the Operator running your custom script. atというエラー
特にここの経路はローカル経路が分布式HDFS経路ではなく、初心者の方が混同しやすいことを示しています
そして私たちのdemoを見てみましょう.pyはどう書きますか.
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys
import datetime
import time

#           
def timestamp_toString(stamp):
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stamp))

for line in sys.stdin:
print timestamp_toString(float(line))

董pythonの友达はここが私たちのタイムスタンプを日付フォーマットに変換して出力していることを見るのは難しくありません.
したがって、hive SQLを実行した結果は次のとおりです.
Total MapReduce CPU Time Spent: 23 seconds 880 msec
OK
2013-10-11 13:11:47
2013-10-11 13:11:47
2013-10-11 13:11:47
2013-10-11 13:11:47
2013-10-11 13:11:47
2013-10-11 13:11:48
2013-10-11 13:11:48
2013-10-11 13:11:48
2013-10-11 13:11:48
2013-10-11 13:11:48
Time taken: 33.411 seconds