1.1. Historical evolution of streams
Streams are not a concept unique to Nodejs. They were introduced decades ago in the Unix operating system, and programs can interact with each other on streams through the pipe operator (|).
The pipe operator (|) can be used in MacOS and Linux based on Unix systems. It can convert the output of the process on the left side of the operator into the input on the right side.
In Node, if we use the traditional readFile to read a file, the file will be read into the memory from beginning to end. When all the contents have been read, the contents of the file loaded into the memory will be processed uniformly.
There are two disadvantages to doing this:
memory: it takes up a lot of memory;
time: you need to wait for the entire payload of the data to be loaded before starting to process the data.
In order to solve the above problems, Node.js followed and implemented the concept of streams. In Node In the .js stream, there are four types of streams. They are all instances of EventEmitter in Node.js:
Readable Stream,
Writable Stream,
Readable and Writable Full-Duplex Stream (Duplex Stream)
Transform Stream (Transform Stream)
In order to study this part in depth and gradually understand the concept of streams in Node.js, and because the source code part is relatively complicated, I decided to start learning this part from the readable stream.
1.2. What is a stream?
A stream is an abstract data structure, which is a collection of data. The data types stored in it can only be the following types (only for the case of objectMode === false):
We can use the stream Seen as a collection of these data, just like liquids, we first save these liquids in a container (the internal buffer BufferList of the stream), and when the corresponding event is triggered, we pour the liquid inside into the pipe. And notify others to get their own containers on the other side of the pipe to catch the liquid inside for disposal.
1.3. What is a readable stream?
A readable stream is a type of stream. It has two modes, three states,
and two reading modes:
flow mode: data will be read from the underlying system and passed through EventEmitter as soon as possible. The data is passed to the registered event handler in
pause mode: In this mode, the data will not be read, and the Stream.read() method must be explicitly called to read the data from the stream.
Three states:
readableFlowing = == null: No data will be generated. Calling Stream.pipe() and Stream.resume will change its status to true, start generating data and actively trigger the event
readableFlowing === false: The flow of data will be suspended at this time, but will not The generation of data will be suspended, so a data backlog will occur.
readableFlowing === true: Normally generate and consume data
2.1. Internal state definition (ReadableState)
ReadableState
_readableState: ReadableState { objectMode: false, // To operate other types of data except string, Buffer, and null, this mode needs to be turned on highWaterMark: 16384, // Water level limit, 1024 * 16, default 16kb, if this limit is exceeded, the call will stop _read() reads data into the buffer buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer linked list, used to save data length: 0, // The size of the entire readable stream data, if objectMode is equal to buffer.length pipes: [], // Save all pipe queues that monitor the readable stream flowing: null, // The status of independent flow is null, false, true ended: false, // All data has been consumed endEmitted: false, // Whether the end event has been sent or not reading: false, // Whether the data is being read constructed: true, // The stream cannot be processed before it is constructed or fails. Destroy sync: true, // Whether to trigger the 'readable'/'data' event synchronously, or wait until the next tick needReadable: false, // Whether it is necessary to send the readable event emittedReadable: false, // The readable event has been sent readableListening: false, // Whether there is a readable listening event resumeScheduled: false, // Whether the resume method has been called errorEmitted: false, // Error The event has been sent emitClose: true, // When the stream is destroyed, whether to send the close event autoDestroy: true, // Automatically destroyed, it is called after the 'end' event is triggered destroyed: false, // Whether the stream has been destroyed errored: null, // Identifies whether the stream has reported an error closed: false, // Whether the stream has been closed closeEmitted: false, // Whether the close event has been sent defaultEncoding: 'utf8', // The default character encoding format awaitDrainWriters: null, // Points to the monitored 'drain 'Writer reference of the event, type is null, Writable, Set<Writable> multiAwaitDrain: false, // Whether there are multiple writers waiting for the drain event readingMore: false, // Whether more data can be read dataEmitted: false, // The data has been sent decoder: null, // Decoder encoding: null, // Encoder[Symbol(kPaused)]: null },
2.2. Internal data storage implementation (BufferList)
BufferList is a container used to store internal data in a stream. It is designed in the form of a linked list and has three attributes: head, tail and length.
I represent each node in the BufferList as a BufferNode, and the type of Data inside depends on the objectMode.
This data structure obtains header data faster than Array.prototype.shift().
2.2.1. Data storage typeif objectMode === true:
Then data can be of any type. Whatever data is pushed will be stored.
objectMode=true
const Stream = require('stream'); const readableStream = new Stream.Readable({ objectMode: true, read() {}, }); readableStream.push({ name: 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(true); console.log(readableStream._readableState.buffer.tail); readableStream.push('lisa'); console.log(readableStream._readableState.buffer.tail); readableStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Symbol(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
Running results:
if objectMode === false:
Then data can only be string or Buffer or Uint8Array
objectMode=false
const Stream = require('stream'); const readableStream = new Stream.Readable({ objectMode: false, read() {}, }); readableStream.push({ name: 'lisa'});
Running results:
2.2.2. Data storage structureWe create a readable stream in the console through the node command line to observe changes in the data in the buffer:
Of course, before pushing data, we need to implement its _read method, or implement the read method in the parameters of the constructor:
const Stream = require('stream'); const readableStream = new Stream.Readable(); RS._read = function(size) {}
or
const Stream = require('stream'); const readableStream = new Stream.Readable({ read(size) {} });
After the readableStream.push('abc') operation, the current buffer is:
You can see that the current data is stored. The data stored at the beginning and end are the ASCII codes of the string 'abc', and the type is Buffer type. The length represents the number of currently saved data rather than the size of the data content.
2.2.3. Related APIsPrinting all the methods of BufferList you can get:
Except for join, which serializes the BufferList into a string, the others are all data access operations.
I won’t explain all the methods one by one here, but focus on consume, _getString and _getBuffer.
2.2.3.1.consume
Source code address: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
comsume
// Consumes a specified amount of bytes or characters from the buffered data. consume(n, hasStrings) { const data = this.head.data; if (n < data.length) { // `slice` is the same for buffers and strings. const slice = data.slice(0, n); this.head.data = data.slice(n); return slice; } if (n === data.length) { // First chunk is a perfect match. return this.shift(); } // Result spans more than one buffer. return hasStrings ? this._getString(n) : this._getBuffer(n); }
There are three judgment conditions in the code:
If the byte length of the consumed data is less than the length of the data stored in the head node of the linked list, the first n bytes of the data of the head node are taken, and the data of the current head node is set to the data after slicing.
If the data consumed is exactly equal to the length of the data stored in the head node of the linked list, the data of the current head node is returned directly.
If the length of the data consumed is greater than the length of the head node of the linked list, the last judgment will be made based on the second parameter passed in to determine whether the bottom layer of the current BufferList stores a string or a Buffer.
2.2.3.2. _getBuffer
Source code address: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
comsume
// Consumes a specified amount of bytes from the buffered data. _getBuffer(n) { const ret = Buffer.allocUnsafe(n); const retLen = n; let p = this.head; let c = 0; do { const buf = p.data; if (n > buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.length; } else { if (n === buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++c; if (p.next) this.head = p.next; else this.head = this.tail = null; } else { TypedArrayPrototypeSet(ret, new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); this.head = p; p.data = buf.slice(n); } break; } ++c; } while ((p = p.next) !== null); this.length -= c; return ret; }
In general, it is a loop to operate the nodes in the linked list, and create a new Buffer array to store the returned data.
First, start fetching data from the head node of the linked list, and continue to copy it to the newly created Buffer until the data of a certain node is greater than or equal to the length to be fetched minus the length that has been obtained.
In other words, after reading the last node of the linked list, it has not reached the desired length, so the newly created Buffer is returned.
2.2.3.3. _getString
Source code address: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
comsume
// Consumes a specified amount of characters from the buffered data. _getString(n) { let ret = ''; let p = this.head; let c = 0; do { const str = p.data; if (n > str.length) { ret += str; n -= str.length; } else { if (n === str.length) { ret += str; ++c; if (p.next) this.head = p.next; else this.head = this.tail = null; } else { ret += StringPrototypeSlice(str, 0, n); this.head = p; p.data = StringPrototypeSlice(str, n); } break; } ++c; } while ((p = p.next) !== null); this.length -= c; return ret; }
The operation of strings is the same as the operation of Buffers. It also reads data from the head of the linked list in a loop. There are only some differences in the copy and storage of data. In addition, the data type returned by the _getString operation is string type.
2.3. Why are readable streams instances of EventEmitter?
For this question, we must first understand what the publish-subscribe model is. The publish-subscribe model has important applications in most APIs. Whether it is Promise or Redux, advanced APIs based on the publish-subscribe model can be seen everywhere.
Its advantage is that it can store the event-related callback functions in the queue, and then notify the other party to process the data at a certain time in the future, thereby achieving separation of concerns. The producer only produces data and notifies the consumer, while the consumer Then it only processes the corresponding events and their corresponding data, and the Node.js streaming model just fits this characteristic.
So how does the Node.js stream implement the creation of instances based on EventEmitter?
The source code for this is here: stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
legacy
function Stream(opts) { EE.call(this, opts); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE);
Then there are these lines of code in the source code of the readable stream:
This part of the source code is here: readable https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
legacy
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream);
First, inherit the prototype object of Stream from EventEmitter, so that all instances of Stream can access the methods on EventEmitter.
At the same time, the static methods on EventEmitter are also inherited through ObjectSetPrototypeOf(Stream, EE), and in the constructor of Stream, the constructor EE is borrowed to realize the inheritance of all properties in EventEmitter, and then in the readable stream, use the same The method implements prototypal inheritance and static property inheritance of the Stream class, thus obtaining:
Readable.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototype
therefore:
Readable.prototype.__proto__.__proto__ === EE.prototype
Therefore, you can find the prototype of EventEmitter by tracing the prototype chain of the readable stream, and realize the inheritance of EventEmitter.
2.4. Implementation of related APIs
The APIs will be displayed here in the order they appear in the source code documents, and only the core API implementations will be explained.
Note: Only the functions declared in the Node.js readable stream source code are interpreted here, and externally introduced function definitions are not included. In order to reduce the length, all the codes will not be copied.
Readable.prototype
Stream { destroy: [Function: destroy], _undestroy: [Function: unestroy], _destroy: [Function (anonymous)], push: [Function (anonymous)], unshift: [Function (anonymous)], isPaused: [Function (anonymous)], setEncoding: [Function (anonymous)], read: [Function (anonymous)], _read: [Function (anonymous)], pipe: [Function (anonymous)], unpipe: [Function (anonymous)], on: [Function (anonymous)], addListener: [Function (anonymous)], removeListener: [Function (anonymous)], off: [Function (anonymous)], removeAllListeners: [Function (anonymous)], resume: [Function (anonymous)], pause: [Function (anonymous)], wrap: [Function (anonymous)], iterator: [Function (anonymous)], [Symbol(nodejs.rejection)]: [Function (anonymous)], [Symbol(Symbol.asyncIterator)]: [Function (anonymous)] }2.4.1. push
readable.push
Readable.prototype.push = function(chunk, encoding) { return readableAddChunk(this, chunk, encoding, false); };
The main function of the push method is to pass the data block to the downstream pipeline by triggering the 'data' event, or to store the data in its own buffer.
The following code is relevant pseudocode and only shows the main process:
readable.push
function readableAddChunk(stream, chunk, encoding, addToFront) { const state = stream._readableState; if (chunk === null) { // push null stream end signal, no more data can be written after that state.reading = false; onEofChunk(stream, state); } else if (!state.objectMode) { // If not object mode if (typeof chunk === 'string') { chunk = Buffer.from(chunk); } else if (chunk instanceof Buffer) { //If it is Buffer // Process the encoding} else if (Stream._isUint8Array(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); } else if (chunk != null) { err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } } if (state.objectMode || (chunk && chunk.length > 0)) { // It is object mode or chunk is Buffer // The judgment of several data insertion methods is omitted here addChunk(stream, state, chunk, true); } } function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { // If in streaming mode, there are subscribers listening to data stream.emit('data', chunk); } else { // Otherwise, save the data to the buffer state.length += state.objectMode ? 1 : chunk.length; if (addToFront) { state.buffer.unshift(chunk); } else { state.buffer.push(chunk); } } maybeReadMore(stream, state); // Try reading a little more data}
The push operation is mainly divided into judging the objectMode. Different types will perform different operations on the incoming data:
The first judgment of addChunk is mainly to deal with the situation when Readable is in flowing mode, has a data listener, and the buffer data is empty.
At this time, the data is mainly passed through to other programs that subscribe to the data event, otherwise the data is saved in the buffer.
2.4.2. readExcept for the judgment of boundary conditions and flow status, this method mainly has two operations.
Call the user-implemented _read method to process the execution results
Read data from the buffer buffer and trigger the 'data' event
readable.read
// If the length of read is greater than hwm, hwm will be recalculated if (n > state.highWaterMark) { state.highWaterMark = computeNewHighWaterMark(n); } // Call the user-implemented _read method try { const result = this._read(state.highWaterMark); if (result != null) { const then = result.then; if (typeof then === 'function') { then.call( result, nop, function(err) { errorOrDestroy(this, err); }); } } } catch (err) { errorOrDestroy(this, err); }
If the _read method implemented by the user returns a promise, call the then method of this promise and pass in the success and failure callbacks to facilitate handling of exceptions.
The core code of the read method to read zone data from the buffer is as follows:
readable.read
function fromList(n, state) { // nothing buffered. if (state.length === 0) return null; let ret; if (state.objectMode) ret = state.buffer.shift(); else if (!n || n >= state.length) { // Handle the case where n is empty or greater than the length of the buffer // Read it all, truncate the list. if (state.decoder) // If there is a decoder, serialize the result into a string ret = state.buffer.join(''); else if (state.buffer.length === 1) // There is only one data, return the head node data ret = state.buffer.first(); else // Store all data into a Buffer ret = state.buffer.concat(state.length); state.buffer.clear(); // Clear the buffer} else { // Handle the situation where the read length is less than the buffer ret = state.buffer.consume(n, state.decoder); } return ret; }2.4.3. _read
A method that must be implemented when users initialize a Readable stream. You can call the push method in this method to continuously trigger the read method. When we push null, we can stop the writing operation of the stream.
Sample code:
readable._read
const Stream = require('stream'); const readableStream = new Stream.Readable({ read(hwm) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 122) { this.push(null); } }, }); readableStream.currentCharCode = 97; readableStream.pipe(process.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4. pipe (important)
Bind one or more writable streams to the current Readable stream, and switch the Readable stream to flowing mode.
There are many event listening handles in this method, and I will not introduce them one by one here:
readable.pipe
Readable.prototype.pipe = function(dest, pipeOpts) { const src = this; const state = this._readableState; state.pipes.push(dest); // Collect Writable stream src.on('data', ondata); function ondata(chunk) { const ret = dest.write(chunk); if (ret === false) { pause(); } } // Tell the dest that it's being piped to. dest.emit('pipe', src); // Start the stream if the stream is in pause mode if (dest.writableNeedDrain === true) { if (state.flowing) { pause(); } } else if (!state.flowing) { src.resume(); } return dest; }
The pipe operation is very similar to the Linux pipe operator '|', changing the left output to the right input. This method collects the writable stream for maintenance, and triggers the 'data' event when the readable stream is available.
When data flows out, the write event of the writable stream will be triggered, so that data can be transferred and operations like a pipeline can be realized. And will automatically change the readable stream in pause mode to flowing mode.
2.4.5. resumeSwitch the stream from 'pause' mode to 'flow' mode. If the 'readable' event listener is set, then this method has no effect.
readable.resume
Readable.prototype.resume = function() { const state = this._readableState; if (!state.flowing) { state.flowing = !state.readableListening; // Whether it is in flowing mode depends on whether the 'readable' listening handle is set resume(this, state); } }; function resume(stream, state) { if (!state.resumeScheduled) { // Switch so that the resume_ method is only called once in the same Tick state.resumeScheduled = true; process.nextTick(resume_, stream, state); } } function resume_(stream, state) { if (!state.reading) { stream.read(0); } state.resumeScheduled = false; stream.emit('resume'); flow(stream); } function flow(stream) { // When the stream is in streaming mode, this method will continue to read data from the buffer until the buffer is empty const state = stream._readableState; while (state.flowing && stream.read() !== null); // Because the read method will be called here and the stream of the 'readable' event listener is set, the read method may also be called. //This results in incoherent data (does not affect data, only affects calling the read method in the 'readable' event callback to read data) }2.4.6. pause
Change the stream from flowing mode to paused mode, stop firing the 'data' event, and save all data to the buffer
readable.pause
Readable.prototype.pause = function() { if (this._readableState.flowing !== false) { debug('pause'); this._readableState.flowing = false; this.emit('pause'); } return this; };
2.5. Usage and working mechanism
The usage method has been mentioned in the BufferList section. Create a Readable instance and implement its _read() method, or implement the read method in the first object parameter of the constructor.
2.5.1. Working mechanismHere we only draw the general process and the mode conversion triggering conditions of the Readable stream.
in: