Node.js-アリEggのマルチプロセスモデルとプロセス間通信
16511 ワード
前言
最近Eggをベースフレームワーク開発プロジェクトとして用いており,そのマルチプロセスモデルの管理実現に興味を持ち,学習していくつかのことを理解し,ついでに記録した.文章に誤りがあれば,軽く噴き出してください
なぜマルチプロセスが必要なのか
テクノロジーの発展に伴い、現在のサーバは基本的に
cpu
です.しかしながら、ノードは
言語である(開発者にとっては単一スレッドであり、実際にはそうではない).cpu
であることはよく知られていますが、ノードの特性に基づいて、私たちは毎回1つのcpuしか利用できません.これにより,利用率が極めて低いだけでなく,許容誤差も受け入れられない(エラー時にプログラム全体がクラッシュする).そのため、ノードはclusterを持っていて、サーバのリソースを十分に利用することができます.cluster
clusterの動作原理については、この文章を読むことをお勧めします.ここで簡単にまとめます.hack
ではなく、マスターの TCP
に統一されるので、複数のサブプロセスが同じポートをリスニングしてエラーを報告することはありません. master TCP
、TCPの要求処理ロジックでは、ワークフロープロセスが newconn ,
に選択されます.(ここでは、Windows以外のすべてのプラットフォームのデフォルトのメソッドループ法として、プライマリ・プロセスがポートをリスニングし、新しい接続を受信してからワーク・プロセスに接続ループを配布する方法が2つあります.配布では、ワーク・プロセス・タスクのオーバーロードを防止するための組み込みテクニックが使用されています.2つ目は、プライマリ・プロセスがリスニングsocketを作成して興味のあるワーク・プロセスに送信し、ワーク・プロセスが直接接続を受信する方法です.) (net.socket)
に戻ります.図:図参照元
マルチプロセスモデル
まずEgg公式ドキュメントのプロセスモデルを見てみましょう
+--------+ +-------+
| Master || Agent |
+--------+ +-------+
^ ^ ^
/ | \
/ | \
/ | \
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
を選択します.
プロセス数
さぎょう
あんていせい
ビジネスコードを実行するかどうか
Master
1
プロセス管理、プロセス間メッセージ転送
非常に高い
いいえ
Agent
1
バックグラウンド動作(長接続クライアント)
高い
ごくわずか
Worker
一般的にcpuコア数
ビジネス・コードの実行
一般
はい
大体
Master
をプライマリスレッドとして利用し、Agent
を秘書プロセスとして起動し、Worker
に協力していくつかの公共事務(ログなど)を処理し、Worker
プロセスを起動して本当の業務コードを実行する.マルチプロセスの実装
プロセス関連コード
まず
Master
から着手し、ここでは一時的にMasterが最上位のプロセスであると考えられる(実際にはparent
プロセスもあるので、後で説明する)./**
* start egg app
* @method Egg#startCluster
* @param {Object} options {@link Master}
* @param {Function} callback start success callback
*/
exports.startCluster = function(options, callback) {
new Master(options).ready(callback);
};
まず
Master
からconstructor(options) {
super();
//
this.options = parseOptions(options);
// worker Manager Messenger
this.workerManager = new Manager();
// messenger , Manager Messenger
this.messenger = new Messenger(this);
// ready get-ready npm
ready.mixin(this);
//
this.isProduction = isProduction();
this.agentWorkerIndex = 0;
//
this.closed = false;
...
ready :
this.ready(() => {
// true
this.isStarted = true;
const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
this.logger.info('[master] %s started on %s (%sms)%s',
frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);
// egg-ready
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
// start check agent and worker status
this.workerManager.startCheck();
});
//
this.on('agent-exit', this.onAgentExit.bind(this));
this.on('agent-start', this.onAgentStart.bind(this));
...
// Fork Agent
detectPort((err, port) => {
...
this.forkAgentWorker();
}
});
}
以上のことから、Masterのコンストラクション関数は主に
であり、最後に実行されるのはforkAgentWorker
関数であり、この関数のキーコードは以下のことを見ることができる.const agentWorkerFile = path.join(__dirname, 'agent_worker.js');
// child_process Agent
const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
agent_worker.js
の上に進むと、agent_worker
はagent
オブジェクトをインスタンス化し、agent_worker.js
にはキーコードがあります.agent.ready(() => {
agent.removeListener('error', startErrorHandler); //
process.send({ action: 'agent-start', to: 'master' }); // master agent-start
});
agent_worker.js
のコードがmaster
にメッセージを送信し、動作がagent-start
であり、Master
に戻り、once forkAppWorkers on onAgentStart
に登録された2つのイベントが表示される.this.on('agent-start', this.onAgentStart.bind(this));
this.once('agent-start', this.forkAppWorkers.bind(this));
まず
onAgentStart
関数を見てみましょう.この関数は比較的簡単で、いくつかの情報の伝達です.onAgentStart() {
this.agentWorker.status = 'started';
// Send egg-ready when agent is started after launched
if (this.isAllAppWorkerStarted) {
this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options });
}
this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
// should send current worker pids when agent restart
if (this.isStarted) {
this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() });
}
this.messenger.send({ action: 'agent-start', to: 'app' });
this.logger.info('[master] agent_worker#%s:%s started (%sms)',
this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
}
その後、
forkAppWorkers
関数が実行され、この関数は主にcforkパケットfork
に対応するワークプロセスを介して、関連する一連のリスニングイベントを登録する....
cfork({
exec: this.getAppWorkerFile(),
args,
silent: false,
count: this.options.workers,
// don't refork in local env
refork: this.isProduction,
});
...
// app-start
cluster.on('listening', (worker, address) => {
this.messenger.send({
action: 'app-start',
data: { workerPid: worker.process.pid, address },
to: 'master',
from: 'app',
});
});
forkAppWorkers
関数は、Listening
イベントをリスニングすると、master
上のapp-start
イベントをトリガーすることがわかる.this.on('app-start', this.onAppStart.bind(this));
...
// master ready
if (this.options.sticky) {
this.startMasterSocketServer(err => {
if (err) return this.ready(err);
this.ready(true);
});
} else {
this.ready(true);
}
// ready egg-ready
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
// start check agent and worker status
if (this.isProduction) {
this.workerManager.startCheck();
}
:
+---------+ +---------+ +---------+
| Master | | Agent | | Worker |
+---------+ +----+----+ +----+----+
| fork agent | |
+-------------------->| |
| agent ready | |
||
| worker ready | |
|| |
| Egg ready | |
+----------------------------------------->|
プロセスデーモン
公式ドキュメントによると、プロセス・デーモンは主にgracefulとegg-clusterの2つのライブラリに依存します.
+---------+ +---------+
| Worker | | Master |
+---------+ +----+----+
| uncaughtException |
+------------+ |
| | | +---------+
| + ---------------------> |
| wait... | |
| exit | |
+-------------------------> | |
| | |
die | |
| |
| |
実行されたappファイルから分かるように、
app
は実際には、graceful()
が呼び出されるApplicationクラスに継承されている.onServer(server) {
......
graceful({
server: [ server ],
error: (err, throwErrorCount) => {
......
},
});
......
}
graceful
を引き続き見ると、process.on('uncaughtException')
イベントがキャプチャされ、コールバック関数の中でTCP
接続が閉じられ、自身のプロセスが閉じられ、master
とのIPC
チャネルが切断されていることがわかります.process.on('uncaughtException', function (err) {
......
// http Connection: close
servers.forEach(function (server) {
if (server instanceof http.Server) {
server.on('request', function (req, res) {
// Let http server set `Connection: close` header, and close the current request socket.
req.shouldKeepAlive = false;
res.shouldKeepAlive = false;
if (!res._header) {
res.setHeader('Connection', 'close');
}
});
}
});
// ,
// make sure we close down within `killTimeout` seconds
var killtimer = setTimeout(function () {
console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid);
if (process.env.NODE_ENV !== 'test') {
// kill children by SIGKILL before exit
killChildren(function() {
//
process.exit(1);
});
}
}, killTimeout);
// But don't keep the process open just for that!
// If there is no more io waitting, just let process exit normally.
if (typeof killtimer.unref === 'function') {
// only worked on node 0.10+
killtimer.unref();
}
var worker = options.worker || cluster.worker;
// cluster mode
if (worker) {
try {
// TCP
for (var i = 0; i < servers.length; i++) {
var server = servers[i];
server.close();
}
} catch (er1) {
......
}
try {
// ICP
worker.disconnect();
} catch (er2) {
......
}
}
});
OK、
IPC
チャネルを閉じた後、cfork
ファイル、すなわち上記のfork worker
のパッケージを見続け、サブプロセスのdisconnect
イベントをリスニングし、条件に基づいてfork
の新しいサブプロセスを再開するかどうかを判断します.cluster.on('disconnect', function (worker) {
......
// pid
disconnects[worker.process.pid] = utility.logDate();
if (allow()) {
// fork
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
......
}
});
一般的には、この時点でしばらく待ってから、上記のタイミング関数、すなわち
が実行されます.OOM、
このような
については、サブプロセスでは
である場合があり、masterでしか処理できません.つまり、cfork
パケットです.cluster.on('exit', function (worker, code, signal) {
// , uncatughException fork ,
var isExpected = !!disconnects[worker.process.pid];
if (isExpected) {
delete disconnects[worker.process.pid];
// worker disconnect first, exit expected
return;
}
// master , fork
if (worker.disableRefork) {
// worker is killed by master
return;
}
if (allow()) {
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else {
......
}
cluster.emit('unexpectedExit', worker, code, signal);
});
プロセス間通信(IPC)
様々なプロセス間通信について説明していますが、clusterのIPCチャネルはMasterとWorker/Agentの間にしか存在しません.WorkerとAgentのプロセスは互いに存在しません.では、Worker間で通信したい場合はどうすればいいのでしょうか.はい、Masterで転送します.
: agent => all workers
+--------+ +-------+
| Master | another worker
+--------+ +-------+
| Master |----------| Agent |
+--------+ +-------+
^ |
send to / |
worker 2 / |
/ |
/ v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
master
において、agent app fork
は、情報を傍受しながら、情報を1つのオブジェクトに変換することができる.agentWorker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'agent';
this.messenger.send(msg);
});
worker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'app';
this.messenger.send(msg);
});
最後に呼び出されたのは
messenger.send
で、messengeer.sendは from to
ですsend(data) {
if (!data.from) {
data.from = 'master';
}
......
// app -> master
// agent -> master
if (data.to === 'master') {
debug('%s -> master, data: %j', data.from, data);
// app/agent to master
this.sendToMaster(data);
return;
}
// master -> parent
// app -> parent
// agent -> parent
if (data.to === 'parent') {
debug('%s -> parent, data: %j', data.from, data);
this.sendToParent(data);
return;
}
// parent -> master -> app
// agent -> master -> app
if (data.to === 'app') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAppWorker(data);
return;
}
// parent -> master -> agent
// app -> master -> agent, to
if (data.to === 'agent') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAgentWorker(data);
return;
}
}
master
は、action
情報emit
に直接従って対応する登録イベントであるsendToMaster(data) {
this.master.emit(data.action, data.data);
}
agentとworkerは
sendmessage
パケットを介して、実際には次のような方法を呼び出します. //
agent.send(data)
worker.send(data)
最後にagentとappの両方が継承するベースクラス
EggApplication
において、Messenger
クラスが呼び出され、このクラス内部の構造関数は以下の通りである.constructor() {
super();
......
this._onMessage = this._onMessage.bind(this);
process.on('message', this._onMessage);
}
_onMessage(message) {
if (message && is.string(message.action)) {
// master action emit
this.emit(message.action, message.data);
}
}
:
の構想は、イベントメカニズムとIPCチャネルを利用して、各プロセス間の通信を達成することである.その他
学習中にtimeoutに出会った.unref()の関数については、この質問を参考にした6階の回答をお勧めします
まとめ
フロントエンドの思考からバックエンドの思考に移るのは実はとても骨が折れるので、Eggのプロセス管理の実現は確かにとてもすごいので、いろいろなapiと思考に多くの時間を費やしました.
参照と参照
マルチプロセスモデルとプロセス間通信Eggソース解析のegg-cluster