ノード.ストリームの



概要


この記事は、学習ノードを簡素化するための記事のシリーズ第4部です.jsこの記事ではストリームをカバーします.

導入


大量のデータを扱うことは通常、ストリームでの作業を意味します.ストリームは過剰な計算リソースを必要とせずに大きなデータ処理を可能にします.ストリームを使用すると、すべてのメモリにそれを維持せずに、その内容を処理して、ピースによってデータを読みます.

ストリームの基礎


ストリームは、一度にすべてを使用できない可能性のあるデータのコレクションです.YouTubeまたはNetflixビデオを見て考える.ビデオが起動すると、それだけで十分なデータは、ビデオを取得し、時間をかけて残りのデータを処理するために開始されます.この種のデータ処理は、他のデータ処理方法よりも2つの大きな利点を提供します.

  • メモリ効率:メモリにデータを大量にロードする必要はありません.

  • 時間効率:データのペイロードが始まるまで待つのではなく、すぐにデータを処理し始めるのに時間がかかる.
  • 効率的なデータ処理により,ノードに大量のデータ処理アプリケーションを提供できる.jsThe stream module すべてのストリーミングAPIがノードに組み込まれている基礎を提供します.jsストリームモジュール内には、読み取り可能な、書き込み可能、重複、および変換の4種類のストリームがあります.これらの各実装pipe 1つのストリームのソースを取り、その出力を別のストリームに接続するメソッドです.つ以上のストリームの接続はpipeline ストリームのデータが最終目的地に到達するまで、パイプラインの各相を通過する.

    読みやすいストリーム


    エーreadable ストリームを使用して、ファイルを読み込み、入力されたHTTPリクエストからデータを読み込み、コマンドプロンプトからユーザー入力を読み込み、いくつかの例を挙げます.読み取り可能なコンストラクターは、EventEmitatorコンストラクターから継承するStreamコンストラクターから継承します.したがって、読み取り可能なストリームはイベントエミッタです.読み込み可能なストリームを作成するには、読み込み可能なコンストラクタがnew キーワードと読み込みメソッドを持つオプションオブジェクトを渡します.

    コードスニペット
    'use strict'
    const { Readable } = require('stream')
    const createReadStream = () => {
      const data = ['some', 'data', 'to', 'read']
      return new Readable({
        encoding: 'utf8',
        read () {
          if (data.length === 0) this.push(null)
          else this.push(data.shift())
        }
      })
    }
    const readable = createReadStream()
    readable.on('data', (data) => { console.log('got data', data) })
    readable.on('end', () => { console.log('finished reading') })
    

    出力
    got data some
    got data data
    got data to
    got data read
    finished reading
    

    書き込み可能ストリーム


    エーwritable ストリームを使用してファイルを書き込み、HTTPレスポンスにデータを書き込むか、端末に書き込むことができます.書き込み可能なコンストラクターは、EventEmitatorコンストラクターから継承するStreamコンストラクターから継承します.したがって、書き込み可能なストリームはイベントエミッタです.書き込み可能なストリームを作成するには、new キーワード.WritableコンストラクターのOptionオブジェクトには、書き込み機能を持つことができますchunk, enc, next . 書き込み可能なストリームにデータを送信するにはwrite メソッド.The end メソッドを使用すると、ストリームに最終ペイロードを書き込みます.流れが終わるとfinish イベントが発行されます.
    'use strict'
    const { Writable } = require('stream')
    const createWriteStream = (data) => {
      return new Writable({
        decodeStrings: false,
        write (chunk, enc, next) {
          data.push(chunk);
          next();
        }
      })
    }
    const data = [];
    const writable = createWriteStream(data);
    writable.on('finish', () => {console.log('finished', data)});
    writable.write('A');
    writable.write('B');
    writable.write('C');
    writable.end('nothing more to write');
    

    出力
    finished [ 'A', 'B', 'C', 'nothing more to write' ]
    

    二相流


    Duplex ストリームは読み取り可能で書き込み可能なインターフェイスを実装するストリームです.二相ストリームの良い例はTCPソケットです.TCPソケットはクライアントへの書き込みと同様にクライアント接続からデータを読むことができます.例を示すために、TCPソケットとクライアント接続をシミュレートする2つのファイルを作成します.

    TCPサーバ
    'use strict'
    const net = require('net')
    net.createServer((socket) => {
      const interval = setInterval(() => {
        socket.write('beat')
      }, 1000)
      socket.on('data', (data) => {
        socket.write(data.toString().toUpperCase())
      })
      socket.on('end', () => { clearInterval(interval) })
    }).listen(3000)
    

    クライアント接続
    'use strict'
    const net = require('net')
    const socket = net.connect(3000)
    
    socket.on('data', (data) => {
      console.log('got data:', data.toString())
    })
    
    setTimeout(() => {
      socket.write('all done')
      setTimeout(() => {
        socket.end()
      }, 250)
    }, 3250)
    
    両方のスクリプトを実行すると、次の出力が生成されます.

    出力
    got data: beat
    got data: beat
    got data: beat
    got data: ALL DONE
    

    変換ストリーム


    Transform ストリームは、追加の制約を持つデュプレックスストリームで、読み取りインターフェイスと書き込みインターフェイスの関係を強制します.読み取りインターフェイスと書き込みインターフェイスの間の制約はtransform 関数.Transform関数は、書き込み可能なストリームオブジェクトからの書き込み関数と同じシグネチャを持ちますchunk , enc , and next パラメータとして.違いはnext 関数は2番目の引数を渡すことができます.これは、いくつかの種類の変換操作を受信チャンクに適用する結果です.簡単な例を見ましょう.

    コードスニペット
    'use strict'
    const { Transform } = require('stream')
    const createTransformStream = () => {
      return new Transform({
        decodeStrings: false,
        encoding: 'utf8',
        transform (chunk, enc, next) {
         next(null, chunk.toUpperCase());
        }
      })
    }
    const transform = createTransformStream()
    transform.on('data', (data) => {
      console.log('got data:', data);
    })
    transform.write('a\n');
    transform.write('b\n');
    transform.write('c\n');
    transform.end('nothing more to write');
    

    出力
    got data: A
    
    got data: B
    
    got data: C
    
    got data: NOTHING MORE TO WRITE
    

    配管流


    前述の通りpipe メソッドは1つのストリームのソースを取り、別のストリームの行先にそれをパイプします.簡単な例を見ましょう.読みやすく、書き込み可能な例を前のセクションからリファクタリングしますpipe メソッド.
    'use strict'
    const { Readable, Writable } = require('stream')
    const createReadStream = () => {
      const readData = ['some', 'data', 'to', 'read'];
      return new Readable({
        encoding: 'utf8',
        read () {
          if (readData.length === 0) this.push(null)
          else this.push(readData.shift())
        }
      })
    }
    
    const createWriteStream = (data) => {
      return new Writable({
        decodeStrings: false,
        write (chunk, enc, next) {
          data.push(chunk);
          next();
        }
      })
    }
    const data = [];
    const readable = createReadStream();
    const writable = createWriteStream(data);
    readable.pipe(writable);
    writable.on('finish', () => {console.log('finished', data)});
    

    出力
    finished [ 'some', 'data', 'to', 'read' ]
    
    上のコードスニペットでreadable.on メソッドからコードを削除しました.これはストリームが一時的に一時停止状態になっているためです.データの流れを得るための唯一の方法は、Recordメソッド、データイベント、またはパイプメソッドを使用することです.あなたは、あなたのユースケースを満たすために必要な多くのストリームとしてパイプすることができますが、それはパイプラインを使用する場合は、より多くの2つのストリームを配管する場合は最善の練習です.
    エーpipeline 一緒にストリームの一連のパイプに使用することができます.例を見ましょう.パイプラインを使って動作するように、読み取り可能な、書き込み可能な、変換セクションからコードをリファクタリングします.

    パイプラインの断片
    'use strict'
    const { Readable, Writable, Transform, pipeline } = require('stream')
    const createReadStream = () => {
      const readData = ['some', 'data', 'to', 'read'];
      return new Readable({
        encoding: 'utf8',
        read() {
          if (readData.length === 0) this.push(null);
          else this.push(readData.shift());
        }
      })
    }
    
    const createTransform = () => {
      return new Transform({
        transform(chunk, enc, next) {
          const changedData = chunk.toString().toUpperCase();
          next(null, changedData);
        }
      })
    }
    
    const createWriteStream = () => {
      const data = [];
      const writable = new Writable({
        decodeStrings: false,
        write(chunk, enc, next) {
          data.push(chunk.toString());
          next();
        }
      });
      writable.data = data;
      return writable;
    }
    
    const readable = createReadStream();
    const writable = createWriteStream();
    const transform = createTransform();
    pipeline(readable, transform, writable, (err) => {
      if (err) console.error('Pipeline failed.', err);
      else console.log('Pipeline succeeded.', writable.data);
    });
    

    出力
    Pipeline succeeded. [ 'SOME', 'DATA', 'TO', 'READ' ]
    
    上記のコードスニペットではpipeline ストリームモジュールからの関数です.次に、参照ストリーミング機能の3つの変数を使用して、パイプラインを流れるデータと対話します.最後に、パイプラインはerr パイプラインが完了すると実行されるパラメータ.エラーが発生した場合、パイプラインは失敗し、そうでなければコンソールは成功メッセージでデータをログ出力します.
    この記事の例は、ストリームにはるかに多くあります.私は常にNodeJS Docs あなたのユースケースの解決策を開発するときのプライマリソースとして.他の開発者によって書かれた良い記事もたくさんあります.ここでは、ノードのストリームの開発に役立つことができます.jsもう一つの良い著者はSamer Buna . Samerはノードについての高度なトピックについて多くの良いコンテンツを持っています.jsいつものように質問がある場合は、ディスカッションで投稿し、返信します.注意してくださいハッピーコーディング.