NodeJSのClusterモジュール使用

15509 ワード

はじめに、nodejsは単一プロセスの単一スレッドのサーバーエンジンであることをみんなは知っています.どんなに強力なハードウェアがあっても、単一のCPUで計算するしかないです.そこで、第三者のclusterを開発し、nodeがマルチコアCPUを利用して並行して実現できるようにしました.
nodejsの発展に従って、nodejsに環境を生産させて、多プロセスの多核処理を支持しなければなりません.V 0.0.6バージョンでは、Nodejsにclusterの特性が内蔵されています.これにより、Nodejsはついに独立したアプリケーション開発ソリューションとして目に映るようになりました.
最も簡単な例:
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log("master start...");

    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    cluster.on('listening',function(worker,address){
        console.log('listening: worker ' + worker.process.pid +', Address: '+address.address+":"+address.port);
    });

    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    });
} else {
    http.createServer(function(req, res) {
        res.writeHead(200);
        res.end("hello world
"); }).listen(0); }
 
二.clusterの仕事原理
各ワーカープロセスはchild_を使うことによってprocess.fork関数は、IPC(Inter-Parocess Communication、プロセス間通信)に基づいて、masterプロセスとの間の通信を実現します.
workerがserver.listen(...)関数を使用するとパラメータシーケンスをmasterプロセスに伝えます.マスタープロセスがworkersにマッチしたら、最後の文を労働者に伝えます.マスターがワーカーにマッチしていない場合は、ワーカーを作成して、文を伝えてワーカーに伝えます.
境界条件では、3つの面白い行動があります.注:下のserver.listenは、下の「http.Server-」net.Server類への呼び出しです.
  • 1.server.listen({fd:7}:masterとworkerの通信過程で、ファイルを転送することによって、masterは「ファイル記述は7」であって、「ファイル記述は7」の参照を転送するのではなく、「ファイル記述は7」を傍受します.
  • .server.listen(handle):masterとworkerの通信プロセスは、handle関数を介して通信し、プロセス連絡なしに
  • に通信する.
  • .server.listen(0):masterとworkerの通信プロセスでは、クラスタ内のworkerはランダムポートを開いて共有し、socket通信によって、上記の例の57132
  • のように通信する.
    複数のプロセスが同じリソースにある場合、オペレーティングシステムの負荷バランスは非常に効率的である.Node.jsにはルーティングロジックがありません.worker間に共有状態がありません.したがって、プログラムはメモリベースのsessionなど、簡単に設計する必要があります.
    ワーカーは独力で運行していますので、プログラムの必要に応じて独立して削除または再起動できます.ワーカーは互いに影響しません.ウォーカーが生きている限り、マスタは接続を受信し続けます.Nodeは自動的にworkersの数を維持しません.私たちは自分の接続池を作ることができます.
     
    三. clusterのAPI
    公式サイトのアドレス:http://nodejs.org/api/cluster.html#cluster_cluster
    clusterオブジェクトの各種属性と関数
    cluster.settings:クラスタパラメータオブジェクトを設定するcluster.isMaster:masterノードcluster.isWorkerであるかどうかを判断する:workノードEvent:'fork':workプロセスイベントを作成するEvent:online':傍受worker作成成功イベントEvent:'listening'イベントイベントを終了します.Event:'setup Masterイベントcluster.setupMaster([settings]):クラスタパラメータcluster.fork([env]):workerプロセスcluster.disconnectを作成します.:worketプロセスcluster.workersを閉じる:現在のworkerオブジェクトcluster.workersを取得する:クラスタ内で生存している全てのworkerオブジェクトのworkersの各種属性と関数を取得する:cluster.worketを通じて取得することができます.
    worket.id:プロセスID番号worker.process:ChildProcessオブジェクトworker.suicide:disconnect()の後、workerが自殺したかどうかを判断するworker.send:masterがworkerにメッセージを送る.注:workerはmasterにメッセージを送ります.process.send worker.kill([signal='SIGTRERM]):指定されたworkerを殺します.別名destory()worker.disconnect():workerを切断します.workerを自殺させるEvent:'message':mastersとworkerのmessageイベントEvent:'online':指定されたworker作成成功イベントを傍受するEvent:'listening':masteningを傍受します.masteraがworker状態イベントイベントEvent:'disconnect'を傍受します.
    四.マスターとウォーカーの通信
    var cluster = require('cluster');
    var http = require('http');
    var numCPUs = require('os').cpus().length;
    
    if (cluster.isMaster) {
        console.log('[master] ' + "start master...");
    
        for (var i = 0; i < numCPUs; i++) {
            var wk = cluster.fork();
            wk.send('[master] ' + 'hi worker' + wk.id);
        }
    
        cluster.on('fork', function (worker) {
            console.log('[master] ' + 'fork: worker' + worker.id);
        });
    
        cluster.on('online', function (worker) {
            console.log('[master] ' + 'online: worker' + worker.id);
        });
    
        cluster.on('listening', function (worker, address) {
            console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
        });
    
        cluster.on('disconnect', function (worker) {
            console.log('[master] ' + 'disconnect: worker' + worker.id);
        });
    
        cluster.on('exit', function (worker, code, signal) {
            console.log('[master] ' + 'exit worker' + worker.id + ' died');
        });
    
        function eachWorker(callback) {
            for (var id in cluster.workers) {
                callback(cluster.workers[id]);
            }
        }
    
        setTimeout(function () {
            eachWorker(function (worker) {
                worker.send('[master] ' + 'send message to worker' + worker.id);
            });
        }, 3000);
    
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].on('message', function(msg){
                console.log('[master] ' + 'message ' + msg);
            });
        });
    
    } else if (cluster.isWorker) {
        console.log('[worker] ' + "start worker ..." + cluster.worker.id);
    
        process.on('message', function(msg) {
            console.log('[worker] '+msg);
            process.send('[worker] worker'+cluster.worker.id+' received!');
        });
    
        http.createServer(function (req, res) {
                res.writeHead(200, {"content-type": "text/html"});
                res.end('worker'+cluster.worker.id+',PID:'+process.pid);
        }).listen(3000);
    
    }
     
    五.Redisキューからデータを取り、Mysqlに書き込んでください.
     
    var cluster = require('cluster');
    var http = require('http');
    var numCPUs = require('os').cpus().length;
    
    var queueHost = '192.168.235.60';
    var queuePort = 6379;
    
    var dataConnection = {};
    dataConnection.host = 'localhost';
    dataConnection.user = 'root';
    dataConnection.password = '';
    dataConnection.database = 'livedb';
    
    if (cluster.isMaster) {
        for (var i = 0; i < numCPUs; i++) {
            cluster.fork();
        }
    
        cluster.on('fork', function (worker) {
            console.log('[master] Create worker. pid: '+worker.process.pid);
        });
    
        cluster.on('online', function (worker) {
            console.log('[master] Worker online. pid: ' +worker.process.pid);
        });
    
        cluster.on('disconnect', function (worker) {
            console.log('[master] ' + 'disconnect: worker: ' + worker.process.pid);
        });
    
        cluster.on('exit', function (worker, code, signal) {
            console.log('[master] ' + 'exit worker: ' + worker.process.pid + ' died');
        });
    
        function eachWorker(callback) {
            for (var id in cluster.workers) {
                callback(cluster.workers[id]);
            }
        }
    
        /*
         //send to child
         setTimeout(function () {
         eachWorker(function (worker) {
         worker.send('[master] ' + 'send message to worker' + worker.id);
         });
         }, 3000);
         */
    
    
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].on('message', function(message){
                if(message.act === 'suicide'){
                    //cluster.fork();
                }
            });
        });
    
    }
    else if (cluster.isWorker) {
        process.on('message', function(msg) {
            //send to parent
            //process.send('[worker] worker'+cluster.worker.id+' received!');
        });
    
    
    
        var redis = require("redis"),
        client = redis.createClient(queuePort, queueHost);
        client.on("error", function (err){
            console.log("[process] Error " + err);
        });
    
         var mysql      = require('mysql');
         var connection = mysql.createConnection(dataConnection);
         connection.connect();
    
        process.on('uncaughtException', function(err){
            console.log("[process] UncaughtException: " + err);
            connection.end();
            process.send({act:'suicide'});
            process.exit(1);
        });
    
    
        setInterval(function(){
            client.rpop("message",function (err,value){
                if(value!=null){
                    //console.log("[process] Handler" + process.pid + ": " + value);
    
                    var obj = JSON.parse(value);
                    var account  = {accountid: obj.time, openid: obj.stamp};
                    console.log(account);
    
    
                    var query = connection.query('INSERT INTO pre_qqaccount SET ?', account, function(err, result) {
                        if(err){
                            console.log('[process] insert database error: '+err);
                        }
                    });
                }
            });
        },1000);
    }
     
    転載先:https://www.cnblogs.com/fuland/p/4143160.html