Node.jsで処理をマルチプロセスで分担して結果を回収し処理を継続する


重たい処理をNode.jsくんがCPU使用率25%でがんばって処理してくれている姿をみて、もうちょっと人間さんもがんばらないといけないなと思いました。

コード

いいからまずは完成形のコードだ

const cluster = require('cluster')

// CPUの数が上限である必要はある? これ以上なら動かないというわけでもないけど
const numCPUs = require('os').cpus().length

const masterFnc = () => {

    // 初期準備 処理対象
    const target = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    // 各コアの処理数が均等になるように渡す数を分割する (ただしこのやり方は最後が楽になる)
    const slicedTarget = []
    const bundle = Math.ceil(target.length / numCPUs)
    for (i = 0; i < target.length; i += bundle) {
        slicedTarget.push(target.slice(i, i + bundle))
    }
    console.log(slicedTarget)

    // fork実行
    for (let i = 0; i < numCPUs; i++) {
        let worker = cluster.fork()
        worker.on('online', () => {
            worker.send(slicedTarget[i])
        })
    }

    // workerから値を受け取り、全部のworkerが仕事を終えたら結果を統合して終了する
    // ここ、workerごとにPromiseつくってPromiseAllで待つのもアリかな。
    const promise = new Promise(resolve =>{
        let finishedCount = 0
        let result = []
        cluster.on('message', (worker ,message, handle) => {
            console.log('message', message)
            // ここは実装されたらpushで追加して最後にflatかけたい
            // ここではもともとの順番が崩れても気にしないこととする。気にするならworkerのid見て順番管理かな?
            result = result.concat(message)
            finishedCount++
            // CPUの数を使わないならワーカー数 Object.keys(cluster.workers).length を使ってあげればいい
            if (finishedCount == numCPUs) {
                // 全部終わったら終了。
                resolve(result)
            }
        })
    })

    // なんとなく呼び出し側で確認しよう
    return promise
}

const workerFnc = () => {
    // Masterから値をもらったらお仕事スタート
    process.on("message", (arr) => {
        console.log('worker on message', arr)
        // 仕事はなんでもいいです
        arr = arr.map(value => value + 'です')
        // 値を返してお仕事終了
        process.send(arr)
        cluster.worker.disconnect()
    });
}

(async () => {
    if (cluster.isMaster) {
        console.log('master')
        console.log('最終結果', await masterFnc())
        // 受け取った結果を使い処理を継続できる
    } else {
        console.log('worker')
        workerFnc()
    }
})()

実行結果

$ node test.js
master
[ [ 1, 2, 3 ], [ 4, 5, 6 ], [ 7, 8, 9 ], [ 10 ] ]
worker
worker on message [ 1, 2, 3 ]
message [ '1です', '2です', '3です' ]
worker
worker
worker on message [ 4, 5, 6 ]
worker on message [ 10 ]
message [ '4です', '5です', '6です' ]
message [ '10です' ]
worker
worker on message [ 7, 8, 9 ]
message [ '7です', '8です', '9です' ]
最終結果 [ '1です',
  '2です',
  '3です',
  '4です',
  '5です',
  '6です',
  '10です',
  '7です',
  '8です',
  '9です' ]

$ node test.js
master
[ [ 1, 2, 3 ], [ 4, 5, 6 ], [ 7, 8, 9 ], [ 10 ] ]
worker
worker on message [ 4, 5, 6 ]
message [ '4です', '5です', '6です' ]
worker
worker on message [ 1, 2, 3 ]
message [ '1です', '2です', '3です' ]
worker
worker on message [ 7, 8, 9 ]
message [ '7です', '8です', '9です' ]
worker
worker on message [ 10 ]
message [ '10です' ]
最終結果 [ '4です',
  '5です',
  '6です',
  '1です',
  '2です',
  '3です',
  '7です',
  '8です',
  '9です',
  '10です' ]

並列実行なので、当然終了順は保証されない。
順番を保ちたい場合はindexを渡し・受け取り順番にマージしてやる必要があります。

やること!

配列に格納された各要素に順次適用する処理をマルチプロセス化し、
1プロセスが担当する配列の要素数を少なくする
また、すべてのマルチプロセスが処理した要素を結合し、
もともとのメインプロセスで処理を継続する

=> 既存のループ処理をマルチプロセス化して高速化するようなシーンを想定

つまり、やらないこと

親プロセスは接続を待ち、接続があれば子プロセスを生成し手放す。(後のことは知らん! or 親-1つの子 間で関係が完結している)
といったTCPサーバー的な動き。

動作検証環境

  • Windows7 64bit
  • Node.js v9.3.0

Node.jsのマルチプロセス

方法は2つ3つあるらしい。

  1. Cluster | Node.js v10.10.0 Documentation
  2. Child Process | Node.js v10.10.0 Documentation

に加え、
Node.jsにworkerが入った - 技術探し

[email protected]でworkerが追加されました。
Worker Threads | Node.js v10.10.0 Documentation
workerはまだちょっと早いかな?っと複数の日本語資料があるClusterを使いましたが、一つ覚えておけば他のものには入りやすくなると思います。

参考資料

…元記事はtsで描かれているからサ…もっと初心者にもわかりやすいようにとネ…


使い所

const target = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

for(let i = 0; i < target.length; i++ ) {
    とても重たい処理(target[i])}
}

のようなコードが書かれており、重い処理がネックになっているならば、forkで並行して走らせればCPUの有効活用ができて高速化できるのではないかと考えている。

比較用のforkしないシンプルなコード

let target = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
target = target.map(value => value + 'です')
console.log('最終結果', target)

サンプルすぎて肝心の「仕事」が短いのでこの有様ですが、
一応コードの増加量や複雑さの増加とも相談しておきたいですね。


また、clusterはスクリプトを最初から読み込み直すそうなので、その時点でforkするらしいChild Processのほうが既存コードに組み込みやすいかもしれません。