1.1 Evolução histórica dos streams
Streams não são um conceito exclusivo do Nodejs. Eles foram introduzidos há décadas no sistema operacional Unix e os programas podem interagir entre si em fluxos por meio do operador pipe (|).
O operador pipe (|) pode ser usado em MacOS e Linux baseados em sistemas Unix. Ele pode converter a saída do processo no lado esquerdo do operador na entrada no lado direito.
No Node, se usarmos o readFile tradicional para ler um arquivo, o arquivo será lido na memória do início ao fim. Quando todo o conteúdo for lido, o conteúdo do arquivo carregado na memória será processado uniformemente.
Há duas desvantagens em fazer isso:
memória
: ocupa muito
tempo: você precisa esperar que toda a carga útil dos dados seja carregada antes de começar a processar os dados.
.js seguiu e implementou o conceito de streams no Node. No stream .js, existem quatro tipos de streams. Eles são todos instâncias de EventEmitter em Node.js:
Readable Stream,
Writable Stream,
Readable e Writable Full-Duplex Stream (. Duplex Stream)
Transform Stream (Transform Stream)
Para estudar esta parte em profundidade e compreender gradualmente o conceito de streams em Node.js, e porque a parte do código fonte é relativamente complicada, decidi começar a aprender esta parte a partir do stream legível .
1.2. O que é um fluxo?
Um fluxo é uma estrutura de dados abstrata, que é uma coleção de dados. Os tipos de dados armazenados nele só podem ser os seguintes tipos (apenas para o caso de objectMode === false):
We. podemos usar o fluxo Visto como uma coleção desses dados, assim como os líquidos, primeiro salvamos esses líquidos em um contêiner (o buffer interno BufferList do fluxo) e quando o evento correspondente é acionado, despejamos o líquido dentro do tubo . E notifique outras pessoas para colocarem seus próprios recipientes no outro lado do tubo para coletar o líquido de dentro para descarte.
1.3. O que é um fluxo legível?
Um fluxo legível é um tipo de fluxo que possui dois modos, três estados
e dois modos de leitura:
modo de fluxo: os dados serão lidos do sistema subjacente e passados pelo EventEmitter o mais rápido possível. Os dados são passados para o manipulador de eventos registrado no
modo de pausa: Neste modo, os dados não serão lidos e o método Stream.read() deve ser chamado explicitamente para ler os dados do fluxo.
Três estados:
readableFlowing = =. = null: Nenhum dado será gerado. Chamar Stream.pipe() e Stream.resume mudará seu status para verdadeiro, começará a gerar dados e acionará ativamente o evento
readableFlowing === false: O fluxo de dados será suspenso neste momento.
não
Ageração
de dados será suspensa, portanto ocorrerá um backlog de dados
readableFlowing === true: Gera e consome dados normalmente
2.1.
objectMode: false, // Para operar outros tipos de dados exceto string, Buffer e null, este modo precisa ser ativado highWaterMark: 16384, // Limite de nível de água, 1024 * 16, padrão 16kb, se este limite for excedido , a chamada irá parar _read() lê dados no buffer buffer: BufferList { head: null, tail: null, length: 0 }, // Lista vinculada de buffer, usada para salvar dados length: 0, // O tamanho de todos os dados do fluxo legível, se objectMode for igual a buffer.length pipes: [], // Salva todas as filas de tubos que monitoram o fluxo do fluxo legível: null, // O status do fluxo independente é nulo, falso, verdadeiro terminou: falso, // Todos os dados foram consumidos endEMITIDO: falso, // Se o evento final foi enviado ou não foi lido: falso, // Se os dados estão sendo lidos construído: verdadeiro, // O fluxo não pode ser processado antes ele é construído ou falha. Destroy sync: true, // Se deve acionar o evento 'legível'/'dados' de forma síncrona ou esperar até o próximo tick needReadable: false, // Se é necessário enviar o evento legível eidedReadable: false, // O evento legível foi enviado readableListening: false, // Se existe um evento de escuta legível resumeScheduled: false, // Se o método resume foi chamado errorEMITIDO: false, // Erro O evento foi enviado emitClose: true, // Quando o fluxo é destruído, se deve enviar o evento de fechamento autoDestroy: true, // Destruído automaticamente, é chamado após o 'fim' evento é acionado destroy: false, // Se o fluxo foi destruído errored: null, // Identifica se o fluxo reportou um erro closed: false, // Se o fluxo foi fechado closeEMITIDO: false, // Se o fechamento o evento foi enviado defaultEncoding: 'utf8', // O formato de codificação de caracteres padrão awaitDrainWriters: null, // Aponta para a referência do gravador 'drain' monitorada do evento, o tipo é nulo, Writable, Set<Writable> multiAwaitDrain: false, // Se há vários gravadores aguardando o evento de drenagem readingMore: false, // Se mais dados podem ser lidos dataEMITIDO: false, // Os dados foram enviados decoder: null, // Codificação do decodificador: null, // Encoder[Symbol(kPaused)]: null },
2.2. Implementação de armazenamento interno de dados (BufferList)
BufferList é um contêiner usado para armazenar dados internos em um fluxo. Ele é projetado na forma de uma lista vinculada e possui três atributos: head, tail e length.
Eu represento cada nó no BufferList como um BufferNode, e o tipo de dados dentro dele depende do objectMode.
Esta estrutura de dados obtém dados de cabeçalho mais rapidamente do que Array.prototype.shift().
2.2.1 Tipo de armazenamento de dadosse objectMode === verdadeiro:
Então os dados podem ser de qualquer tipo. Quaisquer dados enviados serão armazenados.
objectMode = verdadeiro
const Fluxo = require('fluxo'); const readableStream = novo Stream.Readable({ objectMode: verdadeiro, ler() {}, }); readableStream.push({ nome: 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(true); console.log(readableStream._readableState.buffer.tail); readableStream.push('lisa'); console.log(readableStream._readableState.buffer.tail); legívelStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Symbol(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
Resultados em execução:
se objectMode === falso:
Então os dados só podem ser string ou Buffer ou Uint8Array
objectMode=falso
const Fluxo = require('fluxo'); const readableStream = novo Stream.Readable({ objectMode: falso, ler() {}, }); readableStream.push({ nome: 'lisa'});
Resultados em execução:
2.2.2. Estrutura de armazenamento de dadosCriamos um fluxo legível no console por meio da linha de comando do nó para observar alterações nos dados no buffer:
Claro, antes de enviar dados, precisamos implementar seu método _read ou implementar o método read nos parâmetros do construtor:
const Fluxo = require('fluxo'); const readableStream = new Stream.Readable(); RS._read = função(tamanho) {}
ou
const Fluxo = require('fluxo'); const readableStream = novo Stream.Readable({ ler(tamanho) {} });
Após a operação readableStream.push('abc'), o buffer atual é:
Você pode ver que os dados atuais são armazenados. Os dados armazenados no início e no final são os códigos ASCII da string 'abc' e o tipo é tipo Buffer. O comprimento representa o número de dados salvos atualmente, em vez do tamanho dos dados. o conteúdo dos dados.
2.2.3 APIs relacionadas.Imprimindo todos os métodos do BufferList você pode obter:
Exceto join, que serializa o BufferList em uma string, as outras são todas operações de acesso a dados.
Não vou explicar todos os métodos um por um aqui, mas focarei em consumir, _getString e _getBuffer.
2.2.3.1.consumir
Endereço do código-fonte: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
consumir
// Consome uma quantidade especificada de bytes ou caracteres dos dados armazenados em buffer. consumir(n, hasStrings) { dados const = this.head.data; if (n <data.length) { // `slice` é o mesmo para buffers e strings. fatia const = data.slice(0, n); this.head.data = dados.slice (n); fatia de retorno; } if (n === dados.comprimento) { // O primeiro pedaço é uma combinação perfeita. retorne isto.shift(); } // O resultado abrange mais de um buffer. retornar hasStrings? isto._getString(n): isto._getBuffer(n); }
Existem três condições de julgamento no código:
Se o comprimento de bytes dos dados consumidos for menor que o comprimento dos dados armazenados no nó principal da lista vinculada, os primeiros n bytes dos dados do nó principal serão obtidos e os dados do nó principal atual serão definidos aos dados após o fatiamento.
Se os dados consumidos forem exatamente iguais ao comprimento dos dados armazenados no nó principal da lista vinculada, os dados do nó principal atual serão retornados diretamente.
Se o comprimento dos dados consumidos for maior que o comprimento do nó principal da lista vinculada, o último julgamento será feito com base no segundo parâmetro passado para determinar se a camada inferior do BufferList atual armazena uma string ou um Buffer .
2.2.3.2.
Endereço do código-fonte: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
consumir
// Consome uma quantidade especificada de bytes dos dados armazenados em buffer. _getBuffer(n){ const ret = Buffer.allocUnsafe(n); const retLen = n; deixe p = this.head; seja c = 0; fazer { const buf = p.dados; if (n > buf.comprimento) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.comprimento; } outro { if (n === buf.comprimento) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++c; se (p.próximo) este.head = p.próximo; outro this.head = this.tail = null; } outro { TypedArrayPrototypeSet(ret, novo Uint8Array (buf.buffer, buf.byteOffset, n), retLen - n); isto.head = p; p.dados = buf.slice(n); } quebrar; } ++c; } while ((p = p.next) !== nulo); este.comprimento -= c; retornar ret; }
Em geral, é um loop para operar os nós da lista vinculada e criar um novo array Buffer para armazenar os dados retornados.
Primeiro, comece a buscar dados do nó principal da lista vinculada e continue a copiá-los para o buffer recém-criado até que os dados de um determinado nó sejam maiores ou iguais ao comprimento a ser buscado menos o comprimento que foi obtido.
Ou seja, após a leitura do último nó da lista vinculada, ele não atingiu o comprimento desejado, então o Buffer recém-criado é retornado.
2.2.3.3.
Endereço do código-fonte: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
consumir
// Consome uma quantidade especificada de caracteres dos dados armazenados em buffer. _getString(n){ deixe ret = ''; deixe p = this.head; seja c = 0; fazer { const str = p.dados; if (n > str.comprimento) { ret +=str; n -= str.comprimento; } outro { if (n === str.comprimento) { ret +=str; ++c; se (p.próximo) este.head = p.próximo; outro this.head = this.tail = null; } outro { ret += StringPrototypeSlice(str, 0, n); isto.head = p; p.data = StringPrototypeSlice(str, n); } quebrar; } ++c; } while ((p = p.next) !== nulo); este.comprimento -= c; retornar ret; }
A operação de strings é igual à operação de Buffers. Ele também lê dados do topo da lista vinculada em um loop. Além disso, o tipo de dados retornado. A operação _getString é do tipo string.
2.3. Por que os streams legíveis são instâncias do EventEmitter?
Para esta questão, devemos primeiro entender o que é o modelo de publicação-assinatura. O modelo de publicação-assinatura tem aplicações importantes na maioria das APIs, seja Promise ou Redux, APIs avançadas baseadas no modelo de publicação-assinatura podem ser vistas em todos os lugares.
Sua vantagem é que ele pode armazenar as funções de retorno de chamada relacionadas ao evento na fila e, em seguida, notificar a outra parte para processar os dados em um determinado momento no futuro, conseguindo assim a separação de interesses. O produtor apenas produz dados e notifica o consumidor. , enquanto o consumidor então processa apenas os eventos correspondentes e seus dados correspondentes, e o modelo de streaming do Node.js apenas se ajusta a essa característica.
Então, como o stream Node.js implementa a criação de instâncias baseadas em EventEmitter?
O código fonte para isso está aqui: stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
legado
function Stream(opções) { EE.call(este, opta); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE);
Depois, há estas linhas de código no código-fonte do fluxo legível:
Esta parte do código fonte está aqui: legível https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
legado
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Legível, Fluxo);
Primeiro, herde o objeto protótipo de Stream de EventEmitter, para que todas as instâncias de Stream possam acessar os métodos em EventEmitter.
Ao mesmo tempo, os métodos estáticos em EventEmitter também são herdados por meio de ObjectSetPrototypeOf (Stream, EE), e no construtor de Stream, o construtor EE é emprestado para realizar a herança de todas as propriedades em EventEmitter e, em seguida, no fluxo legível, use o mesmo O método implementa herança prototípica e herança de propriedade estática da classe Stream, obtendo assim:
Legível.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototype
portanto:
Legível.prototype.__proto__.__proto__ === EE.prototype
Portanto, você pode encontrar o protótipo do EventEmitter rastreando a cadeia de protótipos do fluxo legível e realizar a herança do EventEmitter.
2.4. Implementação de APIs relacionadas
As APIs serão exibidas aqui na ordem em que aparecem nos documentos de código-fonte e apenas as principais implementações da API serão explicadas.
Observação: apenas as funções declaradas no código-fonte do fluxo legível do Node.js são interpretadas aqui e as definições de funções introduzidas externamente não são incluídas. Para reduzir o comprimento, todos os códigos não serão copiados.
Protótipo legível
Fluxo { destruir: [Função: destruir], _undestroy: [Função: unestroy], _destroy: [Função (anônima)], push: [Função (anônima)], unshift: [Função (anônima)], isPaused: [Função (anônima)], setEncoding: [Função (anônima)], leia: [Função (anônima)], _ler: [Função (anônima)], pipe: [Função (anônima)], unpipe: [Função (anônima)], em: [Função (anônima)], addListener: [Função (anônima)], removeListener: [Função (anônima)], desligado: [Função (anônima)], removeAllListeners: [Função (anônima)], currículo: [Função (anônima)], pausa: [Função (anônima)], wrap: [Função (anônima)], iterador: [Função (anônima)], [Símbolo(nodejs.rejection)]: [Função (anônima)], [Symbol(Symbol.asyncIterator)]: [Função (anônimo)] }2.4.1.
legível.push
Readable.prototype.push = function(pedaço, codificação) { return readableAddChunk(este, pedaço, codificação, falso); };
A principal função do método push é passar o bloco de dados para o pipeline downstream, acionando o evento 'data', ou armazenar os dados em seu próprio buffer.
O código a seguir é um pseudocódigo relevante e mostra apenas o processo principal:
legível.push
function readableAddChunk(stream, chunk, encoding, addToFront) { estado const = stream._readableState; if (chunk === null) { // envia o sinal de fim do fluxo nulo, nenhum outro dado pode ser gravado depois disso state.reading = false; onEofChunk(stream, estado); } else if (!state.objectMode) { // Se não for modo de objeto if (typeof chunk === 'string') { pedaço = Buffer.from(pedaço); } else if (chunk instanceof Buffer) { //Se for Buffer // Processa a codificação} else if (Stream._isUint8Array(chunk)) { pedaço = Stream._uint8ArrayToBuffer(pedaço); } else if (pedaço! = nulo) { err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], pedaço); } } if (state.objectMode || (chunk && chunk.length > 0)) { //É modo de objeto ou chunk é Buffer // O julgamento de vários métodos de inserção de dados é omitido aqui addChunk(stream, state, chunk, true); } } function addChunk(fluxo, estado, pedaço, addToFront) { if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { // Se estiver no modo streaming, há assinantes ouvindo os dados stream.emit('data', chunk); } else { // Caso contrário, salve os dados no buffer state.length += state.objectMode 1 : chunk.length; if (addToFront) { estado.buffer.unshift(pedaço); } outro { estado.buffer.push(pedaço); } } MaybeReadMore(stream, state); // Tente ler um pouco mais de dados}
A operação push é dividida principalmente em julgar o objectMode. Diferentes tipos realizarão diferentes operações nos dados recebidos:
O primeiro julgamento do addChunk é principalmente para lidar com a situação quando Readable está no modo de fluxo, possui um ouvinte de dados e os dados do buffer estão vazios.
Neste momento, os dados são transmitidos principalmente para outros programas que assinam o evento de dados, caso contrário, os dados são salvos no buffer.
2.4.2.Exceto pelo julgamento das condições de contorno e do estado do fluxo, este método possui principalmente duas operações.
Chame o método _read implementado pelo usuário para processar os resultados da execução
Leia os dados do buffer e acione o evento 'data'
legível.ler
// Se o comprimento da leitura for maior que hwm, o hwm será recalculado if (n > estado.highWaterMark) { state.highWaterMark = computaNewHighWaterMark(n); } // Chama o método _read implementado pelo usuário try { resultado const = this._read(state.highWaterMark); if (resultado! = nulo) { const então = resultado.então; if (typeof then === 'função') { então.call( resultado, não, função(erro) { errorOrDestroy(este, err); }); } } } pegar (errar) { errorOrDestroy(este, err); }
Se o método _read implementado pelo usuário retornar uma promessa, chame o método then dessa promessa e passe os retornos de chamada de sucesso e falha para facilitar o tratamento de exceções.
O código principal do método read para ler os dados da zona do buffer é o seguinte:
legível.ler
function fromList(n, estado) { // nada armazenado em buffer. if (estado.comprimento === 0) retornar nulo; deixe ret; if (estado.objectMode) ret = estado.buffer.shift(); else if (!n || n >= state.length) { // Trata o caso em que n está vazio ou é maior que o comprimento do buffer // Leia tudo, trunque a lista. if (state.decoder) // Se houver um decodificador, serialize o resultado em uma string ret = state.buffer.join(''); else if (state.buffer.length === 1) // Existe apenas um dado, retorne os dados do nó principal ret = state.buffer.first(); else // Armazena todos os dados em um Buffer ret = state.buffer.concat(state.length); state.buffer.clear(); // Limpa o buffer} else { // Lida com a situação em que o comprimento de leitura é menor que o buffer ret = state.buffer.consume(n, state.decoder); } retornar ret; }2.4.3 _ler.
Um método que deve ser implementado quando os usuários inicializam um fluxo legível. Você pode chamar o método push neste método para acionar continuamente o método de leitura. Quando enviamos null, podemos interromper a operação de escrita do fluxo.
Código de exemplo:
legível._read
const Fluxo = require('fluxo'); const readableStream = novo Stream.Readable({ leia(hmm) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 122) { isto.push(nulo); } }, }); readableStream.currentCharCode = 97; readableStream.pipe(process.stdout); //abcdefghijklmnopqrstuvwxyz%2.4.4. tubo (importante)
Vincule um ou mais fluxos graváveis ao fluxo Legível atual e alterne o fluxo Legível para o modo de fluxo.
Existem muitos identificadores de escuta de eventos neste método e não irei apresentá-los um por um aqui:
legível.pipe
Legível.prototype.pipe = function(dest, pipeOpts) { const src = isto; estado const = this._readableState; state.pipes.push(dest); // Coleta fluxo gravável src.on('data', ondata); function ondata(pedaço) { const ret = destino.write(pedaço); if (ret === falso) { pausa(); } } // Diga ao destino para onde está sendo canalizado. dest.emit('pipe',src); // Inicia o stream se o stream estiver em modo de pausa if (dest.writableNeedDrain === true) { if (estado.fluindo) { pausa(); } } senão if (!state.flowing) { src.resume(); } destino de retorno; }
A operação de pipe é muito semelhante ao operador de pipe '|' do Linux, alterando a saída esquerda para a entrada direita. Este método coleta o fluxo gravável para manutenção e aciona o evento 'dados' quando o fluxo legível está disponível.
Quando os dados fluem, o evento de gravação do fluxo gravável será acionado, para que os dados possam ser transferidos e operações como um pipeline possam ser realizadas. E mudará automaticamente o fluxo legível no modo de pausa para o modo de fluxo.
2.4.5.Mude o fluxo do modo 'pausa' para o modo 'fluxo' Se o ouvinte de evento 'legível' estiver definido, esse método não terá efeito.
legível.currículo
Legível.prototype.resume = function() { estado const = this._readableState; if (!state.flowing) { state.flowing = !state.readableListening; // Se está no modo de fluxo depende se o identificador de escuta 'legível' está definido resume(this, state); } }; function currículo(fluxo, estado) { if (!state.resumeScheduled) { // Alterna para que o método resume_ seja chamado apenas uma vez no mesmo Tick state.resumeScheduled = true; process.nextTick(resume_, fluxo, estado); } } função currículo_(fluxo, estado) { if (!estado.leitura){ fluxo.leitura(0); } estado.resumeScheduled = falso; stream.emit('currículo'); fluxo(fluxo); } function flow(stream) { // Quando o stream está no modo de streaming, este método continuará a ler dados do buffer até que o buffer esteja vazio const state = stream._readableState; while (state.flowing && stream.read() !== null); // Como o método read será chamado aqui e o fluxo do ouvinte de evento 'legível' será definido, o método read também poderá ser chamado. //Isso resulta em dados incoerentes (não afeta os dados, afeta apenas a chamada do método read no retorno de chamada do evento 'legível' para ler os dados) }2.4.6.
Mude o fluxo do modo de fluxo para o modo pausado, pare de disparar o evento 'dados' e salve todos os dados no buffer
legível.pause
Legível.prototype.pause = function() { if (this._readableState.flowing! == falso) { depurar('pausa'); this._readableState.flowing = falso; this.emit('pausa'); } devolva isso; };
2.5. Utilização e mecanismo de funcionamento
O método de uso foi mencionado na seção BufferList Crie uma instância Readable e implemente seu método _read() ou implemente o método read no primeiro parâmetro de objeto do construtor.
2.5.1. Mecanismo de funcionamentoAqui desenhamos apenas o processo geral e as condições de acionamento da conversão de modo do fluxo Legível.
em: