Il existe 4 types de flux de nœuds : 1. Lisible (flux lisible). La méthode "_read" doit être implémentée pour renvoyer le contenu ; 2. Writable (flux inscriptible), la méthode "_write" doit être implémentée pour accepter le contenu ; 3. Duplex (flux lisible et inscriptible), le "_read" et " Les méthodes _write" doivent être implémentées. Pour accepter et renvoyer le contenu ; 4. Transformation (flux de conversion), vous devez implémenter la méthode "_transform" pour convertir le contenu reçu et renvoyer le contenu.
L'environnement d'exploitation de ce tutoriel : système Windows 7, nodejs version 16, ordinateur DELL G3.
Stream est un concept très basique dans Nodejs. De nombreux modules de base sont implémentés sur la base de flux et jouent un rôle très important. Dans le même temps, le flux est également un concept très difficile à comprendre. Cela est principalement dû au manque de documentation pertinente. Pour les débutants en NodeJ, il faut souvent beaucoup de temps pour comprendre le flux avant de pouvoir réellement maîtriser ce concept. pour la plupart des NodeJ, c'est le cas. Pour les utilisateurs, il n'est utilisé que pour développer des applications Web. Une compréhension insuffisante des flux n'affecte pas leur utilisation. Cependant, comprendre les flux peut conduire à une meilleure compréhension des autres modules de NodeJs et, dans certains cas, l'utilisation de flux pour traiter les données donnera de meilleurs résultats.
Stream est une interface abstraite pour traiter les données en streaming dans Node.js. Stream n'est pas une interface réelle, mais un terme général désignant tous les flux. Les interfaces réelles incluent ReadableStream, WritableStream et ReadWriteStream.
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | setEncoding(encoding: BufferEncoding): this; T extends WritableStream>(destination : T, options ? : { end ? : boolean | undefined ; }) : T; unpipe(destination ? : WritableStream) : this ; ; wrap(oldStream : ReadableStream) : this ; [Symbol.asyncIterator]() : AsyncIterableIterator<string | Buffer>;}interface WritableStream extends EventEmitter { writable : boolean; Erreur | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this;}interface ReadWriteStream étend ReadableStream, WritableStream { }On peut voir que ReadableStream et WritableStream sont toutes deux des interfaces qui héritent de la classe EventEmitter (les interfaces dans ts peuvent hériter de classes, car elles ne font que fusionner des types).
Les classes d'implémentation correspondant aux interfaces ci-dessus sont respectivement Readable, Writable et Duplex.
Il existe 4 types de flux dans NodeJs :
Flux Flux lisible (implémente ReadableStream)
Flux inscriptible (implémente WritableStream)
Duplex est un flux lisible et inscriptible (implémentant WritableStream après avoir hérité de Readable)
Flux de conversion de transformation (hérité de Duplex)
Ils ont tous des méthodes à mettre en œuvre :
Readable doit implémenter la méthode _read pour renvoyer le contenu
Writable doit implémenter la méthode _write pour accepter le contenu
Duplex doit implémenter les méthodes _read et _write pour accepter et renvoyer le contenu
Transform doit implémenter la méthode _transform pour convertir le contenu reçu et le renvoyer
Lisible est un type de flux il a deux modes et trois états.
Deux modes de lecture :
Mode flux : les données seront lues et écrites du système sous-jacent dans le tampon. Lorsque le tampon est plein, les données seront automatiquement transmises au gestionnaire d'événements enregistré le plus rapidement possible via EventEmitter.
Mode pause : dans ce mode, EventEmitter ne sera pas activement déclenché pour transmettre des données. La méthode Readable.read() doit être explicitement appelée pour lire les données du tampon. La lecture déclenchera une réponse à l'événement EventEmitter.
Trois états :
readableFlowing === null (état initial)
readableFlowing === false (mode pause)
readableFlowing === true (mode fluide)
Le readable.readableFlowing du flux est initialement nul.
Cela devient vrai après l'ajout de l'événement de données. Lorsque pause(), unpipe() est appelé, ou qu'une contre-pression est reçue ou qu'un événement lisible est ajouté, readableFlowing sera défini sur false. Dans cet état, la liaison d'un écouteur à l'événement de données ne fera pas passer readableFlowing à true.
L’appel de curriculum vitae() peut faire passer le readableFlowing du flux lisible à true.
La suppression de tous les événements lisibles est le seul moyen de rendre readableFlowing nul.
Description du nom de l'événement lisible est déclenché lorsqu'il y a de nouvelles données lisibles dans le tampon (il sera déclenché à chaque fois qu'un nœud est inséré dans le pool de cache) les données seront déclenchées à chaque fois que les données sont consommées. Le paramètre correspond aux données consommées cette fois-ci et. le flux d'erreur est déclenché lorsque le flux de fermeture est fermé. Lorsqu'une erreur se produit, le nom de la méthode de déclenchement indique que read(size) consomme des données d'une longueur de size. Le renvoi de null indique que les données actuelles sont inférieures à size. les données consommées cette fois sont renvoyées. Lorsque la taille n'est pas transmise, cela signifie consommer toutes les données du pool de cache const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// cache pool float value})readStreams. on('readable', () => { console.log('buffer full') readStreams.read()// Consomme toutes les données du pool de mémoire tampon, renvoie le résultat et déclenche l'événement de données}) readStreams.on('data', (data) => { console.log('data')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
Lorsque la taille est 0, l'événement lisible sera déclenché.
Lorsque la longueur des données dans le pool de cache atteint la valeur flottante highWaterMark, il ne demandera pas activement de données de production, mais attendra que les données soient consommées avant de produire des données.
Si le flux en pause n'appelle pas read pour consommer des données, data et readable ne seront pas déclenchés ultérieurement. Lorsque read est appelé pour consommer, il déterminera d'abord si la longueur des données restantes après cette consommation est inférieure au float. Si elle est inférieure à la valeur flottante, les données de production seront demandées avant la consommation. De cette façon, une fois l'exécution logique après la lecture terminée, les nouvelles données auront très probablement été produites, puis lisibles seront à nouveau déclenchées. Ce mécanisme de production à l'avance des données consommées suivantes et de leur stockage dans le pool de cache est également. la raison pour laquelle le flux de cache est rapide.
Il existe deux situations d'écoulement à l'état d'écoulement
Lorsque la vitesse de production est inférieure à la vitesse de consommation : Dans ce cas, il n'y aura généralement plus de données restantes dans le pool de cache après chaque donnée de production, et les données produites cette fois pourront être directement transmises à l'événement data (car ce n'est pas le cas). entrez dans le pool de cache, il n'est donc pas nécessaire d'appeler read pour consommer), puis commencez immédiatement à produire de nouvelles données. Les nouvelles données ne seront pas produites jusqu'à ce que les dernières données soient à nouveau déclenchées jusqu'à la fin du flux. . Lorsque la vitesse de production est plus rapide que la vitesse de consommation : A ce moment, après chaque production de données, il y a généralement des données non consommées dans le pool de cache. Dans ce cas, la prochaine consommation de données commencera généralement lorsque les données seront consommées, et après. les anciennes données sont consommées, de nouvelles données ont été produites et placées dans le pool de cacheLa seule différence entre eux est de savoir si les données existent toujours dans le pool de cache après leur production. Si les données existent, les données produites seront poussées vers le pool de cache pour attendre leur consommation. être transmis directement aux données sans les ajouter au pool de cache.
Il convient de noter que lorsqu'un flux contenant des données dans un pool de cache entre en mode flux à partir du mode pause, read sera appelé en boucle pour consommer les données jusqu'à ce que null soit renvoyé.
En mode pause, lorsqu'un flux lisible est créé, le mode est le mode pause. Après la création, la méthode _read est automatiquement appelée pour transmettre les données de la source de données vers le pool de mémoire tampon jusqu'à ce que les données du pool de mémoire tampon atteignent la valeur flottante. Chaque fois que les données atteignent la valeur flottante, le flux lisible déclenchera un événement « lisible » pour indiquer au consommateur que les données sont prêtes et peuvent continuer à être consommées.
D'une manière générale, l'événement 'readable' indique une nouvelle activité sur le flux : soit il y a de nouvelles données, soit la fin du flux est atteinte. Par conséquent, avant que les données de la source de données ne soient lues, l'événement « lisible » sera également déclenché ;
Dans la fonction de gestionnaire de l'événement consommateur « lisible », les données du pool de mémoire tampon sont activement consommées via stream.read(size).
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // La méthode read du paramètre sera utilisée comme méthode _read du stream pour obtenir les données source read( size) { / / Supposons que nos données source contiennent 1000 1s let chunk = null // Le processus de lecture des données est généralement asynchrone, comme l'opération IO setTimeout(() => { if (count > 0) { let chunkLength = Math .min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' sera déclenché à chaque fois que les données sont poussé avec succès vers le pool de cache) readable', () => { const chunk = myReadable.read()//Consume toutes les données du pool de cache actuel console.log(chunk.toString())})Il convient de noter que si la taille de read(size) est supérieure à la valeur float, la nouvelle valeur float sera recalculée et la nouvelle valeur float est la seconde puissance suivante de size (size <= 2^n, n prend la valeur minimale)
// hwm ne dépassera pas 1 Go.const MAX_HWM = 0x40000000 ; function calculateNewHighWaterMark(n) { if (n >= MAX_HWM) { // Limite de 1 Go n = MAX_HWM } else { // Supprime la puissance la plus élevée suivante de 2 à empêcher une augmentation excessive hwm n--; n |= n >>> 1;= n >>> 2;= n >>> 4; > 16 ; n++; } renvoie n;}Tous les flux lisibles démarrent en mode pause et peuvent être basculés en mode flux via les méthodes suivantes :
Ajoutez un gestionnaire d'événements "data" ; appelez la méthode "resume" ; utilisez la méthode "pipe" pour envoyer des données au flux inscriptible.En mode flux, les données du pool tampon seront automatiquement envoyées au consommateur pour consommation. Dans le même temps, après chaque sortie de données, la méthode _read sera automatiquement rappelée pour placer les données de la source de données dans le pool tampon. . Si le pool de mémoire tampon est S'il n'y a pas de données, les données seront transmises directement à l'événement de données sans passer par le pool de cache ; jusqu'à ce que le mode de flux passe à d'autres modes de pause ou que les données de la source de données soient lues (push). (nul));
Les flux lisibles peuvent être remis en mode pause via :
S'il n'y a pas de cible de pipeline, stream.pause() est appelé. S’il existe des cibles de pipeline, supprime toutes les cibles de pipeline. Plusieurs cibles de canal peuvent être supprimées en appelant stream.unpipe(). const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark : 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})Comparés aux flux lisibles, les flux inscriptibles sont plus simples.
Lorsque le producteur appelle write(chunk), il choisira en interne s'il doit le mettre en cache dans la file d'attente du tampon ou appeler _write en fonction d'un certain statut (bouché, écriture, etc.). Après chaque écriture des données, il essaiera de les effacer. les données dans la file d'attente du cache. Si la taille des données dans la file d'attente du tampon dépasse la valeur flottante (highWaterMark), le consommateur renverra false après avoir appelé write(chunk). À ce moment, le producteur doit arrêter d'écrire.
Alors, quand puis-je continuer à écrire ? Lorsque toutes les données du tampon ont été écrites avec succès, l'événement de drainage sera déclenché une fois la file d'attente du tampon effacée. À ce moment, le producteur peut continuer à écrire des données.
Lorsque le producteur doit terminer l'écriture des données, il doit appeler la méthode stream.end pour notifier la fin du flux inscriptible.
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// sera utilisé comme méthode _write setTimeout(() = >{ fileContent += chunk callback()// Appelé une fois l'écriture terminée}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable . write('123123')// truemyWritable.write('123123')// falsemyWritable.end()Notez qu'une fois que les données du pool de cache ont atteint la valeur flottante, il peut y avoir plusieurs nœuds dans le pool de cache à ce moment-là. Pendant le processus d'effacement du pool de cache (appel cyclique _read), il ne consommera pas la même longueur que le pool de cache. flux lisible. Les données de la valeur flottante sont consommées un nœud tampon à la fois, même si la longueur du tampon est incohérente avec la valeur flottante.
const { Writable } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark : 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log ('Consumption', chunk.toString()) callback()// Appelé une fois l'écriture terminée}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})let count = 0function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}productionData()myWritable.on('drain', productionData)Ce qui précède est un flux inscriptible avec une valeur flottante de 10. Désormais, la source de données est une chaîne numérique continue de 0 à 20 et productionData est utilisée pour écrire des données.
Premièrement, lorsque myWritable.write("0") est appelé pour la première fois, car il n'y a aucune donnée dans le pool de cache, "0" n'entre pas dans le pool de cache, mais est directement donné à _wirte. La valeur de retour de myWritable. .write("0") est vrai
Lorsque myWritable.write("1") est exécuté, car le rappel de _wirte n'a pas encore été appelé, cela indique que les dernières données n'ont pas encore été écrites. La position garantit l'ordre de l'écriture des données. Un seul tampon peut être créé. pour stocker "1". " Ajouter au pool de cache. C'est vrai pour les 2-9 prochains
Lorsque myWritable.write("10") est exécuté, la longueur du tampon est de 9 (1-9) et n'a pas encore atteint la valeur flottante "10" continue d'être ajoutée au pool de cache en tant que tampon, et le pool de cache. length devient 11, donc myWritable.write("1") renvoie false, ce qui signifie que les données dans le tampon sont suffisantes et que nous devons attendre la notification de l'événement de drainage pour produire à nouveau des données.
Après 100 ms, le rappel de _write("0", encoding, callback) est appelé, indiquant que "0" a été écrit. Ensuite, il vérifiera s'il y a des données dans le pool de cache. S'il existe, il appellera d'abord _read pour consommer le nœud principal du pool de cache ("1"), puis continuera à répéter ce processus jusqu'à ce que le pool de cache soit vide. , déclenchez l'événement drain et exécutez à nouveau productionData.
Appelez myWritable.write("11") pour déclencher le processus commençant à l'étape 1 jusqu'à la fin du flux.
Après avoir compris le flux lisible et le flux inscriptible, le flux duplex est facile à comprendre. Le flux duplex hérite en fait du flux lisible puis implémente le flux inscriptible (le code source est écrit comme ceci, mais il faut dire qu'il est implémenté). en même temps, il est préférable d'avoir des flux lisibles et inscriptibles).
Le flux duplex doit implémenter les deux méthodes suivantes en même temps
Implémentez la méthode _read() pour produire des données pour des flux lisibles
Implémentez la méthode _write() pour consommer des données pour les flux inscriptibles
La manière d'implémenter les deux méthodes ci-dessus a été introduite dans les flux inscriptibles et lisibles ci-dessus. Ce qu'il convient de noter ici, c'est qu'il existe respectivement deux pools de tampons indépendants pour les flux duplex et que leurs sources de données ne sont pas non plus les mêmes.
Prenons l'exemple du flux d'entrée et de sortie standard de NodeJs :
Lorsque nous saisissons des données dans la console, son événement data est déclenché, ce qui prouve qu'il a la fonction d'un flux lisible. Chaque fois que l'utilisateur saisit, cela équivaut à appeler la méthode push lisible pour pousser les données produites. Lorsque nous appelons sa méthode write, nous pouvons également afficher du contenu sur la console, mais l'événement data ne sera pas déclenché. Cela montre qu'il a la fonction d'un flux inscriptible et qu'il dispose d'un tampon indépendant. permettre à la console d'afficher du texte. // Chaque fois que l'utilisateur saisit des données sur la console (_read), l'événement data sera déclenché, ce qui est une caractéristique du flux lisible process.stdin.on('data', data=>{ process.stdin.write(data ); })// Produise des données dans le flux d'entrée standard toutes les secondes (il s'agit d'une fonctionnalité d'un flux inscriptible, qui sera émis directement sur la console) et ne déclenchera pas datasetInterval(()=>{ process.stdin.write (« n'est pas une donnée saisie par la console utilisateur »)}, 1000)Un flux Duplex peut être considéré comme un flux lisible avec un flux inscriptible. Les deux sont indépendants, chacun avec des tampons internes indépendants. Les événements de lecture et d’écriture se produisent indépendamment.
Flux duplex ------------------| Lire <----- Source externe Vous ------------------| Écrire -----> Récepteur externe -------------------|Les flux de transformation sont duplex, où les lectures et les écritures se produisent dans une relation de cause à effet. Les points de terminaison d'un flux duplex sont liés via une certaine transformation. Une lecture nécessite une écriture pour avoir lieu.
Transform Stream --------------|-------------- Vous écrivez ----> ----> Vous lisez ----- ----------|--------------Pour créer des flux Transform, le plus important est d'implémenter la méthode _transform au lieu de _write ou _read. Dans _transform, les données écrites par le flux inscriptible sont traitées (consommées), puis les données sont produites pour le flux lisible.
Les flux de conversion implémentent souvent une méthode `_flush`, qui sera appelée avant la fin du flux. Elle est généralement utilisée pour ajouter quelque chose à la fin du flux. Par exemple, certaines informations de compression lors de la compression des fichiers sont ajoutées ici const { write. } = require('fs')const { Transformation, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const transform = new Transform({ highWaterMark : 10, rappel){ // Convertir data, Appelez push pour ajouter le résultat de la conversion au pool de cache this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){//Execute this.push (' avant la fin des déclencheurs <<<') callback() }})// write écrit en continu des données let count = 0transform.write('>>>')function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() }}productionData()transform.on('drain', productionData)let result = '' transform.on( 'data', data=>{ result += data.toString()})transform.on('end', ()=>{ console.log(result) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})