XplentyでJOIN処理を高速化する方法


正しいJoinタイプを選択することでデータ統合のパフォーマンスを向上させることができることをご存知でしたか?Pig はいくつかの異なるアルゴリズムを持っており、データセットのもつ特性に応じて最適な方法で結合処理を行うことができ、ジョブの実行時間を大幅に節約することができます。

XplentyのJoinコンポーネントでは、Pigで利用可能なのJoinタイプのうち、以下のタイプを指定することが可能です。

Replicated Join

どういう場合に使う?
データセットがメモリに収まるほど小さい場合。

どう動作するか?
Replicated joinは、小さなデータセットを分散されたキャッシュ(すべてのクラスタマシンで利用可能なスペース)にコピーし、メモリにロードします。各マッパーは大きなデータセットが分割されたデータを処理し、より小さい方のデータセットの中から一致するレコードを探します。データはメモリ上で扱われ、MapReduceのマップ側で処理されるため、この操作はデフォルトの結合よりもはるかに高速に動作します。内部および外部結合の両方が利用可能で、小さい方のデータセットはjoinの右側にセットする必要があります。

制約
Replicated joinを使用するためには、どのくらい小さいサイズのデータセットまで可能かは明確ではありません。Pigのドキュメントによると、プロセス全体で1GBのメモリが必要な場合、最大100MBまでのリレーション(ジョイン先のデータ)が利用可能です。データをロードするのに十分なメモリがない場合、ランタイムエラーが発生します。
XplentyでReplicated joinを指定する場合、InnerもしくはLeft joinのみ利用可能です。

Skewed Join

どういう場合に使う?
あるキーの値が他の値よりもはるかに多くの部分を占めており、データが大きすぎてメモリに収まりきらない場合。(キーの値が均等に分散されておらず、特定のキーの値に偏っている場合)

どう動作するか?
Standard joinsは、プロセス間でキー値を分割することで、異なるreducer間で並列に実行されます。特定のキーに対して多くのデータがある場合、データは各reducerに均等に分散されず、いずれかのreducerが大部分のデータを処理して「スタック」してしまいます。Skewed joinはこうした場合に対処するのに利用します。ヒストグラムを計算して、どのキーが最も多くを占めているかをチェックし、最適なパフォーマンスを得るためにそれぞれのreducerへデータを分割します。

制約
Skewed joinは内部と外部の両方の結合をサポートしていますが、2つの入力データのみになります。 2つ以上のテーブル間の結合は、さらに別の結合に分ける必要がります。また、pig.skwedjoin.reduce.memusageというJavaパラメータがあり、これはこの結合を実行するためにReducerが利用できるヒープ率を指定します。低い値を設定すると、より多くのReducerが使用されることを意味しますが、それらの間でデータをコピーするためのコストが増加します。Pigの開発者はこの値を0.1-0.4の間に設定すると良いパフォーマンスが得られると主張していますが、理想的な値を見つけるためには検証する必要があります。

Default

どういう場合に使う?
上記のいずれかのケースに当てはまらない場合。

どう動作するか?
Joinは、特定のキーで2つの入力データセットからのレコードを一緒に一致させます。例えば、CRMからのレコードとERPからのレコードをメールアドレスなどのキーで結合することで、複数の別々のデータセットではなく、すべての顧客情報を同じデータセットにまとめることができます。外部結合も可能で、その場合、相手側で一致していないレコードも、欠落しているフィールドにNull値がセットされ含まれることになります。

Pigはマップフェーズですべての結合対象の入力データセット(2つ以上)を読み込み、各レコードのソースに注釈を付けます。結合キーをシャッフルキーとして使用し、同じキーを持つ行をグループ化します。各キー値に対して、左側の入力データセット(または複数の入力データセット)からのすべてのレコードがキャッシュされます(1つのキー値に対して複数のレコードが存在する可能性があります)。そして、右側の入力データセットがキャッシュされたレコードと掛け合わされ、出力レコードが生成されます。つまり、1対多の結合では、結合のパフォーマンスを向上させるために、キー値あたりのレコード数が多い入力データセットを常に右側に配置する必要があります。

制約
すべてのPigのjoinは等結合であり、join演算子には等号比較(equals/not equals)のみが使用できることを意味します。Pigはシータ(非等号)結合をサポートしておらず、MapReduceでの実装は困難です。シータ結合が必要な場合は、Cross-joinを実行してからフィルタを使用してください。

join演算子はNULLに関してはSQL標準に従って機能します - NULLキーを持つすべての行は内部結合で無視されます。したがって、無駄なレコードの処理を減らしパフォーマンスを向上させるために、内部結合処理の前にNULLキーをフィルタリングし除外してください。Null値は外部結合では保持されますが、ジョイン先のレコードに結合されることはありません。

JOIN時のフィールド名に関する注意点

結合対象の入力データセットに同じフィールド名が含まれている場合、フィールド名の前に自動的にデータセット名が不可されます。(例:sample_input_data1::customer_id)。このような場合、ジョブ実行時にエラーを引き起こす可能性があるため、後続処理でセレクトコンポーネントを使用し、フィールドの列名を修正したり、結合処理の前に、同じ値を含むフィールドを削除するようにしてください。

まとめ

データ統合のパフォーマンスは、正しい結合タイプを選択し使用することで向上します。片方のデータセットのサイズが小さい場合はReplicated join、特定のキーが非常に一般的な場合はSkewed Joinをお使いください。