Existem 4 tipos de fluxos de nós: 1. Legível (fluxo legível). O método "_read" precisa ser implementado para retornar conteúdo; 2. Gravável (fluxo gravável), o método "_write" precisa ser implementado para aceitar conteúdo; 3. Duplex (fluxo legível e gravável), o "_read" e "; _write" métodos precisam ser implementados Para aceitar e retornar conteúdo; 4. Transform (fluxo de conversão), você precisa implementar o método "_transform" para converter o conteúdo recebido e retornar o conteúdo.
O ambiente operacional deste tutorial: sistema Windows 7, nodejs versão 16, computador DELL G3.
Stream é um conceito muito básico em Nodejs. Muitos módulos básicos são implementados com base em streams e desempenham um papel muito importante. Ao mesmo tempo, o fluxo também é um conceito muito difícil de entender. Isso se deve principalmente à falta de documentação relevante. Felizmente, para iniciantes em NodeJs, leva muito tempo para entender o fluxo. para a maioria dos NodeJs, é usado apenas para desenvolver aplicativos da Web. A compreensão insuficiente dos fluxos não afeta seu uso. No entanto, a compreensão dos fluxos pode levar a uma melhor compreensão de outros módulos no NodeJs e, em alguns casos, o uso de fluxos para processar dados terá melhores resultados.
Stream é uma interface abstrata para processamento de dados de streaming em Node.js. Stream não é uma interface real, mas um termo geral para todos os streams. As interfaces reais incluem ReadableStream, WritableStream e ReadWriteStream.
interface ReadableStream estende EventEmitter { legível: boolean read(size?: number): string | setEncoding(encoding: BufferEncoding): this; T estende WritableStream>(destino: T, opções?: { end?: boolean | indefinido; }): T; unpipe(destino?: WritableStream): this; unshift(chunk: string | Uint8Array, codificação?: BufferEncoding): void ; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;}interface WritableStream estende EventEmitter { gravável: boolean; Erro | null) => void): boolean; write(str: string, codificação?: BufferEncoding, cb?: (err?: Erro | null) => void): boolean end(cb?: () => void; ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this;}interface ReadWriteStream estende ReadableStream, WritableStream { }Pode-se observar que ReadableStream e WritableStream são interfaces que herdam a classe EventEmitter (interfaces em ts podem herdar classes, porque são apenas tipos de mesclagem).
As classes de implementação correspondentes às interfaces acima são Readable, Writable e Duplex respectivamente.
Existem 4 tipos de streams em NodeJs:
Fluxo legível legível (implementa ReadableStream)
Fluxo gravável gravável (implementa WritableStream)
Duplex é um fluxo legível e gravável (implementando WritableStream após herdar Readable)
Fluxo de conversão de transformação (herdado de Duplex)
Todos eles têm métodos para implementar:
Readable precisa implementar o método _read para retornar conteúdo
Gravável precisa implementar o método _write para aceitar conteúdo
Duplex precisa implementar os métodos _read e _write para aceitar e retornar conteúdo
Transform precisa implementar o método _transform para converter o conteúdo recebido e retorná-lo
Legível é um tipo de fluxo. Possui dois modos e três estados.
Dois modos de leitura:
Modo de fluxo: os dados serão lidos e gravados do sistema subjacente no buffer. Quando o buffer estiver cheio, os dados serão passados automaticamente para o manipulador de eventos registrado o mais rápido possível por meio do EventEmitter.
Modo de pausa: neste modo, EventEmitter não será acionado ativamente para transmitir dados. O método Readable.read() deve ser chamado explicitamente para ler dados do buffer. A leitura acionará uma resposta ao evento EventEmitter.
Três estados:
readableFlowing === null (estado inicial)
readableFlowing === false (modo de pausa)
readableFlowing === true (modo de fluxo)
O readable.readableFlowing do fluxo é inicialmente nulo.
Torna-se verdade após adicionar o evento de dados. Quando pause(), unpipe() é chamado, ou a contrapressão é recebida ou um evento legível é adicionado, readableFlowing será definido como false. Nesse estado, vincular um ouvinte ao evento de dados não mudará readableFlowing para true.
Chamar resume() pode mudar o readableFlowing do fluxo legível para verdadeiro.
Remover todos os eventos legíveis é a única maneira de tornar readableFlowing nulo.
A descrição do nome do evento legível é acionada quando há novos dados legíveis no buffer (será acionado toda vez que um nó for inserido no pool de cache). Os dados serão acionados toda vez que os dados forem consumidos. O parâmetro são os dados consumidos desta vez e. o fluxo de erro é acionado quando o fluxo próximo é fechado. Quando ocorre um erro, o nome do método de disparo indica que read(size) consome dados com um comprimento de tamanho. os dados consumidos desta vez são retornados. Quando o tamanho não é passado, significa consumir todos os dados no pool de cache const fs = require('fs'); value})readStreams. on('readable', () => { console.log('buffer full') readStreams.read()// Consome todos os dados no buffer pool, retorna o resultado e aciona o evento de dados}) readStreams.on('dados', (dados) => { console.log('dados')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
Quando o tamanho for 0, o evento legível será acionado.
Quando o comprimento dos dados no pool de cache atingir o valor flutuante highWaterMark, ele não solicitará ativamente os dados de produção, mas aguardará que os dados sejam consumidos antes de produzir dados.
Se o fluxo no estado pausado não chamar read para consumir dados, os dados e legíveis não serão acionados posteriormente. Quando read for chamado para consumir, ele primeiro determinará se o comprimento dos dados restantes após esse consumo é menor que o float. Se for inferior ao valor flutuante, os dados de produção serão solicitados antes do consumo. Desta forma, após a conclusão da execução da lógica após a leitura, os novos dados provavelmente terão sido produzidos e, em seguida, os legíveis serão acionados novamente. Este mecanismo de produção antecipada dos próximos dados consumidos e armazenamento no pool de cache também é. a razão pela qual o fluxo de cache é rápido.
Existem duas situações de fluxo no estado de fluxo
Quando a velocidade de produção é mais lenta que a velocidade de consumo: Nesse caso, geralmente não haverá dados restantes no pool de cache após cada dado de produção, e os dados produzidos desta vez podem ser passados diretamente para o evento de dados (porque não entre no pool de cache, portanto, também não há necessidade de chamar read para consumir) e, em seguida, comece imediatamente a produzir novos dados. Os novos dados não serão produzidos até que os últimos dados sejam consumidos novamente até que o fluxo termine. . Quando a velocidade de produção é mais rápida que a velocidade de consumo: Neste momento, após cada produção de dados, geralmente há dados não consumidos no pool de cache. Nesse caso, o próximo consumo de dados geralmente começará quando os dados forem consumidos e depois. os dados antigos são consumidos, novos dados foram produzidos e colocados no pool de cacheA única diferença entre eles é se os dados ainda existem no pool de cache após a produção dos dados. Se os dados existirem, os dados produzidos serão enviados para o pool de cache para aguardar o consumo. ser entregue diretamente aos dados sem adicioná-los ao pool de cache.
É importante notar que quando um fluxo com dados em um cache pool entra no modo de fluxo a partir do modo de pausa, read será chamado em um loop para consumir os dados até que nulo seja retornado.
No modo de pausa, quando um fluxo legível é criado, o modo é o modo de pausa. Após a criação, o método _read é chamado automaticamente para enviar dados da fonte de dados para o buffer pool até que os dados no buffer pool atinjam o valor flutuante. Sempre que os dados atingirem o valor flutuante, o fluxo legível acionará um evento "legível" para informar ao consumidor que os dados estão prontos e podem continuar a ser consumidos.
De modo geral, o evento 'legível' indica nova atividade no fluxo: ou há novos dados ou o fim do fluxo foi atingido. Portanto, antes de os dados na fonte de dados serem lidos, o evento 'legível' também será acionado;
Na função manipuladora do evento "legível" do consumidor, os dados no buffer pool são consumidos ativamente por meio de stream.read(size).
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // O método read do parâmetro será usado como o método _read do stream para obter os dados de origem read( size) { // Suponha que nossos dados de origem tenham 1000 1s let chunk = null // O processo de leitura de dados é geralmente assíncrono, como a operação IO setTimeout(() => { if (count > 0) { let chunkLength = Math .min( contagem, tamanho) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' será acionado toda vez que os dados forem enviado com sucesso para o pool de cache) legível', () => { const chunk = myReadable.read()//Consumir todos os dados no pool de cache atual console.log(chunk.toString())})É importante notar que se o tamanho de read(size) for maior que o valor float, o novo valor float será recalculado, e o novo valor float será a próxima segunda potência do tamanho (tamanho <= 2^n, n leva o valor mínimo)
// hwm não será maior que 1GB.const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // limite de 1GB n = MAX_HWM; evitar aumento excessivo hwm n--;= n >>> 1;= n >>> 2; > 16; n++;} retornar n;}Todos os fluxos legíveis começam no modo de pausa e podem ser alternados para o modo de fluxo através dos seguintes métodos:
Adicione o manipulador de eventos "data"; chame o método "resume"; use o método "pipe" para enviar dados para o fluxo gravável.No modo de fluxo, os dados no buffer pool serão automaticamente enviados ao consumidor para consumo. Ao mesmo tempo, após cada saída de dados, o método _read será automaticamente chamado de volta para colocar os dados da fonte de dados no buffer pool. . Se o buffer pool for Se não houver dados, os dados serão passados diretamente para o evento de dados sem passar pelo cache pool até que o modo de fluxo mude para outros modos de pausa ou os dados da fonte de dados sejam lidos (push); (nulo));
Os fluxos legíveis podem ser retornados ao modo pausado por meio de:
Se não houver destino de pipeline, stream.pause() será chamado. Se houver destinos de pipeline, remove todos os destinos de pipeline. Vários destinos de pipe podem ser removidos chamando stream.unpipe(). const { Legível } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(contagem, tamanho) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})Comparados com fluxos legíveis, os fluxos graváveis são mais simples.
Quando o produtor chama write(chunk), ele escolherá internamente se deseja armazená-lo em cache na fila do buffer ou chamar _write com base em algum status (arrolhado, escrita, etc.). Após cada vez que os dados forem gravados, ele tentará limpar. os dados na fila de cache. Se o tamanho dos dados na fila do buffer exceder o valor flutuante (highWaterMark), o consumidor retornará falso após chamar write(chunk). Neste momento, o produtor deve parar de escrever.
Então, quando posso continuar escrevendo? Quando todos os dados no buffer forem gravados com sucesso, o evento de drenagem será acionado após a limpeza da fila do buffer. Nesse momento, o produtor pode continuar a gravar dados.
Quando o produtor precisa terminar de escrever os dados, ele precisa chamar o método stream.end para notificar o fim do fluxo gravável.
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// será usado como método _write setTimeout(() = >{ fileContent += chunk callback()// Chamado após a conclusão da escrita}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable .write('123123')// truemyWritable.write('123123')// falsemyWritable.end()Observe que depois que os dados no pool de cache atingirem o valor flutuante, pode haver vários nós no pool de cache neste momento. Durante o processo de limpeza do pool de cache (chamada cíclica _read), ele não consumirá o mesmo comprimento que o. fluxo legível Os dados do valor flutuante são consumidos um nó de buffer por vez, mesmo se o comprimento do buffer for inconsistente com o valor flutuante.
const { Writable } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log ('Consumo', chunk.toString()) callback()// Chamado após a conclusão da gravação}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})let count = 0function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}productionData()myWritable.on('drain', productionData)O acima é um fluxo gravável com um valor flutuante de 10. Agora, a fonte de dados é uma sequência numérica contínua de 0 a 20 e productionData é usado para gravar dados.
Primeiro, quando myWritable.write("0") é chamado pela primeira vez, porque não há dados no pool de cache, "0" não entra no pool de cache, mas é fornecido diretamente a _wirte O valor de retorno de myWritable. .write("0") é verdadeiro
Quando myWritable.write("1") é executado, porque o retorno de chamada de _wirte ainda não foi chamado, indica que os últimos dados ainda não foram gravados. A posição garante a ordem da gravação de dados. para armazenar "1". Adicione ao pool de cache. Isto é verdade para os próximos 2-9
Quando myWritable.write("10") é executado, o comprimento do buffer é 9 (1-9) e ainda não atingiu o valor flutuante "10" continua a ser adicionado ao pool de cache como um buffer, e o pool de cache. o comprimento passa a ser 11, então myWritable.write("1") retorna falso, o que significa que os dados no buffer são suficientes e precisamos aguardar a notificação do evento de drenagem para produzir dados novamente.
Após 100ms, o retorno de chamada de _write("0", encoding, callback) é chamado, indicando que "0" foi escrito. Em seguida, ele verificará se há dados no pool de cache. Se existir, ele primeiro chamará _read para consumir o nó principal do pool de cache ("1") e, em seguida, continuará a repetir esse processo até que o pool de cache esteja vazio. , acione o evento de drenagem e execute productionData novamente.
Chame myWritable.write("11") para acionar o processo começando na etapa 1 até o final do fluxo.
Depois de entender o fluxo legível e o fluxo gravável, o fluxo duplex é fácil de entender. O fluxo duplex na verdade herda o fluxo legível e então implementa o fluxo gravável (o código-fonte é escrito assim, mas deve-se dizer que é implementado. ao mesmo tempo, é melhor ter fluxos legíveis e graváveis).
O fluxo duplex precisa implementar os dois métodos a seguir ao mesmo tempo
Implemente o método _read() para produzir dados para fluxos legíveis
Implemente o método _write() para consumir dados para fluxos graváveis
Como implementar os dois métodos acima foi introduzido nos fluxos graváveis e legíveis acima. O que precisa ser observado aqui é que existem dois buffer pools independentes para fluxos duplex, respectivamente, e suas fontes de dados também não são iguais.
Pegue o fluxo de entrada e saída padrão de NodeJs como exemplo:
Quando inserimos dados no console, seu evento de dados é acionado, o que prova que ele tem a função de um fluxo legível. Cada vez que o usuário digita enter, é equivalente a chamar o método push legível para enviar os dados produzidos. Quando chamamos seu método write, também podemos enviar conteúdo para o console, mas o evento de dados não será acionado. Isso mostra que ele tem a função de um fluxo gravável e possui um buffer independente. permitir que o console exiba texto. // Sempre que o usuário inserir dados no console (_read), o evento data será acionado, o que é uma característica do stream legível process.stdin.on('data', data=>{ process.stdin.write(data ); })// Produz dados para o fluxo de entrada padrão a cada segundo (este é um recurso de um fluxo gravável, que será enviado diretamente para o console) e não acionará datasetInterval(()=>{ process.stdin.write ('não são dados inseridos pelo console do usuário')}, 1000)Um fluxo Duplex pode ser considerado um fluxo legível com um fluxo gravável. Ambos são independentes, cada um com buffers internos independentes. Os eventos de leitura e gravação ocorrem de forma independente.
Fluxo Duplex ------------------| Leia <----- Fonte Externa Você ------------------| Escreva -----> Coletor Externo ------------------|Os fluxos de transformação são duplex, onde leituras e gravações ocorrem em uma relação de causa e efeito. Os pontos finais de um fluxo duplex são vinculados por meio de alguma transformação. Uma leitura requer uma gravação para ocorrer.
Transform Stream --------------|-------------- Você escreve ----> ----> Leia você ----- ----------|--------------Para criar fluxos Transform, o mais importante é implementar o método _transform em vez de _write ou _read. Em _transform, os dados gravados pelo fluxo gravável são processados (consumidos) e, em seguida, os dados são produzidos para o fluxo legível.
Os fluxos de conversão geralmente implementam um método `_flush`, que será chamado antes do final do fluxo. Geralmente é usado para anexar algo ao final do fluxo. Por exemplo, algumas informações de compactação ao compactar arquivos são adicionadas aqui const {write. } = require('fs')const { Transform, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const transform = new Transform({ highWaterMark: 10, transform(chunk,en codificação, retorno de chamada){ // Converte data, chame push para adicionar o resultado da conversão ao pool de cache this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){//Execute this.push (' before end triggers <<<') callback() }})// write grava dados continuamente let count = 0transform.write('>>>')function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() }}productionData()transform.on('drain', productionData)let result = '' transform.on('dados', dados=>{ resultado += data.toString()})transform.on('end', ()=>{ console.log(resultado) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})