1.1. 스트림의 역사적 진화
스트림은 Nodejs에만 있는 개념이 아닙니다. 이는 수십 년 전에 Unix 운영 체제에 도입되었으며 프로그램은 파이프 연산자(|)를 통해 스트림에서 서로 상호 작용할 수 있습니다.
파이프 연산자(|)는 Unix 시스템 기반의 MacOS 및 Linux에서 사용할 수 있으며 연산자 왼쪽의 프로세스 출력을 오른쪽의 입력으로 변환할 수 있습니다.
Node에서 파일을 읽기 위해 전통적인 readFile을 사용하면 파일은 처음부터 끝까지 메모리로 읽혀집니다. 모든 내용을 읽었을 때 메모리에 로드된 파일의 내용은 균일하게 처리됩니다.
이 작업에는 두 가지 단점이 있습니다.
메모리: 많은 메모리를 차지합니다.
시간: 데이터 처리를 시작하기 전에 데이터의 전체 페이로드가 로드될 때까지 기다려야 합니다
. .js는 스트림 개념을 따르고 구현했습니다. .js 스트림에는
읽기 가능한 스트림,
쓰기 가능한 스트림,
읽기 가능 및 쓰기 가능한 전이중 스트림()
Duplex Stream)
Transform Stream(Transform Stream)
이 부분을 심도 있게 연구하고 Node.js에서 스트림의 개념을 점차적으로 이해하기 위해, 그리고 소스코드 부분이 상대적으로 복잡하기 때문에 이 부분을 읽을 수 있는 스트림부터 배우기로 했습니다. .
1.2 스트림이란?
스트림은 데이터의 집합인 추상 데이터 구조입니다. 여기에 저장되는 데이터 유형은 다음과 같습니다(objectMode === false인 경우
. 액체와 마찬가지로 이러한 데이터의 집합으로 간주되는 스트림을 사용할 수 있습니다. 먼저 이러한 액체를 컨테이너(스트림의 내부 버퍼 BufferList)에 저장하고 해당 이벤트가 트리거되면 내부의 액체를 파이프에 붓습니다. 그리고 다른 사람들에게 파이프 반대편에 액체를 담아 폐기할 수 있는 용기를 마련하라고 알려주세요.
1.3 읽기 가능한 스트림이란 무엇입니까?
읽기 가능한 스트림은 스트림 유형입니다. 여기에는 두 가지 모드, 세 가지 상태
및 두 가지 읽기 모드가 있습니다:
흐름 모드: 데이터는 기본 시스템에서 읽혀지고 가능한 한 빨리 EventEmitter를 통해 전달됩니다.
일시
에서 등록된 이벤트 핸들러로 전달됩니다
. 이 모드에서는 데이터를 읽을 수 없으며 스트림에서 데이터를 읽으려면 Stream.read() 메서드를 명시적으로 호출해야 합니다.
readableFlowing = = = null: 데이터가 생성되지 않습니다. Stream.pipe() 및 Stream.resume을 호출하면 상태가 true로 변경되고 데이터 생성이 시작되며 이벤트가 적극적으로 트리거됩니다
readableFlowing === false: 이 시점에서 데이터 흐름이 일시 중지됩니다. , 그러나 그렇지 않습니다. 데이터 생성이 중단되므로 데이터 백로그가 발생합니다.
readableFlowing === true: 일반적으로 데이터를 생성하고 소비합니다
2.1 내부 상태 정의(ReadableState)
ReadableState
: ReadableState objectMode: false, // string, Buffer, null을 제외한 다른 유형의 데이터를 작동하려면 이 모드를 켜야 합니다. highWaterMark: 16384, // 수위 제한, 1024 * 16, 기본값 16kb, 이 제한을 초과하는 경우 , 호출이 중지됩니다. _read()는 데이터를 버퍼 버퍼로 읽습니다. BufferList { head: null, tail: null, length: 0 }, // 데이터를 저장하는 데 사용되는 버퍼 연결 목록 길이: 0, // 크기 전체 읽기 가능한 스트림 데이터(objectMode가 buffer.length와 같은 경우)pipe: [], // 읽기 가능한 스트림 흐름을 모니터링하는 모든 파이프 큐 저장: null, // 독립 흐름의 상태는 null, false, true입니다. ended: false, // 모든 데이터가 소비되었습니다. endEmitted: false, // 종료 이벤트가 전송되었는지 여부: false, // 데이터를 읽는 중인지 여부 constructor: true, // 스트림을 처리하기 전에는 처리할 수 없습니다. 생성되거나 실패합니다. Destroy sync: true, // '읽기 가능'/'데이터' 이벤트를 동기적으로 트리거할지 아니면 다음 틱까지 기다릴지 여부입니다. needReadable: false, // 읽을 수 있는 이벤트를 전송해야 하는지 여부가 방출됩니다. has been Called errorEmitted: false, // 오류 이벤트가 전송되었습니다. EmitClose: true, // 스트림이 소멸되면 닫기 이벤트를 보낼지 여부 autoDestroy: true, // 자동으로 소멸되며, 'end' 이후에 호출됩니다. event is Triggered destroy: false, // 스트림이 파괴되었는지 여부 errored: null, // 스트림이 오류를 보고했는지 식별합니다. close: false, // 스트림이 닫혔는지 여부 closeEmitted: false, // 닫혔는지 여부 이벤트가 전송되었습니다. defaultEncoding: 'utf8', // 기본 문자 인코딩 형식 waitDrainWriters: null, // 이벤트의 모니터링되는 'drain' 작성자 참조를 가리킵니다. 유형은 null, Writable, Set<Writable> multiAwaitDrain: false, // 배수 이벤트를 기다리는 작성자가 여러 명 있는지 여부 readingMore: false, // 더 많은 데이터를 읽을 수 있는지 여부 dataEmitted: false, // 데이터가 전송되었습니다 decoder: null, // 디코더 인코딩: null, // 인코더[Symbol(kPaused)]: null },
2.2 내부 데이터 저장 구현(BufferList)
BufferList는 내부 데이터를 스트림에 저장하는 데 사용되는 컨테이너로 연결된 목록 형태로 설계되었으며 head, tail 및 length의 세 가지 속성을 갖습니다.
BufferList의 각 노드를 BufferNode로 표현하고 내부의 Data 유형은 objectMode에 따라 다릅니다.
이 데이터 구조는 Array.prototype.shift()보다 더 빠르게 헤더 데이터를 얻습니다.
2.2.1. 데이터 저장 유형objectMode === true인 경우:
그러면 데이터가 푸시되는 모든 유형이 저장됩니다.
객체 모드=true
const Stream = require('스트림'); const readableStream = 새로운 Stream.Readable({ 객체 모드: true, 읽다() {}, }); readableStream.push({ 이름: 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(true); console.log(readableStream._readableState.buffer.tail); readableStream.push('lisa'); console.log(readableStream._readableState.buffer.tail); readableStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Symbol(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
실행 결과:
objectMode === false인 경우:
그러면 데이터는 문자열, Buffer 또는 Uint8Array만 될 수 있습니다.
객체 모드=거짓
const Stream = require('스트림'); const readableStream = 새로운 Stream.Readable({ 객체 모드: 거짓, 읽다() {}, }); readableStream.push({ 이름: 'lisa'});
실행 결과:
2.2.2. 데이터 저장 구조노드 명령줄을 통해 콘솔에서 읽기 가능한 스트림을 생성하여 버퍼의 데이터 변경 사항을 관찰합니다.
물론, 데이터를 푸시하기 전에 _read 메소드를 구현하거나 생성자의 매개변수에 read 메소드를 구현해야 합니다.
const Stream = require('스트림'); const readableStream = 새로운 Stream.Readable(); RS._read = 함수(크기) {}
또는
const Stream = require('스트림'); const readableStream = 새로운 Stream.Readable({ 읽기(크기) {} });
readableStream.push('abc') 작업 후 현재 버퍼는 다음과 같습니다.
현재 데이터가 저장되어 있는 것을 볼 수 있는데, 처음과 끝 부분에 저장된 데이터는 'abc'라는 문자열의 ASCII 코드이고, 종류는 Buffer 형태이다. 데이터 내용.
2.2.3. 관련 APIBufferList의 모든 메소드를 인쇄하면 다음을 얻을 수 있습니다.
BufferList를 문자열로 직렬화하는 조인을 제외하고 나머지는 모두 데이터 액세스 작업입니다.
여기서는 모든 메소드를 하나씩 설명하지는 않고, Consumer, _getString, _getBuffer에 집중하겠습니다.
2.2.3.1.소비
소스 코드 주소: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
소비하다
// 버퍼링된 데이터에서 지정된 양의 바이트 또는 문자를 소비합니다. 소비(n, hasStrings) { const 데이터 = this.head.data; if (n < data.length) { // `slice`은 버퍼와 문자열에서 동일합니다. const 슬라이스 = data.slice(0, n); this.head.data = data.slice(n); 슬라이스 반환; } if (n === 데이터.길이) { // 첫 번째 청크는 완벽하게 일치합니다. return this.shift(); } // 결과는 두 개 이상의 버퍼에 걸쳐 있습니다. hasStrings 반환 ? this._getString(n) : this._getBuffer(n); }
코드에는 세 가지 판단 조건이 있습니다.
소비되는 데이터의 바이트 길이가 연결리스트의 헤드 노드에 저장된 데이터의 길이보다 작을 경우, 헤드 노드의 데이터 중 처음 n바이트를 취하고, 현재 헤드 노드의 데이터를 설정한다. 슬라이싱 후 데이터에.
소모된 데이터가 연결리스트의 헤드 노드에 저장된 데이터의 길이와 정확히 일치하면 현재 헤드 노드의 데이터를 직접 반환한다.
소비되는 데이터의 길이가 링크드 리스트의 헤드 노드의 길이보다 큰 경우, 현재 BufferList의 최하위 레이어에 문자열이 저장되어 있는지 아니면 버퍼가 저장되어 있는지를 결정하기 위해 전달된 두 번째 매개변수에 따라 최종 판단이 내려집니다. .
2.2.3.2._getBuffer
소스 코드 주소: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
소비하다
// 버퍼링된 데이터에서 지정된 양의 바이트를 소비합니다. _getBuffer(n) { const ret = Buffer.allocUnsafe(n); const retLen = n; p = this.head; c = 0이라고 하자; 하다 { const buf = p.data; if (n > buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.길이; } 또 다른 { if (n === buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++c; 만약 (p.다음) this.head = p.next; 또 다른 this.head = this.tail = null; } 또 다른 { TypedArrayPrototypeSet(ret, 새로운 Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); this.head = p; p.data = buf.slice(n); } 부서지다; } ++c; } while ((p = p.next) !== null); this.length -= c; 반환 ret; }
일반적으로 연결된 목록의 노드를 조작하고, 반환된 데이터를 저장하기 위해 새로운 Buffer 배열을 생성하는 루프입니다.
먼저 연결리스트의 헤드 노드에서 데이터를 가져오기 시작하고, 특정 노드의 데이터가 가져오려는 길이에서 가져온 길이를 뺀 값보다 크거나 같을 때까지 새로 생성된 Buffer에 계속 복사합니다.
즉, 연결리스트의 마지막 노드를 읽어도 원하는 길이에 도달하지 못하여 새로 생성된 Buffer를 반환한다.
2.2.3.3._getString
소스 코드 주소: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
소비하다
// 버퍼링된 데이터에서 지정된 양의 문자를 소비합니다. _getString(n) { ret = ''; p = this.head; c = 0이라고 하자; 하다 { const str = p.data; if (n > str.length) { ret += str; n -= 문자열.길이; } 또 다른 { if (n === str.length) { ret += str; ++c; 만약 (p.다음) this.head = p.next; 또 다른 this.head = this.tail = null; } 또 다른 { ret += StringPrototypeSlice(str, 0, n); this.head = p; p.data = StringPrototypeSlice(str, n); } 부서지다; } ++c; } while ((p = p.next) !== null); this.length -= c; 반환 ret; }
문자열의 연산은 버퍼의 연산과 동일하며, 루프에서 연결된 목록의 선두에서 데이터를 읽습니다. 또한 데이터의 복사 및 저장에 약간의 차이가 있습니다. _getString 작업은 문자열 유형입니다.
2.3. EventEmitter의 읽기 가능한 스트림 인스턴스는 무엇입니까?
이 질문에 대해 먼저 게시-구독 모델이 무엇인지 이해해야 합니다. 게시-구독 모델은 Promise이든 Redux이든 상관없이 게시-구독 모델을 기반으로 하는 고급 API를 어디에서나 볼 수 있습니다.
이벤트 관련 콜백 함수를 큐에 저장한 후, 향후 특정 시간에 데이터를 처리하도록 상대방에게 알릴 수 있어 생산자는 데이터만 생산하고 소비자에게 알릴 수 있다는 장점이 있습니다. , 소비자는 해당 이벤트와 해당 데이터만 처리하며 Node.js 스트리밍 모델은 이러한 특성에 딱 맞습니다.
그렇다면 Node.js 스트림은 EventEmitter를 기반으로 인스턴스 생성을 어떻게 구현합니까?
이에 대한 소스 코드는 여기에 있습니다: stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
유산
함수 스트림(옵션) { EE.call(this, opts); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(스트림, EE);
읽을 수 있는 스트림의 소스 코드에는 다음과 같은 코드 줄이 있습니다.
소스 코드의 이 부분은 다음과 같습니다. 읽기 가능 https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
유산
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(읽기 가능, 스트림);
먼저 Stream의 모든 인스턴스가 EventEmitter의 메서드에 액세스할 수 있도록 EventEmitter에서 Stream의 프로토타입 개체를 상속합니다.
동시에 EventEmitter의 정적 메서드도 ObjectSetPrototypeOf(Stream, EE)를 통해 상속되며 Stream의 생성자에서 생성자 EE를 빌려 EventEmitter의 모든 속성 상속을 구현한 다음 읽기 가능한 스트림에서 동일한 방법을 사용합니다. 이 메서드는 Stream 클래스의 프로토타입 상속과 정적 속성 상속을 구현하여 다음을 얻습니다.
Readable.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototype
그러므로:
Readable.prototype.__proto__.__proto__ === EE.prototype
따라서 읽기 가능한 스트림의 프로토타입 체인을 추적하여 EventEmitter의 프로토타입을 찾고 EventEmitter의 상속을 실현할 수 있습니다.
2.4. 관련 API 구현
API는 소스 코드 문서에 나타나는 순서대로 여기에 표시되며 핵심 API 구현만 설명됩니다.
참고: 여기에서는 Node.js 판독 가능 스트림 소스 코드에 선언된 함수만 해석되며, 길이를 줄이기 위해 외부에서 도입된 함수 정의는 포함되지 않습니다.
읽을 수 있는 프로토타입
개울 { destroy: [기능: 파괴], _undestroy: [기능: 파괴되지 않음], _destroy: [함수(익명)], push: [함수(익명)], unshift: [함수(익명)], isPaused: [함수(익명)], setEncoding: [함수(익명)], 읽기: [함수(익명)], _read: [함수(익명)], 파이프: [함수(익명)], unpipe: [함수(익명)], on: [함수(익명)], addListener: [함수(익명)], RemoveListener: [함수(익명)], off: [함수(익명)], RemoveAllListeners: [함수(익명)], 이력서: [함수(익명)], 일시 중지: [함수(익명)], Wrap: [함수(익명)], 반복자: [함수(익명)], [Symbol(nodejs.rejection)]: [함수(익명)], [Symbol(Symbol.asyncIterator)]: [함수(익명)] }2.4.1.푸시
읽기 가능.푸시
Readable.prototype.push = 함수(청크, 인코딩) { 읽을 수 있는AddChunk(this, Chunk, Encoding, false)를 반환합니다. };
푸시 메서드의 주요 기능은 'data' 이벤트를 트리거하여 데이터 블록을 다운스트림 파이프라인에 전달하거나 데이터를 자체 버퍼에 저장하는 것입니다.
다음 코드는 관련 의사코드이며 주요 프로세스만 보여줍니다.
읽기 가능.푸시
함수 읽기 가능AddChunk(스트림, 청크, 인코딩, addToFront) { const 상태 = stream._readableState; if (chunk === null) { // null 스트림 종료 신호를 푸시하면 해당 이후에는 더 이상 데이터를 쓸 수 없습니다. state.reading = false; onEofChunk(스트림, 상태); } else if (!state.objectMode) { // 객체 모드가 아닌 경우 if (typeof Chunk === 'string') { 청크 = Buffer.from(청크); } else if (버퍼 인스턴스 청크) { //버퍼인 경우 // 인코딩 처리} else if (Stream._isUint8Array(chunk)) { 청크 = Stream._uint8ArrayToBuffer(청크); } else if (청크 != null) { err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'],chunk); } } if (state.objectMode || (chunk && Chunk.length > 0)) { // 객체 모드이거나 청크가 버퍼입니다. // 여기에서는 여러 데이터 삽입 방법에 대한 판단을 생략합니다. addChunk(stream, state, Chunk, true); } } 함수 addChunk(스트림, 상태, 청크, addToFront) { if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { // 스트리밍 모드인 경우 데이터를 수신하는 구독자가 있습니다. stream.emit('data', Chunk); } else { // 그렇지 않으면 버퍼에 데이터를 저장합니다. state.length += state.objectMode 1 : Chunk.length; if (addToFront) { state.buffer.unshift(청크); } 또 다른 { state.buffer.push(청크); } } MaybeReadMore(stream, state); // 좀 더 많은 데이터를 읽어보세요.}
푸시 작업은 주로 objectMode를 판단하는 것으로 구분됩니다. 다양한 유형은 수신 데이터에 대해 다양한 작업을 수행합니다.
addChunk의 첫 번째 판단은 주로 Readable이 흐름 모드에 있고, 데이터 리스너가 있고, 버퍼 데이터가 비어 있는 상황을 처리하는 것입니다.
이때 데이터는 주로 데이터 이벤트를 구독하는 다른 프로그램을 통해 전달되고, 그렇지 않으면 데이터가 버퍼에 저장됩니다.
2.4.2.읽기경계조건과 흐름상태의 판단을 제외하고 이 방법은 주로 두 가지 작업을 수행한다.
실행 결과를 처리하기 위해 사용자가 구현한 _read 메서드를 호출합니다.
버퍼 버퍼에서 데이터를 읽고 'data' 이벤트를 트리거합니다.
읽을 수 있습니다.읽기
// 읽은 길이가 hwm보다 길면 hwm이 다시 계산됩니다. if (n > state.highWaterMark) { state.highWaterMark = 계산NewHighWaterMark(n); } // 사용자가 구현한 _read 메소드를 호출합니다. const 결과 = this._read(state.highWaterMark); if (결과 != null) { const then = 결과.then; if (typeof then === '함수') { then.call( 결과, 아니, 함수(오류) { errorOrDestroy(this, err); }); } } } 잡기 (오류) { errorOrDestroy(this, err); }
사용자가 구현한 _read 메서드가 Promise를 반환하는 경우 이 Promise의 then 메서드를 호출하고 성공 및 실패 콜백을 전달하여 예외 처리를 용이하게 합니다.
버퍼에서 존 데이터를 읽기 위한 read 메소드의 핵심 코드는 다음과 같습니다.
읽을 수 있습니다.읽기
함수 fromList(n, 상태) { // 버퍼링된 것이 없습니다. if (state.length === 0) null을 반환; 리트하자; if (state.objectMode) ret = state.buffer.shift(); else if (!n || n >= state.length) { // n이 비어 있거나 버퍼 길이보다 큰 경우를 처리합니다. // 모두 읽고 목록을 자릅니다. if (state.decoder) // 디코더가 있으면 결과를 문자열로 직렬화합니다. ret = state.buffer.join(''); else if (state.buffer.length === 1) // 데이터가 하나뿐이면 헤드 노드 데이터를 반환합니다. ret = state.buffer.first(); else // 모든 데이터를 버퍼에 저장합니다. ret = state.buffer.concat(state.length); state.buffer.clear(); // 버퍼 지우기} else { // 읽기 길이가 버퍼보다 작은 상황을 처리합니다. ret = state.buffer.consume(n, state.decoder); } 반환 ret; }2.4.3.
사용자가 Readable 스트림을 초기화할 때 구현해야 하는 메서드입니다. 이 메서드에서 push 메서드를 호출하면 읽기 메서드를 계속해서 트리거할 수 있습니다. null을 푸시하면 스트림 쓰기 작업을 중지할 수 있습니다.
샘플 코드:
읽을 수 있습니다._read
const Stream = require('스트림'); const readableStream = 새로운 Stream.Readable({ 읽기(hwm) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 122) { this.push(null); } }, }); readableStream.currentCharCode = 97; readableStream.pipe(process.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4. 파이프(중요)
하나 이상의 쓰기 가능 스트림을 현재 읽기 가능 스트림에 바인딩하고 읽기 가능 스트림을 흐름 모드로 전환합니다.
이 메서드에는 많은 이벤트 수신 핸들이 있으므로 여기서는 하나씩 소개하지 않겠습니다.
읽을 수 있는 파이프
Readable.prototype.pipe = function(dest, PipeOpts) { const src = 이것; const state = this._readerState; state.pipes.push(dest); // 쓰기 가능한 스트림 수집 src.on('data', ondata); 함수 온데이터(청크) { const ret = dest.write(청크); if (ret === false) { 정지시키다(); } } // 파이프로 전달되고 있음을 대상에게 알립니다. dest.emit('파이프', src); // 스트림이 일시 정지 모드인 경우 스트림을 시작합니다. if (dest.writableNeedDrain === true) { if (state.flowing) { 정지시키다(); } } else if (!state.flowing) { src.resume(); } 목적지를 반환; }
파이프 작업은 Linux 파이프 연산자 '|'와 매우 유사하며 왼쪽 출력을 오른쪽 입력으로 변경합니다. 이 방법은 유지 관리를 위해 쓰기 가능한 스트림을 수집하고 읽기 가능한 스트림을 사용할 수 있을 때 '데이터' 이벤트를 트리거합니다.
데이터가 유출되면 쓰기 가능한 스트림의 쓰기 이벤트가 트리거되어 데이터가 전송되고 파이프라인과 같은 작업이 실현될 수 있습니다. 그리고 일시 정지 모드에서 읽기 가능한 스트림을 흐름 모드로 자동 변경합니다.
2.4.5.스트림을 '일시 중지' 모드에서 '흐름' 모드로 전환합니다. '읽기 가능' 이벤트 리스너가 설정된 경우 이 메서드는 효과가 없습니다.
읽기 가능.이력서
Readable.prototype.resume = function() { const 상태 = this._readerState; if (!state.flowing) { state.flowing = !state.readableListening; // 유동 모드인지 여부는 '읽기 가능' 청취 핸들이 설정되어 있는지 여부에 따라 다릅니다.resume(this, state); } }; 함수 재개(스트림, 상태) { if (!state.resumeScheduled) { // 동일한 Tick에서 이력서 메소드가 한 번만 호출되도록 전환합니다. state.resumeScheduled = true; process.nextTick(resume_, 스트림, 상태); } } 함수 이력서_(스트림, 상태) { if (!state.reading) { 스트림.읽기(0); } state.resumeScheduled = 거짓; stream.emit('이력서'); 흐름(스트림); } function flow(stream) { // 스트림이 스트리밍 모드일 때 이 메서드는 버퍼가 빌 때까지 버퍼에서 데이터를 계속 읽습니다. const state = stream._readerState; while (state.flowing && stream.read() !== null); // 여기에서 read 메소드가 호출되고 '읽기 가능' 이벤트 리스너의 스트림이 설정되므로 read 메소드도 호출될 수 있습니다. //이로 인해 일관성 없는 데이터가 발생합니다(데이터에는 영향을 주지 않으며, 데이터를 읽기 위해 '읽기 가능' 이벤트 콜백에서 읽기 메서드를 호출하는 데에만 영향을 미칩니다). }2.4.6.
스트림을 흐름 모드에서 일시 중지 모드로 변경하고, 'data' 이벤트 발생을 중지하고, 모든 데이터를 버퍼에 저장합니다.
읽을 수 있음.일시 중지
Readable.prototype.pause = function() { if (this._readableState.flowing !== false) { 디버그('일시 중지'); this._readableState.flowing = false; this.emit('pause'); } 이거 돌려줘; };
2.5. 사용법 및 작동 메커니즘
사용 방법은 BufferList 섹션에서 언급되었습니다. Readable 인스턴스를 만들고 해당 _read() 메서드를 구현하거나 생성자의 첫 번째 개체 매개 변수에 read 메서드를 구현합니다.
2.5.1. 작동 메커니즘여기서는 Readable 스트림의 일반적인 프로세스와 모드 변환 트리거 조건만 그립니다.
안에: