There are 4 types of node streams: 1. Readable (readable stream). The "_read" method needs to be implemented to return content; 2. Writable (writable stream), the "_write" method needs to be implemented to accept content; 3. Duplex (readable and writable stream), the "_read" and "_write" methods need to be implemented To accept and return content; 4. Transform (conversion stream), you need to implement the "_transform" method to convert the received content and return the content.
The operating environment of this tutorial: Windows 7 system, nodejs version 16, DELL G3 computer.
Stream is a very basic concept in Nodejs. Many basic modules are implemented based on streams and play a very important role. At the same time, flow is also a very difficult concept to understand. This is mainly due to the lack of relevant documentation. For NodeJs beginners, it often takes a lot of time to understand flow before they can truly master this concept. Fortunately, for most NodeJs, it is For users, it is only used to develop Web applications. Insufficient understanding of streams does not affect their use. However, understanding streams can lead to a better understanding of other modules in NodeJs, and in some cases, using streams to process data will have better results.
Stream is an abstract interface for processing streaming data in Node.js. Stream is not an actual interface, but a general term for all streams. The actual interfaces include ReadableStream, WritableStream, and ReadWriteStream.
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; pipe< T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void ; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;}interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this;}interface ReadWriteStream extends ReadableStream, WritableStream { }It can be seen that ReadableStream and WritableStream are both interfaces that inherit the EventEmitter class (interfaces in ts can inherit classes, because they are only merging types).
The implementation classes corresponding to the above interfaces are Readable, Writable and Duplex respectively.
There are 4 types of streams in NodeJs:
Readable Readable stream (implements ReadableStream)
Writable writable stream (implements WritableStream)
Duplex is a readable and writable stream (implementing WritableStream after inheriting Readable)
Transform conversion stream (inherited from Duplex)
They all have methods to implement:
Readable needs to implement the _read method to return content
Writable needs to implement the _write method to accept content
Duplex needs to implement the _read and _write methods to accept and return content
Transform needs to implement the _transform method to convert the received content and return it
Readable is a type of stream. It has two modes and three states.
Two reading modes:
Flow mode: Data will be read and written from the underlying system to the buffer. When the buffer is full, the data will be automatically passed to the registered event handler as quickly as possible through EventEmitter.
Pause mode: In this mode, EventEmitter will not be actively triggered to transmit data. The Readable.read() method must be explicitly called to read data from the buffer. read will trigger a response to the EventEmitter event.
Three states:
readableFlowing === null (initial state)
readableFlowing === false (pause mode)
readableFlowing === true (flowing mode)
The readable.readableFlowing of the stream is initially null.
It becomes true after adding the data event. When pause(), unpipe() is called, or back pressure is received or a readable event is added, readableFlowing will be set to false. In this state, binding a listener to the data event will not switch readableFlowing to true.
Calling resume() can switch the readableFlowing of the readable stream to true.
Removing all readable events is the only way to make readableFlowing null.
Event name description readable is triggered when there is new readable data in the buffer (it will be triggered every time a node is inserted into the cache pool) data will be triggered every time data is consumed. The parameter is the data consumed this time and the error stream is triggered when the close stream is closed. When an error occurs, the trigger method name indicates that read(size) consumes data with a length of size. Returning null indicates that the current data is less than size. Otherwise, the data consumed this time is returned. When size is not passed, it means consuming all the data in the cache pool const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// cache pool float value})readStreams. on('readable', () => { console.log('buffer full') readStreams.read()// Consume all data in the buffer pool, return the result and trigger the data event})readStreams.on('data ', (data) => { console.log('data')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
When size is 0, the readable event will be triggered.
When the data length in the cache pool reaches the float value highWaterMark, it will not actively request production data, but will wait for the data to be consumed before producing data.
If the stream in the paused state does not call read to consume data, data and readable will not be triggered later. When read is called to consume, it will first determine whether the length of the remaining data after this consumption is lower than the float value. If it is lower than the float value, Production data will be requested before consumption. In this way, after the logic execution after read is completed, the new data will most likely have been produced, and then readable will be triggered again. This mechanism of producing the next consumed data in advance and storing it in the cache pool is also the reason why the cache stream is fast.
There are two situations of flow in the flowing state
When the production speed is slower than the consumption speed: In this case, there will generally be no remaining data in the cache pool after each production data, and the data produced this time can be directly passed to the data event (because it does not enter the cache pool, so it is also There is no need to call read to consume), and then immediately start producing new data. The new data will not be produced until the last data is consumed. Data is triggered again until the stream ends. When the production speed is faster than the consumption speed: At this time, after each data production, there is usually unconsumed data in the cache pool. In this case, the next consumption of data will generally start when the data is consumed, and after the old data is consumed, New data has been produced and placed in the cache poolThe only difference between them is whether the data still exists in the cache pool after the data is produced. If the data exists, the produced data will be pushed to the cache pool to wait for consumption. If it does not exist, the data will be handed over directly to data without adding it to the cache pool.
It is worth noting that when a stream with data in a cache pool enters the flow mode from the pause mode, read will be called in a loop to consume the data until null is returned.
In pause mode, when a readable stream is created, the mode is pause mode. After creation, the _read method is automatically called to push data from the data source to the buffer pool until the data in the buffer pool reaches the float value. Whenever data reaches the float value, the readable stream will trigger a "readable" event to tell the consumer that the data is ready and can continue to be consumed.
Generally speaking, the 'readable' event indicates new activity on the stream: either there is new data, or the end of the stream has been reached. Therefore, before the data in the data source is read, the 'readable' event will also be triggered;
In the handler function of the consumer "readable" event, the data in the buffer pool is actively consumed through stream.read(size).
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // The read method of the parameter will be used as the _read method of the stream to obtain the source data read(size) { / / Assume that our source data has 1000 1s let chunk = null // The process of reading data is generally asynchronous, such as IO operation setTimeout(() => { if (count > 0) { let chunkLength = Math.min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' will be triggered every time the data is successfully pushed to the cache pool) readable', () => { const chunk = myReadable.read()//Consume all data in the current cache pool console.log(chunk.toString())})It is worth noting that if the size of read(size) is greater than the float value, the new float value will be recalculated, and the new float value is the next second power of size (size <= 2^n, n takes the minimum value)
// hwm will not be larger than 1GB.const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB limit n = MAX_HWM; } else { // Remove the next highest power of 2 to prevent excessive Increase hwm n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16 ; n++; } return n;}All readable streams start in pause mode and can be switched to flowing mode through the following methods:
Add "data" event handler; call "resume" method; use "pipe" method to send data to writable streamIn flow mode, the data in the buffer pool will be automatically output to the consumer for consumption. At the same time, after each data output, the _read method will be automatically called back to put the data from the data source into the buffer pool. If the buffer pool is If there is no data, the data will be passed directly to the data event without going through the cache pool; until the flow mode switches to other pause modes, or the data from the data source is read (push(null));
Readable streams can be switched back to paused mode via:
If there is no pipeline target, stream.pause() is called. If there are pipeline targets, removes all pipeline targets. Multiple pipe targets can be removed by calling stream.unpipe(). const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})Compared with readable streams, writable streams are simpler.
When the producer calls write(chunk), it will internally choose whether to cache it in the buffer queue or call _write based on some status (corked, writing, etc.). After each time the data is written, it will try to clear the data in the cache queue. If the data size in the buffer queue exceeds the float value (highWaterMark), the consumer will return false after calling write(chunk). At this time, the producer should stop writing.
So when can I continue writing? When all the data in the buffer has been successfully written, the drain event will be triggered after the buffer queue is cleared. At this time, the producer can continue to write data.
When the producer needs to finish writing data, it needs to call the stream.end method to notify the end of the writable stream.
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// will be used as _write method setTimeout(()= >{ fileContent += chunk callback()// Called after writing is completed}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable. write('123123')// truemyWritable.write('123123')// falsemyWritable.end()Note that after the data in the cache pool reaches the float value, there may be multiple nodes in the cache pool at this time. During the process of clearing the cache pool (cyclic call _read), it will not consume the same length as the readable stream. The data of the float value is consumed one buffer node at a time, even if the buffer length is inconsistent with the float value.
const { Writable } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log ('Consumption', chunk.toString()) callback()// Called after writing is completed}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})let count = 0function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}productionData()myWritable.on('drain', productionData)The above is a writable stream with a float value of 10. Now the data source is a continuous number string from 0 to 20, and productionData is used to write data.
First, when myWritable.write("0") is called for the first time, because there is no data in the cache pool, "0" does not enter the cache pool, but is directly given to _wirte. The return value of myWritable.write("0") is true
When myWritable.write("1") is executed, because the callback of _wirte has not been called yet, it indicates that the last data has not been written yet. The position guarantees the orderliness of data writing. Only one buffer can be created to store "1". " Add to the cache pool. This is true for the next 2-9
When myWritable.write("10") is executed, the buffer length is 9 (1-9) and has not yet reached the float value. "10" continues to be added to the cache pool as a buffer, and the cache pool length becomes is 11, so myWritable.write("1") returns false, which means that the data in the buffer is enough, and we need to wait for the drain event notification to produce data again.
After 100ms, the callback of _write("0", encoding, callback) is called, indicating that "0" has been written. Then it will check whether there is data in the cache pool. If it exists, it will first call _read to consume the head node of the cache pool ("1"), and then continue to repeat this process until the cache pool is empty, trigger the drain event, and execute productionData again.
Call myWritable.write("11") to trigger the process starting in step 1 until the end of the stream.
After understanding the readable stream and the writable stream, the duplex stream is easy to understand. The duplex stream actually inherits the readable stream and then implements the writable stream (the source code is written like this, but it should be said that it is implemented at the same time It is better to have readable and writable streams).
Duplex flow needs to implement the following two methods at the same time
Implement the _read() method to produce data for readable streams
Implement the _write() method to consume data for writable streams
How to implement the above two methods has been introduced in the writable and readable streams above. What needs to be noted here is that there are two independent buffer pools for duplex streams respectively, and their data sources are also no the same
Take the standard input and output stream of NodeJs as an example:
When we enter data in the console, its data event is triggered, which proves that it has the function of a readable stream. Every time the user types enter, it is equivalent to calling the readable push method to push the produced data. When we call its write method, we can also output content to the console, but the data event will not be triggered. This shows that it has the function of a writable stream and has an independent buffer. The implementation of the _write method is to allow the console to display text. . // Whenever the user enters data on the console (_read), the data event will be triggered, which is a characteristic of the readable stream process.stdin.on('data', data=>{ process.stdin.write(data); })// Produce data to the standard input stream every second (this is a feature of a writable stream, which will be output directly to the console) and will not trigger datasetInterval(()=>{ process.stdin.write('is not Data entered by the user console')}, 1000)A Duplex stream can be thought of as a readable stream with a writable stream. Both are independent, each with independent internal buffers. Read and write events occur independently.
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|Transform streams are duplex, where reads and writes occur in a cause-and-effect relationship. The endpoints of a duplex stream are linked through some transformation. A read requires a write to occur.
Transform Stream --------------|-------------- You Write ----> ----> Read You ----- ----------|--------------For creating Transform streams, the most important thing is to implement the _transform method instead of _write or _read. In _transform, the data written by the writable stream is processed (consumed) and then the data is produced for the readable stream.
Conversion streams often implement a `_flush` method, which will be called before the end of the stream. It is generally used to append something to the end of the stream. For example, some compression information when compressing files is added here const { write } = require('fs')const { Transform, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const transform = new Transform({ highWaterMark: 10, transform(chunk,encoding, callback){ // Convert data, Call push to add the conversion result to the cache pool this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){//Execute this.push(' before end triggers <<<') callback() }})// write continuously writes data let count = 0transform.write('>>>')function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() }}productionData()transform.on('drain', productionData)let result = ''transform.on( 'data', data=>{ result += data.toString()})transform.on('end', ()=>{ console.log(result) // >>>0@23456789@0@1@ 2@3@4@5@6@7@8@920<<<})