bagpipeソースコード解析

4152 ワード

これも朴大の非同期同時制御を解決するパッケージである.素朴な一節を写して、適用シーンを説明します.Nodeでは,パラレル呼び出しを非同期で容易に開始できる.次のコードを使用すると、100回の非同期呼び出しを簡単に開始できます.
for(var i = 0;i < 100; i++){
  async();
}

しかし、コンカレント量が大きすぎると、私たちの下層サーバは耐えられません.ファイルシステムを大量に同時呼び出しすると、オペレーティングシステムのファイル記述子の数が瞬時に光を放ち、次のようなエラーが発生します.
Error: EMFILE, too many open files

同期I/Oと非同期I/Oの著しい差が分かる:同期I/OはそれぞれのI/Oが互いにブロックされているため、循環体の中で、いつも次から次へと呼び出され、ファイル記述子を消費することが多すぎる場合はなく、同時に性能も同様で、非同期I/Oに対しては同時で実現しやすいが、実現しやすいため、依然として制御が必要である.言い換えれば、下層システムの性能を圧搾するが、過不足を防止するために一定の過負荷保護が必要である.bagpipeの応用は、同時制御のためにキューを維持することです.具体的なアプリケーションコードは以下の通りです.
var Bagpipe = require('bagpipe');
var bagpipe = new Bagpipe(10);
for(var i = 1; i < 10; i++){
  bagpipe.push(async,  function(){
  });
}
bagpipe.on('full', function(length){
  console.log(' , , ' + length);
});

次にbagpipeのソースコードを見てみましょう.ここで朴大のbagpipeはnodejsを引き継いだEventEmitter

コンストラクタ

var Bagpipe = function (limit, options) {
  events.EventEmitter.call(this);
  //  ( )
  this.limit = limit;
  //  
  this.active = 0;
  //  
  this.queue = [];
  //  
  this.options = {
    //  
    disabled: false,
    //  queueLength 
    refuse: false,
    //  limit , refuse
    ratio: 1,
    //  , 
    timeout: null
  };
  if (typeof options === 'boolean') {
    options = {
      disabled: options
    };
  }
  options = options || {};
  for (var key in this.options) {
    if (options.hasOwnProperty(key)) {
      this.options[key] = options[key];
    }
  }
  // queue length
  this.queueLength = Math.round(this.limit * (this.options.ratio || 1));
};
util.inherits(Bagpipe, events.EventEmitter);

Pushメソッド(非同期タスクの追加)

Bagpipe.prototype.push = function (method) {
  //  method 
  var args = [].slice.call(arguments, 1);
  //  
  var callback = args[args.length - 1];
  if (typeof callback !== 'function') {
    args.push(function () {});
  }
  //  , limit 1, 
  if (this.options.disabled || this.limit < 1) {
    method.apply(null, args);
    return this;
  }

  //  
  if (this.queue.length < this.queueLength || !this.options.refuse) {
    this.queue.push({
      method: method,
      args: args
    });
  } else {
    var err = new Error('Too much async call in queue');
    err.name = 'TooMuchAsyncCallError';
    callback(err);
  }

  if (this.queue.length > 1) {
    this.emit('full', this.queue.length);
  }
  //  next , 
  this.next();
  return this;
};

nextメソッドは、非同期タスクが実行可能かどうかを検出するために使用されます。

Bagpipe.prototype.next = function () {
  var that = this;
  //  
  if (that.active < that.limit && that.queue.length) {
    var req = that.queue.shift();
    // 
    that.run(req.method, req.args);
  }
};

//  , 
Bagpipe.prototype._next = function () {
  // 
  this.active--;
  this.next();
};

非同期タスクrunの実行

Bagpipe.prototype.run = function (method, args) {
  var that = this;
  //  , 
  that.active++;
  var callback = args[args.length - 1];
  var timer = null;
  var called = false;

  //  
  args[args.length - 1] = function (err) {
    // anyway, clear the timer
    //  timer, clear timer
    if (timer) {
      clearTimeout(timer);
      timer = null;
    }
    //  
    if (!called) {
      that._next();
      callback.apply(null, arguments);
    //  , 
    } else {
      // pass the outdated error
      if (err) {
        that.emit('outdated', err);
      }
    }
  };

  //  timer, 
  var timeout = that.options.timeout;
  if (timeout) {
    timer = setTimeout(function () {
      // set called as true
      called = true;
      that._next();
      // pass the exception
      var err = new Error(timeout + 'ms timeout');
      err.name = 'BagpipeTimeoutError';
      err.data = {
        name: method.name,
        method: method.toString(),
        args: args.slice(0, -1)
      };
      callback(err);
    }, timeout);
  }
  //  
  method.apply(null, args);
};

実はbagpipeで非同期同時タスクのキューを維持し、最大の同時数もlimitの数より小さいようにします.