NodeJs流の一つを詳しく説明します.
NodeJsシリーズに興味があれば、WeChat公式アカウントに注目してください.フロントエンドの神盾局またはgithub NodeJsシリーズの記事です.
以前のunixから流れてきました.過去数十年の間に、頼りになるプログラミング方式であることが証明されました.大型システムをいくつかの小さな部分に分解して、これらの部分の間で完璧に協力します.
nodeでは、ストリームの影はほとんどどこにでもあり、ファイルを操作しても、ローカルサーバを作成しても、簡単な
Node.jsには4つの基本的なストリームタイプがあります. Readable−読出し可能データのストリーム(例えば、fs.reate ReadStream()). Writable−データを書き込むことができるストリーム(例えば、fs.create WriteStream()). Duplex−読み書き可能なストリーム(例えば、net.Socket). Trans form-読み書き中にデータのDuplexストリームを修正または変換することができます(例えばzlib.create Deflate() なぜストリームを使うのですか
nodeを使用して簡単な静的ファイルサーバを実現する必要があると仮定します.
読み取り可能なストリーム(Readable Stream)
Readable Streamを二つの段階に分割することができます.push段階とpull段階は、
さらに説明を始める前に、まずいくつかのフィールドを紹介します.これらのフィールドはnodeソースから来ています. に対応する. である. を読んでいるかどうか である.
二つのモード
読み取り可能なストリームは2つのモードがあります.フローモードと一時停止モードがあり、ソースコードには
二つのモードの基本的な流れは上の図のpushとpullの段階に従っています.pullの段階の自主性に違いがあります.フローモードにとっては、キャッシュカードに未消耗のデータがあれば、データは絶えず抽出されます.自動のポンプとして想像できます.電気が通じたら、プールの水を抜かないと止められないです.一時停止モードに対しては、バケツを打つようです.必要な時は池から水を入れてください.
すべての読み取り可能なストリームは一時停止モードで開始され、以下のように流れモードに切り替えることができる.は、 呼び出し 呼び出し 読み取り可能なストリームは、以下のように一時停止モードに切り替えることもできる.追加 配管ターゲットがない場合は パイプラインターゲットがある場合、すべてのパイプラインターゲットが除去される.呼び出し すべては
読み取り可能なストリームに対しては、消費駆動生産は、Pull段階の
これはソースコードに基づいて整理された簡単なフローチャートです.後で一部のリンクについて説明します.の底のデータ(リソース)に読み取り可能なデータがない場合、 である.
呼び出しでバッファには、読み取り可能データ がありません.
本イベントは、
公式文書では、
プッシュする
読み取り可能なストリームのみによって呼び出しが可能であり、readable.u.read()メソッドで呼び出します.
pushはデータ生産の中核であり、消費者は
このプロセスでは、プッシュ方法は、データをバッファ内に格納することができ、
現在のストリームが流れている場合(は、ストリームを一時停止モード に切り替える.バッファが消費されていないデータの場合、トリガ そうでなければ、現在は下のデータを読んでいるかどうかを判断し、そうでなければ、下のデータ の読み取りを開始する.
トリガ条件 にあります.キャッシュ・プールにはまだデータがあります.または、本船の下のデータはすでに読み終わりました. Node.js v 10.15.1ドキュメント Node.js Stream内部メカニズムを深く理解する stream-handbook は、反応式プログラミングにおけるバックプレッシャー機構をどのようにイメージして説明しますか? データストリームにおける滞積問題 Node.js Stream-進級編 Node Stream
以前のunixから流れてきました.過去数十年の間に、頼りになるプログラミング方式であることが証明されました.大型システムをいくつかの小さな部分に分解して、これらの部分の間で完璧に協力します.
nodeでは、ストリームの影はほとんどどこにでもあり、ファイルを操作しても、ローカルサーバを作成しても、簡単な
console
にしても、ストリームに関わる可能性が高い.Node.jsには4つの基本的なストリームタイプがあります.
nodeを使用して簡単な静的ファイルサーバを実現する必要があると仮定します.
const http = require('http');
const fs = require('fs');
http.createServer((req,res)=>{
fs.readFile('./test.html',function(err,data){
if(err){
res.statusCode = 500;
res.end();
}else{
res.end(data);
}
})
}).listen(3000)
上記のコードは、静的ファイルの読み込みと送信を簡単に実現し、論理的には完全に実行可能です.しかし、readFile
は読み出したファイルを一度にメモリに保存するものであるため、test.html
ファイルが非常に大きいか、アクセス量が多くなるとサーバメモリが消耗する可能性が高い.この時、私達は流れの方式を使って改善する必要があります.const http = require('http');
const fs = require('fs');
http.createServer((req,res)=>{
fs.createReadStream('./test.html').pipe(res);
}).listen(3000);
fs.createReadStream
は、読み取り可能なストリームを作成し、ファイルのコンテンツを順次読み込んで下流消費に供給する.このような段階的な読み取りと消費の方式は、メモリの消費を効果的に遅らせる.読み取り可能なストリーム(Readable Stream)
Readable Streamを二つの段階に分割することができます.push段階とpull段階は、
_read
方法を実現することによって、データを下のデータリソースプールからキャッシュ池に押し上げることができます.これはデータの生産段階です.pull段階はキャッシュプールのデータを引き出して、下流のデータの消費段階です.さらに説明を始める前に、まずいくつかのフィールドを紹介します.これらのフィールドはnodeソースから来ています.
state.buffer
:Array
バッファリング、各要素は、プッシュ中のdata state.length
:Number
バッファリング内のデータ量は、objectMode
モードでstate.length === state.buffer.length
、そうでなければ、state.buffer
におけるデータバイト数の合計state.ended
:Boolean
は、最下のデータプールに読み取り可能なデータがないことを示している(this.pull(null)
)state.flowing
:Null|Boolean
は現在のストリームのモードを表し、その値は3つの場合がある.null
(初期状態)、true
(フローモード)、false
(タイムアウトモード)state.needReadable
:Boolean
をトリガする必要があるかどうかreadable
イベントstate.reading
:Boolean
が下のデータstate.sync
:Boolean
は直ちにdata
/readable
イベントをトリガするかどうか、false
は即時トリガ、true
は次のtick再トリガ(process.nextTick
)二つのモード
読み取り可能なストリームは2つのモードがあります.フローモードと一時停止モードがあり、ソースコードには
state.flowing
を使用して識別されます.二つのモードの基本的な流れは上の図のpushとpullの段階に従っています.pullの段階の自主性に違いがあります.フローモードにとっては、キャッシュカードに未消耗のデータがあれば、データは絶えず抽出されます.自動のポンプとして想像できます.電気が通じたら、プールの水を抜かないと止められないです.一時停止モードに対しては、バケツを打つようです.必要な時は池から水を入れてください.
すべての読み取り可能なストリームは一時停止モードで開始され、以下のように流れモードに切り替えることができる.
data
イベントのハンドルを追加します.state.flowing === null
stream.resume()
stream.pipe()
イベントハンドルreadable
を呼び出す.stream.pause()
は、複数のパイプラインターゲットを除去することができる.stream.unpipe()
から始まります.読み取り可能なストリームに対しては、消費駆動生産は、Pull段階の
read
関数を呼び出してこそ、Push段階のデータ生成を開始し、それによって流れ全体の動きを促進することができる.したがって、読み取り可能なストリームにとってread
はすべての開始点である.これはソースコードに基づいて整理された簡単なフローチャートです.後で一部のリンクについて説明します.
read
howMuchToRead
を呼び出している間に、nodeは実際の状況に応じて読み取りの数を調整し、実際の値はread(n)
によって決定される.function howMuchToRead(n,state){
// size <= 0
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
// objectMode
if (state.objectMode)
return 1;
// size
if (Number.isNaN(n)) {
// read() , ,
// ,
if (state.flowing && state.length)
return state.buffer.head.data.length;
else
return state.length;
}
if (n > state.highWaterMark)
// highWaterMark
state.highWaterMark = computeNewHighWaterMark(n);
//
if (n <= state.length)
return n;
// ,
// ,
//
//
if (!state.ended) {
state.needReadable = true;
return 0;
}
return state.length;
}
howMuchRead
イベントend
関数コール中に、nodeは、read
イベントをトリガするかどうかを選択し、判定基準は主に以下の2つの条件である.if (state.length === 0 && state.ended) endReadable(this);
end
はstate.ended
、呼び出しで
true
は、下のデータは現在、読み取り可能なデータがないことを示している.pull(null)
本イベントは、
state.length === 0
を呼び出したときにトリガされる(上記条件を満たす場合).read([size])
doRead
は、下のデータが読み込まれているかどうかを判断するために使用される. // `state.needReadable`
var doRead = state.needReadable;
//
if (state.length === 0 || state.length - n < state.highWaterMark){
doRead = true;
}
if (state.ended || state.reading) {
doRead = false;
} else if (doRead) {
// ...
this._read(state.highWaterMark);
// ...
}
doRead
フラグは、最終的に底からデータを取る動作が完了したかどうか、state.reading
メソッドが起動されるとpush
に設定され、false
が終了したことを示しています._read()
イベント公式文書では、
data
イベントのハンドルを追加すると、Readable Streamのモードをフローモードに切り替えることができますが、公式には言及されていません.例を挙げますconst { Readable } = require('stream');
class ExampleReadable extends Readable{
constructor(opt){
super(opt);
this._time = 0;
}
_read(){
this.push(String(++this._time));
}
}
const exampleReadable = new ExampleReadable();
// state.flowing === false
exampleReadable.pause();
exampleReadable.on('data',(chunk)=>{
console.log(`Received ${chunk.length} bytes of data.`);
});
この例を実行すると、端末には出力がないことが分かりました.なぜですか?ソースから端緒が見えるからです. if (state.flowing !== false)
this.resume();
これにより、公式表現をさらに改善することができる.読み取り可能ストリーム初期化状態(data
)において、state.flowing
イベントのハンドルを追加すると、ストリームがフローモードに入る.プッシュする
読み取り可能なストリームのみによって呼び出しが可能であり、readable.u.read()メソッドで呼び出します.
pushはデータ生産の中核であり、消費者は
null
を呼び出してストリーム出力データを促し、_を通じて流れる.read()は、下の階からプッシュメソッドを呼び出して、データをストリームに転送します.このプロセスでは、プッシュ方法は、データをバッファ内に格納することができ、
state.flowing === null
イベントを介して直接出力することもできる.私たちは一つ一つ分析します.現在のストリームが流れている場合(
data
)、バッファ内に読み取り可能なデータがない場合、データは直接にイベントread(n)
によって出力されるだろう.// node
if (state.flowing && state.length === 0 && !state.sync){
state.awaitDrain = 0;
stream.emit('data', chunk);
}
例を挙げます.const { Readable } = require('stream');
class ExampleReadable extends Readable{
constructor(opt){
super(opt);
this.max = 100;
this.time = 0;
}
_read(){
const seed = setTimeout(()=>{
if(this.time > 100){
this.push(null);
}else{
this.push(String(++this.time));
}
clearTimeout(seed);
},0)
}
}
const exampleReadable = new ExampleReadable({ });
exampleReadable.on('data',(data)=>{
console.log('from data',data);
});
data
イベントexampleReadable.on('readable',()=>{
....
});
state.flowing === true
イベントを登録すると、nodeは次のように処理します.state.flowing = false;
state.needReadable = true;
data
、stream.emit('readable');
readable
トリガ条件
readable
は現在、一時停止モードreadable
return !state.ended &&
(state.length < state.highWaterMark || state.length === 0);
参照