Es gibt 4 Arten von Knotenströmen: 1. Lesbar (lesbarer Stream). Die Methode „_read“ muss implementiert werden, um Inhalte zurückzugeben. 2. Beschreibbar (beschreibbarer Stream); die Methode „_write“ muss implementiert werden, um Inhalte zu akzeptieren; 3. Duplex (lesbarer und beschreibbarer Stream); _write“-Methoden müssen implementiert werden, um Inhalte zu akzeptieren und zurückzugeben; 4. Transformieren (Konvertierungsstream), Sie müssen die „_transform“-Methode implementieren, um den empfangenen Inhalt zu konvertieren und den Inhalt zurückzugeben.
Die Betriebsumgebung dieses Tutorials: Windows 7-System, NodeJS Version 16, DELL G3-Computer.
Stream ist ein sehr grundlegendes Konzept in Nodejs. Viele Grundmodule werden auf Basis von Streams implementiert und spielen eine sehr wichtige Rolle. Gleichzeitig ist Flow auch ein sehr schwer zu verstehendes Konzept. Dies liegt vor allem an der fehlenden relevanten Dokumentation. Für NodeJs-Anfänger dauert es glücklicherweise oft lange, den Flow zu verstehen. Für die meisten NodeJs wird es nur zum Entwickeln von Webanwendungen verwendet. Ein unzureichendes Verständnis von Streams hat keinen Einfluss auf deren Verwendung. Das Verständnis von Streams kann jedoch zu einem besseren Verständnis anderer Module in NodeJs führen, und in einigen Fällen führt die Verwendung von Streams zur Datenverarbeitung zu besseren Ergebnissen.
Stream ist eine abstrakte Schnittstelle zur Verarbeitung von Streaming-Daten in Node.js. Stream ist keine eigentliche Schnittstelle, sondern ein allgemeiner Begriff für alle Streams. Zu den eigentlichen Schnittstellen gehören ReadableStream, WritableStream und ReadWriteStream.
Schnittstelle ReadableStream erweitert EventEmitter { readable: boolean; setEncoding(encoding: BufferEncoding): this; resume(): T erweitert WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, binding?: BufferEncoding): void ; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Fehler |. null) => void): boolean; write(str: string, binding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, binding?: BufferEncoding, cb?: () => void): this;}interface ReadWriteStream erweitert ReadableStream, WritableStream { }Es ist ersichtlich, dass ReadableStream und WritableStream beide Schnittstellen sind, die die EventEmitter-Klasse erben (Schnittstellen in ts können Klassen erben, da es sich nur um zusammengeführte Typen handelt).
Die den oben genannten Schnittstellen entsprechenden Implementierungsklassen sind Readable, Writable und Duplex.
Es gibt 4 Arten von Streams in NodeJs:
Readable Readable Stream (implementiert ReadableStream)
Beschreibbarer beschreibbarer Stream (implementiert WritableStream)
Duplex ist ein lesbarer und beschreibbarer Stream (implementiert WritableStream nach der Erbung von Readable).
Konvertierungsstream transformieren (von Duplex geerbt)
Sie alle verfügen über Methoden zur Implementierung:
Readable muss die _read-Methode implementieren, um Inhalte zurückzugeben
Writable muss die _write-Methode implementieren, um Inhalte zu akzeptieren
Duplex muss die Methoden _read und _write implementieren, um Inhalte zu akzeptieren und zurückzugeben
Transform muss die Methode _transform implementieren, um den empfangenen Inhalt zu konvertieren und zurückzugeben
Lesbar ist eine Art Stream. Er verfügt über zwei Modi und drei Zustände.
Zwei Lesemodi:
Flussmodus: Daten werden vom zugrunde liegenden System gelesen und in den Puffer geschrieben. Wenn der Puffer voll ist, werden die Daten so schnell wie möglich automatisch über EventEmitter an den registrierten Ereignishandler übergeben.
Pausenmodus: In diesem Modus wird EventEmitter nicht aktiv zum Übertragen von Daten ausgelöst. Die Readable.read()-Methode muss explizit aufgerufen werden, um Daten aus dem Puffer zu lesen, um eine Antwort auf das EventEmitter-Ereignis auszulösen.
Drei Staaten:
readableFlowing === null (Ausgangszustand)
readableFlowing === false (Pausemodus)
readableFlowing === true (fließender Modus)
Der readable.readableFlowing des Streams ist zunächst null.
Es wird wahr, nachdem das Datenereignis hinzugefügt wurde. Wenn pause(), unpipe() aufgerufen wird oder Gegendruck empfangen wird oder ein lesbares Ereignis hinzugefügt wird, wird readableFlowing auf „false“ gesetzt. In diesem Zustand wird readableFlowing durch die Bindung eines Listeners an das Datenereignis nicht auf „true“ gesetzt.
Durch Aufrufen von resume() kann das readableFlowing des lesbaren Streams auf true gesetzt werden.
Das Entfernen aller lesbaren Ereignisse ist die einzige Möglichkeit, readableFlowing auf null zu setzen.
Ereignisname Beschreibung readable wird ausgelöst, wenn neue lesbare Daten im Puffer vorhanden sind (es wird jedes Mal ausgelöst, wenn ein Knoten in den Cache-Pool eingefügt wird). Daten werden jedes Mal ausgelöst, wenn Daten verbraucht werden Der Fehlerstrom wird ausgelöst, wenn ein Fehler auftritt. Der Name der Triggermethode gibt an, dass read(size) Daten mit einer Länge von „size“ verbraucht Die diesmal verbrauchten Daten werden zurückgegeben. Wenn die Größe nicht übergeben wird, bedeutet dies, dass alle Daten im Cache-Pool verbraucht werden. const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// Cache-Pool-Float value})readStreams. on('readable', () => { console.log('buffer full') readStreams.read()// Alle Daten im Pufferpool verbrauchen, das Ergebnis zurückgeben und das Datenereignis auslösen}) readStreams.on('data ', (data) => { console.log('data')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
Wenn die Größe 0 ist, wird das lesbare Ereignis ausgelöst.
Wenn die Datenlänge im Cache-Pool den Float-Wert highWaterMark erreicht, fordert er nicht aktiv Produktionsdaten an, sondern wartet, bis die Daten verbraucht sind, bevor er Daten produziert.
Wenn der Stream im angehaltenen Zustand nicht „read“ aufruft, um Daten zu verbrauchen, werden „data“ und „readable“ später nicht ausgelöst. Wenn „read“ zum Konsumieren aufgerufen wird, wird zunächst ermittelt, ob die Länge der verbleibenden Daten nach diesem Verbrauch kleiner als der Float ist Wenn er niedriger als der Float-Wert ist, werden Produktionsdaten vor dem Verbrauch angefordert. Auf diese Weise werden nach Abschluss der Logikausführung höchstwahrscheinlich neue Daten erzeugt und dann wieder lesbar. Dieser Mechanismus zum Erzeugen der nächsten verbrauchten Daten im Voraus und zum Speichern im Cache-Pool ist ebenfalls vorhanden der Grund, warum der Cache-Stream schnell ist.
Im fließenden Zustand gibt es zwei Strömungssituationen
Wenn die Produktionsgeschwindigkeit langsamer ist als die Verbrauchsgeschwindigkeit: In diesem Fall verbleiben nach jeder Produktionsdaten im Allgemeinen keine Daten mehr im Cache-Pool, und die dieses Mal erzeugten Daten können direkt an das Datenereignis übergeben werden (da dies nicht der Fall ist). Geben Sie den Cache-Pool ein, sodass kein Leseaufruf erforderlich ist, und beginnen Sie dann sofort mit der Produktion neuer Daten. Die neuen Daten werden erst dann generiert, wenn die letzten Daten verbraucht sind . Wenn die Produktionsgeschwindigkeit höher ist als die Verbrauchsgeschwindigkeit: Zu diesem Zeitpunkt befinden sich nach jeder Datenproduktion normalerweise nicht verbrauchte Daten im Cache-Pool. In diesem Fall beginnt der nächste Datenverbrauch im Allgemeinen, wenn die Daten verbraucht sind, und danach Die alten Daten werden verbraucht. Neue Daten wurden erstellt und im Cache-Pool abgelegtDer einzige Unterschied besteht darin, ob die Daten nach der Datenerstellung noch im Cache-Pool vorhanden sind. Wenn die Daten vorhanden sind, werden die erzeugten Daten in den Cache-Pool verschoben, um auf den Verbrauch zu warten direkt an Daten übergeben werden, ohne sie dem Cache-Pool hinzuzufügen.
Es ist erwähnenswert, dass, wenn ein Stream mit Daten in einem Cache-Pool aus dem Pausenmodus in den Flussmodus wechselt, read in einer Schleife aufgerufen wird, um die Daten zu verbrauchen, bis null zurückgegeben wird.
Wenn im Pausenmodus ein lesbarer Stream erstellt wird, ist der Modus der Pausenmodus. Nach der Erstellung wird die _read-Methode automatisch aufgerufen, um Daten von der Datenquelle in den Pufferpool zu übertragen, bis die Daten im Pufferpool den Float-Wert erreichen. Immer wenn Daten den Float-Wert erreichen, löst der lesbare Stream ein „lesbares“ Ereignis aus, um dem Verbraucher mitzuteilen, dass die Daten bereit sind und weiterhin konsumiert werden können.
Im Allgemeinen weist das „lesbare“ Ereignis auf eine neue Aktivität im Stream hin: Entweder liegen neue Daten vor oder das Ende des Streams wurde erreicht. Bevor die Daten in der Datenquelle gelesen werden, wird daher auch das Ereignis „lesbar“ ausgelöst;
In der Handlerfunktion des Consumer-Ereignisses „readable“ werden die Daten im Pufferpool aktiv über stream.read(size) verbraucht.
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // Die Lesemethode des Parameters wird als _read-Methode des Streams verwendet, um die gelesenen Quelldaten zu erhalten( size) { // Angenommen, unsere Quelldaten haben 1000 1s let chunk = null // Der Prozess des Lesens von Daten ist im Allgemeinen asynchron, wie zum Beispiel die E/A-Operation setTimeout(() => { if (count > 0) { let chunkLength = Math .min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' wird jedes Mal ausgelöst, wenn die Daten vorliegen erfolgreich in den Cache-Pool verschoben) readable', () => { const chunk = myReadable.read()//Alle Daten im aktuellen Cache-Pool verbrauchen console.log(chunk.toString())})Es ist zu beachten, dass, wenn die Größe von read(size) größer als der Float-Wert ist, der neue Float-Wert neu berechnet wird und der neue Float-Wert die nächste zweite Potenz der Größe ist (Größe <= 2^n, n dauert). der Mindestwert)
// hwm wird nicht größer als 1 GB sein übermäßige Erhöhung verhindern hwm n--; n |= n >>> 2; n |= n >>> > 16 ; n++; } return n;}Alle lesbaren Streams starten im Pausenmodus und können mit den folgenden Methoden in den Fließmodus geschaltet werden:
Fügen Sie den Ereignishandler „data“ hinzu; rufen Sie die Methode „resume“ auf; verwenden Sie die Methode „pipe“, um Daten an den beschreibbaren Stream zu sendenIm Flow-Modus werden die Daten im Pufferpool automatisch zum Verbrauch an den Verbraucher ausgegeben. Gleichzeitig wird nach jeder Datenausgabe die _read-Methode automatisch zurückgerufen, um die Daten aus der Datenquelle in den Pufferpool zu stellen Wenn im Pufferpool keine Daten vorhanden sind, werden die Daten direkt an das Datenereignis weitergeleitet, ohne den Cache-Pool zu durchlaufen, bis der Flussmodus in einen anderen Pausenmodus wechselt oder die Daten aus der Datenquelle gelesen werden (Push). (null));
Lesbare Streams können über Folgendes wieder in den Pausenmodus geschaltet werden:
Wenn kein Pipeline-Ziel vorhanden ist, wird stream.pause() aufgerufen. Wenn Pipeline-Ziele vorhanden sind, werden alle Pipeline-Ziele entfernt. Mehrere Pipe-Ziele können durch Aufruf von stream.unpipe() entfernt werden. const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})Im Vergleich zu lesbaren Streams sind beschreibbare Streams einfacher.
Wenn der Produzent write(chunk) aufruft, entscheidet er intern, ob er sie in der Pufferwarteschlange zwischenspeichert oder _write aufruft, basierend auf einem bestimmten Status (verkorkt, geschrieben usw.). Nach jedem Schreiben der Daten wird versucht, sie zu löschen die Daten in der Cache-Warteschlange. Wenn die Datengröße in der Pufferwarteschlange den Float-Wert (highWaterMark) überschreitet, gibt der Verbraucher nach dem Aufruf von write(chunk) den Wert false zurück. Zu diesem Zeitpunkt sollte der Produzent mit dem Schreiben aufhören.
Wann kann ich also weiterschreiben? Wenn alle Daten im Puffer erfolgreich geschrieben wurden, wird das Drain-Ereignis ausgelöst, nachdem die Pufferwarteschlange gelöscht wurde. Zu diesem Zeitpunkt kann der Produzent mit dem Schreiben von Daten fortfahren.
Wenn der Produzent das Schreiben der Daten beenden muss, muss er die Methode stream.end aufrufen, um das Ende des beschreibbaren Streams zu benachrichtigen.
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, binding, callback) {// wird als _write-Methode setTimeout(() verwendet = >{ fileContent += chunk callback()// Wird aufgerufen, nachdem der Schreibvorgang abgeschlossen ist}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable . write('123123')// truemyWritable.write('123123')// falsemyWritable.end()Beachten Sie, dass sich zu diesem Zeitpunkt möglicherweise mehrere Knoten im Cache-Pool befinden, nachdem die Daten im Cache-Pool den Float-Wert erreicht haben. Während des Prozesses des Löschens des Cache-Pools (zyklischer Aufruf von _read) wird nicht die gleiche Länge verbraucht lesbarer Stream. Die Daten des Float-Werts werden jeweils für einen Pufferknoten verbraucht, auch wenn die Pufferlänge nicht mit dem Float-Wert übereinstimmt.
const { Writable } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, binding, callback) { setTimeout(()=>{ fileContent += chunk console.log ('Consumption', chunk.toString()) callback()// Wird aufgerufen, nachdem der Schreibvorgang abgeschlossen ist}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})let count = 0function ProductionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}ProductionData()myWritable.on('drain', ProductionData)Das Obige ist ein beschreibbarer Stream mit einem Float-Wert von 10. Die Datenquelle ist nun eine fortlaufende Zahlenzeichenfolge von 0 bis 20, und ProductionData wird zum Schreiben von Daten verwendet.
Wenn myWritable.write("0") zum ersten Mal aufgerufen wird, wird „0“ nicht in den Cache-Pool eingegeben, sondern direkt an _wirte übergeben. Der Rückgabewert von myWritable .write("0") ist wahr
Wenn myWritable.write("1") ausgeführt wird, bedeutet dies, dass die letzten Daten noch nicht geschrieben wurden, da der Rückruf von _wirte noch nicht aufgerufen wurde. Die Position garantiert die Ordnungsmäßigkeit des Datenschreibens um „1“ zu speichern. „ Zum Cache-Pool hinzufügen. Dies gilt für die nächsten 2-9
Wenn myWritable.write("10") ausgeführt wird, beträgt die Pufferlänge 9 (1-9) und hat den Float-Wert "10" noch nicht erreicht. Er wird weiterhin als Puffer zum Cache-Pool hinzugefügt Die Länge beträgt 11, daher gibt myWritable.write("1") false zurück, was bedeutet, dass die Daten im Puffer ausreichen und wir auf die Drain-Ereignisbenachrichtigung warten müssen, um wieder Daten zu erzeugen.
Nach 100 ms wird der Rückruf von _write („0“, Kodierung, Rückruf) aufgerufen, der angibt, dass „0“ geschrieben wurde. Anschließend wird geprüft, ob Daten im Cache-Pool vorhanden sind. Wenn Daten vorhanden sind, wird zunächst _read aufgerufen, um den Hauptknoten des Cache-Pools („1“) zu verbrauchen, und dieser Vorgang wird dann wiederholt, bis der Cache-Pool leer ist , lösen Sie das Drain-Ereignis aus und führen Sie ProductionData erneut aus.
Rufen Sie myWritable.write("11") auf, um den Prozess von Schritt 1 bis zum Ende des Streams auszulösen.
Nach dem Verständnis des lesbaren Streams und des beschreibbaren Streams ist es leicht zu verstehen, dass der Duplex-Stream tatsächlich den lesbaren Stream erbt und dann den beschreibbaren Stream implementiert (der Quellcode ist so geschrieben, aber es sollte gesagt werden, dass er implementiert ist). Gleichzeitig ist es besser, lesbare und beschreibbare Streams zu haben.
Der Duplexfluss muss die folgenden zwei Methoden gleichzeitig implementieren
Implementieren Sie die Methode _read(), um Daten für lesbare Streams zu erzeugen
Implementieren Sie die Methode _write(), um Daten für beschreibbare Streams zu verbrauchen
Wie die beiden oben genannten Methoden implementiert werden, wurde in den oben genannten beschreibbaren und lesbaren Streams erläutert. Hierbei ist zu beachten, dass es jeweils zwei unabhängige Pufferpools für Duplex-Streams gibt und auch deren Datenquellen nicht identisch sind
Nehmen Sie als Beispiel den Standard-Eingabe- und Ausgabestream von NodeJs:
Wenn wir Daten in die Konsole eingeben, wird das Datenereignis ausgelöst, das beweist, dass es die Funktion eines lesbaren Streams hat. Jedes Mal, wenn der Benutzer eine Eingabe macht, entspricht dies dem Aufruf der lesbaren Push-Methode, um die erzeugten Daten zu pushen. Wenn wir seine Schreibmethode aufrufen, können wir auch Inhalte an die Konsole ausgeben, aber das Datenereignis wird nicht ausgelöst. Dies zeigt, dass es die Funktion eines beschreibbaren Streams hat und über einen unabhängigen Puffer verfügt Erlauben Sie der Konsole, Text anzuzeigen. // Immer wenn der Benutzer Daten auf der Konsole eingibt (_read), wird das Datenereignis ausgelöst, das ein Merkmal des lesbaren Streams ist ); })// Erzeugt jede Sekunde Daten im Standardeingabestream (dies ist eine Funktion eines beschreibbaren Streams, der direkt an die Konsole ausgegeben wird) und löst kein datasetInterval(()=>{process.stdin.write aus ('sind keine von der Benutzerkonsole eingegebenen Daten')}, 1000)Ein Duplex-Stream kann als lesbarer Stream mit beschreibbarem Stream betrachtet werden. Beide sind unabhängig und verfügen jeweils über unabhängige interne Puffer. Lese- und Schreibereignisse erfolgen unabhängig voneinander.
Duplex-Stream ----|. Lesen Sie <----- Externe Quelle ----| Schreiben Sie -----> Externe Senke ----|Transformationsströme sind duplex, wobei Lese- und Schreibvorgänge in einer Ursache-Wirkungs-Beziehung erfolgen. Die Endpunkte eines Duplex-Streams werden durch eine Transformation verbunden. Für einen Lesevorgang ist ein Schreibvorgang erforderlich.
Stream transformieren --------------|-------------- Sie schreiben ----> ----> Lesen Sie ----- ----------|--------------Beim Erstellen von Transform-Streams ist es am wichtigsten, die Methode _transform anstelle von _write oder _read zu implementieren. In _transform werden die vom beschreibbaren Stream geschriebenen Daten verarbeitet (konsumiert) und dann werden die Daten für den lesbaren Stream erzeugt.
Konvertierungsstreams implementieren häufig eine „_flush“-Methode, die vor dem Ende des Streams aufgerufen wird. Sie wird im Allgemeinen verwendet, um etwas an das Ende des Streams anzuhängen. Beim Komprimieren von Dateien werden hier beispielsweise einige Komprimierungsinformationen hinzugefügt } = require('fs')const { Transform, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const transform = new Transform({ highWaterMark: 10, transform(chunk,encoding , Rückruf){ // Konvertieren data, Rufen Sie Push auf, um das Konvertierungsergebnis zum Cache-Pool hinzuzufügen this.push(chunk.toString().replace('1', '@')) callback() }, Flush(callback){//Führen Sie this.push aus (' before end triggers <<<') callback() }})// write schreibt kontinuierlich Daten let count = 0transform.write('>>>')function ProductionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() }}ProductionData()transform.on('drain', ProductionData)let result = '' transform.on( 'data', data=>{ result += data.toString()})transform.on('end', ()=>{ console.log(result) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})