Nodeのストリーム(stream)

5827 ワード

公式の定義
ストリーム(stream)は、ノード.jsでストリームデータを処理する抽象インタフェースである.streamモジュールは、ストリームインタフェースを実装したオブジェクトを構築するために使用されます.
サーバリクエスト、ファイルモジュールなど、nodeが提供するストリームオブジェクトを直接使用するのが一般的です.
ストリームの分類
  • Writable−データを書き込むことができるストリーム(例えばfs.createWriteStream()).
  • Readable−データを読み取ることができるストリーム(例えばfs.createReadStream()).
  • Duplex−読み書き可能で書き込み可能なストリーム(例えば、net.Socket).
  • Transform-読み書き中にデータを修正または変換することができるDuplexストリーム(例えばzlib.createDeflate())
  • .
    ストリームの重要なイベントと方法
    次は具体的な例を結びつけてストリームのよく使われるイベントと方法を整理し,対流の理解を深める.
    1、dataおよびendイベント
    ストリームタイプ:読み取り可能なストリームdataイベントは、読み取り可能なストリームが消費者にデータを送信した後にトリガーされます.特に、イベントが追加されたストリームは、ストリームモードに自動的に切り替わり、endイベントは、ストリームに消費可能なデータがない場合にトリガーされます.
    const stream = fs.createReadStream('./file.txt') //      
    let chunks = []
    stream.on("data", (chunk) => { //      
        chunks.push(chunk)
    })
    stream.on("end", () => {
        constcontent = Buffer.concat(chunks).toString()
        console.log(content)
    })

    ここでchunkはbufferタイプ
    2、readableイベントとread()方法
    ストリームタイプ:読み取り可能なストリームreadableイベントは、ストリームに新しいダイナミックがあることを示します.新しいデータがあるか、ストリームの最後に到達します.次に、ファイルを読み込む例を示します.
    const stream = fs.createReadStream('./file.txt')
    let chunks = []
    // stream        readable,   read    null.
    //            ,   end    
    stream.on("readable", () => {
        console.log('  readable');
        let data;
        while (data = stream.read(1024)) {
            chunks.push(data)
            console.log('    ', data);
        }
    })
    stream.on("end", () => {
        const content = Buffer.concat(chunks).toString();
        console.log(content)
    })

    readableを使用すると、dataイベントをリスニングしてもストリームの状態が静止状態になります.readメソッドが呼び出され、データが返されるとdataイベントがトリガーされます.上のコードではreadメソッドで内部バッファのデータを読み出し、sizeパラメータを指定しないと内部バッファのすべてのデータを読み出します.ストリームのすべてのデータではないことに注意してください.sizeを指定しないとwhileループを使用する必要はありません.直接一度に読み出すと、whileループコードブロックは次のようになります.
    data = stream.read()
    data && chunks.push(data)
    console.log('    ', data);

    実行結果に基づいて、readメソッドはバッファデータを読み終えるとreadableイベント、すなわちread()がnullを返すとトリガーします.
    3、pipe()とunpipe()
    ストリームタイプ:読み取り可能なストリーム定義は公式ドキュメントを参照し、次の例ではpipeを使用してhttpリクエストに応答します.
    const http = require('http')
    const fs = require('fs')
    const server = http.createServer()
    server.on('request', (request, response) => {
        const stream = fs.createReadStream('./file.txt')
        stream.pipe(response)
    })
    server.listen(8888)

    pipeを使用するとデータストリームが自動的に管理されるので、読み取り可能なストリームが速くても、ターゲット書き込み可能なストリームは過負荷になりません.またpipe()はターゲットストリームの参照を返し、bが変換ストリームであると仮定したチェーン操作をサポートします.a.pipe(b).pipe(c)`unpipe()は、バインドを解く前にバインドされた書き込み可能なストリームです.上記の例はdataイベントで書き換えることができます.
    //....
    server.on('request', (request, response) => {
        const stream = fs.createReadStream('./file.txt')
        stream.on("data", (chunk) => {
            response.write(chunk)
        })
        stream.on("end", () => {
            response.end() //   pipe           end     end()    
        })
    })
    //...

    しかし、このように書くと書き込み可能なストリームが過負荷になる可能性があります.drainの概念を導入します.
    4、drainおよびfinishイベント
    ストリームタイプ:書き込み可能なストリーム書き込み可能なストリーム呼び出しwrite()がfalseに戻ると、書き込みが速すぎて、これ以上書けないことを示します.ストリームへのデータの書き込みを続行できると、「drain」イベントがトリガーされます.上記の例では、drainイベントがトリガーされるかどうかをテストします.
    //....
    server.on('request', (request, response) => {
        const stream = fs.createReadStream('./file.txt')
        stream.on("data", (chunk) => {
            response.write(chunk)
        })
        stream.on("end", () => {
            response.end()
        })
        response.on("drain", () => {
            console.log('    ')
      })
    })
    //...

    ファイルfile.txtサイズは100 kb程度で、実行後にdrainイベントが4回トリガーされます.書くのは早すぎますが、http応答の結果、データは失われていません.ドキュメントを調べてみると、次のような説明があります.
    ストリームが空になっていない場合に呼び出されますwrite()バッファリングchunk、およびfalse . 現在バッファリングされているすべてのデータブロックが空になると(オペレーティングシステムによって受信され、伝送される)、トリガーされる'drain'事件.推奨write() falseを返すと、ブロックは書き込まれません.'drain'イベントがトリガーされます.ストリームが空になっていない場合でも呼び出すことができますwrite()、Node.jsは、最大メモリ消費量に達するまで、書き込まれたすべてのデータブロックをバッファリングします.この場合、無条件に中止されます.
    だからwrite() falseに戻るときは、もうデータを書かないでください.上記の例では、次のように最適化できます.
    server.on('request', (request, response) => {
        conststream = fs.createReadStream('./file.txt')
        let ok = true
        stream.on("data", (chunk) => {
            ok = response.write(chunk)
            if(!ok) {
                stream.pause()
                console.log('   ')
                ok = true
            }
        })
        stream.on("end", () => {
            response.end()
        })
        response.on("drain", () => {
            console.log('    ')
            stream.resume()
        })
    })

    このように書くのはちょっと面倒ですが、pipeを直接使うほうが便利です.
    finishイベントはend()を呼び出し、バッファデータが下位システムに渡された後にトリガーされる.
    5、pause()とresume()
    ストリームタイプ:読み取り可能なストリームのダイナミックと静止状態の切り替え、dataイベントがトリガーされるかどうかを変更
    6、write()とend()
    ≪ストリーム・タイプ|Streams Type|emdw≫:書き込み可能なストリームの上記の例に関連しています.writeは書き込み可能なストリームにデータを書き込み、endは書き込みが完了したことを示し、writeを呼び出すことができなくなった.