Hive---Join最適化
8913 ワード
1、接続順序の最適化
マルチテーブル接続は、複数のMRジョブに変換され、各MRジョブはHiveでJOINフェーズ(Stage)と呼ばれます.各ステージにおいて、JOIN順の最後のテーブルはできるだけ大きなテーブルであるべきである.JOINの前段階で生成されたデータはReducerのbufferに存在するため、streamの一番後ろのテーブルを通して、Reducerのbufferからバッファリングされた中間結果データ(この中間結果データはJOIN順である可能性があり、前のテーブルが接続された結果のKeyであり、データ量が相対的に小さく、メモリオーバーヘッドが小さい)を直接読み出すことで、後ろの大きなテーブルに接続する場合は、bufferからキャッシュされたKeyを読み込むだけで、大きなテーブルの指定されたKeyに接続すると、より高速になり、メモリバッファのオーバーフローを回避できます.例:
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
このJOIN文は、MR Jobを生成し、JOIN順を選択した場合、データ量はb
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
2、条件に基づくLEFTOUTER JOIN最適化
左接続の場合、左テーブルに表示されるJOINフィールドはすべて保持され、右テーブルに接続されていないものはすべて空です.WHERE条件付きJOIN文の場合は、次のようになります.
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)
WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'
実行手順は,まず2表JOINを完成させてからWHERE条件でフィルタリングすることで,JOIN中に大量の結果が出力される可能性があり,これらの結果をフィルタリングするのに時間がかかる.最適化は、WHERE条件をONにして、例えば:
SELECT a.val, b.val FROM a LEFT OUTER JOIN b
ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07')
このようにJOINの過程で,条件を満たさない記録をあらかじめフィルタリングしておくと,よりよい表現が得られる可能性がある.
3、左半接続(LEFTSEMI JOIN)
LEFTSEMI JOINはIN/EXISTSサブクエリのより効率的な実装である.Hiveは現在IN/EXISTSサブクエリを実装していないので、LEFTSEMI JOINでサブクエリ文を書き換えることができます.LEFTSEMI JOINの制限は、JOIN句の右側の表はON句にのみフィルタ条件が設定されており、WHERE句、SELECT句または他の場所ではフィルタリングができません.
SQL :
select a.key,a.value from a where a.key in (select b.key from b)
Hive :
select a.key, a.val from a left semi join b on (a.key = b.key)
4、Map Side JOIN
接続された2つのテーブルが比較的小さなテーブルと特に大きなテーブルである場合、比較的小さなtableをメモリに直接入れてから、比較的大きなテーブルに対してmap操作を行います.joinはmap操作の時に発生し、大きなtableのデータをスキャンするたびに、小さなテーブルのデータ、どれが一致しているかを見て接続します.ここのjoinはreduce操作には関与しません.map端joinの利点はshuffleがないことです.複数のテーブルが接続されています.1つのテーブルだけが大きい場合、他のテーブルが小さい場合、JOIN操作はMapのみを含むJobに変換されます.たとえば、次のようになります.
b
SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a JOIN b ON a.key = b.key
5、BUCKET Map Side JOIN
まず、2つのテーブルaとbのDDLを見てみましょう.テーブルaは:
CREATE TABLE a(key INT, othera STRING)
CLUSTERED BY(key) INTO 4 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;
表bは、
CREATE TABLE b(key INT, otherb STRING)
CLUSTERED BY(key) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;
今、a.keyとb.keyに基づいてJOIN操作を行います.この場合、JOIN列はBUCKET列でもあります.JOIN文は以下の通りです.
SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a JOIN b ON a.key = b.key
また、表aには4個のBUCKETがあり、表bには32個のBUCKETがあり、デフォルトでは、表aの各BUCKETについて、表bの各BUCKETを取得してJOINを行うが、今回は一定のオーバーヘッドが発生する.表bの中でJOIN条件を満たすBUCKETだけが表aのBUCKETに本当に接続されるからである.このデフォルト動作は最適化でき、デフォルトJOIN動作を変更することで、変数を設定するだけです.
set hive.optimize.bucketmapjoin = true
このように、JOINのプロセスは、テーブルaのBUCKET 1がテーブルbのBUCKET 1とJOINするだけであり、テーブルbの他のBUCKET 2~32は考慮されない.上記のテーブルに同じBUCKETがある場合、32個で並べ替えられている場合、すなわち、テーブル定義においてCLUSTERDBY(key)の後に以下の制約が追加される.
SORTED BY(key)
上記JOIN文では、Sort-Marge-Bucket(SMB)JOINを実行します.同様に、デフォルトの動作を変更するパラメータを設定する必要があります.JOINを最適化するには、関連するBUCKETのみを巡回すればいいです.
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;