Node.js-アリEggのマルチプロセスモデルとプロセス間通信


前言


最近Eggをベースフレームワーク開発プロジェクトとして用いており,そのマルチプロセスモデルの管理実現に興味を持ち,学習していくつかのことを理解し,ついでに記録した.文章に誤りがあれば,軽く噴き出してください

なぜマルチプロセスが必要なのか


テクノロジーの発展に伴い、現在のサーバは基本的に cpuです.しかしながら、ノードは 言語である(開発者にとっては単一スレッドであり、実際にはそうではない).cpu であることはよく知られていますが、ノードの特性に基づいて、私たちは毎回1つのcpuしか利用できません.これにより,利用率が極めて低いだけでなく,許容誤差も受け入れられない(エラー時にプログラム全体がクラッシュする).そのため、ノードはclusterを持っていて、サーバのリソースを十分に利用することができます.cluster clusterの動作原理については、この文章を読むことをお勧めします.ここで簡単にまとめます.
  • サブプロセスのポートリスニングはhackではなく、マスターの TCP に統一されるので、複数のサブプロセスが同じポートをリスニングしてエラーを報告することはありません.
  • master TCP、TCPの要求処理ロジックでは、ワークフロープロセスが newconn , に選択されます.(ここでは、Windows以外のすべてのプラットフォームのデフォルトのメソッドループ法として、プライマリ・プロセスがポートをリスニングし、新しい接続を受信してからワーク・プロセスに接続ループを配布する方法が2つあります.配布では、ワーク・プロセス・タスクのオーバーロードを防止するための組み込みテクニックが使用されています.2つ目は、プライマリ・プロセスがリスニングsocketを作成して興味のあるワーク・プロセスに送信し、ワーク・プロセスが直接接続を受信する方法です.)
  • workerプロセスはハンドルを受信した後、 (net.socket) に戻ります.

  • 図:図参照元

    マルチプロセスモデル


    まずEgg公式ドキュメントのプロセスモデルを見てみましょう
                    +--------+          +-------+
                    | Master || Agent |
                    +--------+          +-------+
                    ^   ^    ^
                   /    |     \
                 /      |       \
               /        |         \
             v          v          v
    +----------+   +----------+   +----------+
    | Worker 1 |   | Worker 2 |   | Worker 3 |
    +----------+   +----------+   +----------+

    を選択します.
    プロセス数
    さぎょう
    あんていせい
    ビジネスコードを実行するかどうか
    Master
    1
    プロセス管理、プロセス間メッセージ転送
    非常に高い
    いいえ
    Agent
    1
    バックグラウンド動作(長接続クライアント)
    高い
    ごくわずか
    Worker
    一般的にcpuコア数
    ビジネス・コードの実行
    一般
    はい
    大体Masterをプライマリスレッドとして利用し、Agentを秘書プロセスとして起動し、Workerに協力していくつかの公共事務(ログなど)を処理し、Workerプロセスを起動して本当の業務コードを実行する.

    マルチプロセスの実装


    プロセス関連コード


    まずMasterから着手し、ここでは一時的にMasterが最上位のプロセスであると考えられる(実際にはparentプロセスもあるので、後で説明する).
    /**
     * start egg app
     * @method Egg#startCluster
     * @param {Object} options {@link Master}
     * @param {Function} callback start success callback
     */
    exports.startCluster = function(options, callback) {
      new Master(options).ready(callback);
    };

    まずMaster から
    constructor(options) {
      super();
      //      
      this.options = parseOptions(options);
      // worker           Manager Messenger 
      this.workerManager = new Manager();
      // messenger ,     Manager Messenger 
      this.messenger = new Messenger(this);
      //     ready      get-ready npm 
      ready.mixin(this);
      //        
      this.isProduction = isProduction();
      this.agentWorkerIndex = 0;
      //     
      this.closed = false;
      ...
    
            ready             :
      this.ready(() => {
        //         true
        this.isStarted = true;
        const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
        this.logger.info('[master] %s started on %s (%sms)%s',
        frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);
    
        //   egg-ready            
        const action = 'egg-ready';
        this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
        this.messenger.send({ action, to: 'app', data: this.options });
        this.messenger.send({ action, to: 'agent', data: this.options });
        // start check agent and worker status
        this.workerManager.startCheck();
        });
        //       
        this.on('agent-exit', this.onAgentExit.bind(this));
        this.on('agent-start', this.onAgentStart.bind(this));
        ...
        //       Fork  Agent
        detectPort((err, port) => {
          ... 
          this.forkAgentWorker();
        }
      });
    }

    以上のことから、Masterのコンストラクション関数は主に であり、最後に実行されるのはforkAgentWorker関数であり、この関数のキーコードは以下のことを見ることができる.
    const agentWorkerFile = path.join(__dirname, 'agent_worker.js');
    //   child_process    Agent
    const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
    agent_worker.jsの上に進むと、agent_workeragentオブジェクトをインスタンス化し、agent_worker.jsにはキーコードがあります.
    agent.ready(() => {
      agent.removeListener('error', startErrorHandler); //          
      process.send({ action: 'agent-start', to: 'master' }); //  master    agent-start   
    });
    agent_worker.jsのコードがmasterにメッセージを送信し、動作がagent-startであり、Masterに戻り、once forkAppWorkers on onAgentStartに登録された2つのイベントが表示される.
    this.on('agent-start', this.onAgentStart.bind(this));
    this.once('agent-start', this.forkAppWorkers.bind(this));

    まずonAgentStart関数を見てみましょう.この関数は比較的簡単で、いくつかの情報の伝達です.
    onAgentStart() {
        this.agentWorker.status = 'started';
    
        // Send egg-ready when agent is started after launched
        if (this.isAllAppWorkerStarted) {
          this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options });
        }
    
        this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
        // should send current worker pids when agent restart
        if (this.isStarted) {
          this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() });
        }
    
        this.messenger.send({ action: 'agent-start', to: 'app' });
        this.logger.info('[master] agent_worker#%s:%s started (%sms)',
          this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
      }

    その後、forkAppWorkers関数が実行され、この関数は主にcforkパケットforkに対応するワークプロセスを介して、関連する一連のリスニングイベントを登録する.
    ...
    cfork({
      exec: this.getAppWorkerFile(),
      args,
      silent: false,
      count: this.options.workers,
      // don't refork in local env
      refork: this.isProduction,
    });
    ...
    //   app-start  
    cluster.on('listening', (worker, address) => {
      this.messenger.send({
        action: 'app-start',
        data: { workerPid: worker.process.pid, address },
        to: 'master',
        from: 'app',
      });
    });
    forkAppWorkers関数は、Listeningイベントをリスニングすると、master上のapp-startイベントをトリガーすることがわかる.
    this.on('app-start', this.onAppStart.bind(this));
    
    ...
    // master ready    
    if (this.options.sticky) {
      this.startMasterSocketServer(err => {
        if (err) return this.ready(err);
          this.ready(true);
      });
    } else {
      this.ready(true);
    }
    
    // ready     egg-ready       
    const action = 'egg-ready';
    this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
    this.messenger.send({ action, to: 'app', data: this.options });
    this.messenger.send({ action, to: 'agent', data: this.options });
    
    // start check agent and worker status
    if (this.isProduction) {
      this.workerManager.startCheck();
    }
  • Master.constructor:Masterのコンストラクション関数を先に実行し、detect関数が
  • に実行されます.
  • Detect: Detect => forkAgentWorker()
  • forkAgentWorker:Agentプロセスを取得し、agent-startイベント
  • をmasterにトリガーします.
  • onAgentStart関数、forkAppWorker関数(once)
  • を実行
  • onAgentStart=>各種情報を送信し、forkAppWorker=>masterにapp-startイベント
  • をトリガーする
  • App-startイベントトリガonAppStart()メソッド
  • onAppStart=>設定ready=>実行readyのコールバック関数
  • Ready()=>egg-readyを各プロセスに送信し、関連イベントをトリガしてstartCheck()関数
  • を実行する.
    +---------+           +---------+          +---------+
    |  Master |           |  Agent  |          |  Worker |
    +---------+           +----+----+          +----+----+
         |      fork agent     |                    |
         +-------------------->|                    |
         |      agent ready    |                    |
         ||
         |     worker ready    |                    |
         ||                    |
         |      Egg ready      |                    |
         +----------------------------------------->|

    プロセスデーモン


    公式ドキュメントによると、プロセス・デーモンは主にgracefulとegg-clusterの2つのライブラリに依存します.
  • 異常WorkerプロセスのすべてのTCPサーバをシャットダウンし(既存の接続を迅速に切断し、新しい接続を受信しない)、MasterとのIPCチャネルを切断し、新しいユーザー要求を受け入れない.
  • Masterはすぐに新しいWorkerプロセスをforkし、オンラインの「労働者」の総数が変わらないことを保証します.
  • 異常Workerはしばらく待機し、受信したリクエストを処理して終了します.
  • +---------+                 +---------+
    |  Worker |                 |  Master |
    +---------+                 +----+----+
         | uncaughtException         |
         +------------+              |
         |            |              |                   +---------+
         |  + ---------------------> |
         |         wait...           |                        |
         |          exit             |                        |
         +-------------------------> |                        |
         |                           |                        |
        die                          |                        |
                                     |                        |
                                     |                        |

    実行されたappファイルから分かるように、appは実際には、graceful()が呼び出されるApplicationクラスに継承されている.
    onServer(server) {
        ......
        graceful({
          server: [ server ],
          error: (err, throwErrorCount) => {
            ......
          },
        });
        ......
      }
    gracefulを引き続き見ると、process.on('uncaughtException')イベントがキャプチャされ、コールバック関数の中でTCP接続が閉じられ、自身のプロセスが閉じられ、masterとのIPCチャネルが切断されていることがわかります.
    process.on('uncaughtException', function (err) {
        ......
        //  http     Connection: close   
        servers.forEach(function (server) {
          if (server instanceof http.Server) {
            server.on('request', function (req, res) {
              // Let http server set `Connection: close` header, and close the current request socket.
              req.shouldKeepAlive = false;
              res.shouldKeepAlive = false;
              if (!res._header) {
                res.setHeader('Connection', 'close');
              }
            });
          }
        });
    
        //              ,        
        // make sure we close down within `killTimeout` seconds
        var killtimer = setTimeout(function () {
          console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid);
          if (process.env.NODE_ENV !== 'test') {
            // kill children by SIGKILL before exit
            killChildren(function() {
              //       
              process.exit(1);
            });
          }
        }, killTimeout);
    
        // But don't keep the process open just for that!
        // If there is no more io waitting, just let process exit normally.
        if (typeof killtimer.unref === 'function') {
          // only worked on node 0.10+
          killtimer.unref();
        }
    
        var worker = options.worker || cluster.worker;
    
        // cluster mode
        if (worker) {
          try {
            //   TCP  
            for (var i = 0; i < servers.length; i++) {
              var server = servers[i];
              server.close();
            }
          } catch (er1) {
            ......
          }
    
          try {
            //   ICP  
            worker.disconnect();
          } catch (er2) {
            ......
          }
        }
      });

    OK、IPCチャネルを閉じた後、cforkファイル、すなわち上記のfork workerのパッケージを見続け、サブプロセスのdisconnectイベントをリスニングし、条件に基づいてforkの新しいサブプロセスを再開するかどうかを判断します.
    cluster.on('disconnect', function (worker) {
        ......
        //    pid
        disconnects[worker.process.pid] = utility.logDate();
        if (allow()) {
          // fork       
          newWorker = forkWorker(worker._clusterSettings);
          newWorker._clusterSettings = worker._clusterSettings;
        } else {
          ......
        }
      });

    一般的には、この時点でしばらく待ってから、上記のタイミング関数、すなわち が実行されます.OOM、 このような については、サブプロセスでは である場合があり、masterでしか処理できません.つまり、cforkパケットです.
    cluster.on('exit', function (worker, code, signal) {
        //        ,         uncatughException  fork     ,          
        var isExpected = !!disconnects[worker.process.pid];
        if (isExpected) {
          delete disconnects[worker.process.pid];
          // worker disconnect first, exit expected
          return;
        }
        //  master      ,   fork
        if (worker.disableRefork) {
          // worker is killed by master
          return;
        }
    
        if (allow()) {
          newWorker = forkWorker(worker._clusterSettings);
          newWorker._clusterSettings = worker._clusterSettings;
        } else {
          ......
        }
        cluster.emit('unexpectedExit', worker, code, signal);
      });

    プロセス間通信(IPC)


    様々なプロセス間通信について説明していますが、clusterのIPCチャネルはMasterとWorker/Agentの間にしか存在しません.WorkerとAgentのプロセスは互いに存在しません.では、Worker間で通信したい場合はどうすればいいのでしょうか.はい、Masterで転送します.
        : agent => all workers
                      +--------+          +-------+
                      | Master | another worker
                      +--------+          +-------+
                      | Master |----------| Agent |
                      +--------+          +-------+
                     ^    |
         send to    /     |
        worker 2   /      |
                  /       |
                 /        v
      +----------+   +----------+   +----------+
      | Worker 1 |   | Worker 2 |   | Worker 3 |
      +----------+   +----------+   +----------+
    masterにおいて、agent app fork は、情報を傍受しながら、情報を1つのオブジェクトに変換することができる.
    agentWorker.on('message', msg => {
      if (typeof msg === 'string') msg = { action: msg, data: msg };
      msg.from = 'agent';
      this.messenger.send(msg);
    });
    
    worker.on('message', msg => {
      if (typeof msg === 'string') msg = { action: msg, data: msg };
      msg.from = 'app';
      this.messenger.send(msg);
    });

    最後に呼び出されたのはmessenger.sendで、messengeer.sendは from to です
    send(data) {
        if (!data.from) {
          data.from = 'master';
        }
        ......
    
        // app -> master
        // agent -> master
        if (data.to === 'master') {
          debug('%s -> master, data: %j', data.from, data);
          // app/agent to master
          this.sendToMaster(data);
          return;
        }
    
        // master -> parent
        // app -> parent
        // agent -> parent
        if (data.to === 'parent') {
          debug('%s -> parent, data: %j', data.from, data);
          this.sendToParent(data);
          return;
        }
    
        // parent -> master -> app
        // agent -> master -> app
        if (data.to === 'app') {
          debug('%s -> %s, data: %j', data.from, data.to, data);
          this.sendToAppWorker(data);
          return;
        }
    
        // parent -> master -> agent
        // app -> master -> agent,      to
        if (data.to === 'agent') {
          debug('%s -> %s, data: %j', data.from, data.to, data);
          this.sendToAgentWorker(data);
          return;
        }
      }
    masterは、action情報emitに直接従って対応する登録イベントである
    sendToMaster(data) {
      this.master.emit(data.action, data.data);
    }

    agentとworkerはsendmessageパケットを介して、実際には次のような方法を呼び出します.
     //         
     agent.send(data)
     worker.send(data)

    最後にagentとappの両方が継承するベースクラスEggApplicationにおいて、Messengerクラスが呼び出され、このクラス内部の構造関数は以下の通りである.
    constructor() {
        super();
        ......
        this._onMessage = this._onMessage.bind(this);
        process.on('message', this._onMessage);
      }
    
    _onMessage(message) {
        if (message && is.string(message.action)) {
          //  master    action  emit         
          this.emit(message.action, message.data);
        }
      }
    の構想は、イベントメカニズムとIPCチャネルを利用して、各プロセス間の通信を達成することである.

    その他


    学習中にtimeoutに出会った.unref()の関数については、この質問を参考にした6階の回答をお勧めします

    まとめ


    フロントエンドの思考からバックエンドの思考に移るのは実はとても骨が折れるので、Eggのプロセス管理の実現は確かにとてもすごいので、いろいろなapiと思考に多くの時間を費やしました.

    参照と参照


    マルチプロセスモデルとプロセス間通信Eggソース解析のegg-cluster