Hive Streaming

8622 ワード

1.Hive Streamingの説明
前にUDF、UDTF、UDAFの実現は簡単ではなく、Javaに詳しいことを求めていますが、Hiveの設計の初心はJava以外の人の使用を便利にすることです.そのため、Hiveはもう一つのデータ処理方式であるStreamingを提供し、Javaコードを書く必要がなくなり、実際にはStreaming処理方式は多くの言語をサポートすることができる.しかし、Streamingの実行効率は、通常、対応するUDFまたはInputFormatオブジェクトを書き換える方法よりも低い.パイプ内でシーケンス化され、データが逆シーケンス化されると、通常は非効率になります.また、通常の方法ではプログラム全体をデバッグするのは難しい.
Hiveでは、Streamingを使用するための複数の構文が用意されています.
  • MAP()
  • REDUCE()
  • TRANSFORM()

  • ただし、MAP()は実際にはMapperフェーズでStreamingを実行するのではなく、REDUCE()が実際にReducerフェーズでStreamingを実行するわけではないことに注意してください.したがって、同じ機能では、通常TRANSFORM()文を使用することを推奨し、疑惑を回避することができます.
    2.Streamingの作成と使用
    Streamingの実装にはTRANSFORM()関数とUSINGキーワードが必要です.TRANSFORM()のパラメータはテーブルのカラム名で、USINGキーワードはスクリプトを指定するために使用されます.このセクションのデータは、Hive UDFチュートリアル(1)で使用されているemployeeテーブルを使用します.
    例1:Streaming Linuxコマンドの使用
    まず、StreamingがLinuxシステムのコマンドcatを直接使用してテーブルを問合せます.cat.qはHiveQLファイルで、内容は以下の通りです.
    SELECT TRANSFORM(e.name, e.salary)
    USING '/bin/cat' AS name, salary
    FROM employee e;

    実行結果:
    hive (mydb)> SOURCE cat.q;
    OK
    Time taken: 0.044 seconds
    Query ID = root_20160120000909_2de2d4f9-b50c-4ed1-a876-768c0127f067
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks is set to 0 since there's no reduce operator
    Starting Job = job_1453275977382_0001, Tracking URL = http://master:8088/proxy/application_1453275977382_0001/
    Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job  -kill job_1453275977382_0001
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
    2016-01-20 00:10:16,258 Stage-1 map = 0%,  reduce = 0%
    2016-01-20 00:10:22,942 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.12 sec
    MapReduce Total cumulative CPU time: 1 seconds 120 msec
    Ended Job = job_1453275977382_0001
    MapReduce Jobs Launched: 
    Stage-Stage-1: Map: 1   Cumulative CPU: 1.12 sec   HDFS Read: 1040 HDFS Write: 139 SUCCESS
    Total MapReduce CPU Time Spent: 1 seconds 120 msec
    OK
    John Doe	100000.0
    Mary Smith	80000.0
    Todd Jones	70000.0
    Bill King	60000.0
    Boss Man	200000.0
    Fred Finance	150000.0
    Stacy Accountant	60000.0
    Time taken: 24.758 seconds, Fetched: 7 row(s)

    例2:Streaming Pythonスクリプトの使用
    次に、Hiveのsum()関数と、sum.pyを使用したPythonスクリプトの実行を比較して、Hiveのsum()関数の実行を見てみましょう.
    hive (mydb)> SELECT sum(salary) FROM employee;
    Query ID = root_20160120012525_1abf156b-d44b-4f1c-b2c2-3604e4c1bba0
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks determined at compile time: 1
    In order to change the average load for a reducer (in bytes):
      set hive.exec.reducers.bytes.per.reducer=<number>
    In order to limit the maximum number of reducers:
      set hive.exec.reducers.max=<number>
    In order to set a constant number of reducers:
      set mapreduce.job.reduces=<number>
    Starting Job = job_1453281391968_0002, Tracking URL = http://master:8088/proxy/application_1453281391968_0002/
    Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job  -kill job_1453281391968_0002
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
    2016-01-20 01:25:20,364 Stage-1 map = 0%,  reduce = 0%
    2016-01-20 01:25:31,620 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.55 sec
    2016-01-20 01:25:42,394 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.73 sec
    MapReduce Total cumulative CPU time: 2 seconds 730 msec
    Ended Job = job_1453281391968_0002
    MapReduce Jobs Launched: 
    Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.73 sec   HDFS Read: 1040 HDFS Write: 9 SUCCESS
    Total MapReduce CPU Time Spent: 2 seconds 730 msec
    OK
    720000.0
    Time taken: 33.891 seconds, Fetched: 1 row(s)

    次に、Streamingを見て実行します.sum.pyスクリプト:
    #!/usr/bin/env python
    
    import sys
    
    def sum(arg):
        global total
        total += arg
    
    if __name__ == "__main__":
        total  = 0.0
        for arg in sys.stdin:
            sum(float(arg))
        print total;
    

    HiveQLスクリプトsum.q:
    SELECT TRANSFORM(salary)                     
    USING 'python /root/experiment/hive/sum.py' AS total
    FROM employee; 

    最後に、実行結果(仮想マシンで構築された完全な分散型であり、データ量が小さいため、sum()関数と比較して実行時間は参照専用):
    hive> source sum.q;
    OK
    Time taken: 0.022 seconds
    Query ID = root_20160120002626_0ced0b93-e4e8-4f3a-91d0-f2aaa06b5f11
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks is set to 0 since there's no reduce operator
    Starting Job = job_1453278047512_0002, Tracking URL = http://master:8088/proxy/application_1453278047512_0002/
    Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job  -kill job_1453278047512_0002
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
    2016-01-20 00:26:28,341 Stage-1 map = 0%,  reduce = 0%
    2016-01-20 00:26:36,185 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.4 sec
    MapReduce Total cumulative CPU time: 1 seconds 400 msec
    Ended Job = job_1453278047512_0002
    MapReduce Jobs Launched: 
    Stage-Stage-1: Map: 1   Cumulative CPU: 1.4 sec   HDFS Read: 1040 HDFS Write: 9 SUCCESS
    Total MapReduce CPU Time Spent: 1 seconds 400 msec
    OK
    720000.0
    Time taken: 17.048 seconds, Fetched: 1 row(s)

    例3:StreamingのWordCount
    このセクションの最後に、WordCountをHive Streamingで実行する例を示します.まずdocsデータテーブルを見てください.
    hive (mydb)> SELECT * FROM docs;                                      
    OK
    hello world
    hello hadoop
    hello spark
    Time taken: 0.044 seconds, Fetched: 3 row(s)

    wc_mapper.pyは、Mapperフェーズの処理を行います.つまり、すべての単語を取り出し、カウントは1です.
    #!/sur/bin/env python
    
    import sys
    
    def splitWord(rows):
        words = rows.strip().split(" ")
        for word in words:
            print "%s\t1" % (word)
    
    if __name__ == "__main__":
        for line in sys.stdin:
            splitWord(line)
    

    wc_reducer.pyはReducer段階の処理を行い、すなわち単語を統計する.
    #!/usr/bin/env python
    
    import sys
    
    (lastKey, lastCount) = (None, 0)
    #f = open("test")
    for line in sys.stdin:
        (key, count) = line.strip().split("\t")
        if (lastKey) and (lastKey != key):
            print "%s\t%d" % (lastKey, lastCount)
            (lastKey, lastCount) = (key, int(count))
        else:
            lastKey = key
            lastCount += int(count)
    
    if lastKey:
        print "%s\t%d" % (lastKey, lastCount)

    HiveQLスクリプトwc.qは実行するHQL文を書いて、私は中間表wordcountを使って結果を保存して、もちろん直接出力を問い合わせることができます:
    CREATE TABLE IF NOT EXISTS wordcount(
        word STRING,
        count INT
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t';
    
    FROM(
        FROM docs
        SELECT TRANSFORM(line) USING 'python /root/experiment/hive/wc_mapper.py'
        AS word, count
        CLUSTER BY word) wc
    INSERT OVERWRITE TABLE wordcount
    SELECT TRANSFORM(wc.word, wc.count) USING 'python /root/experiment/hive/wc_reducer.py'
    AS words, counts;

    最後に実行結果です.中間テーブルwordcountを使用しているので、実行後、wordcountテーブルから結果を検索する必要があります.
    hive (mydb)> SOURCE wc.q;
    OK
    Time taken: 0.022 seconds
    OK
    Time taken: 0.066 seconds
    Query ID = root_20160120013535_c6e957a9-1981-475a-b21a-e73576df6a99
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks not specified. Estimated from input data size: 1
    In order to change the average load for a reducer (in bytes):
      set hive.exec.reducers.bytes.per.reducer=<number>
    In order to limit the maximum number of reducers:
      set hive.exec.reducers.max=<number>
    In order to set a constant number of reducers:
      set mapreduce.job.reduces=<number>
    Starting Job = job_1453281391968_0003, Tracking URL = http://master:8088/proxy/application_1453281391968_0003/
    Kill Command = /root/install/hadoop-2.4.1/bin/hadoop job  -kill job_1453281391968_0003
    Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
    2016-01-20 01:35:53,691 Stage-1 map = 0%,  reduce = 0%
    2016-01-20 01:36:00,339 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.15 sec
    2016-01-20 01:36:08,961 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.98 sec
    MapReduce Total cumulative CPU time: 2 seconds 980 msec
    Ended Job = job_1453281391968_0003
    Loading data to table mydb.wordcount
    Table mydb.wordcount stats: [numFiles=1, numRows=4, totalSize=33, rawDataSize=29]
    MapReduce Jobs Launched: 
    Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.98 sec   HDFS Read: 260 HDFS Write: 103 SUCCESS
    Total MapReduce CPU Time Spent: 2 seconds 980 msec
    OK
    Time taken: 25.652 seconds
    
    hive (mydb)> SELECT * FROM wordcount;
    OK
    hadoop	1
    hello	3
    spark	1
    world	1
    Time taken: 0.047 seconds, Fetched: 4 row(s)