1.1. Evolución histórica de los streams
Los streams no son un concepto exclusivo de Nodejs. Se introdujeron hace décadas en el sistema operativo Unix y los programas pueden interactuar entre sí en transmisiones a través del operador de tubería (|).
El operador de tubería (|) se puede utilizar en MacOS y Linux basados en sistemas Unix. Puede convertir la salida del proceso en el lado izquierdo del operador en la entrada en el lado derecho.
En Node, si usamos el readFile tradicional para leer un archivo, el archivo se leerá en la memoria de principio a fin. Cuando se haya leído todo el contenido, el contenido del archivo cargado en la memoria se procesará de manera uniforme.
Hay dos desventajas al hacer esto:
memoria: consume mucha memoria
: debe esperar a que se cargue toda la carga útil de los datos antes de comenzar a procesar los datos.
Para resolver los problemas anteriores, Node. .js siguió e implementó el concepto de secuencias. En Node En la secuencia .js, hay cuatro tipos de secuencias. Todas son instancias de EventEmitter en Node.js:
secuencia legible,
secuencia grabable,
secuencia legible y grabable Full-Duplex (. Duplex Stream)
Transform Stream (Transform Stream)
Para estudiar esta parte en profundidad y comprender gradualmente el concepto de flujos en Node.js, y debido a que la parte del código fuente es relativamente complicada, decidí comenzar a aprender esta parte desde el flujo legible. .
1.2 ¿Qué es una secuencia?
Una secuencia es una estructura de datos abstracta, que es una colección de datos. Los tipos de datos almacenados en ella solo pueden ser los siguientes tipos (solo para el caso de objectMode === false):
Nosotros
.puedo usar la corriente Visto como una colección de estos datos, al igual que los líquidos, primero guardamos estos líquidos en un contenedor (el buffer interno BufferList de la corriente), y cuando se activa el evento correspondiente, vertemos el líquido dentro en la tubería Y avise a otros que coloquen sus propios contenedores al otro lado de la tubería para recoger el líquido del interior y desecharlo.
1.3 ¿Qué es una secuencia legible?
Una secuencia legible es un tipo de secuencia que tiene dos modos, tres estados
y dos modos de lectura:
modo de flujo: los datos se leerán del sistema subyacente y se pasarán a través de EventEmitter lo antes posible. Los datos se pasan al controlador de eventos registrado en
modo de pausa: en este modo, los datos no se leerán y se debe llamar explícitamente al método Stream.read() para leer los datos de la secuencia.
Tres estados:
readableFlowing = =. = null: No se generarán datos al llamar a Stream.pipe() y Stream.resume cambiará su estado a verdadero, comenzará a generar datos y activará activamente el evento
readableFlowing === false: el flujo de datos se suspenderá en este momento. , pero no se suspenderá la generación de datos, por lo que se producirá una acumulación de datos
readableFlowing === true: normalmente genera y consume datos
2.1 Definición del estado interno (ReadableState)
ReadableState
_readableState: ReadableState {. objectMode: false, // Para operar otros tipos de datos excepto cadena, Buffer y null, este modo debe estar activado highWaterMark: 16384, // Límite de nivel de agua, 1024 * 16, predeterminado 16 kb, si se excede este límite , la llamada se detendrá _read() lee datos en el búfer búfer: BufferList { head: null, tail: null, length: 0 }, // Lista vinculada al búfer, utilizada para guardar datos longitud: 0, // El tamaño de todos los datos de flujo legibles, si objectMode es igual a buffer.length pipes: [], // Guarda todas las colas de canalización que monitorean el flujo legible que fluye: null, // El estado del flujo independiente es nulo, falso, verdadero end: false, // Se han consumido todos los datos endEmitted: false, // Si el evento final se ha enviado o no leyendo: false, // Si los datos se están leyendo construido: true, // La secuencia no se puede procesar antes está construido o falla. Destroy sync: true, // Ya sea para activar el evento 'legible'/'datos' sincrónicamente o esperar hasta el siguiente tic. needReadable: false, // Si es necesario enviar el evento legible emittedReadable: false, // El evento legible se ha enviado readableListening: false, // Si hay un evento de escucha legible resumeScheduled: false, // Si el método resume se ha llamado errorEmitted: false, // Error El evento se ha enviado emitClose: true, // Cuando se destruye la secuencia, si se debe enviar el evento de cierre autoDestroy: true, // Se destruye automáticamente, se llama después del 'fin' se activa el evento destroy: false, // Si la secuencia se ha destruido errored: null, // Identifica si la secuencia ha informado un error cerrado: false, // Si la secuencia se ha cerrado closeEmitted: false, // Si se cerró se ha enviado el evento defaultEncoding: 'utf8', // El formato de codificación de caracteres predeterminado awaitDrainWriters: null, // Apunta a la referencia del escritor 'drain' monitoreado del evento, el tipo es nulo, Writable, Set<Writable> multiAwaitDrain: false, // Si hay varios escritores esperando la lectura del evento de drenajeMás: false, // Si se pueden leer más datos dataEmitted: false, // Los datos se han enviado decodificador: nulo, // Codificación del decodificador: nulo, // Codificador[Símbolo(kPausado)]: nulo },
2.2. Implementación de almacenamiento de datos internos (BufferList)
BufferList es un contenedor utilizado para almacenar datos internos en una secuencia. Está diseñado en forma de lista vinculada y tiene tres atributos: encabezado, cola y longitud.
Represento cada nodo en BufferList como un BufferNode, y el tipo de datos internos depende del modo de objeto.
Esta estructura de datos obtiene datos de encabezado más rápido que Array.prototype.shift().
2.2.1. Tipo de almacenamiento de datossi modoobjeto === verdadero:
Entonces los datos pueden ser de cualquier tipo. Se almacenarán los datos que se envíen.
modoobjeto=verdadero
const Corriente = requerir('corriente'); const readableStream = nuevo Stream.Readable({ modo de objeto: verdadero, leer() {}, }); readableStream.push({ nombre: 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(verdadero); console.log(readableStream._readableState.buffer.tail); readableStream.push('lisa'); console.log(readableStream._readableState.buffer.tail); legibleStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Símbolo(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
Resultados de ejecución:
si modoobjeto === falso:
Entonces los datos solo pueden ser cadenas, Buffer o Uint8Array.
modoobjeto=falso
const Corriente = requerir('corriente'); const readableStream = nuevo Stream.Readable({ modo de objeto: falso, leer() {}, }); readableStream.push({ nombre: 'lisa'});
Resultados de ejecución:
2.2.2. Estructura de almacenamiento de datosCreamos una secuencia legible en la consola a través de la línea de comando del nodo para observar los cambios en los datos en el búfer:
Por supuesto, antes de enviar datos, debemos implementar su método _read, o implementar el método read en los parámetros del constructor:
const Corriente = requerir('corriente'); const readableStream = nuevo Stream.Readable(); RS._read = función(tamaño) {}
o
const Corriente = requerir('corriente'); const readableStream = nuevo Stream.Readable({ leer (tamaño) {} });
Después de la operación readableStream.push('abc'), el búfer actual es:
Puede ver que los datos actuales están almacenados. Los datos almacenados al principio y al final son los códigos ASCII de la cadena 'abc' y el tipo es el tipo de búfer. La longitud representa la cantidad de datos guardados actualmente en lugar del tamaño de. el contenido de los datos.
2.2.3. API relacionadasAl imprimir todos los métodos de BufferList, puede obtener:
A excepción de la unión, que serializa BufferList en una cadena, las demás son todas operaciones de acceso a datos.
No explicaré todos los métodos uno por uno aquí, sino que me centraré en consumir, _getString y _getBuffer.
2.2.3.1.consumir
Dirección del código fuente: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
consumir
// Consume una cantidad específica de bytes o caracteres de los datos almacenados en el búfer. consumir(n, hasStrings) { datos constantes = this.head.data; si (n <datos.longitud) { // `slice` es lo mismo para buffers y cadenas. segmento constante = datos.slice(0, n); this.head.data = datos.slice(n); rebanada de retorno; } if (n === datos.longitud) { // El primer fragmento es una combinación perfecta. devolver this.shift(); } // El resultado abarca más de un búfer. devolver hasStrings? esto._getString(n): esto._getBuffer(n); }
Hay tres condiciones de juicio en el código:
Si la longitud en bytes de los datos consumidos es menor que la longitud de los datos almacenados en el nodo principal de la lista vinculada, se toman los primeros n bytes de los datos del nodo principal y se configuran los datos del nodo principal actual. a los datos después del corte.
Si los datos consumidos son exactamente iguales a la longitud de los datos almacenados en el nodo principal de la lista vinculada, los datos del nodo principal actual se devuelven directamente.
Si la longitud de los datos consumidos es mayor que la longitud del nodo principal de la lista vinculada, el último juicio se realizará en función del segundo parámetro pasado para determinar si la capa inferior de la BufferList actual almacena una cadena o un Buffer. .
2.2.3.2.
Dirección del código fuente: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
consumir
// Consume una cantidad específica de bytes de los datos almacenados en el búfer. _getBuffer(n) { const ret = Buffer.allocUnsafe(n); constante retLen = n; sea p = this.head; sea c = 0; hacer { const buf = p.datos; si (n > longitud buf) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.longitud; } demás { if (n === buf.longitud) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++c; si (p.siguiente) this.head = p.siguiente; demás this.head = this.tail = null; } demás { TypedArrayPrototypeSet(ret, nuevo Uint8Array(buf.buffer, buf.byteOffset, n), retLen-n); this.head = p; p.datos = buf.slice(n); } romper; } ++c; } mientras ((p = p.siguiente) !== nulo); esta.longitud -= c; volver atrás; }
En general, es un bucle para operar los nodos en la lista vinculada y crear una nueva matriz de búfer para almacenar los datos devueltos.
Primero, comience a recuperar datos del nodo principal de la lista vinculada y continúe copiándolos en el búfer recién creado hasta que los datos de un determinado nodo sean mayores o iguales a la longitud que se va a recuperar menos la longitud que se ha obtenido.
En otras palabras, después de leer el último nodo de la lista vinculada, no ha alcanzado la longitud deseada, por lo que se devuelve el búfer recién creado.
2.2.3.3._obtenercadena
Dirección del código fuente: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
consumir
// Consume una cantidad específica de caracteres de los datos almacenados en el buffer. _getString(n) { dejar ret = ''; sea p = this.head; sea c = 0; hacer { const cadena = p.datos; si (n > longitud de cadena) { ret += cadena; n -= longitud de cadena; } demás { si (n === longitud de cadena) { ret += cadena; ++c; si (p.siguiente) this.head = p.siguiente; demás this.head = this.tail = null; } demás { ret += StringPrototypeSlice(cadena, 0, n); this.head = p; p.data = StringPrototypeSlice(cadena, n); } romper; } ++c; } mientras ((p = p.siguiente) !== nulo); esta.longitud -= c; volver atrás; }
La operación de cadenas es la misma que la operación de Buffers. También lee datos del encabezado de la lista vinculada en un bucle. Solo hay algunas diferencias en la copia y el almacenamiento de datos. La operación _getString es de tipo cadena.
2.3. ¿Por qué son instancias de flujos legibles de EventEmitter?
Para esta pregunta, primero debemos comprender qué es el modelo de publicación-suscripción. El modelo de publicación-suscripción tiene aplicaciones importantes en la mayoría de las API, ya sea Promise o Redux, las API avanzadas basadas en el modelo de publicación-suscripción se pueden ver en todas partes.
Su ventaja es que puede almacenar funciones de devolución de llamada relacionadas con eventos en la cola y luego notificar a la otra parte que procese los datos en un momento determinado en el futuro, logrando así la separación de preocupaciones. El productor solo produce datos y notifica al consumidor. Mientras que el consumidor solo procesa los eventos correspondientes y sus datos correspondientes, y el modelo de transmisión de Node.js simplemente se ajusta a esta característica.
Entonces, ¿cómo implementa la secuencia Node.js la creación de instancias basadas en EventEmitter?
El código fuente para esto está aquí: stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
legado
función Stream(opciones) { EE.call(esto, opta); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE);
Luego están estas líneas de código en el código fuente de la secuencia legible:
Esta parte del código fuente está aquí: legible https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
legado
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Legible, Stream);
Primero, herede el objeto prototipo de Stream de EventEmitter, para que todas las instancias de Stream puedan acceder a los métodos en EventEmitter.
Al mismo tiempo, los métodos estáticos en EventEmitter también se heredan a través de ObjectSetPrototypeOf (Stream, EE), y en el constructor de Stream, el constructor EE se toma prestado para implementar la herencia de todas las propiedades en EventEmitter y luego en la secuencia legible. utilizar el mismo El método implementa la herencia prototípica y la herencia de propiedades estáticas de la clase Stream, obteniendo así:
Readable.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototipo
por lo tanto:
Readable.prototype.__proto__.__proto__ === EE.prototipo
Por lo tanto, puede encontrar el prototipo de EventEmitter rastreando la cadena de prototipos del flujo legible y realizar la herencia de EventEmitter.
2.4. Implementación de API relacionadas
Las API se mostrarán aquí en el orden en que aparecen en los documentos del código fuente y solo se explicarán las implementaciones principales de las API.
Nota: Aquí solo se interpretan las funciones declaradas en el código fuente de flujo legible de Node.js y no se incluyen las definiciones de funciones introducidas externamente. Para reducir la longitud, no se copiarán todos los códigos.
Prototipo legible
Arroyo { destruir: [Función: destruir], _unstroy: [Función: deshacer], _destroy: [Función (anónima)], push: [Función (anónima)], unshift: [Función (anónima)], isPaused: [Función (anónima)], setEncoding: [Función (anónima)], leer: [Función (anónimo)], _read: [Función (anónima)], tubería: [Función (anónima)], unpipe: [Función (anónimo)], en: [Función (anónima)], addListener: [Función (anónima)], removeListener: [Función (anónima)], apagado: [Función (anónima)], removeAllListeners: [Función (anónima)], currículum: [Función (anónima)], pausa: [Función (anónima)], envoltura: [Función (anónima)], iterador: [Función (anónima)], [Símbolo (nodejs.rejection)]: [Función (anónima)], [Símbolo (Symbol.asyncIterator)]: [Función (anónima)] }2.4.1.
legible.push
Readable.prototype.push = función (fragmento, codificación) { return readableAddChunk(this, fragment, encoding, false); };
La función principal del método push es pasar el bloque de datos a la tubería descendente activando el evento 'datos' o almacenar los datos en su propio búfer.
El siguiente código es un pseudocódigo relevante y solo muestra el proceso principal:
legible.push
función readableAddChunk(flujo, fragmento, codificación, addToFront) { estado constante = flujo._readableState; if (chunk === null) { // empuja la señal de fin de flujo nulo, no se pueden escribir más datos después de ese state.reading = false; onEofChunk(corriente, estado); } else if (!state.objectMode) { // Si no es modo de objeto if (typeof chunk === 'string') { trozo = Buffer.from(trozo); } else if (instancia de fragmento de Buffer) { //Si es Buffer // Procesa la codificación} else if (Stream._isUint8Array(chunk)) { trozo = Stream._uint8ArrayToBuffer(trozo); } más si (fragmento! = nulo) { err = new ERR_INVALID_ARG_TYPE('fragmento', ['cadena', 'Buffer', 'Uint8Array'], fragmento); } } if (state.objectMode || (chunk && chunk.length > 0)) { // Es modo objeto o fragmento es Buffer // Aquí se omite el juicio de varios métodos de inserción de datos addChunk(stream, state, chunk, true); } } función addChunk(flujo, estado, fragmento, addToFront) { if (estado.fluyendo && estado.longitud === 0 && !estado.sync && stream.listenerCount('data') > 0) { // Si está en modo streaming, hay suscriptores escuchando datos stream.emit('data', chunk); } else { // De lo contrario, guarda los datos en el búfer state.length += state.objectMode 1: chunk.length; si (agregar al frente) { state.buffer.unshift(fragmento); } demás { estado.buffer.push(fragmento); } } mayReadMore(stream, state); // Intenta leer un poco más de datos}
La operación de inserción se divide principalmente en juzgar el modo de objeto. Diferentes tipos realizarán diferentes operaciones en los datos entrantes:
El primer juicio de addChunk es principalmente lidiar con la situación en la que Readable está en modo fluido, tiene un detector de datos y los datos del búfer están vacíos.
En este momento, los datos se pasan principalmente a otros programas que se suscriben al evento de datos; de lo contrario, los datos se guardan en el búfer.
2.4.2.Excepto por el juicio de las condiciones límite y el estado del flujo, este método tiene principalmente dos operaciones.
Llame al método _read implementado por el usuario para procesar los resultados de la ejecución.
Leer datos del búfer y activar el evento 'datos'
legible.leer
// Si la longitud de la lectura es mayor que hwm, se recalculará hwm si (n > estado.highWaterMark) { state.highWaterMark = calcularNewHighWaterMark(n); } // Llama al método _read implementado por el usuario try { resultado constante = this._read(state.highWaterMark); si (resultado! = nulo) { const entonces = resultado.entonces; si (tipo de entonces === 'función') { entonces.llamar( resultado, nop, función (errar) { errorOrDestroy(esto, errar); }); } } } atrapar (errar) { errorOrDestroy(esto, errar); }
Si el método _read implementado por el usuario devuelve una promesa, llame al método then de esta promesa y pase las devoluciones de llamada de éxito y fracaso para facilitar el manejo de excepciones.
El código central del método de lectura para leer datos de zona del búfer es el siguiente:
legible.leer
función de Lista (n, estado) { // nada almacenado en buffer. si (estado.longitud === 0) devolver nulo; dejar retirarse; si (estado.objectMode) ret = estado.buffer.shift(); else if (!n || n >= state.length) { // Maneja el caso en el que n está vacío o es mayor que la longitud del buffer // Lee todo, trunca la lista. if (state.decoder) // Si hay un decodificador, serializa el resultado en una cadena ret = state.buffer.join(''); else if (state.buffer.length === 1) // Solo hay un dato, devuelve los datos del nodo principal ret = state.buffer.first(); else // Almacena todos los datos en un Buffer ret = state.buffer.concat(state.length); state.buffer.clear(); // Limpiar el búfer} else { // Manejar la situación en la que la longitud de lectura es menor que el búfer ret = state.buffer.consume(n, state.decoder); } volver atrás; }2.4.3. _leer
Un método que debe implementarse cuando los usuarios inicializan una secuencia legible. Puede llamar al método push en este método para activar continuamente el método de lectura. Cuando presionamos nulo, podemos detener la operación de escritura de la secuencia.
Código de muestra:
legible._read
const Corriente = requerir('corriente'); const readableStream = nuevo Stream.Readable({ leer(hwm) { this.push(String.fromCharCode(this.currentCharCode++)); si (this.currentCharCode > 122) { this.push(nulo); } }, }); readableStream.currentCharCode = 97; readableStream.pipe(proceso.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4. tubería (importante)
Vincule una o más secuencias grabables a la secuencia legible actual y cambie la secuencia legible al modo fluido.
Hay muchos identificadores de escucha de eventos en este método, y no los presentaré uno por uno aquí:
tubería.legible
Readable.prototype.pipe = función(dest, pipeOpts) { fuente constante = esto; estado constante = this._readableState; state.pipes.push(dest); // Recopilar flujo de escritura src.on('data', ondata); función ondata(fragmento) { const ret = destino.write(fragmento); si (ret === falso) { pausa(); } } // Dígale al destino al que se está canalizando. dest.emit('tubería', src); // Iniciar la transmisión si la transmisión está en modo de pausa if (dest.writableNeedDrain === true) { si (estado.fluyendo) { pausa(); } } más si (!estado.fluyendo) { src.resume(); } destino de regreso; }
La operación de canalización es muy similar al operador de canalización de Linux '|', cambiando la salida izquierda a la entrada derecha. Este método recopila la secuencia grabable para mantenimiento y activa el evento 'datos' cuando la secuencia legible está disponible.
Cuando los datos fluyen, se activará el evento de escritura del flujo de escritura, de modo que los datos se puedan transferir y se puedan realizar operaciones como una canalización. Y cambiará automáticamente la transmisión legible en modo de pausa al modo de flujo.
2.4.5.Cambie la transmisión del modo 'pausa' al modo 'flujo'. Si el detector de eventos 'legible' está configurado, este método no tiene ningún efecto.
legible.resume
Readable.prototype.resume = función() { estado constante = this._readableState; si (!estado.fluyendo) { state.flowing = !state.readableListening; // Si está en modo fluido depende de si el identificador de escucha 'legible' está configurado resume(this, state); } }; función reanudar (flujo, estado) { if (!state.resumeScheduled) { // Cambia para que el método resume_ solo se llame una vez en el mismo Tick state.resumeScheduled = true; proceso.nextTick(resume_, flujo, estado); } } función resume_(flujo, estado) { si (!estado.lectura) { flujo.read(0); } state.resumeScheduled = falso; stream.emit('reanudar'); flujo(corriente); } function flow(stream) { // Cuando la transmisión está en modo de transmisión, este método continuará leyendo datos del búfer hasta que el búfer esté vacío const state = stream._readableState; mientras (estado.fluyendo && flujo.read() !== nulo); // Debido a que aquí se llamará al método de lectura y se establece la secuencia del detector de eventos 'legible', también se puede llamar al método de lectura. //Esto da como resultado datos incoherentes (no afecta los datos, solo afecta llamar al método de lectura en la devolución de llamada del evento 'legible' para leer datos) }2.4.6.
Cambie la transmisión del modo fluido al modo pausado, deje de activar el evento de "datos" y guarde todos los datos en el búfer.
legible.pausa
Readable.prototype.pause = función() { si (this._readableState.flowing! == falso) { depurar('pausa'); this._readableState.flowing = false; this.emit('pausa'); } devolver esto; };
2.5. Uso y mecanismo de funcionamiento.
El método de uso se mencionó en la sección BufferList. Cree una instancia legible e implemente su método _read(), o implemente el método de lectura en el primer parámetro de objeto del constructor.
2.5.1. Mecanismo de trabajoAquí solo dibujamos el proceso general y las condiciones de activación de la conversión de modo de la secuencia Readable.
en: