ノード ストリームには 4 つのタイプがあります。 1. 読み取り可能 (読み取り可能ストリーム)。コンテンツを返すには、「_read」メソッドを実装する必要があります。 2. 書き込み可能 (書き込み可能なストリーム)、コンテンツを受け入れるには、「_write」メソッドを実装する必要があります。コンテンツを受け入れて返すには、_write" メソッドを実装する必要があります。 4. Transform (変換ストリーム)、受信したコンテンツを変換して返すには、"_transform" メソッドを実装する必要があります。
このチュートリアルの動作環境: Windows 7 システム、nodejs バージョン 16、DELL G3 コンピューター。
ストリームは Nodejs の非常に基本的な概念であり、多くの基本モジュールがストリームに基づいて実装され、非常に重要な役割を果たします。同時に、フローは理解するのが非常に難しい概念でもあります。これは主に、NodeJ の初心者にとって、この概念を真に理解するまでに多くの時間がかかることが原因です。ほとんどの NodeJ では、これは Web アプリケーションの開発にのみ使用されますが、ストリームの理解が不十分でも使用には影響しません。ただし、ストリームを理解すると、NodeJ の他のモジュールをより深く理解できるようになり、場合によっては、ストリームを使用してデータを処理した方が良い結果が得られることがあります。
Stream は、Node.js でストリーミング データを処理するための抽象インターフェイスです。ストリームは実際のインターフェイスではなく、すべてのストリームの総称です。実際のインターフェイスには、ReadableStream、WritableStream、ReadWriteStream が含まれます。
インターフェイス ReadableStream extends EventEmitter { readable: boolean; read(size?:number): string | setEncoding(encoding: BufferEncoding): this; isPaused(): boolean; T extends WritableStream>(destination: T, options?: { end?: boolean | unknown; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void ; ラップ(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;} インターフェイス WritableStream extends EventEmitter { write(buffer: Uint8Array | string, cb?:エラー | null) => void): ブール値; write(str: 文字列、エンコード?: BufferEncoding、cb?: (エラー | null) => void): end(cb?: () => void ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this;} インターフェイス ReadWriteStream extends ReadableStream、WritableStream { }ReadableStream と WritableStream はどちらも EventEmitter クラスを継承するインターフェイスであることがわかります (ts のインターフェイスは型をマージするだけであるため、クラスを継承できます)。
上記のインターフェースに対応する実装クラスは、それぞれ Readable、Writable、Duplex です。
NodeJ には 4 種類のストリームがあります。
Readable Readable ストリーム (ReadableStream を実装)
書き込み可能な書き込み可能なストリーム (WritableStream を実装)
Duplex は読み取りおよび書き込み可能なストリームです (Readable を継承した後に WritableStream を実装します)。
変換変換ストリーム (Duplex から継承)
それらにはすべて実装するメソッドがあります。
Readable はコンテンツを返すために _read メソッドを実装する必要があります
Writable はコンテンツを受け入れるために _write メソッドを実装する必要があります
Duplex はコンテンツを受け入れて返すために _read メソッドと _write メソッドを実装する必要があります
Transform は、受信したコンテンツを変換して返すために _transform メソッドを実装する必要があります
Readable はストリームの一種で、2 つのモードと 3 つの状態があります。
2 つの読み取りモード:
フロー モード: データは基礎となるシステムからバッファーに読み書きされ、バッファーがいっぱいになると、EventEmitter を通じてできるだけ早く登録されたイベント ハンドラーにデータが自動的に渡されます。
一時停止モード: このモードでは、EventEmitter はデータを送信するためにアクティブにトリガーされません。Readable.read() メソッドを明示的に呼び出して、バッファーからデータを読み取る必要があります。これにより、EventEmitter イベントへの応答がトリガーされます。
3 つの状態:
readableFlowing === null (初期状態)
readableFlowing === false (一時停止モード)
readableFlowing === true (フローモード)
ストリームの readable.readableFlowing は、最初は null です。
データイベントを追加するとtrueになります。 stop()、unpipe() が呼び出されたり、バック プレッシャーが受信されたり、読み取り可能なイベントが追加されたりすると、readableFlowing は false に設定されます。この状態では、リスナーをデータ イベントにバインドしても readableFlowing は true に切り替わりません。
resume() を呼び出すと、読み取り可能なストリームの readableFlowing を true に切り替えることができます。
readableFlowing を null にする唯一の方法は、読み取り可能なイベントをすべて削除することです。
イベント名 説明 readable は、バッファーに新しい読み取り可能なデータがあるときにトリガーされます (ノードがキャッシュ プールに挿入されるたびにトリガーされます) data は、データが消費されるたびにトリガーされます。パラメーターは、今回消費されるデータです。エラーが発生した場合、トリガー メソッド名は、read(size) が size の長さのデータを消費することを示します。それ以外の場合、null が返されると、現在のデータが size より小さいことを示します。今回消費したデータは返却されます。サイズが渡されない場合、キャッシュ プール内のすべてのデータが消費されることを意味します const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// キャッシュ プール フロートvalue})readStreams.on('readable', () => { console.log('buffer full') readStreams.read()// バッファプール内のすべてのデータを消費し、結果を返し、データイベントをトリガーします}) readStreams.on('data ', (data) => { console.log('data')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
サイズが 0 の場合、読み取り可能イベントがトリガーされます。
キャッシュ プール内のデータ長が float 値 highWaterMark に達すると、実稼働データは積極的に要求されませんが、データが消費されるのを待ってからデータが生成されます。
一時停止状態のストリームがデータを消費するために read を呼び出さない場合、後で data と readable がトリガーされず、消費するために read が呼び出されたときに、この消費後の残りのデータの長さが float より小さいかどうかが最初に判断されます。値が float 値より小さい場合は、消費する前に実稼働データが要求されます。このように、読み取り後のロジックの実行が完了すると、新しいデータが生成されている可能性が高く、次に使用されるデータを事前に生成してキャッシュ プールに保存するこのメカニズムもまた読み取り可能になります。キャッシュ ストリームが速い理由。
フロー状態には 2 つのフロー状態があります
生産速度が消費速度よりも遅い場合: この場合、通常、各生産データの後にキャッシュ プールにデータは残りません。今回生成されたデータは、データ イベントに直接渡すことができます (データ イベントに渡されないため)。キャッシュ プールに入ると、消費するために read を呼び出す必要もありません)、その後すぐに新しいデータの生成が開始され、ストリームが終了するまで、最後のデータが再度トリガーされるまで、新しいデータは生成されません。 。生成速度が消費速度よりも速い場合: 現時点では、通常、各データ生成の後、キャッシュ プールに未消費のデータが存在します。この場合、次のデータの消費は、通常、データが消費された後、開始されます。古いデータが消費され、新しいデータが生成されてキャッシュ プールに配置されますそれらの唯一の違いは、データが生成された後もキャッシュ プールにデータが存在するかどうかです。データが存在する場合、生成されたデータは消費されるまでキャッシュ プールにプッシュされます。データが存在しない場合、データはキャッシュ プールにプッシュされます。データをキャッシュ プールに追加せずに直接引き渡すことができます。
キャッシュ プール内のデータを含むストリームが一時停止モードからフロー モードに入ると、ループ内で read が呼び出され、null が返されるまでデータが消費されることに注意してください。
一時停止モードでは、読み取り可能なストリームが作成されると、作成後に _read メソッドが自動的に呼び出され、バッファ プール内のデータが float 値に達するまでデータ ソースからバッファ プールにデータをプッシュします。データが float 値に達すると、読み取り可能ストリームは「読み取り可能」イベントをトリガーして、データの準備ができており、引き続き使用できることをコンシューマーに通知します。
一般に、「読み取り可能」イベントは、ストリーム上の新しいアクティビティ、つまり新しいデータがあるか、ストリームの終わりに到達したかを示します。したがって、データ ソース内のデータが読み取られる前に、「readable」イベントもトリガーされます。
コンシューマの「読み取り可能」イベントのハンドラ関数では、バッファ プール内のデータが stream.read(size) を通じてアクティブに消費されます。
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // パラメータの read メソッドは、ソース データを取得するためにストリームの _read メソッドとして使用されます read( size) { // ソース データには 1000 個の 1 があると仮定します let chunk = null // データの読み取りプロセスは、IO 操作など、通常は非同期です setTimeout(() => { if (count > 0) { let chunkLength = Math .min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' はデータが読み込まれるたびにトリガーされますキャッシュ プールに正常にプッシュされました) readable', () => { const chunk = myReadable.read()//現在のキャッシュ プール内のすべてのデータを消費します console.log(chunk.toString())})read(size) のサイズが float 値より大きい場合、新しい float 値が再計算され、新しい float 値は size の次の 2 乗になります (size <= 2^n、n は最小値)
// hwm は 1GB より大きくなりません。const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB の制限 n = MAX_HWM } else { // 2 の次に大きい累乗を削除します。 hwm n--; n |= n >>> 1; n |= n >>> 4; > 16 ; n++ を返します;}すべての読み取り可能なストリームは一時停止モードで開始され、次の方法でフロー モードに切り替えることができます。
「data」イベントハンドラーを追加し、「resume」メソッドを呼び出して、データを書き込み可能なストリームに送信します。フロー モードでは、バッファ プール内のデータは消費のためにコンシューマに自動的に出力されます。同時に、各データ出力後に _read メソッドが自動的にコールバックされ、データ ソースからバッファ プールにデータが入れられます。バッファ プールにデータがない場合、フロー モードが他の一時停止モードに切り替わるまで、またはデータ ソースからのデータが読み取られるまで、データはキャッシュ プールを経由せずにデータ イベントに直接渡されます。 (null));
読み取り可能なストリームは、次の方法で一時停止モードに戻すことができます。
パイプライン ターゲットがない場合は、stream.pause() が呼び出されます。パイプライン ターゲットがある場合は、すべてのパイプライン ターゲットを削除します。 stream.unpipe() を呼び出すことで、複数のパイプ ターゲットを削除できます。 const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) チャンク = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})読み取り可能なストリームと比較して、書き込み可能なストリームはより単純です。
プロデューサが write(chunk) を呼び出すと、データが書き込まれるたびに、バッファ キューにキャッシュするか、何らかのステータス (コルク、書き込み中など) に基づいて _write を呼び出すかを内部的に選択します。キャッシュキュー内のデータ。バッファキュー内のデータサイズが float 値 (highWaterMark) を超える場合、コンシューマは write(chunk) を呼び出した後に false を返します。この時点で、プロデューサは書き込みを停止する必要があります。
それで、いつ書き続けられるでしょうか?バッファ内のすべてのデータが正常に書き込まれると、バッファ キューがクリアされた後にドレイン イベントがトリガーされ、この時点でプロデューサーはデータの書き込みを続けることができます。
プロデューサはデータの書き込みを終了する必要がある場合、stream.end メソッドを呼び出して、書き込み可能なストリームの終了を通知する必要があります。
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// _write メソッド setTimeout(() として使用されます) = >{ fileContent += chunk callback()// 書き込み完了後に呼び出される}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable .write('123123')// truemyWritable.write('123123')// falsemyWritable.end()キャッシュ プール内のデータが float 値に達した後、この時点でキャッシュ プールに複数のノードが存在する可能性があるため、キャッシュ プールのクリア処理 (_read の循環呼び出し) では同じ長さが消費されないことに注意してください。読み取り可能なストリーム。バッファ長が float 値と一致しない場合でも、float 値のデータは一度に 1 つのバッファ ノードごとに消費されます。
const { Writable } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(チャンク, エンコーディング, コールバック) { setTimeout(()=>{ fileContent += チャンク console.log ('Consumption', chunk.toString()) callback()// 書き込み完了後に呼び出されます}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent) )})let count = 0functionproductionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}productionData()myWritable.on('drain',productionData)上記は、float 値が 10 の書き込み可能なストリームです。ここで、データ ソースは 0 から 20 までの連続した数値文字列であり、データの書き込みにはproductionData が使用されます。
まず、初めて myWritable.write("0") を呼び出したときは、キャッシュプールにデータがないため、キャッシュプールには入らず、myWritable の戻り値に直接「0」が渡されます。 .write("0") は true
myWritable.write("1") が実行されると、_wirte のコールバックがまだ呼び出されないため、その位置はデータ書き込みの順序性を保証します。作成できるバッファーは 1 つだけです。 "1" を格納します。 " キャッシュ プールに追加します。これは次の 2-9 にも当てはまります
myWritable.write("10") が実行されると、バッファ長は 9 (1-9) であり、浮動小数点値にまだ達していない "10" がバッファとしてキャッシュ プールに追加され続けます。長さは 11 になるため、myWritable.write("1") は false を返します。これは、バッファー内のデータが十分であることを意味し、データが再び生成されるためにドレイン イベント通知を待つ必要があります。
100ms 後、_write("0",coding, callback) のコールバックが呼び出され、「0」が書き込まれたことが示されます。次に、キャッシュ プールにデータがあるかどうかを確認し、存在する場合は、まず _read を呼び出してキャッシュ プールのヘッド ノード (「1」) を消費し、次にキャッシュ プールが空になるまでこのプロセスを繰り返します。 、ドレイン イベントをトリガーし、productionData を再度実行します。
myWritable.write("11") を呼び出して、ステップ 1 からストリームの終了までのプロセスをトリガーします。
readable ストリームと writable ストリームを理解すると、duplex ストリームは実際に readable ストリームを継承して、writable ストリームを実装することを理解するのが簡単です (ソース コードはこのように書かれていますが、実装されていると言うべきです)。同時に、読み取り可能および書き込み可能なストリームがある方が良いです)。
Duplex フローは次の 2 つのメソッドを同時に実装する必要があります
_read() メソッドを実装して、読み取り可能なストリームのデータを生成します
_write() メソッドを実装して、書き込み可能なストリームのデータを消費します
上記 2 つのメソッドの実装方法は、上記の書き込み可能ストリームと読み取り可能ストリームで紹介されていますが、ここで注意する必要があるのは、双方向ストリームにそれぞれ 2 つの独立したバッファー プールがあり、それらのデータ ソースも同じではないということです。
NodeJ の標準入出力ストリームを例に挙げます。
コンソールにデータを入力すると、そのデータ イベントがトリガーされます。これは、ユーザーが入力するたびに、読み取り可能なプッシュ メソッドを呼び出して、生成されたデータをプッシュするのと同じことになります。 write メソッドを呼び出すと、コンテンツをコンソールに出力することもできますが、データ イベントはトリガーされません。これは、_write メソッドの実装が書き込み可能なストリームの機能を備えていることを示しています。コンソールにテキストを表示できるようにします。 // ユーザーがコンソール (_read) にデータを入力するたびに、データ イベントがトリガーされます。これは、読み取り可能なストリームの特性です process.stdin.on('data', data=>{ process.stdin.write(data) ); })// 標準入力ストリームに毎秒データを生成し (これはコンソールに直接出力される書き込み可能なストリームの機能です)、datasetInterval(()=>{ process.stdin.write) ('ユーザー コンソールによって入力されたデータではありません')}, 1000)Duplex ストリームは、書き込み可能なストリームを備えた読み取り可能なストリームと考えることができます。どちらも独立しており、それぞれに独立した内部バッファがあります。読み取りイベントと書き込みイベントは独立して発生します。
二重ストリーム ------------------| 読み取り <----- 外部ソース ------------------|書き込み --> 外部シンク ------------------|変換ストリームは二重であり、読み取りと書き込みは因果関係で行われます。二重ストリームのエンドポイントは、何らかの変換を通じてリンクされます。読み取りには書き込みが発生する必要があります。
変換ストリーム --------------|-------------- あなたが書き込みます ----> ----> 読み上げます ----- ----------|--------------Transform ストリームを作成する場合、最も重要なことは、_write または _read の代わりに _transform メソッドを実装することです。 _transform では、書き込み可能ストリームによって書き込まれたデータが処理 (消費) され、その後、読み取り可能ストリーム用のデータが生成されます。
変換ストリームは、ストリームの終わりの前に呼び出される `_flush` メソッドを実装することがよくあります。これは通常、ストリームの最後に何かを追加するために使用されます。たとえば、ファイルを圧縮するときの圧縮情報がここに追加されます。 } = require('fs')const { Transform, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const 変換 = new Transform({ highWaterMark: 10, callback){ // 変換data, Push を呼び出して変換結果をキャッシュ プールに追加します this.push(chunk.toString().replace('1', '@')) callback() }, flash(callback){// this.push を実行します(' before end トリガー <<<') callback() }})// write 継続的にデータを書き込み let count = 0transform.write('>>>')functionproductionData() { let flag = true while (count <= 20 && flag) { flag =transform.write(count.toString()) count++ } if (count > 20) {transform.end() }}productionData()transform.on('drain',productionData)let result = '' transform.on( 'data', data=>{ result += data.toString()})transform.on('end', ()=>{ console.log(result) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})