1.1. ストリームの歴史的進化
ストリームは Nodejs に固有の概念ではありません。 これらは数十年前に Unix オペレーティング システムに導入され、プログラムはパイプ演算子 (|) を介してストリーム上で相互に対話できます。
パイプ演算子 (|) は、MacOS および Unix システムに基づく Linux で使用でき、演算子の左側のプロセスの出力を右側の入力に変換できます。
Nodeでは、従来のreadFileを使用してファイルを読み込むと、ファイルは最初から最後までメモリに読み込まれ、すべての内容が読み取られた時点で、メモリにロードされたファイルの内容が均一に処理されます。
これには 2 つの欠点があります:
メモリ: 大量のメモリを消費します; 時間: データの処理を開始する前
に
データのペイロード全体がロードされるまで待つ必要があります。
読み取り可能なストリーム、
書き込み可能なストリーム、
読み取り可能な全二重ストリーム ()
の 4 種類のストリームがあります。
Duplex Stream)
Transform Stream (Transform Stream)
この部分を詳しく学習して、Node.js のストリームの概念を徐々に理解するために、ソース コード部分が比較的複雑であるため、この部分を読みやすいストリームから学習し始めることにしました。 。
1.2. ストリームとは何ですか?
ストリーム
のコレクションである抽象データ構造であり、次のタイプのみを使用できます (objectMode === false の場合のみ)。
ストリームを使用できます。液体と同様に、これらのデータのコレクションとして見られます。最初にこれらの液体をコンテナー (ストリームの内部バッファー BufferList) に保存し、対応するイベントがトリガーされると、中の液体をパイプに注ぎます。そして、パイプの反対側に自分の容器を用意して、中の液体を回収して廃棄するように他の人に通知してください。
1.3. 読み取り可能なストリームとは何ですか?
読み取り可能なストリームは、2 つのモード、3 つの状態、
および 2 つの読み取りモードがあります:
フロー モード: データは基礎となるシステムから読み取られ、できるだけ早く EventEmitter を通過します。データは、一時停止モードで登録されたイベント ハンドラーに渡されます
。このモードでは、データは読み取られません。ストリームからデータを読み取るには、Stream.read() メソッドを明示的に呼び出す必要があります
。readableFlowing = =
の 3 つの状態。
= null: データは生成されません。Stream.pipe() と Stream.resume を呼び出すとステータスが true に変更され、データの生成が開始され、イベントがアクティブにトリガーされます。
readableFlowing === false: データ フローはこの時点で一時停止されます。
readableFlowing === true: 通常はデータ
を生成および消費します。
2.1. 内部状態の定義 (ReadableState)
ReadableState
_readableState: ReadableState { objectMode: false, // 文字列、バッファー、null 以外の他のタイプのデータを操作するには、このモードをオンにする必要があります highWaterMark: 16384, // 水位制限、1024 * 16、この制限を超えた場合のデフォルトは 16kb 、呼び出しは停止します _read() はバッファーにデータを読み取ります。 BufferList { head: null, tail: null, length: 0 }, // データの保存に使用されるバッファーのリンクされたリスト length: 0, // のサイズobjectMode がbuffer.length に等しい場合、読み取り可能なストリーム データ全体。 Pipes: [], // 流れている読み取り可能なストリームを監視するすべてのパイプ キューを保存します: null, // 独立したフローのステータスは null、false、true completed: false, // すべてのデータが消費されました endEmitted: false, // 終了イベントが送信されたかどうか reading: false, // データが読み取られているかどうか construction: true, // ストリームは前に処理できませんDestroy sync: true, // 'readable'/'data' イベントを同期的にトリガーするか、次のティックまで待機するか。 needReadable:false、//読み取り可能なイベントを送信する必要があるかどうかはemitedReadable:false、//読み取り可能なイベントが送信されましたreadablelistening:false、//読み取り可能なリスニングイベントがあるかどうかresumescheduled:false、//履歴書方法が呼び出されました errorEmitted: false, // エラー イベントが送信されました EmitClose: true, // ストリームが破棄されたときに、close イベントを送信するかどうか autoDestroy: true, // 自動的に破棄され、「end」の後に呼び出されますイベントがトリガーされる destroy: false, // ストリームが破棄されたかどうか errored: null, // ストリームがエラーを報告したかどうかを識別します close: false, // ストリームが閉じられたかどうか closeEmitted: false, // ストリームが閉じられたかどうかイベントが送信されました。defaultEncoding: 'utf8', // デフォルトの文字エンコーディング形式 awaitDrainWriters: null, // 監視対象の 'drain' イベントのライター参照を指します。タイプは null、Writable、Set<Writable> MultiAwaitDrain:false、// Drainイベントを待っている複数の作家がいるかどうかReadingmore:false、//より多くのデータを読み取ることができるかどうか:false、//データはdecoder:null、// decoderエンコーディング:null、 // エンコーダ[Symbol(kPaused)]: null }、
2.2内部データストレージの実装(BufferList)
バッファーリストは、リンクリストの形で設計されているために使用されるコンテナです。
BufferList 内の各ノードを BufferNode として表し、内部のデータのタイプは objectMode によって異なります。
このデータ構造は、array.prototype.shift()よりも速くヘッダーデータを取得します。
2.2.1. データストレージの種類objectMode === trueの場合:
その後、データはどんなタイプでも保存されます。
オブジェクトモード=true
const ストリーム = require('ストリーム'); const readableStream = new Stream.Readable({ オブジェクトモード: true、 読む() {}、 }); readableStream.push({名前: 'lisa'}); console.log(readablestream._readablestate.buffer.tail); readableStream.push(true); console.log(readablestream._readablestate.buffer.tail); readableStream.push('lisa'); console.log(readablestream._readablestate.buffer.tail); readableStream.push(666); console.log(readablestream._readablestate.buffer.tail); readableStream.push(() => {}); console.log(readablestream._readablestate.buffer.tail); readableStream.push(シンボル(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
実行結果:
objectMode === falseの場合:
その場合、データは文字列、バッファ、または Uint8Array のみにすることができます
オブジェクトモード=false
const ストリーム = require('ストリーム'); const readableStream = new Stream.Readable({ オブジェクトモード: false、 読む() {}、 }); readableStream.push({名前: 'lisa'});
実行結果:
2.2.2. データストレージ構造バッファ内のデータの変更を観察するために、ノードコマンドラインを介してコンソールに読み取り可能なストリームを作成します。
もちろん、データをプッシュする前に、その_readメソッドを実装するか、コンストラクターのパラメーターに読み取り方法を実装する必要があります。
const ストリーム = require('ストリーム'); const readableStream = 新しい Stream.Readable(); RS._read = 関数(サイズ) {}
または
const ストリーム = require('ストリーム'); const readableStream = new Stream.Readable({ 読み取り(サイズ) {} });
readableStream.push('abc') 操作後の現在のバッファーは次のようになります。
現在のデータが保存されていることがわかります。データの内容。
2.2.3. 関連する APIBufferList のすべてのメソッドを出力すると、次のようになります。
BufferList を文字列にシリアル化する join を除き、その他はすべてデータ アクセス操作です。
ここですべての方法を1つずつ説明するわけではありませんが、_getStringと_getBufferに焦点を当てます。
2.2.3.1.consume
ソースコードアドレス: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#l80
comsume
//バッファーされたデータから指定された量のバイトまたは文字を消費します。 消費(n、ハストリング){ const データ = this.head.data; if(n <data.length){ // `slice` はバッファと文字列で同じです。 const スライス = データ.スライス(0, n); this.head.data = データ.スライス(n); スライスを返します。 } if (n === data.length) { // 最初のチャンクは完全に一致します。 this.shift() を返します。 } // 結果は複数のバッファにまたがります。 hasstringを返しますか? }
コードには以下の 3 つの判定条件があります。
消費されたデータのバイト長がリンクリストのヘッドノードに格納されているデータの長さよりも短い場合、ヘッドノードのデータの最初のnバイトが取得され、現在のヘッドノードのデータが設定されますスライス後のデータに。
消費されたデータが、リンクリストのヘッドノードに保存されているデータの長さと正確に等しい場合、現在のヘッドノードのデータは直接返されます。
消費されるデータの長さがリンク リストのヘッド ノードの長さよりも長い場合は、渡された 2 番目のパラメータに基づいて最後の判断が行われ、現在の BufferList の最下層に文字列が格納されているか、バッファが格納されているかが決定されます。 。
2.2.3.2. _getBuffer
ソースコードアドレス: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/streams/buffer_list.js#l137
消費する
//バッファーデータから指定された量のバイトを消費します。 _getBuffer(n) { const ret = Buffer.allocUnsafe(n); const retLen = n; p = this.headとします。 c = 0とします。 する { const buf = p.data; if (n > buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.length; } それ以外 { if (n === buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++ c; if(p.next) this.head = p.next; それ以外 this.head = this.tail = null; } それ以外 { TypedArrayPrototypeSet(ret, new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); this.head = p; p.data = buf.slice(n); } 壊す; } ++c; while ((p = p.next) !== null); this.length -= c; retを返します。 }
一般に、リンク リスト内のノードを操作し、返されたデータを格納する新しいバッファ配列を作成するループです。
最初に、リンクリストのヘッドノードからデータの取得を開始し、特定のノードのデータが取得された長さを差し引いて取得する長さ以上になるまで、新しく作成されたバッファーにコピーし続けます。
言い換えれば、リンクリストの最後のノードを読んだ後、目的の長さに達していないため、新しく作成されたバッファーが返されます。
2.2.3.3. _getString
ソースコードアドレス:bufferlist._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
消費する
//バッファーデータから指定された量の文字を消費します。 _getString(n) { ret = ''を p = this.headとします。 c = 0とします。 する { const str = p.data; if(n> str.length){ ret += str; n - = str.length; } それ以外 { if(n === str.length){ ret += str; ++ c; if(p.next) this.head = p.next; それ以外 this.head = this.tail = null; } それ以外 { ret += stringprototypeslice(str、0、n); this.head = p; p.data = stringprototypeslice(str、n); } 壊す; } ++ c; } while((p = p.next)!== null); this.length -= c; retを返します。 }
文字列の動作は、バッファーの操作と同じです_getString オペレーションは文字列型です。
2.3。
この質問では、まず、パブリッシュサブスクライブモデルがほとんどのAPIで重要なアプリケーションを持っていることを理解する必要があります。
その利点は、イベント関連のコールバック関数をキューに保存し、その後、将来の特定の時間にデータを処理するために相手に通知できることです、消費者は対応するイベントと対応するデータのみを処理し、node.jsストリーミングモデルはこの特性に適合します。
では、node.jsストリームは、EventeMitterに基づいてインスタンスの作成をどのように実装していますか?
このソースコードはここにあります: stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/treams/legacy.js#l10
遺産
関数ストリーム(opts) { ee.call(this、opts); } objectSetPrototypeof(Stream.Prototype、EE.Prototype); ObjectSetPrototypeOf(ストリーム、EE);
次に、読み取り可能なストリームのソースコードにこれらのコード行があります。
ソースコードのこの部分はこちら:読み取り可能です https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/streams/readable.js#l77
遺産
objectSetPrototypeof(Readable.Prototype、Stream.Prototype); ObjectSetPrototypeof(Readable、Stream);
まず、EventeMitterからストリームのプロトタイプオブジェクトを継承して、ストリームのすべてのインスタンスがEventeMitterのメソッドにアクセスできるようにします。
同時に、EventeMitterの静的メソッドは、ObjectSetPrototypeof(Stream、EE)を介して継承され、ストリームのコンストラクターでも、コンストラクターEEが借用され、EventeMitterのすべてのプロパティの継承を実現し、読みやすいストリームで、同じ方法を使用して、メソッドはストリームクラスのプロトタイプの継承と静的プロパティ継承を実装し、次のことを取得します。
readable.prototype .__ proto__ === stream.prototype;
stream.prototype .__ proto__ === ee.prototype
したがって:
readable.prototype .__ proto __.__ proto__ === ee.prototype
したがって、読み取り可能なストリームのプロトタイプチェーンをたどることで EventEmitter のプロトタイプを見つけ、EventEmitter の継承を実現できます。
2.4関連APIの実装
APIは、ソースコードドキュメントに表示される順序でここに表示され、コアAPI実装のみが説明されます。
注: ここでは、Node.js 読み取り可能なストリーム ソース コードで宣言された関数のみが解釈され、長さを減らすために、外部から導入された関数定義は含まれません。
読み取り可能なプロトタイプ
ストリーム { destroy: [関数: 破壊], _undestroy: [関数: アンデストロイ], _destroy:[function(anonymous)]、 プッシュ: [関数 (匿名)]、 unshift: [関数 (匿名)]、 ispaused:[function(anonymous)]、 SetEncoding:[function(anonymous)]、 読む:[function(匿名)]、 _read:[function(anonymous)]、 パイプ:[function(匿名)]、 疑いのない:[function(匿名)]、 on:[function(anonymous)]、 AddListener:[function(anonymous)]、 removelistener:[function(anonymous)]、 off: [関数(匿名)]、 RemoveAllListeners: [関数 (匿名)], 履歴書:[function(匿名)]、 一時停止:[function(匿名)]、 ラップ:[function(匿名)]、 イテレータ: [関数 (匿名)], [symbol(nodejs.rejection)]:[function(anonymous)]、 [symbol(symbol.asynciterator)]:[function(anonymous)] }2.4.1
読み取り可能.プッシュ
readable.prototype.push = function(chunk、encoding){ ReadAbleaddChunk(これ、チャンク、エンコード、false)を返します。 };
プッシュメソッドの主な機能は、「データ」イベントをトリガーすることにより、データブロックを下流のパイプラインに渡すか、データを独自のバッファーに保存することです。
次のコードは関連する擬似コードであり、主なプロセスのみを示しています。
readable.push
関数readableaddchunk(ストリーム、チャンク、エンコード、addtofront){ const state = stream._readableState; if(chunk === null){// nullストリーム終了信号をプッシュすると、そのstate.reading = false; onEofChunk(ストリーム、状態); } else if(!state.objectMode){//オブジェクトモードではない場合if(typeof chunk === 'string'){ chunk = buffer.from(chunk); } else if(chunk instanceof buffer){//バッファーの場合 //エンコードを処理} else if(stream。 _ isuint8array(chunk)){ chunk = stream。 _ uint8arraytobuffer(chunk); } else if(chunk!= null){ err = new err _invalid _arg _type( 'chunk'、['string'、 'buffer'、 'uint8array']、chunk); } } if(state.objectmode ||(chunk && chunk.length> 0)){//オブジェクトモードまたはチャンクはバッファーです //いくつかのデータ挿入方法の判断は、ここではAddChunk(Stream、State、Chunk、True)を省略しています。 } } 関数addchunk(stream、state、chunk、addtofront){ if(state.flowing && state.length === 0 &&!state.sync && stream.listenercount( 'data')> 0){//ストリーミングモードの場合、data stream.emit( 'data'、chunk)を聞くサブスクライバーがあります。 } else {// buffer state.length += state.objectModeにデータを保存しますか? if(addtofront){ state.buffer.unshift(chunk); } それ以外 { state.buffer.push(chunk); } } Maybereadmore(Stream、State);
プッシュ操作は、主にオブジェクトモードの判断に分割されます。
AddChunkの最初の判断は、主に読みやすい場合の状況に対処することであり、Flowingモードで、データリスナーがあり、バッファデータが空です。
現時点では、データは主にデータイベントを購読する他のプログラムに渡されます。そうしないと、データはバッファに保存されます。
2.4.2読みます境界条件とフローステータスの判断を除き、この方法には主に2つの操作があります。
ユーザーが実装した_Readメソッドを呼び出して、実行結果を処理する
バッファバッファーからデータを読み、「データ」イベントをトリガーします
readable.read
//読み取りの長さがHWMよりも大きい場合、HWMは再計算されます if(n> state.highwatermark){ state.highwatermark = computeNewhighwatermark(n); } // ユーザー実装の _read メソッドを呼び出します try { const result = this。 _ read(state.highwatermark); if(result!= null){ const then = result.then; if(typeof then === 'function'){ then.call( 結果、 nop、 関数(エラー) { errorordestroy(これ、err); }); } } } catch(err){ errorordestroy(これ、err); }
ユーザーによって実装された_readメソッドが約束を返した場合、この約束のその後の方法を呼び出し、例外の処理を促進するために成功と失敗のコールバックを渡します。
バッファからゾーンデータを読むための読み取りメソッドのコアコードは次のとおりです。
readable.read
fruct fromList(n、state){ //バッファリングされていません。 if(state.length === 0) null を返します。 ret; if(state.objectMode) ret = state.buffer.shift(); else if(!n || n> = state.length){// nがバッファの長さよりも空または大きい場合//すべて読み取り、リストを切り捨てます。 if (state.decoder) // デコーダがある場合、結果を文字列にシリアル化します ret = state.buffer.join(''); else if(state.buffer.length === 1)//データは1つだけです。ヘッドノードデータを返します= state.buffer.first(); else //すべてのデータをバッファーに保存しますret = state.buffer.concat(state.length); state.buffer.clear(); //読み取り長がバッファーよりも小さい状況を処理します= state.buffer.consume(n、state.decoder); } retを返します。 }2.4.3 _Read
ユーザーが読み取り可能なストリームを初期化するときに実装する必要があります。
サンプルコード:
readable._read
const stream = require( 'stream'); const readableStream = new Stream.Readable({ read(hwm){ this.push(string.fromCharcode(this.currentCharcode ++)); if(this.currentCharcode> 122){ this.push(null); } }、 }); ReadableStream.CurrentCharcode = 97; readableStream.pipe(process.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4(重要)
1つ以上の書き込み可能なストリームを現在の読み取り可能なストリームにバインドし、読み取り可能なストリームを流れるモードに切り替えます。
この方法には多くのイベントリスニングハンドルがありますが、ここでは1つずつ紹介しません。
readable.pipe
readable.prototype.pipe = function(dest、pipeopts){ const src = this; const state = this。 _ readablestate; state.pipes.push(dest); function ondata(chunk){ const ret = dest.write(chunk); if(ret === false){ 一時停止(); } } //それがパイプされていることを運命に伝えます。 dest.emit( 'pipe'、src); //ストリームが一時停止モードの場合、ストリームを開始する場合(dest.writableneeddrain === true){ if(state.flowing){ 一時停止(); } } else if(!state.flowing){ src.resume(); } 戻り先; }
パイプ操作は、Linuxパイプ演算子 '|'に非常に似ており、左の出力を適切な入力に変更し、メンテナンスのために書き込み可能なストリームを収集し、読み取り可能なストリームが利用可能になったときに「データ」イベントをトリガーします。
データが流れると、書き込み可能なストリームの書き込みイベントがトリガーされ、データを転送し、パイプラインのような操作を実現できます。また、Pauseモードで読み取り可能なストリームをフローモードに自動的に変更します。
2.4.5ストリームを「一時停止」モードから「フロー」モードに切り替えます。
Readable.resume
Readable.prototype.resume = function() { const state = this._readablestate; if(!state.flowing){ state.flowing =!readableLelistening; } }; 関数履歴書(Stream、State){ if(!state.resumescheduled){// resume_メソッドが同じティック状態で1回のみ呼び出されるように切り替えます。 process.nexttick(resume_、stream、state); } } function resume_(stream、state){ if(!state.reading){ stream.read(0); } state.resumescheduled = false; stream.emit( 'resume'); フロー(ストリーム); } 関数フロー(ストリーム){//ストリームがストリーミングモードである場合、このメソッドはバッファが空になるまでバッファーからデータを読み続けますconst state = stream._readablestate; while(state.flowing && stream.read()!== null); //ここでは読み取りメソッドが呼び出され、「読み取り可能な」イベントリスナーのストリームが設定されるため、読み取り方法も呼び出されます。 //これにより、一貫性のないデータが得られます(データには影響しません。「読み取り可能な」イベントコールバックの読み取りメソッドの呼び出しのみに影響します。 }2.4.6
流れモードから一時停止モードにストリームを変更し、「データ」イベントの発射を停止し、すべてのデータをバッファーに保存する
readable.pause
readable.prototype.pause = function(){ if(this._readablestate.flowing!== false){ debug( 'Pause'); this._readablestate.flowing = false; this.emit( 'Pause'); } これを返します。 };
2.5使用および作業メカニズム
使用方法は、BufferListセクションで言及されています。
2.5.1ここでは、Readable ストリームの一般的なプロセスとモード変換のトリガー条件のみを示します。
で: