Hiveでテーブルのパーティショニングや重複削除をするクエリをカラム名非依存に書く


はじめに

この記事はMicroAd Advent Calendar 2017の13日目の記事です。

Hive(hadoop)は大量データを効率的に捌くため、データの非正規化上等な世界です。
必然的に扱うデータのカラム数は肥大化する傾向があり、如何にして必要の無いカラムの名前を意識しないでデータを操作するか、というクエリが必要になってきます。

この記事ではETLの初期の段階で必要となってくる

  • パーティショニング
  • 重複削除

において、具体的なカラムを意識しないで操作する方法について紹介します。

結論だけ先に書いておくと、正規表現マッチングによるカラムの指定とrow_nuber関数使いましょうという話です。

Hiveによるデータ変換の必要性と問題

生のログをHadoopに効率的に使うためにはETLの初期の段階で以下のような変換が必要になります。

  • HDFS向けのデータへの変換
    • 列指向フォーマット(ORC|Parquent)への変換
    • 圧縮
    • 細分化されたファイルの結合
  • パーティショニング
  • 重複削除

これらの変換はデータ全体に対して行うため、基本的にはSELECT *でガツッと書けそうですがパーティションニング重複削除においては以下のような問題があり、具体的なカラム名について記述の必要が出てきます。

パーティショニング

HDFSに突っ込むデータは大概はdateとかhourとか時系列にパーティショニングする事が多いかと思います。
fluentdやflumeならtimestampに応じてHDFSの格納先を変更できるので、データを突っ込む際に時系列でのパーティショニングが可能です。
が、場合によってはあるキーをベースに追加でパーティションの階層を増やしたくるケースがあります。
ここでdynamic partition insertを使ってパーティションを増やしたテーブルにデータを突っ込み直そうとすると、ある問題に直面します。
それはdynamic partition insertでパーティションとして指定したいカラムはSELECTの末尾に順番を合わせて書くという制約があることです。
このままではSELECT句で末尾の順番を気にしながら全てのカラムを羅列しなければならなくなります。

重複削除を行う

もう一方の問題は重複削除のクエリをどう書くか?という点です。
データ収集のパイプラインをExactly onceに構築するのはかなり難しく、大概はデータパイプラインはAt least onceを保証して、データの終点(Hadoopなど)でidなどのユニークなカラムを使って重複削除を行います。
重複削除のクエリで最もポピュラーな方法はidGROUP BYしてその他カラムはMAXで集約してSELETするというものがありますが、やはりカラム数が多いとクエリを書くのが辛くなります。

解決法

Hiveカラムの正規表現マッチングを使用してパーティションキーのみ順番を指定する

HiveではSELECTするカラムを正規表現によりマッチングさせる便利機能があります。
この機能を使って、パーティションに使うカラムのみを除く正規表現を書くことで、パーティションキーの順番のみを意識したクエリがかけます。

具体的には以下のようなクエリです。以下の例ではdate,hour,countryをパーティションキーとして使おうとしています。

SELECT `(date|hour|country)?+.+`, country, date, hour FROM json_log;

このクエリでは`(date|hour|country)?+.+`の正規表現マッチングの箇所でcountrydatehour以外のカラムがマッチさせ、除外されたパーティションキーとして使うカラムは別途、明示的に順番を指定しています。

ちなみにこのカラム名の正規表現マッチングの機能を使おうとすると、hive.support.quoted.identifiersnoneにする必要があります。

idの重複削除にはrow_numbe関数を使う

HiveではWindow関数が使えるので、row_number関数が使えます。
以下のようなクエリでidカラムの重複に対して連番が振れるので、その1番目だけを使い重複を削除できます。

WITH with_order_num AS (
    SELECT 
        *, row_number()
            over(
                partition by id
            ) AS order_num
    FROM 
        json_log
)
SELECT
    *
FROM
    with_order_num
WHERE
    order_num = 1
;

まとめ

上記のテクニックをまとめると以下のようなクエリになります。

WITH with_order_num AS (
    SELECT 
        *, row_number()
            over(
                partition by id
            ) AS order_num
    FROM 
        json_log
    WHERE
         WHERE date = '20171213'
         AND hour = '15'
)
FROM with_order_num
INSERT OVERWRITE TABLE orc_log partition(country, dt, hour)
SELECT 
    `(date|hour|country)?+.+`, 
    country, 
    date, 
    hour 
WHERE 
    order_num = 1
;

このように必要なカラム名以外を意識することなく、変換処理が書けました。
また、カラム名に非依存な変換クエリにしておくと、データ構造の変更にも強いですし、扱うログのカラム名の規則がちゃんと揃っていればほぼコピペのクエリで済みます。
(実際にはクエリをジョブで投げる際はtemplate化して都度レンダリングするケースが殆どだと思いますのでtemplateが使いまわせます。)

参考