MongoDB MapReduceのパフォーマンスを20倍に向上させる最適化の宝典

11862 ワード

MongoDBがますます多くの大規模なキープロジェクトに採用されてから、データ分析もますます重要な話題になっています.これらの方法は大規模なデータ伝送を必要とし、これらのコストはかなり高いため、分析に異なるソフトウェアを使用することに飽きたようだ. 
     MongoDBは、データを分析する2つの方法を提供しています.Map Reduce(以下、MRと略称します)とAggregation Framework(Aggregation Framework)です.MRは非常に柔軟で使いやすく,パッチとよく組み合わせて使用でき,大規模な出力を可能にする.MongoDB v 2.4バージョンでは、JavaScriptエンジンがSpiderからV 8に切り替わったため、MRの性能は大幅に向上したが、Agg Framework(C+++を使用)に比べてMRの速度は遅い.本稿では,MRの速度を向上させる方法を見てみよう. テスト 
     まず、0~100万の単一の整数値を含む1000万のドキュメントを挿入するテストを行います.これは、平均10ドキュメント当たり同じ値を持つことを意味します.
           
> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
> db.uniques.findOne()
{ "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
> db.uniques.ensureIndex({dim0: 1})
> db.uniques.stats()
{
        "ns" : "test.uniques",
        "count" : 10000000,
        "size" : 360000052,
        "avgObjSize" : 36.0000052,
        "storageSize" : 582864896,
        "numExtents" : 18,
        "nindexes" : 2,
        "lastExtentSize" : 153874432,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 576040080,
        "indexSizes" : {
                "_id_" : 324456384,
                "dim0_1" : 251583696
        },
        "ok" : 1
}

 ここでは、ドキュメント内の一意の値のカウントを取得するには、次のMRタスクを使用します. 
    
> db.runCommand(
{ mapreduce: "uniques",
map: function () { emit(this.dim0, 1); },
reduce: function (key, values) { return Array.sum(values); },
out: "mrout" })
{
        "result" : "mrout",
        "timeMillis" : 1161960,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1059138,
                "output" : 999961
        },
        "ok" : 1
}

ご覧のように、出力結果は約1200秒(EC 2 M 3インスタンスでテスト)かかり、1千万maps、100万reduces、99999996のドキュメントが出力されます.結果は次のようになります.
     
> db.mrout.find()
{ "_id" : 1, "value" : 10 }
{ "_id" : 2, "value" : 5 }
{ "_id" : 3, "value" : 6 }
{ "_id" : 4, "value" : 10 }
{ "_id" : 5, "value" : 9 }
{ "_id" : 6, "value" : 12 }
{ "_id" : 7, "value" : 5 }
{ "_id" : 8, "value" : 16 }
{ "_id" : 9, "value" : 10 }
{ "_id" : 10, "value" : 13 }
...
  
    最適化の方法を見てみましょう.    ソートの使用      以前のこの記事では,MRに対するソートの利点を簡単に説明したが,これはあまり知られていない特性である.この場合、ソートされていない入力を処理すると、MRエンジンはランダムにソートされた値を得ることを意味し、RAMでreduceを行う機会はほとんどありません.逆に、一時的なcollectionでデータをディスクに書き込み、順番に読み取り、reduceを行わなければなりません.    次に、ソートを使用するとどのように役立つかを見てみましょう. 
    
> db.runCommand(
{ mapreduce: "uniques",
map: function () { emit(this.dim0, 1); },
reduce: function (key, values) { return Array.sum(values); },
out: "mrout",
sort: {dim0: 1} })
{
        "result" : "mrout",
        "timeMillis" : 192589,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1000372,
                "output" : 999961
        },
        "ok" : 1
}

   現在、時間は192秒に下がり、速度は6倍に向上した.実はreducesの数はそれほど差がありませんが、ディスクに書き込まれる前にRAMで完了しています. マルチスレッドの使用       MongoDBでは、単一のMRタスクではマルチスレッドは使用できません.マルチスレッドは、複数のタスクでのみ使用できます.しかし、現在のマルチコアCPUは、Hadoopのような単一サーバ上での並列化作業に非常に有利である.入力データをいくつかのブロックに分割し,各ブロックにMRタスクを割り当てる必要がある.splitVectorコマンドは、より簡単な分割方法があれば、分割点を迅速に見つけるのに役立ちます. 
> db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000})
{
    "timeMillis" : 6006,
    "splitKeys" : [
        {
            "dim0" : 18171
        },
        {
            "dim0" : 36378
        },
        {
            "dim0" : 54528
        },
        {
            "dim0" : 72717
        },
…
        {
            "dim0" : 963598
        },
        {
            "dim0" : 981805
        }
    ],
    "ok" : 1
}

     
1千万のドキュメントから分割点を見つけ、splitVectorコマンドを使用するには約5秒しかかかりません.これはかなり速いです.したがって、以下では、複数のMRタスクを作成する方法を見つける必要があります.アプリケーションサーバの面では、マルチスレッドと$gt/$ltクエリーコマンドを使用すると便利です.shellの場合、ScopedThreadオブジェクトは次のように動作します. 
     
> var t = new ScopedThread(mapred, 963598, 981805)
> t.start()
> t.join()

   いくつかのJSコードを入れることができます.これらのコードは4つのスレッドを生成することができます.次は結果の表示を待つことができます. 
> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
> var keys = res.splitKeys
> keys.length
39
> var mapred = function(min, max) {
return db.runCommand({ mapreduce: "uniques",
map: function () { emit(this.dim0, 1); },
reduce: function (key, values) { return Array.sum(values); },
out: "mrout" + min,
sort: {dim0: 1},
query: { dim0: { $gte: min, $lt: max } } }) }
> var numThreads = 4
> var inc = Math.floor(keys.length / numThreads) + 1
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
{
        "result" : "mrout0",
        "timeMillis" : 205790,
        "counts" : {
                "input" : 2750002,
                "emit" : 2750002,
                "reduce" : 274828,
                "output" : 274723
        },
        "ok" : 1
}
{
        "result" : "mrout274736",
        "timeMillis" : 189868,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250364,
                "output" : 250255
        },
        "ok" : 1
}
{
        "result" : "mrout524997",
        "timeMillis" : 191449,
        "counts" : {
                "input" : 2500014,
                "emit" : 2500014,
                "reduce" : 250120,
                "output" : 250019
        },
        "ok" : 1
}
{
        "result" : "mrout775025",
        "timeMillis" : 184945,
        "counts" : {
                "input" : 2249971,
                "emit" : 2249971,
                "reduce" : 225057,
                "output" : 224964
        },
        "ok" : 1
}

   
1番目のスレッドでは、他のスレッドよりも多くの作業が行われていますが、時間は190秒に達しています.これは、マルチスレッドが単一のスレッドよりも速くないことを意味します. 
複数のデータベースの使用 
      ここでの問題は,スレッド間にロック競合が多すぎることである.ロックされるとMRは非常に無私ではない(1000回の読み取りごとにyieldが行われる).MRタスクは大量の書き込み操作を行っているため,スレッド間の終了時には互いに待機する.MongoDBの各データベースには独立したロックがあるので、スレッドごとに異なる出力データベースを使用してみましょう. 
   
> var mapred = function(min, max) {
return db.runCommand({ mapreduce: "uniques",
map: function () { emit(this.dim0, 1); },
reduce: function (key, values) { return Array.sum(values); },
out: { replace: "mrout" + min, db: "mrdb" + min },
sort: {dim0: 1},
query: { dim0: { $gte: min, $lt: max } } }) }
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
...
{
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 105821,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250364,
                "output" : 250255
        },
        "ok" : 1
}
...

      所要時間は100秒に短縮され,これは単一のスレッドに比べて約2倍の速度向上を意味する.予想に及ばなかったが、もうよかった.ここでは、4つのコアを使用して、2倍しかアップしていません.8コアCPUを使用すると、約4倍アップします. 純JavaScriptモードの使用      スレッド間で入力データを分割する場合、100万ではなく約25万のプライマリ・キーのみを出力する興味深いものがあります.これは、jsMode:trueで有効にする「純JSモード」を使用できることを意味します.オンにすると、MongoDBはJSとBSONの間で繰り返し変換されません.逆に、内部の50万キーのJS辞書からすべてのオブジェクトをreducesします.次に、この操作が速度の向上に役立つかどうかを見てみましょう. 
> var mapred = function(min, max) {
return db.runCommand({ mapreduce: "uniques",
map: function () { emit(this.dim0, 1); },
reduce: function (key, values) { return Array.sum(values); },
out: { replace: "mrout" + min, db: "mrdb" + min },
sort: {dim0: 1},
query: { dim0: { $gte: min, $lt: max } },
jsMode: true }) }
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
...
{
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 70507,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250156,
                "output" : 250255
        },
        "ok" : 1
}
...

   
現在、時間は70秒に低下しています.jsModeは、特にオブジェクトに多くのフィールドがある場合に役立ちます.この例では、単一の数値フィールドですが、30%上昇しました. 
MongoDB v 2.6の改良点 
      MongoDB v 2.6の開発では、JS関数呼び出し時のオプションのargsパラメータに関するコードが削除されました.このパラメータは標準的ではなく、使用も推奨されていません.歴史的な理由で残っています(SERVER-4654参照).
).Gitライブラリから最新のMongoDBをpullしてコンパイルし、テスト例を再実行します. 
       
...
{
        "result" : {
                "db" : "mrdb274736",
                "collection" : "mrout274736"
        },
        "timeMillis" : 62785,
        "counts" : {
                "input" : 2500013,
                "emit" : 2500013,
                "reduce" : 250156,
                "output" : 250255
        },
        "ok" : 1
}
...

    その結果,時間は60秒に低下し,速度は約10〜15%向上した.また,この変更によりJSエンジン全体のスタック消費量も改善された.
結論 
     振り返ってみると,同じMRタスクに対しては,最初の1200秒に比べて20倍の速度が向上している.この最適化は、いくつかのテクニック効果がそれほど理想的でなくても(例えば、複数の出力dbs/セットを使用する)多くの場合に適用されるべきである.しかし、これらのテクニックはMRタスクの速度を向上させるのに役立ち、将来的にはこれらの特性がより使いやすいかもしれません.例えば、このticket
 splitVectorコマンドをより使用可能にします.このticket
同じデータベース内の複数のMRタスクが改善されます.
     からhttp://www.iteye.com/news/28013