bagpipeソースコード解析
4152 ワード
これも朴大の非同期同時制御を解決するパッケージである.素朴な一節を写して、適用シーンを説明します.Nodeでは,パラレル呼び出しを非同期で容易に開始できる.次のコードを使用すると、100回の非同期呼び出しを簡単に開始できます.
しかし、コンカレント量が大きすぎると、私たちの下層サーバは耐えられません.ファイルシステムを大量に同時呼び出しすると、オペレーティングシステムのファイル記述子の数が瞬時に光を放ち、次のようなエラーが発生します.
同期I/Oと非同期I/Oの著しい差が分かる:同期I/OはそれぞれのI/Oが互いにブロックされているため、循環体の中で、いつも次から次へと呼び出され、ファイル記述子を消費することが多すぎる場合はなく、同時に性能も同様で、非同期I/Oに対しては同時で実現しやすいが、実現しやすいため、依然として制御が必要である.言い換えれば、下層システムの性能を圧搾するが、過不足を防止するために一定の過負荷保護が必要である.bagpipeの応用は、同時制御のためにキューを維持することです.具体的なアプリケーションコードは以下の通りです.
次にbagpipeのソースコードを見てみましょう.ここで朴大のbagpipeはnodejsを引き継いだEventEmitter
実はbagpipeで非同期同時タスクのキューを維持し、最大の同時数もlimitの数より小さいようにします.
for(var i = 0;i < 100; i++){
async();
}
しかし、コンカレント量が大きすぎると、私たちの下層サーバは耐えられません.ファイルシステムを大量に同時呼び出しすると、オペレーティングシステムのファイル記述子の数が瞬時に光を放ち、次のようなエラーが発生します.
Error: EMFILE, too many open files
同期I/Oと非同期I/Oの著しい差が分かる:同期I/OはそれぞれのI/Oが互いにブロックされているため、循環体の中で、いつも次から次へと呼び出され、ファイル記述子を消費することが多すぎる場合はなく、同時に性能も同様で、非同期I/Oに対しては同時で実現しやすいが、実現しやすいため、依然として制御が必要である.言い換えれば、下層システムの性能を圧搾するが、過不足を防止するために一定の過負荷保護が必要である.bagpipeの応用は、同時制御のためにキューを維持することです.具体的なアプリケーションコードは以下の通りです.
var Bagpipe = require('bagpipe');
var bagpipe = new Bagpipe(10);
for(var i = 1; i < 10; i++){
bagpipe.push(async, function(){
});
}
bagpipe.on('full', function(length){
console.log(' , , ' + length);
});
次にbagpipeのソースコードを見てみましょう.ここで朴大のbagpipeはnodejsを引き継いだEventEmitter
コンストラクタ
var Bagpipe = function (limit, options) {
events.EventEmitter.call(this);
// ( )
this.limit = limit;
//
this.active = 0;
//
this.queue = [];
//
this.options = {
//
disabled: false,
// queueLength
refuse: false,
// limit , refuse
ratio: 1,
// ,
timeout: null
};
if (typeof options === 'boolean') {
options = {
disabled: options
};
}
options = options || {};
for (var key in this.options) {
if (options.hasOwnProperty(key)) {
this.options[key] = options[key];
}
}
// queue length
this.queueLength = Math.round(this.limit * (this.options.ratio || 1));
};
util.inherits(Bagpipe, events.EventEmitter);
Pushメソッド(非同期タスクの追加)
Bagpipe.prototype.push = function (method) {
// method
var args = [].slice.call(arguments, 1);
//
var callback = args[args.length - 1];
if (typeof callback !== 'function') {
args.push(function () {});
}
// , limit 1,
if (this.options.disabled || this.limit < 1) {
method.apply(null, args);
return this;
}
//
if (this.queue.length < this.queueLength || !this.options.refuse) {
this.queue.push({
method: method,
args: args
});
} else {
var err = new Error('Too much async call in queue');
err.name = 'TooMuchAsyncCallError';
callback(err);
}
if (this.queue.length > 1) {
this.emit('full', this.queue.length);
}
// next ,
this.next();
return this;
};
nextメソッドは、非同期タスクが実行可能かどうかを検出するために使用されます。
Bagpipe.prototype.next = function () {
var that = this;
//
if (that.active < that.limit && that.queue.length) {
var req = that.queue.shift();
//
that.run(req.method, req.args);
}
};
// ,
Bagpipe.prototype._next = function () {
//
this.active--;
this.next();
};
非同期タスクrunの実行
Bagpipe.prototype.run = function (method, args) {
var that = this;
// ,
that.active++;
var callback = args[args.length - 1];
var timer = null;
var called = false;
//
args[args.length - 1] = function (err) {
// anyway, clear the timer
// timer, clear timer
if (timer) {
clearTimeout(timer);
timer = null;
}
//
if (!called) {
that._next();
callback.apply(null, arguments);
// ,
} else {
// pass the outdated error
if (err) {
that.emit('outdated', err);
}
}
};
// timer,
var timeout = that.options.timeout;
if (timeout) {
timer = setTimeout(function () {
// set called as true
called = true;
that._next();
// pass the exception
var err = new Error(timeout + 'ms timeout');
err.name = 'BagpipeTimeoutError';
err.data = {
name: method.name,
method: method.toString(),
args: args.slice(0, -1)
};
callback(err);
}, timeout);
}
//
method.apply(null, args);
};
実はbagpipeで非同期同時タスクのキューを維持し、最大の同時数もlimitの数より小さいようにします.