Hadoop MapReduceステップ分散キャッシュによるreplicated join
コンセプト:
reduce-side joinテクノロジーは柔軟ですが、非常に効率的になる場合があります.joinはreduce()フェーズまで開始するため、shuffleのすべてのデータをネットワークに転送しますが、ほとんどの場合、joinフェーズでほとんどの転送データを破棄します.従ってmap段階でjoin操作全体を完了することを期待する.
主な技術難点:
map段階でjoinを完了する主な困難は、mapperが自分では得られないデータとjoin操作を行う必要がある可能性があることであり、このようなデータがmapperによって得られることを保証できれば、私たちのこの技術は利用できます.例を挙げると、
2つのソースデータが同じサイズのpartitionに分けられ、各partitionがjoin keyに適したkey値でソートされていることを知っていれば、mapper()ごとにすべてのjoin操作に必要なデータを取得することができます.実際、Hadoopのorg.apache.hadoop.mared.joinパッケージには、mapside joinを実現するためのこのようなヘルプクラスが含まれていますが、残念ながら、このような状況は少なすぎます.また、このようなクラスを使用すると、追加のオーバーヘッドが発生します.そのため、私たちはこのパッケージについて議論し続けません.
どんな場合に使いますか.
ケース1:2つのソースデータが
同じ大きさのpartitionで、各partitionはjoin keyとして適しています.
key値ソート
ケース2:joinの大規模なデータの場合、通常は1つのソースデータだけが非常に大きく、もう1つのデータは数桁の減少を示す可能性があります.例えば、電話会社のユーザーデータは千万件のユーザーデータしかないかもしれませんが、彼の取引記録データには10億件以上の具体的な電話記録がある可能性があります.小さなデータソースをmapperのメモリに割り当てることができる場合、小さなデータソースを各mapperマシンにコピーすることによって、mapperをmap段階でjoin操作を行う限り、効果的なパフォーマンス向上を得ることができます.この操作をreplicate joinと言います.
ソリューション:
Hadoopには
分散キャッシュ(distributed cache)のメカニズムは、クラスタ上のすべてのノードにデータを配布する.通常、mapperに必要な「background」データを含むすべてのファイルを配布するために使用されます.たとえば、Hadoopを使用してドキュメントを分類すると、distributed cacheを使用して、すべてのmapperがこれらのkeywords(「background data」)を取得できることを保証するキーワードのリストがあるかもしれません.
操作手順:
1.各ノードにデータを配布します.
私たちのもう一つの制限は、joinのテーブルがメモリに保存できるように十分に小さくなければならないことです.非対称サイズの入力データでは、メモリに入れるほど小さいデータではない可能性があります.
1.データ処理手順を再スケジュールして有効にすることができます.たとえば、すべてのユーザーが415領域でデータをソートする必要がある場合、一定のレコードをフィルタする前にOrdersテーブルとCustomersテーブルを接続するのは正しいが、効率は高くない.CustomersテーブルとOrdersテーブルは、メモリに入れないほど大きい場合があります.このとき,CustomersまたはOrdersテーブルを小さくするためにデータを前処理することができる.
2.データをどのように前処理しても十分に小さくできない場合があります.mapで415 areaに属していないユーザーをフィルタリングする必要があります.詳しくは『Hadoop in Action』Chapter 5.2.3 semijoinを参照
reduce-side joinテクノロジーは柔軟ですが、非常に効率的になる場合があります.joinはreduce()フェーズまで開始するため、shuffleのすべてのデータをネットワークに転送しますが、ほとんどの場合、joinフェーズでほとんどの転送データを破棄します.従ってmap段階でjoin操作全体を完了することを期待する.
主な技術難点:
map段階でjoinを完了する主な困難は、mapperが自分では得られないデータとjoin操作を行う必要がある可能性があることであり、このようなデータがmapperによって得られることを保証できれば、私たちのこの技術は利用できます.例を挙げると、
2つのソースデータが同じサイズのpartitionに分けられ、各partitionがjoin keyに適したkey値でソートされていることを知っていれば、mapper()ごとにすべてのjoin操作に必要なデータを取得することができます.実際、Hadoopのorg.apache.hadoop.mared.joinパッケージには、mapside joinを実現するためのこのようなヘルプクラスが含まれていますが、残念ながら、このような状況は少なすぎます.また、このようなクラスを使用すると、追加のオーバーヘッドが発生します.そのため、私たちはこのパッケージについて議論し続けません.
どんな場合に使いますか.
ケース1:2つのソースデータが
同じ大きさのpartitionで、各partitionはjoin keyとして適しています.
key値ソート
ケース2:joinの大規模なデータの場合、通常は1つのソースデータだけが非常に大きく、もう1つのデータは数桁の減少を示す可能性があります.例えば、電話会社のユーザーデータは千万件のユーザーデータしかないかもしれませんが、彼の取引記録データには10億件以上の具体的な電話記録がある可能性があります.小さなデータソースをmapperのメモリに割り当てることができる場合、小さなデータソースを各mapperマシンにコピーすることによって、mapperをmap段階でjoin操作を行う限り、効果的なパフォーマンス向上を得ることができます.この操作をreplicate joinと言います.
ソリューション:
Hadoopには
分散キャッシュ(distributed cache)のメカニズムは、クラスタ上のすべてのノードにデータを配布する.通常、mapperに必要な「background」データを含むすべてのファイルを配布するために使用されます.たとえば、Hadoopを使用してドキュメントを分類すると、distributed cacheを使用して、すべてのmapperがこれらのkeywords(「background data」)を取得できることを保証するキーワードのリストがあるかもしれません.
操作手順:
1.各ノードにデータを配布します.
DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf);
.各mapperでDistributedCache.getLocalCacheFiles()を使用してファイルを取得し、その後、対応する操作を行います.DistributedCache.getLocalCacheFiles();
新しい問題:私たちのもう一つの制限は、joinのテーブルがメモリに保存できるように十分に小さくなければならないことです.非対称サイズの入力データでは、メモリに入れるほど小さいデータではない可能性があります.
1.データ処理手順を再スケジュールして有効にすることができます.たとえば、すべてのユーザーが415領域でデータをソートする必要がある場合、一定のレコードをフィルタする前にOrdersテーブルとCustomersテーブルを接続するのは正しいが、効率は高くない.CustomersテーブルとOrdersテーブルは、メモリに入れないほど大きい場合があります.このとき,CustomersまたはOrdersテーブルを小さくするためにデータを前処理することができる.
2.データをどのように前処理しても十分に小さくできない場合があります.mapで415 areaに属していないユーザーをフィルタリングする必要があります.詳しくは『Hadoop in Action』Chapter 5.2.3 semijoinを参照