1.1. Historische Entwicklung von Streams
Streams sind kein einzigartiges Konzept für Nodejs. Sie wurden vor Jahrzehnten im Unix-Betriebssystem eingeführt und Programme können über den Pipe-Operator (|) in Streams miteinander interagieren.
Der Pipe-Operator (|) kann unter MacOS und Linux auf Basis von Unix-Systemen verwendet werden. Er kann die Ausgabe des Prozesses auf der linken Seite des Operators in die Eingabe auf der rechten Seite umwandeln.
Wenn wir in Node die herkömmliche readFile zum Lesen einer Datei verwenden, wird die Datei von Anfang bis Ende in den Speicher eingelesen. Wenn der gesamte Inhalt gelesen wurde, wird der Inhalt der in den Speicher geladenen Datei einheitlich verarbeitet.
Dies hat zwei Nachteile:
Speicher: Es nimmt viel Speicher in Anspruch.
Sie müssen warten, bis die gesamte Nutzlast der Daten geladen ist, bevor Sie mit der Verarbeitung der Daten
beginnen .js folgte und implementierte das Konzept von Streams. Im .js-Stream gibt es vier Arten von Streams. Sie sind alle Instanzen von EventEmitter in Node.js:
Lesbarer Stream,
beschreibbarer Stream,
lesbarer und beschreibbarer Vollduplex-Stream ( Duplex Stream)
Transform Stream (Transform Stream)
Um diesen Teil eingehend zu studieren und das Konzept der Streams in Node.js schrittweise zu verstehen, und da der Quellcode-Teil relativ kompliziert ist, habe ich beschlossen, diesen Teil aus dem lesbaren Stream zu lernen .
1.2. Was ist ein Stream?
Ein Stream ist eine abstrakte Datenstruktur, bei der es sich um eine Sammlung von Daten handelt. Die darin gespeicherten Datentypen können nur die folgenden Typen sein (nur für den Fall von objectMode === false):
We Sie können den Stream als Sammlung dieser Daten betrachten. Genau wie bei Flüssigkeiten speichern wir diese Flüssigkeiten zunächst in einem Behälter (dem internen Puffer BufferList des Streams) und gießen die darin enthaltene Flüssigkeit in das Rohr, wenn das entsprechende Ereignis ausgelöst wird . Und weisen Sie andere darauf hin, ihre eigenen Behälter auf die andere Seite des Rohrs zu stellen, um die darin enthaltene Flüssigkeit zur Entsorgung aufzufangen.
1.3. Was ist ein lesbarer Stream?
Ein lesbarer Stream ist eine Art Stream. Er verfügt über zwei Modi, drei Zustände
und zwei Lesemodi:
Daten werden vom zugrunde liegenden System gelesen und so schnell wie möglich weitergeleitet.
Pausenmodus
an den registrierten Ereignishandler übergeben
Drei Zustände:
readableFlowing = = = null: Der Aufruf von Stream.pipe() und Stream.resume ändert seinen Status in „true“, beginnt mit der Datengenerierung und löst aktiv das Ereignis
readableFlowing === false aus: Der Datenfluss wird zu diesem Zeitpunkt angehalten , aber nicht Die Generierung von Daten wird angehalten, sodass ein Datenrückstand auftritt.
readableFlowing === true: Normalerweise Daten generieren und verbrauchen
2.1 Interne Statusdefinition (ReadableState)
ReadableState
: ReadableState { objectMode: false, // Um andere Datentypen außer String, Buffer und Null zu verarbeiten, muss dieser Modus aktiviert werden highWaterMark: 16384, // Wasserstandsgrenze, 1024 * 16, Standard 16 KB, wenn diese Grenze überschritten wird , der Aufruf stoppt _read() liest Daten in den Puffer buffer: BufferList { head: null, tail: null, length: 0 }, // Pufferverknüpfte Liste, wird zum Speichern von Daten verwendet Länge: 0, // Die Größe von die gesamten lesbaren Stream-Daten, wenn objectMode gleich buffer.length ist. Pipes: [], // Alle Pipe-Warteschlangen speichern, die den Fluss des lesbaren Streams überwachen: null, // Der Status des unabhängigen Flusses ist null, false, true endete: false, // Alle Daten wurden verbraucht. endEmitted: false, // Ob das Endereignis gesendet wurde oder nicht. reading: false, // Ob die Daten gelesen werden. construction: true, // Der Stream kann vorher nicht verarbeitet werden es wird erstellt oder schlägt fehl. Destroy sync: true, // Ob das Ereignis „readable“/„data“ synchron ausgelöst werden soll oder bis zum nächsten Tick gewartet werden soll needReadable: false, // Ob es notwendig ist, das lesbare Ereignis zu senden. emittedReadable: false, // Das lesbare Ereignis wurde gesendet. readableListening: false, // Ob es ein lesbares Listening-Ereignis gibt. resumeScheduled: false, // Ob die Resume-Methode wurde aufgerufen errorEmitted: false, // Fehler Das Ereignis wurde gesendet emitClose: true, // Wenn der Stream zerstört wird, ob das Schließereignis gesendet werden soll autoDestroy: true, // Automatisch zerstört, wird nach dem „Ende“ aufgerufen Ereignis wird ausgelöst destroy: false, // Ob der Stream zerstört wurde, error: null, // Identifiziert, ob der Stream einen Fehler gemeldet hat geschlossen: false, // Ob der Stream geschlossen wurde closeEmitted: false, // Ob der Close Ereignis wurde gesendet defaultEncoding: 'utf8', // Das StandardzeichenkodierungsformatawaitDrainWriters: null, // Zeigt auf die überwachte 'drain'-Writer-Referenz des Ereignisses, Typ ist null, Beschreibbar, Set<Writable> multiAwaitDrain: false, // Ob mehrere Writer auf das Drain-Ereignis warten readingMore: false, // Ob weitere Daten gelesen werden können dataEmitted: false, // Die Daten wurden gesendet Decoder: null, // Decoder-Kodierung: null, // Encoder[Symbol(kPaused)]: null },
2.2. Interne Datenspeicherimplementierung (BufferList)
BufferList ist ein Container, der zum Speichern interner Daten in einem Stream verwendet wird. Er ist in Form einer verknüpften Liste konzipiert und verfügt über drei Attribute: Kopf, Ende und Länge.
Ich stelle jeden Knoten in der BufferList als BufferNode dar und der darin enthaltene Datentyp hängt vom Objektmodus ab.
Diese Datenstruktur erhält Header-Daten schneller als Array.prototype.shift().
2.2.1. Art der Datenspeicherungwenn objectMode === true:
Dann können Daten jeglicher Art sein. Alle übertragenen Daten werden gespeichert.
objectMode=true
const Stream = require('stream'); const readableStream = new Stream.Readable({ Objektmodus: wahr, lesen() {}, }); readableStream.push({ name: 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(true); console.log(readableStream._readableState.buffer.tail); readableStream.push('lisa'); console.log(readableStream._readableState.buffer.tail); readableStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Symbol(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
Laufergebnisse:
wenn objectMode === false:
Dann können Daten nur String oder Buffer oder Uint8Array sein
objectMode=false
const Stream = require('stream'); const readableStream = new Stream.Readable({ Objektmodus: falsch, lesen() {}, }); readableStream.push({ name: 'lisa'});
Laufergebnisse:
2.2.2. DatenspeicherstrukturWir erstellen über die Node-Befehlszeile einen lesbaren Stream in der Konsole, um Änderungen in den Daten im Puffer zu beobachten:
Bevor wir Daten übertragen, müssen wir natürlich die _read-Methode implementieren oder die read-Methode in den Parametern des Konstruktors implementieren:
const Stream = require('stream'); const readableStream = new Stream.Readable(); RS._read = Funktion(Größe) {}
oder
const Stream = require('stream'); const readableStream = new Stream.Readable({ read(size) {} });
Nach der Operation readableStream.push('abc') ist der aktuelle Puffer:
Sie können sehen, dass die am Anfang und am Ende gespeicherten Daten die ASCII-Codes der Zeichenfolge „abc“ sind und der Typ Puffertyp ist. Die Länge stellt die Anzahl der aktuell gespeicherten Daten dar und nicht die Größe den Dateninhalt.
2.2.3. Verwandte APIsDrucken Sie alle Methoden von BufferList aus, die Sie erhalten können:
Mit Ausnahme von Join, der die BufferList in eine Zeichenfolge serialisiert, handelt es sich bei allen anderen um Datenzugriffsvorgänge.
Ich werde hier nicht alle Methoden einzeln erklären, sondern mich auf consume, _getString und _getBuffer konzentrieren.
2.2.3.1.verbrauchen
Quellcode-Adresse: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
konsumieren
// Verbraucht eine angegebene Menge an Bytes oder Zeichen aus den gepufferten Daten. verbrauchen(n, hasStrings) { const data = this.head.data; if (n < data.length) { // `slice` ist für Puffer und Strings dasselbe. const Slice = data.slice(0, n); this.head.data = data.slice(n); Return-Slice; } if (n === data.length) { // Der erste Block passt perfekt. return this.shift(); } // Ergebnis erstreckt sich über mehr als einen Puffer. return hasStrings ? this._getString(n) : this._getBuffer(n); }
Der Code enthält drei Beurteilungsbedingungen:
Wenn die Bytelänge der verbrauchten Daten kleiner ist als die Länge der im Kopfknoten der verknüpften Liste gespeicherten Daten, werden die ersten n Bytes der Daten des Kopfknotens verwendet und die Daten des aktuellen Kopfknotens festgelegt zu den Daten nach dem Slicing.
Wenn die verbrauchten Daten genau der Länge der im Kopfknoten der verknüpften Liste gespeicherten Daten entsprechen, werden die Daten des aktuellen Kopfknotens direkt zurückgegeben.
Wenn die Länge der verbrauchten Daten größer ist als die Länge des Kopfknotens der verknüpften Liste, erfolgt die letzte Beurteilung anhand des zweiten übergebenen Parameters, um zu bestimmen, ob die unterste Ebene der aktuellen BufferList eine Zeichenfolge oder einen Puffer speichert .
2.2.3.2. _getBuffer
Quellcode-Adresse: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
konsumieren
// Verbraucht eine angegebene Menge an Bytes aus den gepufferten Daten. _getBuffer(n) { const ret = Buffer.allocUnsafe(n); const retLen = n; let p = this.head; sei c = 0; Tun { const buf = p.data; if (n > buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.length; } anders { if (n === buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++c; if (S.nächste) this.head = p.next; anders this.head = this.tail = null; } anders { TypedArrayPrototypeSet(ret, neues Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); this.head = p; p.data = buf.slice(n); } brechen; } ++c; } while ((p = p.next) !== null); this.length -= c; return ret; }
Im Allgemeinen handelt es sich um eine Schleife zum Betreiben der Knoten in der verknüpften Liste und zum Erstellen eines neuen Pufferarrays zum Speichern der zurückgegebenen Daten.
Beginnen Sie zunächst mit dem Abrufen von Daten vom Kopfknoten der verknüpften Liste und kopieren Sie sie weiter in den neu erstellten Puffer, bis die Daten eines bestimmten Knotens größer oder gleich der abzurufenden Länge minus der erhaltenen Länge sind.
Mit anderen Worten: Nachdem der letzte Knoten der verknüpften Liste gelesen wurde, hat er nicht die gewünschte Länge erreicht, sodass der neu erstellte Puffer zurückgegeben wird.
2.2.3.3. _getString
Quellcode-Adresse: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
konsumieren
// Verbraucht eine angegebene Anzahl von Zeichen aus den gepufferten Daten. _getString(n) { let ret = ''; let p = this.head; sei c = 0; Tun { const str = p.data; if (n > str.length) { ret += str; n -= str.length; } anders { if (n === str.length) { ret += str; ++c; if (S.nächste) this.head = p.next; anders this.head = this.tail = null; } anders { ret += StringPrototypeSlice(str, 0, n); this.head = p; p.data = StringPrototypeSlice(str, n); } brechen; } ++c; } while ((p = p.next) !== null); this.length -= c; return ret; }
Die Operation von Strings ist die gleiche wie die von Buffers. Es werden auch Daten aus dem Kopf der verknüpften Liste in einer Schleife gelesen. Darüber hinaus gibt es einige Unterschiede beim Kopieren und Speichern der Daten Die Operation _getString ist vom Typ String.
2.3. Warum sind Streams Instanzen von EventEmitter?
Für diese Frage müssen wir zunächst verstehen, was das Publish-Subscribe-Modell ist. Es gibt in den meisten APIs wichtige Anwendungen. Unabhängig davon, ob es sich um Promise oder Redux handelt, sind überall erweiterte APIs zu sehen.
Sein Vorteil besteht darin, dass er die ereignisbezogenen Rückruffunktionen in der Warteschlange speichern und dann die andere Partei benachrichtigen kann, die Daten zu einem bestimmten Zeitpunkt in der Zukunft zu verarbeiten, wodurch eine Trennung der Bedenken erreicht wird. Der Produzent produziert nur Daten und benachrichtigt den Verbraucher , während der Verbraucher Dann verarbeitet es nur die entsprechenden Ereignisse und die entsprechenden Daten, und das Streaming-Modell von Node.js entspricht genau dieser Eigenschaft.
Wie implementiert der Node.js-Stream die Erstellung von Instanzen basierend auf EventEmitter?
Der Quellcode hierfür ist hier: stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
Vermächtnis
Funktion Stream(opts) { EE.call(this, opts); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE);
Dann gibt es im Quellcode des lesbaren Streams diese Codezeilen:
Dieser Teil des Quellcodes ist hier: lesbar https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
Vermächtnis
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream);
Erben Sie zunächst das Prototypobjekt von Stream von EventEmitter, damit alle Instanzen von Stream auf die Methoden von EventEmitter zugreifen können.
Gleichzeitig werden die statischen Methoden von EventEmitter auch über ObjectSetPrototypeOf (Stream, EE) geerbt. Im Konstruktor von Stream wird der Konstruktor EE ausgeliehen, um die Vererbung aller Eigenschaften in EventEmitter und dann im lesbaren Stream zu realisieren. Verwenden Sie dasselbe Die Methode implementiert die prototypische Vererbung und die Vererbung statischer Eigenschaften der Stream-Klasse und erhält so:
Readable.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototype
daher:
Readable.prototype.__proto__.__proto__ === EE.prototype
Daher können Sie den Prototyp von EventEmitter finden, indem Sie die Prototypenkette des lesbaren Streams verfolgen und die Vererbung von EventEmitter realisieren.
2.4. Implementierung verwandter APIs
Die APIs werden hier in der Reihenfolge angezeigt, in der sie in den Quellcodedokumenten erscheinen, und es werden nur die Kern-API-Implementierungen erläutert.
Hinweis: Hier werden nur die im lesbaren Stream-Quellcode von Node.js deklarierten Funktionen interpretiert und extern eingeführte Funktionsdefinitionen werden nicht berücksichtigt. Um die Länge zu reduzieren, werden nicht alle Codes kopiert.
Lesbarer.Prototyp
Streamen { zerstören: [Funktion: zerstören], _undestroy: [Funktion: unestroy], _destroy: [Funktion (anonym)], push: [Funktion (anonym)], unshift: [Funktion (anonym)], isPaused: [Funktion (anonym)], setEncoding: [Funktion (anonym)], lesen: [Funktion (anonym)], _read: [Funktion (anonym)], Pipe: [Funktion (anonym)], unpipe: [Funktion (anonym)], zu: [Funktion (anonym)], addListener: [Funktion (anonym)], removeListener: [Funktion (anonym)], aus: [Funktion (anonym)], removeAllListeners: [Funktion (anonym)], Lebenslauf: [Funktion (anonym)], Pause: [Funktion (anonym)], wrap: [Funktion (anonym)], Iterator: [Funktion (anonym)], [Symbol(nodejs.rejection)]: [Funktion (anonym)], [Symbol(Symbol.asyncIterator)]: [Funktion (anonym)] }2.4.1. schieben
lesbar.push
Readable.prototype.push = Funktion(Chunk, Kodierung) { return readableAddChunk(this, chunk, binding, false); };
Die Hauptfunktion der Push-Methode besteht darin, den Datenblock durch Auslösen des „Daten“-Ereignisses an die Downstream-Pipeline zu übergeben oder die Daten in einem eigenen Puffer zu speichern.
Der folgende Code ist relevanter Pseudocode und zeigt nur den Hauptprozess:
lesbar.push
Funktion readableAddChunk(stream, chunk,kodierung, addToFront) { const state = stream._readableState; if (chunk === null) { // Null-Stream-Endsignal drücken, danach können keine Daten mehr geschrieben werden state.reading = false; onEofChunk(stream, state); } else if (!state.objectMode) { // Wenn nicht Objektmodus if (typeof chunk === 'string') { chunk = Buffer.from(chunk); } else if (chunk instanceof Buffer) { //Wenn es Buffer ist // Verarbeiten Sie die Kodierung} else if (Stream._isUint8Array(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); } else if (chunk != null) { err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } } if (state.objectMode || (chunk && chunk.length > 0)) { // Es ist Objektmodus oder Chunk ist Puffer // Die Beurteilung mehrerer Methoden zum Einfügen von Daten wird hier weggelassen addChunk(stream, state, chunk, true); } } Funktion addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { // Im Streaming-Modus gibt es Abonnenten, die Daten hören stream.emit('data', chunk); } else { // Ansonsten die Daten im Puffer speichern state.length += state.objectMode 1 : chunk.length; if (addToFront) { state.buffer.unshift(chunk); } anders { state.buffer.push(chunk); } } MaybeReadMore(stream, state); // Versuchen Sie, etwas mehr Daten zu lesen}
Der Push-Vorgang ist hauptsächlich in die Beurteilung des Objektmodus unterteilt. Verschiedene Typen führen unterschiedliche Vorgänge für die eingehenden Daten aus:
Das erste Urteil von addChunk besteht hauptsächlich darin, die Situation zu behandeln, in der sich Readable im Fließmodus befindet, über einen Daten-Listener verfügt und die Pufferdaten leer sind.
Zu diesem Zeitpunkt werden die Daten hauptsächlich an andere Programme weitergeleitet, die das Datenereignis abonnieren, andernfalls werden die Daten im Puffer gespeichert.
2.4.2. lesenMit Ausnahme der Beurteilung der Randbedingungen und des Strömungsstatus umfasst diese Methode hauptsächlich zwei Vorgänge.
Rufen Sie die vom Benutzer implementierte Methode _read auf, um die Ausführungsergebnisse zu verarbeiten
Daten aus dem Pufferspeicher lesen und das Ereignis „Daten“ auslösen
lesbar.lesbar
// Wenn die Leselänge größer als hwm ist, wird hwm neu berechnet if (n > state.highWaterMark) { state.highWaterMark = computeNewHighWaterMark(n); } // Rufen Sie die vom Benutzer implementierte Methode _read auf try { const result = this._read(state.highWaterMark); if (Ergebnis != null) { const then = result.then; if (typeof then === 'function') { then.call( Ergebnis, Nein, Funktion(err) { errorOrDestroy(this, err); }); } } } fangen (irrt) { errorOrDestroy(this, err); }
Wenn die vom Benutzer implementierte _read-Methode ein Versprechen zurückgibt, rufen Sie die then-Methode dieses Versprechens auf und übergeben Sie die Erfolgs- und Fehlerrückrufe, um die Behandlung von Ausnahmen zu erleichtern.
Der Kerncode der Lesemethode zum Lesen von Zonendaten aus dem Puffer lautet wie folgt:
lesbar.lesbar
Funktion fromList(n, state) { // nichts gepuffert. if (state.length === 0) null zurückgeben; lass ret; if (state.objectMode) ret = state.buffer.shift(); else if (!n || n >= state.length) { // Den Fall behandeln, in dem n leer oder größer als die Länge des Puffers ist // Alles lesen, Liste abschneiden. if (state.decoder) // Wenn es einen Decoder gibt, serialisieren Sie das Ergebnis in einen String ret = state.buffer.join(''); else if (state.buffer.length === 1) // Es gibt nur ein Datenelement, die Daten des Kopfknotens werden zurückgegeben ret = state.buffer.first(); else // Alle Daten in einem Puffer speichern ret = state.buffer.concat(state.length); state.buffer.clear(); // Puffer löschen} else { // Behandeln Sie die Situation, in der die Leselänge kleiner als der Puffer ist ret = state.buffer.consume(n, state.decoder); } return ret; }2.4.3. _lesen
Eine Methode, die implementiert werden muss, wenn Benutzer einen lesbaren Stream initialisieren. Sie können die Push-Methode in dieser Methode aufrufen, um die Lesemethode kontinuierlich auszulösen. Wenn wir Null drücken, können wir den Schreibvorgang des Streams stoppen.
Beispielcode:
lesbar._read
const Stream = require('stream'); const readableStream = new Stream.Readable({ read(hwm) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 122) { this.push(null); } }, }); readableStream.currentCharCode = 97; readableStream.pipe(process.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4. Rohr (wichtig)
Binden Sie einen oder mehrere beschreibbare Streams an den aktuellen lesbaren Stream und schalten Sie den lesbaren Stream in den Fließmodus.
In dieser Methode gibt es viele Event-Listening-Handles, und ich werde sie hier nicht einzeln vorstellen:
readable.pipe
Readable.prototype.pipe = function(dest, pipeOpts) { const src = this; const state = this._readableState; state.pipes.push(dest); // Beschreibbaren Stream sammeln src.on('data', ondata); Funktion ondata(chunk) { const ret = dest.write(chunk); if (ret === false) { Pause(); } } // Dem Ziel mitteilen, an welches es weitergeleitet wird. dest.emit('pipe', src); // Stream starten, wenn sich der Stream im Pausenmodus befindet if (dest.writableNeedDrain === true) { if (state.flowing) { Pause(); } } else if (!state.flowing) { src.resume(); } Rückkehrziel; }
Der Pipe-Vorgang ist dem Linux-Pipe-Operator „|“ sehr ähnlich und ändert die linke Ausgabe in die rechte Eingabe. Diese Methode sammelt den beschreibbaren Stream zur Wartung und löst das Ereignis „data“ aus, wenn der lesbare Stream verfügbar ist.
Wenn Daten ausfließen, wird das Schreibereignis des beschreibbaren Streams ausgelöst, sodass Daten übertragen und Operationen wie eine Pipeline realisiert werden können. Und ändert automatisch den lesbaren Stream im Pausenmodus in den Fließmodus.
2.4.5. LebenslaufSchalten Sie den Stream vom „Pause“-Modus in den „Flow“-Modus um. Wenn der „lesbare“ Ereignis-Listener festgelegt ist, hat diese Methode keine Auswirkung.
lesbarer Lebenslauf
Readable.prototype.resume = function() { const state = this._readableState; if (!state.flowing) { state.flowing = !state.readableListening; // Ob es sich im Fließmodus befindet, hängt davon ab, ob das „lesbare“ Listening-Handle gesetzt ist resume(this, state); } }; Funktion resume(stream, state) { if (!state.resumeScheduled) { // Wechseln Sie so, dass die resume_-Methode nur einmal im selben Tick aufgerufen wird state.resumeScheduled = true; process.nextTick(resume_, stream, state); } } Funktion resume_(stream, state) { if (!state.reading) { stream.read(0); } state.resumeScheduled = false; stream.emit('resume'); fließen(Strom); } function flow(stream) { // Wenn sich der Stream im Streaming-Modus befindet, liest diese Methode weiterhin Daten aus dem Puffer, bis der Puffer leer ist const state = stream._readableState; while (state.flowing && stream.read() !== null); // Da hier die Lesemethode aufgerufen wird und der Stream des „lesbaren“ Ereignis-Listeners festgelegt ist, kann auch die Lesemethode aufgerufen werden. //Dies führt zu inkohärenten Daten (hat keine Auswirkungen auf die Daten, sondern nur auf den Aufruf der Lesemethode im „readable“-Ereignisrückruf zum Lesen von Daten) }2.4.6. Pause
Ändern Sie den Stream vom Fließmodus in den Pausenmodus, beenden Sie das Auslösen des „Daten“-Ereignisses und speichern Sie alle Daten im Puffer
lesbar.Pause
Readable.prototype.pause = function() { if (this._readableState.flowing !== false) { debug('pause'); this._readableState.flowing = false; this.emit('pause'); } gib dies zurück; };
2.5. Verwendung und Arbeitsmechanismus
Die Verwendungsmethode wurde im Abschnitt BufferList erwähnt. Erstellen Sie eine Readable-Instanz und implementieren Sie ihre _read()-Methode oder implementieren Sie die Lesemethode im ersten Objektparameter des Konstruktors.
2.5.1. ArbeitsmechanismusHier zeichnen wir nur den allgemeinen Prozess und die Auslösebedingungen für die Moduskonvertierung des Readable-Streams.
In: