NodeJs流の一つを詳しく説明します.


NodeJsシリーズに興味があれば、WeChat公式アカウントに注目してください.フロントエンドの神盾局またはgithub NodeJsシリーズの記事です.
以前のunixから流れてきました.過去数十年の間に、頼りになるプログラミング方式であることが証明されました.大型システムをいくつかの小さな部分に分解して、これらの部分の間で完璧に協力します.
nodeでは、ストリームの影はほとんどどこにでもあり、ファイルを操作しても、ローカルサーバを作成しても、簡単なconsoleにしても、ストリームに関わる可能性が高い.
Node.jsには4つの基本的なストリームタイプがあります.
  • Readable−読出し可能データのストリーム(例えば、fs.reate ReadStream()).
  • Writable−データを書き込むことができるストリーム(例えば、fs.create WriteStream()).
  • Duplex−読み書き可能なストリーム(例えば、net.Socket).
  • Trans form-読み書き中にデータのDuplexストリームを修正または変換することができます(例えばzlib.create Deflate()
  • なぜストリームを使うのですか
    nodeを使用して簡単な静的ファイルサーバを実現する必要があると仮定します.
    const http = require('http');
    const fs = require('fs');
    
    http.createServer((req,res)=>{
        fs.readFile('./test.html',function(err,data){
            if(err){
                res.statusCode = 500;
                res.end();
            }else{
                res.end(data);
            }
        })
    }).listen(3000)
    上記のコードは、静的ファイルの読み込みと送信を簡単に実現し、論理的には完全に実行可能です.しかし、readFileは読み出したファイルを一度にメモリに保存するものであるため、test.htmlファイルが非常に大きいか、アクセス量が多くなるとサーバメモリが消耗する可能性が高い.この時、私達は流れの方式を使って改善する必要があります.
    const http = require('http');
    const fs = require('fs');
    
    http.createServer((req,res)=>{
        fs.createReadStream('./test.html').pipe(res);
    }).listen(3000);
    
    fs.createReadStreamは、読み取り可能なストリームを作成し、ファイルのコンテンツを順次読み込んで下流消費に供給する.このような段階的な読み取りと消費の方式は、メモリの消費を効果的に遅らせる.
    読み取り可能なストリーム(Readable Stream)
    Readable Streamを二つの段階に分割することができます.push段階とpull段階は、_read方法を実現することによって、データを下のデータリソースプールからキャッシュ池に押し上げることができます.これはデータの生産段階です.pull段階はキャッシュプールのデータを引き出して、下流のデータの消費段階です.
    さらに説明を始める前に、まずいくつかのフィールドを紹介します.これらのフィールドはnodeソースから来ています.
  • state.buffer:Arrayバッファリング、各要素は、プッシュ中のdata
  • に対応する.
  • state.length:Numberバッファリング内のデータ量は、objectModeモードでstate.length === state.buffer.length、そうでなければ、state.bufferにおけるデータバイト数の合計
  • である.
  • state.ended:Booleanは、最下のデータプールに読み取り可能なデータがないことを示している(this.pull(null))
  • state.flowing:Null|Booleanは現在のストリームのモードを表し、その値は3つの場合がある.null(初期状態)、true(フローモード)、false(タイムアウトモード)
  • state.needReadable:Booleanをトリガする必要があるかどうかreadableイベント
  • state.reading:Booleanが下のデータ
  • を読んでいるかどうか
  • state.sync:Booleanは直ちにdata/readableイベントをトリガするかどうか、falseは即時トリガ、trueは次のtick再トリガ(process.nextTick)
  • である.
    二つのモード
    読み取り可能なストリームは2つのモードがあります.フローモードと一時停止モードがあり、ソースコードにはstate.flowingを使用して識別されます.
    二つのモードの基本的な流れは上の図のpushとpullの段階に従っています.pullの段階の自主性に違いがあります.フローモードにとっては、キャッシュカードに未消耗のデータがあれば、データは絶えず抽出されます.自動のポンプとして想像できます.電気が通じたら、プールの水を抜かないと止められないです.一時停止モードに対しては、バケツを打つようです.必要な時は池から水を入れてください.
    すべての読み取り可能なストリームは一時停止モードで開始され、以下のように流れモードに切り替えることができる.
  • は、dataイベントのハンドルを追加します.
  • 呼び出しstate.flowing === null
  • 呼び出しstream.resume()
  • 読み取り可能なストリームは、以下のように一時停止モードに切り替えることもできる.
  • 追加stream.pipe()イベントハンドル
  • 配管ターゲットがない場合はreadableを呼び出す.
  • パイプラインターゲットがある場合、すべてのパイプラインターゲットが除去される.呼び出しstream.pause()は、複数のパイプラインターゲットを除去することができる.
  • すべてはstream.unpipe()から始まります.
    読み取り可能なストリームに対しては、消費駆動生産は、Pull段階のread関数を呼び出してこそ、Push段階のデータ生成を開始し、それによって流れ全体の動きを促進することができる.したがって、読み取り可能なストリームにとってreadはすべての開始点である.
    これはソースコードに基づいて整理された簡単なフローチャートです.後で一部のリンクについて説明します.readhowMuchToReadを呼び出している間に、nodeは実際の状況に応じて読み取りの数を調整し、実際の値はread(n)によって決定される.
    function howMuchToRead(n,state){
      //   size <= 0          
      if (n <= 0 || (state.length === 0 && state.ended))
        return 0;
        
      // objectMode                 
      if (state.objectMode)
        return 1;
        
      //   size    
      if (Number.isNaN(n)) {
        //   read() ,              ,
        //                  ,             
        if (state.flowing && state.length)
          return state.buffer.head.data.length;
        else
          return state.length;
      }
      
      if (n > state.highWaterMark)
        //   highWaterMark
        state.highWaterMark = computeNewHighWaterMark(n);
    
      //            
      if (n <= state.length)
        return n;
        
      //            ,
      //             ,             
      //                 
      //        
      if (!state.ended) {
        state.needReadable = true;
        return 0;
      }
      return state.length;
    }
    howMuchReadイベントend関数コール中に、nodeは、readイベントをトリガするかどうかを選択し、判定基準は主に以下の2つの条件である.
    if (state.length === 0 && state.ended) endReadable(this);
  • の底のデータ(リソース)に読み取り可能なデータがない場合、endstate.ended
  • である.
    呼び出しでtrueは、下のデータは現在、読み取り可能なデータがないことを示している.
  • バッファには、読み取り可能データpull(null)
  • がありません.
    本イベントは、state.length === 0を呼び出したときにトリガされる(上記条件を満たす場合).read([size])doReadは、下のデータが読み込まれているかどうかを判断するために使用される.
      //          `state.needReadable`
      var doRead = state.needReadable;
      
      //                    
      if (state.length === 0 || state.length - n < state.highWaterMark){
        doRead = true;
      }
    
      if (state.ended || state.reading) {
        doRead = false;
      } else if (doRead) {
        // ...
        this._read(state.highWaterMark);
        // ...
      }
    doReadフラグは、最終的に底からデータを取る動作が完了したかどうか、state.readingメソッドが起動されるとpushに設定され、falseが終了したことを示しています._read()イベント
    公式文書では、dataイベントのハンドルを追加すると、Readable Streamのモードをフローモードに切り替えることができますが、公式には言及されていません.例を挙げます
    const { Readable } = require('stream');
    
    class ExampleReadable extends Readable{
      constructor(opt){
        super(opt);
        this._time = 0;
      }
      _read(){
        this.push(String(++this._time));
      }
    }
    
    const exampleReadable = new ExampleReadable();
    //    state.flowing === false
    exampleReadable.pause();
    exampleReadable.on('data',(chunk)=>{
      console.log(`Received ${chunk.length} bytes of data.`);
    });
    この例を実行すると、端末には出力がないことが分かりました.なぜですか?ソースから端緒が見えるからです.
     if (state.flowing !== false)
          this.resume();
    これにより、公式表現をさらに改善することができる.読み取り可能ストリーム初期化状態(data)において、state.flowingイベントのハンドルを追加すると、ストリームがフローモードに入る.
    プッシュする
    読み取り可能なストリームのみによって呼び出しが可能であり、readable.u.read()メソッドで呼び出します.
    pushはデータ生産の中核であり、消費者はnullを呼び出してストリーム出力データを促し、_を通じて流れる.read()は、下の階からプッシュメソッドを呼び出して、データをストリームに転送します.
    このプロセスでは、プッシュ方法は、データをバッファ内に格納することができ、state.flowing === nullイベントを介して直接出力することもできる.私たちは一つ一つ分析します.
    現在のストリームが流れている場合(data)、バッファ内に読み取り可能なデータがない場合、データは直接にイベントread(n)によって出力されるだろう.
    // node   
    if (state.flowing && state.length === 0 && !state.sync){
        state.awaitDrain = 0;
        stream.emit('data', chunk);
    } 
    例を挙げます.
    const { Readable } = require('stream');
    
    class ExampleReadable extends Readable{
      constructor(opt){
        super(opt);
        this.max = 100;
        this.time = 0;
      }
      _read(){
        const seed = setTimeout(()=>{
          if(this.time > 100){
            this.push(null);
          }else{
            this.push(String(++this.time));
          }
          clearTimeout(seed);
        },0)
      }
    }
    const exampleReadable = new ExampleReadable({ });
    exampleReadable.on('data',(data)=>{
      console.log('from data',data);
    });
    dataイベント
    exampleReadable.on('readable',()=>{
        ....
    });
    state.flowing === trueイベントを登録すると、nodeは次のように処理します.
  • は、ストリームを一時停止モード
  • に切り替える.
    state.flowing = false; 
    state.needReadable = true;
  • バッファが消費されていないデータの場合、トリガdata
  • stream.emit('readable');
  • そうでなければ、現在は下のデータを読んでいるかどうかを判断し、そうでなければ、下のデータreadable
  • の読み取りを開始する.
    トリガ条件
  • readableは現在、一時停止モード
  • にあります.
  • キャッシュ・プールにはまだデータがあります.または、本船の下のデータはすでに読み終わりました.readable
  • return !state.ended &&
        (state.length < state.highWaterMark || state.length === 0);
    参照
  • Node.js v 10.15.1ドキュメント
  • Node.js Stream内部メカニズムを深く理解する
  • stream-handbook
  • は、反応式プログラミングにおけるバックプレッシャー機構をどのようにイメージして説明しますか?
  • データストリームにおける滞積問題
  • Node.js Stream-進級編
  • Node Stream