Hay 4 tipos de flujos de nodos: 1. Legible (flujo legible). Es necesario implementar el método "_read" para devolver contenido; 2. Escribible (flujo grabable), es necesario implementar el método "_write" para aceptar contenido; 3. Dúplex (flujo legible y grabable), "_read" y "; Es necesario implementar el método _write" para aceptar y devolver contenido; 4. Transformar (flujo de conversión), debe implementar el método "_transform" para convertir el contenido recibido y devolver el contenido.
El entorno operativo de este tutorial: sistema Windows 7, nodejs versión 16, computadora DELL G3.
Stream es un concepto muy básico en Nodejs. Muchos módulos básicos se implementan en función de streams y juegan un papel muy importante. Al mismo tiempo, el flujo también es un concepto muy difícil de entender. Esto se debe principalmente a la falta de documentación relevante. Para los principiantes de NodeJ, a menudo lleva mucho tiempo comprender el flujo antes de que realmente puedan dominar este concepto. para la mayoría de NodeJ, es Para los usuarios, solo se usa para desarrollar aplicaciones web. La comprensión insuficiente de los flujos no afecta su uso. Sin embargo, comprender los flujos puede conducir a una mejor comprensión de otros módulos en NodeJ y, en algunos casos, usar flujos para procesar datos tendrá mejores resultados.
Stream es una interfaz abstracta para procesar datos de transmisión en Node.js. Stream no es una interfaz real, sino un término general para todas las transmisiones. Las interfaces reales incluyen ReadableStream, WritableStream y ReadWriteStream.
interfaz ReadableStream extiende EventEmitter { legible: booleano; leer (tamaño?: número): cadena | setEncoding (codificación: BufferEncoding): esto; T extiende WritableStream>(destino: T, opciones?: {fin?: boolean | indefinido; }): T; unpipe(destino?: WritableStream): this; ; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;}interface WritableStream extiende EventEmitter { writable: write(buffer: Uint8Array | string, cb?: (¿err?: Error | nulo) => nulo): booleano; escribir(cadena: cadena, codificación?: BufferEncoding, cb?: (err?: Error | nulo) => nulo): final (cb?: () => nulo; ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, codificación?: BufferEncoding, cb?: () => void): this;}interfaz ReadWriteStream extiende ReadableStream, WritableStream { }Se puede ver que ReadableStream y WritableStream son interfaces que heredan la clase EventEmitter (las interfaces en ts pueden heredar clases porque solo están fusionando tipos).
Las clases de implementación correspondientes a las interfaces anteriores son legibles, grabables y dúplex respectivamente.
Hay 4 tipos de transmisiones en NodeJs:
Flujo legible (implementa ReadableStream)
Flujo grabable grabable (implementa WritableStream)
Duplex es una secuencia que se puede leer y escribir (implementa WritableStream después de heredar Readable)
Transformar flujo de conversión (heredado de Duplex)
Todos ellos tienen métodos para implementar:
Readable necesita implementar el método _read para devolver contenido
Writable necesita implementar el método _write para aceptar contenido
Duplex necesita implementar los métodos _read y _write para aceptar y devolver contenido
Transform necesita implementar el método _transform para convertir el contenido recibido y devolverlo.
Legible es un tipo de flujo. Tiene dos modos y tres estados.
Dos modos de lectura:
Modo de flujo: los datos se leerán y escribirán desde el sistema subyacente en el búfer. Cuando el búfer esté lleno, los datos se pasarán automáticamente al controlador de eventos registrado lo más rápido posible a través de EventEmitter.
Modo de pausa: en este modo, EventEmitter no se activará activamente para transmitir datos. El método Readable.read() debe llamarse explícitamente para leer datos del búfer y activará una respuesta al evento EventEmitter.
Tres estados:
readableFlowing === nulo (estado inicial)
readableFlowing === false (modo de pausa)
readableFlowing === verdadero (modo fluido)
El readable.readableFlowing de la secuencia es inicialmente nulo.
Se vuelve verdadero después de agregar el evento de datos. Cuando se llama a pausa(), unpipe(), se recibe contrapresión o se agrega un evento legible, readableFlowing se establecerá en falso. En este estado, vincular un oyente al evento de datos no cambiará readableFlowing a verdadero.
Llamar a resume() puede cambiar el flujo legible de la secuencia legible a verdadero.
Eliminar todos los eventos legibles es la única forma de hacer que readableFlowing sea nulo.
Descripción del nombre del evento legible se activa cuando hay nuevos datos legibles en el búfer (se activará cada vez que se inserte un nodo en el grupo de caché) los datos se activarán cada vez que se consuman datos. El parámetro son los datos consumidos esta vez y. la secuencia de error se activa cuando se cierra la secuencia de cierre. Cuando se produce un error, el nombre del método de activación indica que read(size) consume datos con una longitud de tamaño. Se devuelven los datos consumidos esta vez. Cuando no se pasa el tamaño, significa consumir todos los datos en el grupo de caché const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// pool de caché float value})readStreams. on('readable', () => { console.log('buffer full') readStreams.read()// Consume todos los datos en el grupo de buffer, devuelve el resultado y activa el evento de datos}) readStreams.on('datos ', (datos) => { console.log('datos')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
Cuando el tamaño es 0, se activará el evento legible.
Cuando la longitud de los datos en el grupo de caché alcanza el valor flotante highWaterMark, no solicitará activamente datos de producción, sino que esperará a que se consuman antes de producirlos.
Si la secuencia en estado de pausa no llama a leer para consumir datos, los datos y los legibles no se activarán más tarde. Cuando se llama a leer para consumir, primero determinará si la longitud de los datos restantes después de este consumo es menor que el flotante. Si es inferior al valor flotante, se solicitarán los datos de producción antes del consumo. De esta manera, una vez completada la ejecución lógica después de la lectura, lo más probable es que se hayan producido nuevos datos y luego se activen nuevamente los legibles. Este mecanismo de producir los siguientes datos consumidos por adelantado y almacenarlos en el grupo de caché también lo es. la razón por la cual el flujo de caché es rápido.
Hay dos situaciones de flujo en el estado de flujo.
Cuando la velocidad de producción es más lenta que la velocidad de consumo: en este caso, generalmente no quedarán datos en el grupo de caché después de cada dato de producción, y los datos producidos esta vez se pueden pasar directamente al evento de datos (porque no ingrese al grupo de caché, por lo que tampoco es necesario llamar a lectura para consumir), y luego comience inmediatamente a producir nuevos datos. Los nuevos datos no se producirán hasta que se consuman los últimos datos. . Cuando la velocidad de producción es más rápida que la velocidad de consumo: en este momento, después de cada producción de datos, generalmente hay datos no consumidos en el grupo de caché. En este caso, el siguiente consumo de datos generalmente comenzará cuando se consuman los datos y después. los datos antiguos se consumen, los datos nuevos se producen y se colocan en el grupo de cachéLa única diferencia entre ellos es si los datos todavía existen en el grupo de caché después de que se producen. Si los datos existen, los datos producidos se enviarán al grupo de caché para esperar el consumo. entregarse directamente a los datos sin agregarlos al grupo de caché.
Vale la pena señalar que cuando una secuencia con datos en un grupo de caché ingresa al modo de flujo desde el modo de pausa, se llamará a lectura en un bucle para consumir los datos hasta que se devuelva un valor nulo.
En el modo de pausa, cuando se crea una secuencia legible, el modo es el modo de pausa. Después de la creación, se llama automáticamente al método _read para enviar datos desde la fuente de datos al grupo de búfer hasta que los datos en el grupo de búfer alcancen el valor flotante. Siempre que los datos alcancen el valor flotante, la secuencia legible activará un evento "legible" para indicarle al consumidor que los datos están listos y pueden seguir consumiéndose.
En términos generales, el evento "legible" indica nueva actividad en la transmisión: o hay nuevos datos o se ha llegado al final de la transmisión. Por lo tanto, antes de leer los datos de la fuente de datos, también se activará el evento "legible";
En la función de controlador del evento "legible" del consumidor, los datos en el grupo de búfer se consumen activamente a través de stream.read (tamaño).
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // El método de lectura del parámetro se utilizará como el método _read de la secuencia para obtener los datos de origen leídos( size) { // Supongamos que nuestros datos de origen tienen 1000 1s let chunk = null // El proceso de lectura de datos es generalmente asincrónico, como la operación IO setTimeout(() => { if (count > 0) { let chunkLength = Math .min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' se activará cada vez que se envíen los datos enviado con éxito al grupo de caché) readable', () => { const chunk = myReadable.read()//Consume todos los datos en el grupo de caché actual console.log(chunk.toString())})Vale la pena señalar que si el tamaño de lectura (tamaño) es mayor que el valor flotante, el nuevo valor flotante se volverá a calcular y el nuevo valor flotante es la siguiente segunda potencia del tamaño (tamaño <= 2 ^ n, n toma el valor mínimo)
// hwm no será mayor que 1GB.const MAX_HWM = 0x40000000; function ComputeNewHighWaterMark(n) { if (n >= MAX_HWM) { // Límite de 1 GB n = MAX_HWM } else { // Elimina la siguiente potencia más alta de 2 a; evitar aumento excesivo hwm n--; n |= n >>> 1;= n >>> 2;= n >>> 4; > 16 ; n++; } devolver n;}Todas las transmisiones legibles comienzan en modo de pausa y se pueden cambiar al modo de flujo mediante los siguientes métodos:
Agregue el controlador de eventos de "datos"; llame al método "reanudar"; use el método "tubería" para enviar datos a una secuencia de escrituraEn el modo de flujo, los datos en el grupo de búfer se enviarán automáticamente al consumidor para su consumo. Al mismo tiempo, después de cada salida de datos, se volverá a llamar automáticamente al método _read para colocar los datos de la fuente de datos en el grupo de búfer. Si el grupo de búfer es Si no hay datos, los datos se pasarán directamente al evento de datos sin pasar por el grupo de caché hasta que el modo de flujo cambie a otros modos de pausa o se lean los datos de la fuente de datos (push). (nulo));
Las transmisiones legibles se pueden volver al modo pausado a través de:
Si no hay un objetivo de canalización, se llama a stream.pause(). Si hay objetivos de canalización, elimina todos los objetivos de canalización. Se pueden eliminar varios objetivos de tubería llamando a stream.unpipe(). const { Legible } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, read(size) { let fragment = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(recuento, tamaño) fragmento = '1'.repeat(chunkLength) recuento -= fragmentoLongitud } this.push(fragmento) }, 500) }})myReadable.on('datos', datos => { consola .log(datos.toString())})En comparación con las transmisiones legibles, las transmisiones grabables son más simples.
Cuando el productor llama a write(chunk), elegirá internamente si almacenarlo en caché en la cola del búfer o llamar a _write en función de algún estado (taponado, escribiendo, etc.) Después de cada vez que se escriban los datos, intentará borrarlos. los datos en la cola de caché. Si el tamaño de los datos en la cola del búfer excede el valor flotante (highWaterMark), el consumidor devolverá falso después de llamar a write (chunk). En este momento, el productor debería dejar de escribir.
Entonces, ¿cuándo podré seguir escribiendo? Cuando todos los datos en el búfer se hayan escrito correctamente, el evento de drenaje se activará después de que se borre la cola del búfer. En este momento, el productor puede continuar escribiendo datos.
Cuando el productor necesita terminar de escribir datos, debe llamar al método stream.end para notificar el final de la secuencia de escritura.
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// se usará como método _write setTimeout(() = >{ fileContent += chunk callback()// Se llama después de completar la escritura}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable escribir('123123')// truemyWritable.write('123123')// falsemyWritable.end().Tenga en cuenta que una vez que los datos en el grupo de caché alcanzan el valor flotante, puede haber varios nodos en el grupo de caché en este momento. Durante el proceso de borrar el grupo de caché (llamada cíclica _read), no consumirá la misma longitud que el. flujo legible. Los datos del valor flotante se consumen un nodo de búfer a la vez, incluso si la longitud del búfer no es consistente con el valor flotante.
const { Writable } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(fragmento, codificación, devolución de llamada) { setTimeout(()=>{ fileContent += fragmento console.log ('Consumo', chunk.toString()) callback()// Se llama después de completar la escritura}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})let count = 0function ProductionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}producciónData()myWritable.on('drenaje', producciónData)Lo anterior es una secuencia de escritura con un valor flotante de 10. Ahora la fuente de datos es una cadena numérica continua de 0 a 20, y ProductionData se usa para escribir datos.
Primero, cuando se llama a myWritable.write ("0") por primera vez, debido a que no hay datos en el grupo de caché, "0" no ingresa al grupo de caché, sino que se entrega directamente a _wirte El valor de retorno de myWritable. .write("0") es verdadero
Cuando se ejecuta myWritable.write ("1"), debido a que la devolución de llamada de _wirte aún no se ha llamado, indica que los últimos datos aún no se han escrito. La posición garantiza el orden de la escritura de datos. para almacenar "1". Agregar al grupo de caché. Esto es cierto para los próximos 2-9
Cuando se ejecuta myWritable.write ("10"), la longitud del búfer es 9 (1-9) y aún no ha alcanzado el valor flotante. "10" continúa agregándose al grupo de caché como búfer, y el grupo de caché. la longitud es 11, por lo que myWritable.write ("1") devuelve falso, lo que significa que los datos en el búfer son suficientes y debemos esperar a que la notificación del evento de drenaje produzca datos nuevamente.
Después de 100 ms, se llama a la devolución de llamada de _write ("0", codificación, devolución de llamada), lo que indica que se ha escrito "0". Luego verificará si hay datos en el grupo de caché. Si existen, primero llamará a _read para consumir el nodo principal del grupo de caché ("1") y luego continuará repitiendo este proceso hasta que el grupo de caché esté vacío. , active el evento de drenaje y ejecute ProductionData nuevamente.
Llame a myWritable.write("11") para activar el proceso que comienza en el paso 1 hasta el final de la secuencia.
Después de comprender la secuencia legible y la secuencia grabable, la secuencia dúplex es fácil de entender. La secuencia dúplex en realidad hereda la secuencia legible y luego implementa la secuencia grabable (el código fuente está escrito así, pero debe decirse que está implementado). al mismo tiempo, es mejor tener flujos legibles y grabables).
El flujo dúplex necesita implementar los dos métodos siguientes al mismo tiempo
Implemente el método _read() para producir datos para transmisiones legibles.
Implemente el método _write() para consumir datos para transmisiones grabables
La forma de implementar los dos métodos anteriores se ha introducido en los flujos de escritura y lectura anteriores. Lo que debe tenerse en cuenta aquí es que hay dos grupos de búfer independientes para los flujos dúplex, respectivamente, y sus fuentes de datos tampoco son las mismas.
Tome el flujo de entrada y salida estándar de NodeJs como ejemplo:
Cuando ingresamos datos en la consola, se activa su evento de datos, lo que demuestra que tiene la función de una secuencia legible. Cada vez que el usuario ingresa, es equivalente a llamar al método push legible para enviar los datos producidos. Cuando llamamos a su método de escritura, también podemos enviar contenido a la consola, pero el evento de datos no se activará. Esto muestra que tiene la función de una secuencia de escritura y tiene un búfer independiente. La implementación del método _write es. permitir que la consola muestre texto. // Cada vez que el usuario ingresa datos en la consola (_read), se activará el evento de datos, que es una característica del flujo legible Process.stdin.on('data', data=>{ process.stdin.write(data })// Produce datos en el flujo de entrada estándar cada segundo (esta es una característica de un flujo grabable, que se enviará directamente a la consola) y no activará datasetInterval(()=>{ process.stdin.write ('no son datos ingresados por la consola del usuario')}, 1000)Una secuencia dúplex se puede considerar como una secuencia legible con una secuencia grabable. Ambos son independientes, cada uno con buffers internos independientes. Los eventos de lectura y escritura ocurren de forma independiente.
Transmisión dúplex ------------------| Lectura <----- Fuente externa Usted ------------------| Escribir -----> Disipador externo ------------------|Los flujos de transformación son dúplex, donde las lecturas y escrituras ocurren en una relación de causa y efecto. Los puntos finales de una secuencia dúplex están vinculados mediante alguna transformación. Una lectura requiere que se produzca una escritura.
Transformar secuencia --------------|-------------- Escribes ----> ----> Lees ----- ----------|--------------Para crear transmisiones Transform, lo más importante es implementar el método _transform en lugar de _write o _read. En _transform, los datos escritos por la secuencia grabable se procesan (consumen) y luego se producen los datos para la secuencia legible.
Las secuencias de conversión a menudo implementan un método `_flush`, que se llamará antes del final de la secuencia. Generalmente se usa para agregar algo al final de la secuencia. Por ejemplo, aquí se agrega cierta información de compresión al comprimir archivos const {escribir. } = require('fs')const { Transform, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const transform = new Transform({ highWaterMark: 10, transform(chunk,encoding, ){ // Convertir datos, llame a push para agregar el resultado de la conversión al grupo de caché this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){//Ejecute this.push ('antes del final activa <<<') callback() }})// escribe continuamente escribe datos let count = 0transform.write('>>>')function productionData() { let flag = true while (count <= 20 && bandera) { bandera = transform.write(count.toString()) count++ } if (count > 20) { transform.end() }}productionData()transform.on('drain', ProductionData)let resultado = '' transform.on( 'datos', datos=>{ resultado += data.toString()})transform.on('end', ()=>{ console.log(resultado) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})