1.1. Evolution historique des flux
Les flux ne sont pas un concept propre à Nodejs. Ils ont été introduits il y a des décennies dans le système d'exploitation Unix, et les programmes peuvent interagir les uns avec les autres sur les flux via l'opérateur de canal (|).
L'opérateur pipe (|) peut être utilisé sous MacOS et Linux basés sur les systèmes Unix. Il peut convertir la sortie du processus sur le côté gauche de l'opérateur en entrée sur le côté droit.
Dans Node, si nous utilisons le readFile traditionnel pour lire un fichier, le fichier sera lu dans la mémoire du début à la fin. Lorsque tout le contenu aura été lu, le contenu du fichier chargé dans la mémoire sera traité uniformément.
Il y a deux inconvénients à faire cela :
la mémoire : cela prend beaucoup de mémoire ;
le temps : vous devez attendre que la totalité de la charge utile des données soit chargée avant de commencer à traiter les données.
Afin de résoudre les problèmes ci-dessus, Node. .js a suivi et implémenté le concept de flux. Dans Node Dans le flux .js, il existe quatre types de flux. Ce sont tous des instances d'EventEmitter dans Node.js :
flux lisible,
flux inscriptible,
flux duplex intégral lisible et inscriptible (). Duplex Stream)
Transform Stream (Transform Stream)
Afin d'étudier cette partie en profondeur et de comprendre progressivement le concept de flux dans Node.js, et parce que la partie code source est relativement compliquée, j'ai décidé de commencer à apprendre cette partie à partir du flux lisible .
1.2. Qu'est-ce qu'un flux ?
Un flux est une structure de données abstraite, qui est une collection de données. Les types de données qui y sont stockés ne peuvent être que les types suivants (uniquement dans le cas de objectMode === false) :
We
.peut utiliser le flux Vu comme une collection de ces données, tout comme les liquides, on sauvegarde d'abord ces liquides dans un conteneur (le tampon interne BufferList du flux), et lorsque l'événement correspondant est déclenché, on verse le liquide à l'intérieur dans le tuyau . Et informez les autres de placer leurs propres conteneurs de l’autre côté du tuyau pour récupérer le liquide à l’intérieur et l’éliminer.
1.3. Qu'est-ce qu'un flux lisible ?
Un flux lisible est un type de flux. Il a deux modes, trois états
et deux modes de lecture :
mode flux : les données seront lues à partir du système sous-jacent et transmises via EventEmitter dès que possible. Les données sont transmises au gestionnaire d'événements enregistré en
mode pause : dans ce mode, les données ne seront pas lues et la méthode Stream.read() doit être explicitement appelée pour lire les données du flux.
Trois états :
readableFlowing = =. = null : Aucune donnée ne sera générée. L'appel de Stream.pipe() et Stream.resume changera son statut en true, commencera à générer des données et déclenchera activement l'événement
readableFlowing === false : Le flux de données sera suspendu à ce moment. , mais pas La génération de données sera suspendue, donc un retard de données se produira.
readableFlowing === true : Générer et consommer normalement des données
2.1 Définition de l'état interne (ReadableState)
ReadableState
_readableState : ReadableState {. objectMode : false, // Pour utiliser d'autres types de données à l'exception des chaînes, des tampons et des valeurs nulles, ce mode doit être activé. highWaterMark : 16384, // Limite du niveau d'eau, 1024 * 16, par défaut 16 Ko, si cette limite est dépassée , l'appel s'arrêtera _read() lit les données dans le tampon : BufferList { head: null, tail: null, length: 0 }, // Liste chaînée du tampon, utilisée pour enregistrer les données length: 0, // La taille de l'intégralité des données du flux lisible, si objectMode est égal à buffer.length pipes : [], // Enregistre toutes les files d'attente de tuyaux qui surveillent le flux lisible : null, // L'état du flux indépendant est null, false, true terminé : faux, // Toutes les données ont été consommées endEmis : faux, // Si l'événement de fin a été envoyé ou non lu : faux, // Si les données sont en cours de lecture construit : vrai, // Le flux ne peut pas être traité avant il est construit ou échoue. Destroy sync: true, // S'il faut déclencher l'événement 'readable'/'data' de manière synchrone, ou attendre le prochain tick. needReadable: false, // S'il est nécessaire d'envoyer l'événement lisible émisReadable: false, // L'événement lisible a été envoyé readableListening: false, // S'il existe un événement d'écoute lisible curriculum vitae: false, // Si la méthode de reprise a été appelé errorEmit: false, // Erreur L'événement a été envoyé submitClose: true, // Lorsque le flux est détruit, s'il faut envoyer l'événement de fermeture autoDestroy: true, // Automatiquement détruit, il est appelé après la 'fin' l'événement est déclenché destroy: false, // Si le flux a été détruit en erreur: null, // Identifie si le flux a signalé une erreur close: false, // Si le flux a été fermé closeEmit: false, // Si la fermeture l'événement a été envoyé defaultEncoding: 'utf8', // Le format d'encodage de caractères par défaut waitDrainWriters: null, // Pointe vers la référence 'drain' Writer surveillée de l'événement, le type est null, Writable, Set<Writable> multiAwaitDrain: false, // S'il y a plusieurs écrivains en attente de l'événement de drain readingMore: false, // Si plus de données peuvent être lues dataEmit: false, // Les données ont été envoyées au décodeur: null, // Encodage du décodeur: null, // Encodeur[Symbole(kPause)] : null },
2.2. Implémentation du stockage de données internes (BufferList)
BufferList est un conteneur utilisé pour stocker des données internes dans un flux. Il est conçu sous la forme d'une liste chaînée et possède trois attributs : head, tail et length.
Je représente chaque nœud de la BufferList en tant que BufferNode, et le type de données à l'intérieur dépend du objectMode.
Cette structure de données obtient les données d'en-tête plus rapidement que Array.prototype.shift().
2.2.1. Type de stockage des donnéessi objectMode === vrai :
Les données peuvent alors être de n’importe quel type. Quelles que soient les données transmises, elles seront stockées.
modeobjet=true
const Stream = require('stream'); const readableStream = nouveau Stream.Readable({ modeobjet : vrai, lire() {}, }); readableStream.push({ nom : 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(true); console.log(readableStream._readableState.buffer.tail); readableStream.push('lisa'); console.log(readableStream._readableState.buffer.tail); readableStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Symbol(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
Résultats en cours d'exécution :
si objectMode === false :
Ensuite, les données ne peuvent être qu'une chaîne ou un Buffer ou un Uint8Array
modeobjet=faux
const Stream = require('stream'); const readableStream = nouveau Stream.Readable({ ModeObjet : faux, lire() {}, }); readableStream.push({ nom : 'lisa'});
Résultats en cours d'exécution :
2.2.2. Structure de stockage des donnéesNous créons un flux lisible dans la console via la ligne de commande du nœud pour observer les changements dans les données dans le tampon :
Bien sûr, avant de pousser des données, nous devons implémenter sa méthode _read, ou implémenter la méthode read dans les paramètres du constructeur :
const Stream = require('stream'); const readableStream = new Stream.Readable(); RS._read = fonction (taille) {}
ou
const Stream = require('stream'); const readableStream = nouveau Stream.Readable({ lire (taille) {} });
Après l'opération readableStream.push('abc'), le tampon actuel est :
Vous pouvez voir que les données actuelles sont stockées. Les données stockées au début et à la fin sont les codes ASCII de la chaîne « abc », et le type est de type Buffer. La longueur représente le nombre de données actuellement enregistrées plutôt que la taille de. le contenu des données.
2.2.3. API associéesEn imprimant toutes les méthodes de BufferList, vous pouvez obtenir :
À l'exception de la jointure, qui sérialise la BufferList en une chaîne, les autres sont toutes des opérations d'accès aux données.
Je n'expliquerai pas toutes les méthodes une par une ici, mais me concentrerai sur consumer, _getString et _getBuffer.
2.2.3.1.consommer
Adresse du code source : BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
consommer
// Consomme une quantité spécifiée d'octets ou de caractères des données mises en mémoire tampon. consommer(n, hasStrings) { const data = this.head.data; si (n < data.length) { // `slice` est le même pour les tampons et les chaînes. const tranche = data.slice(0, n); this.head.data = data.slice(n); tranche de retour ; } si (n === data.length) { // Le premier morceau correspond parfaitement. renvoie this.shift(); } // Le résultat s'étend sur plusieurs tampons. return hasStrings ? this._getString(n) : this._getBuffer(n); }
Il y a trois conditions de jugement dans le code :
Si la longueur en octets des données consommées est inférieure à la longueur des données stockées dans le nœud principal de la liste chaînée, les n premiers octets des données du nœud principal sont pris et les données du nœud principal actuel sont définies. aux données après le découpage.
Si les données consommées sont exactement égales à la longueur des données stockées dans le nœud principal de la liste chaînée, les données du nœud principal actuel sont renvoyées directement.
Si la longueur des données consommées est supérieure à la longueur du nœud principal de la liste chaînée, le dernier jugement sera effectué sur la base du deuxième paramètre transmis pour déterminer si la couche inférieure de la BufferList actuelle stocke une chaîne ou un Buffer. .
2.2.3.2. _getBuffer
Adresse du code source : BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
consommer
// Consomme une quantité spécifiée d'octets à partir des données mises en mémoire tampon. _getBuffer(n) { const ret = Buffer.allocUnsafe(n); const retLen = n; soit p = this.head; soit c = 0 ; faire { const buf = p.data; si (n > buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.longueur; } autre { si (n === buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++c; si (p.suivant) this.head = p.next; autre this.head = this.tail = null ; } autre { TypedArrayPrototypeSet(ret, nouveau Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); this.head = p; p.data = buf.slice(n); } casser; } ++c; } while ((p = p.next) !== null); this.length -= c; retour à la retraite; }
En général, il s'agit d'une boucle pour exploiter les nœuds de la liste chaînée et créer un nouveau tableau Buffer pour stocker les données renvoyées.
Tout d'abord, commencez à récupérer les données du nœud principal de la liste chaînée et continuez à les copier dans le tampon nouvellement créé jusqu'à ce que les données d'un certain nœud soient supérieures ou égales à la longueur à récupérer moins la longueur qui a été obtenue.
En d'autres termes, après avoir lu le dernier nœud de la liste chaînée, il n'a pas atteint la longueur souhaitée, donc le Buffer nouvellement créé est renvoyé.
2.2.3.3. _getString
Adresse du code source : BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
consommer
// Consomme un nombre spécifié de caractères à partir des données mises en mémoire tampon. _getString(n) { soit ret = ''; soit p = this.head; soit c = 0 ; faire { const str = p.data; si (n > str.length) { ret += chaîne ; n -= str.length; } autre { if (n === str.length) { ret += chaîne ; ++c; si (p.suivant) this.head = p.next; autre this.head = this.tail = null ; } autre { ret += StringPrototypeSlice(str, 0, n); this.head = p; p.data = StringPrototypeSlice(str, n); } casser; } ++c; } while ((p = p.next) !== null); this.length -= c; retour à la retraite; }
Le fonctionnement des chaînes est le même que celui des Buffers. Il lit également les données de la tête de la liste chaînée en boucle. Il n'y a que quelques différences dans la copie et le stockage des données. L’opération _getString est de type chaîne.
2.3. Pourquoi les instances de flux lisibles d'EventEmitter ?
Pour cette question, nous devons d'abord comprendre ce qu'est le modèle de publication-abonnement. Le modèle de publication-abonnement a des applications importantes dans la plupart des API. Qu'il s'agisse de Promise ou de Redux, les API avancées basées sur le modèle de publication-abonnement peuvent être vues partout.
Son avantage est qu'il peut stocker les fonctions de rappel liées aux événements dans la file d'attente, puis demander à l'autre partie de traiter les données à un certain moment dans le futur, réalisant ainsi une séparation des préoccupations. Le producteur produit uniquement des données et informe le consommateur. , tandis que le consommateur traite ensuite uniquement les événements correspondants et leurs données correspondantes, et le modèle de streaming Node.js correspond parfaitement à cette caractéristique.
Alors, comment le flux Node.js implémente-t-il la création d'instances basées sur EventEmitter ?
Le code source est ici : stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
héritage
fonction Flux (opt) { EE.call (ceci, opte); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Flux, EE);
Ensuite, il y a ces lignes de code dans le code source du flux lisible :
Cette partie du code source est ici : lisible https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
héritage
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf (Lisible, Flux);
Tout d’abord, héritez de l’objet prototype de Stream d’EventEmitter, afin que toutes les instances de Stream puissent accéder aux méthodes sur EventEmitter.
Dans le même temps, les méthodes statiques sur EventEmitter sont également héritées via ObjectSetPrototypeOf(Stream, EE), et dans le constructeur de Stream, le constructeur EE est emprunté pour réaliser l'héritage de toutes les propriétés dans EventEmitter, puis dans le flux lisible, utiliser la même méthode La méthode implémente l'héritage prototypique et l'héritage des propriétés statiques de la classe Stream, obtenant ainsi :
Readable.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototype
donc:
Lisible.prototype.__proto__.__proto__ === EE.prototype
Par conséquent, vous pouvez trouver le prototype d'EventEmitter en traçant la chaîne de prototypes du flux lisible et réaliser l'héritage d'EventEmitter.
2.4. Implémentation des API associées
Les API seront affichées ici dans l'ordre dans lequel elles apparaissent dans les documents de code source, et seules les implémentations principales des API seront expliquées.
Remarque : Seules les fonctions déclarées dans le code source du flux lisible Node.js sont interprétées ici, et les définitions de fonctions introduites en externe ne sont pas incluses. Afin de réduire la longueur, tous les codes ne seront pas copiés.
Prototype lisible
Flux { détruire : [Fonction : détruire], _undestroy : [Fonction : unestroy], _destroy : [Fonction (anonyme)], push : [Fonction (anonyme)], unshift : [Fonction (anonyme)], isPaused : [Fonction (anonyme)], setEncoding : [Fonction (anonyme)], lire : [Fonction (anonyme)], _read : [Fonction (anonyme)], pipe : [Fonction (anonyme)], unpipe : [Fonction (anonyme)], sur : [Fonction (anonyme)], addListener : [Fonction (anonyme)], removeListener : [Fonction (anonyme)], off : [Fonction (anonyme)], removeAllListeners : [Fonction (anonyme)], CV : [Fonction (anonyme)], pause : [Fonction (anonyme)], wrap: [Fonction (anonyme)], itérateur : [Fonction (anonyme)], [Symbole(nodejs.rejection)] : [Fonction (anonyme)], [Symbol(Symbol.asyncIterator)] : [Fonction (anonyme)] }2.4.1. pousser
lisible.push
Readable.prototype.push = fonction (morceau, encodage) { return readableAddChunk(this, chunk, encoding, false); } ;
La fonction principale de la méthode push est de transmettre le bloc de données au pipeline en aval en déclenchant l'événement « data », ou de stocker les données dans son propre tampon.
Le code suivant est un pseudocode pertinent et montre uniquement le processus principal :
lisible.push
function readableAddChunk (stream, chunk, encoding, addToFront) { état const = flux._readableState ; if (chunk === null) { // pousse le signal de fin de flux nul, plus aucune donnée ne peut être écrite après cet état.reading = false; onEofChunk (flux, état); } else if (!state.objectMode) { // Si ce n'est pas le mode objet if (typeof chunk === 'string') { morceau = Buffer.from(morceau); } else if (chunk instanceof Buffer) { //S'il s'agit de Buffer // Traitez l'encodage} else if (Stream._isUint8Array(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); } sinon if (morceau != null) { err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } } if (state.objectMode || (chunk && chunk.length > 0)) { // C'est le mode objet ou le chunk est Buffer // Le jugement de plusieurs méthodes d'insertion de données est omis ici addChunk(stream, state, chunk, true); } } fonction addChunk (flux, état, morceau, addToFront) { if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { // En mode streaming, des abonnés écoutent des données stream.emit('data', chunk); } else { // Sinon, enregistrez les données dans le tampon state.length += state.objectMode 1 : chunk.length; si (addToFront) { state.buffer.unshift(morceau); } autre { state.buffer.push(morceau); } } MaybeReadMore(stream, state); // Essayez de lire un peu plus de données}
L'opération push est principalement divisée en jugeant le mode objet. Différents types effectueront différentes opérations sur les données entrantes :
Le premier jugement de addChunk est principalement de gérer la situation où Readable est en mode fluide, dispose d'un écouteur de données et les données du tampon sont vides.
À ce stade, les données sont principalement transmises à d'autres programmes abonnés à l'événement de données, sinon les données sont enregistrées dans la mémoire tampon.
2.4.2. lireÀ l'exception du jugement des conditions aux limites et de l'état de l'écoulement, cette méthode comporte principalement deux opérations.
Appelez la méthode _read implémentée par l'utilisateur pour traiter les résultats de l'exécution
Lire les données du tampon et déclencher l'événement « données »
lisible.lu
// Si la longueur de lecture est supérieure à hwm, hwm sera recalculé si (n > state.highWaterMark) { state.highWaterMark = calculateNewHighWaterMark(n); } // Appel de la méthode _read implémentée par l'utilisateur try { résultat const = this._read(state.highWaterMark); si (résultat != null) { const then = résultat.then; if (typeof then === 'fonction') { alors.appel( résultat, non, fonction (erreur) { errorOrDestroy(this, err); }); } } } attraper (erreur) { errorOrDestroy(this, err); }
Si la méthode _read implémentée par l'utilisateur renvoie une promesse, appelez la méthode then de cette promesse et transmettez les rappels de réussite et d'échec pour faciliter la gestion des exceptions.
Le code principal de la méthode read pour lire les données de zone à partir du tampon est le suivant :
lisible.lu
fonction fromList(n, état) { // rien n'est mis en mémoire tampon. si (état.longueur === 0) renvoie null ; laisser ret; si (état.objectMode) ret = state.buffer.shift(); else if (!n || n >= state.length) { // Gère le cas où n est vide ou supérieur à la longueur du tampon // Lit tout, tronque la liste. if (state.decoder) // S'il existe un décodeur, sérialisez le résultat dans une chaîne ret = state.buffer.join(''); else if (state.buffer.length === 1) // Il n'y a qu'une seule donnée, renvoie les données du nœud principal ret = state.buffer.first(); else // Stocke toutes les données dans un Buffer ret = state.buffer.concat(state.length); state.buffer.clear(); // Effacer le tampon} else { // Gère la situation où la longueur de lecture est inférieure à la taille du tampon ret = state.buffer.consume(n, state.decoder); } retour à la retraite; }2.4.3. _lire
Une méthode qui doit être implémentée lorsque les utilisateurs initialisent un flux Readable. Vous pouvez appeler la méthode push dans cette méthode pour déclencher en continu la méthode read. Lorsque nous poussons null, nous pouvons arrêter l'opération d'écriture du flux.
Exemple de code :
lisible._read
const Stream = require('stream'); const readableStream = nouveau Stream.Readable({ lire (hwm) { this.push(String.fromCharCode(this.currentCharCode++)); si (this.currentCharCode > 122) { this.push(null); } }, }); readableStream.currentCharCode = 97 ; readableStream.pipe(process.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4. tuyau (important)
Liez un ou plusieurs flux inscriptibles au flux lisible actuel et faites passer le flux lisible en mode fluide.
Il existe de nombreux handles d’écoute d’événements dans cette méthode, et je ne les présenterai pas un par un ici :
lisible.pipe
Readable.prototype.pipe = fonction (dest, pipeOpts) { const src = ceci ; const state = this._readableState ; state.pipes.push(dest); // Collecte le flux inscriptible src.on('data', ondata); fonction ondata (morceau) { const ret = dest.write(morceau); si (ret === faux) { pause(); } } // Dites à la destination vers laquelle il est redirigé. dest.emit('pipe', src); // Démarre le flux si le flux est en mode pause if (dest.writableNeedDrain === true) { si (état.flux) { pause(); } } sinon si (!state.flowing) { src.resume(); } retourner la destination ; }
L'opération de canal est très similaire à l'opérateur de canal Linux '|', changeant la sortie gauche en entrée droite. Cette méthode collecte le flux inscriptible pour la maintenance et déclenche l'événement 'data' lorsque le flux lisible est disponible.
Lorsque les données sortent, l'événement d'écriture du flux inscriptible sera déclenché, afin que les données puissent être transférées et que des opérations telles qu'un pipeline puissent être réalisées. Et changera automatiquement le flux lisible en mode pause en mode fluide.
2.4.5. reprendreBasculez le flux du mode « pause » au mode « flux ». Si l'écouteur d'événements « lisible » est défini, cette méthode n'a aucun effet.
lisible.resume
Readable.prototype.resume = fonction() { const state = this._readableState ; si (!state.flowing) { state.flowing = !state.readableListening; // Le fait qu'il soit en mode fluide dépend de la définition ou non du handle d'écoute « lisible » curriculum vitae (this, state); } } ; fonction reprendre (flux, état) { if (!state.resumeScheduled) { // Basculez pour que la méthode curriculum vitae_ ne soit appelée qu'une seule fois dans le même Tick state.resumeScheduled = true; process.nextTick(resume_, stream, state); } } fonction curriculum vitae_(flux, état) { si (!state.reading) { flux.read(0); } state.resumeScheduled = faux ; stream.emit('resume'); flux(flux); } function flow(stream) { // Lorsque le flux est en mode streaming, cette méthode continuera à lire les données du tampon jusqu'à ce que le tampon soit vide const state = stream._readableState; while (state.flowing && stream.read() !== null); // Étant donné que la méthode read sera appelée ici et que le flux de l'écouteur d'événement 'readable' est défini, la méthode read peut également être appelée. // Cela entraîne des données incohérentes (n'affecte pas les données, affecte uniquement l'appel de la méthode read dans le rappel d'événement 'readable' pour lire les données) }2.4.6. pause
Changez le flux du mode fluide au mode pause, arrêtez de déclencher l'événement « données » et enregistrez toutes les données dans le tampon.
lisible.pause
Readable.prototype.pause = fonction() { if (this._readableState.flowing !== false) { debug('pause'); this._readableState.flowing = false; this.emit('pause'); } rends ceci ; } ;
2.5. Utilisation et mécanisme de travail
La méthode d'utilisation a été mentionnée dans la section BufferList. Créez une instance Readable et implémentez sa méthode _read(), ou implémentez la méthode read dans le premier paramètre objet du constructeur.
2.5.1. Mécanisme de travailIci, nous dessinons uniquement le processus général et les conditions de déclenchement de la conversion de mode du flux Readable.
dans: