노드 스트림에는 4가지 유형이 있습니다. 1. 읽기 가능(읽기 가능한 스트림). 콘텐츠를 반환하려면 "_read" 메서드를 구현해야 합니다. 2. 쓰기 가능(쓰기 가능 스트림), 콘텐츠를 허용하려면 "_write" 메서드를 구현해야 합니다. 3. 이중(읽기 및 쓰기 가능 스트림), "_read" 및 " 콘텐츠를 수락하고 반환하려면 _write" 메서드를 구현해야 합니다. 4. 변환(변환 스트림), 수신된 콘텐츠를 변환하고 콘텐츠를 반환하려면 "_transform" 메서드를 구현해야 합니다.
이 튜토리얼의 운영 환경: Windows 7 시스템, nodejs 버전 16, DELL G3 컴퓨터.
스트림은 Nodejs의 매우 기본적인 개념입니다. 많은 기본 모듈이 스트림을 기반으로 구현되며 매우 중요한 역할을 합니다. 동시에 흐름은 이해하기 매우 어려운 개념이기도 합니다. 이는 주로 관련 문서가 부족하기 때문입니다. 다행히도 NodeJ 초보자의 경우 이 개념을 완전히 익히기 전에 흐름을 이해하는 데 많은 시간이 걸립니다. 대부분의 NodeJ에서는 사용자의 경우 웹 애플리케이션을 개발하는 데만 사용됩니다. 스트림에 대한 이해가 부족해도 사용에 영향을 미치지 않습니다. 그러나 스트림을 이해하면 NodeJ의 다른 모듈을 더 잘 이해할 수 있으며 경우에 따라 스트림을 사용하여 데이터를 처리하면 더 나은 결과를 얻을 수 있습니다.
Stream은 Node.js에서 스트리밍 데이터를 처리하기 위한 추상 인터페이스입니다. 스트림은 실제 인터페이스가 아니라 모든 스트림을 가리키는 일반적인 용어입니다. 실제 인터페이스에는 ReadableStream, WritableStream 및 ReadWriteStream이 포함됩니다.
인터페이스 ReadableStream은 EventEmitter를 확장합니다. read(size?: number): string | setEncoding(encoding: BufferEncoding): this; T는 WritableStream>(destination: T, options?: { end?: boolean | undefine; }): T; unpipe(destination?: WritableStream): unshift(chunk: string | Uint8Array, 인코딩?: BufferEncoding): void ; Wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;} 인터페이스 WritableStream은 EventEmitter를 확장합니다. { writable: boolean; string, cb?: 오류 | null) => void): boolean; write(str: 문자열, 인코딩?: BufferEncoding, cb?: (err?: 오류 | null) => void): boolean; ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, 인코딩?: BufferEncoding, cb?: () => void): this;}인터페이스 ReadWriteStream ReadableStream, WritableStream { }을 확장합니다.ReadableStream과 WritableStream은 모두 EventEmitter 클래스를 상속하는 인터페이스임을 알 수 있습니다(ts의 인터페이스는 병합 유형이므로 클래스를 상속할 수 있습니다).
위의 인터페이스에 해당하는 구현 클래스는 각각 Readable, Writable 및 Duplex입니다.
NodeJ에는 4가지 유형의 스트림이 있습니다.
읽기 가능 읽기 가능 스트림(ReadableStream 구현)
쓰기 가능 쓰기 가능 스트림(WritableStream 구현)
Duplex는 읽기 및 쓰기 가능한 스트림입니다(Readable을 상속한 후 WritableStream 구현).
변환 변환 스트림(Duplex에서 상속됨)
모두 구현할 메서드가 있습니다.
Readable은 콘텐츠를 반환하기 위해 _read 메소드를 구현해야 합니다.
쓰기 가능 항목은 콘텐츠를 허용하기 위해 _write 메서드를 구현해야 합니다.
Duplex는 콘텐츠를 수락하고 반환하기 위해 _read 및 _write 메서드를 구현해야 합니다.
Transform은 수신된 콘텐츠를 변환하고 반환하기 위해 _transform 메서드를 구현해야 합니다.
Readable은 두 가지 모드와 세 가지 상태를 갖는 스트림 유형입니다.
두 가지 읽기 모드:
흐름 모드: 데이터는 기본 시스템에서 버퍼로 읽고 쓰여집니다. 버퍼가 가득 차면 데이터는 EventEmitter를 통해 가능한 한 빨리 등록된 이벤트 핸들러에 자동으로 전달됩니다.
일시 중지 모드: 이 모드에서는 EventEmitter가 데이터 전송을 위해 적극적으로 트리거되지 않습니다. 버퍼에서 데이터를 읽으려면 Readable.read() 메서드를 명시적으로 호출해야 합니다. 그러면 EventEmitter 이벤트에 대한 응답이 트리거됩니다.
세 가지 상태:
readableFlowing === null(초기 상태)
readableFlowing === false(일시 중지 모드)
readableFlowing === true(플로우 모드)
스트림의 readable.readableFlowing은 처음에는 null입니다.
데이터 이벤트를 추가하면 true가 됩니다. Pause(), unpipe()가 호출되거나 back Pressure가 수신되거나 readable 이벤트가 추가되면 readableFlowing이 false로 설정됩니다. 이 상태에서 리스너를 데이터 이벤트에 바인딩하면 readableFlowing이 true로 전환되지 않습니다.
이력서()를 호출하면 읽기 가능한 스트림의 readableFlowing을 true로 전환할 수 있습니다.
모든 읽기 가능한 이벤트를 제거하는 것이 readableFlowing을 null로 만드는 유일한 방법입니다.
이벤트 이름 설명 readable은 버퍼에 새로운 읽을 수 있는 데이터가 있을 때 발생합니다. (노드가 캐시 풀에 삽입될 때마다 발생합니다.) data는 데이터가 소비될 때마다 발생하며, 매개변수는 이번에 소비되는 데이터입니다. 닫기 스트림이 닫힐 때 오류 스트림이 트리거됩니다. 오류가 발생하면 트리거 메소드 이름은 read(size)가 크기 길이의 데이터를 소비함을 나타냅니다. null을 반환하면 현재 데이터가 크기보다 작음을 나타냅니다. 이번에 소비된 데이터가 반환됩니다. size가 전달되지 않으면 캐시 풀의 모든 데이터를 소비한다는 의미입니다. const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// 캐시 풀 float value})readStreams.on('reading', () => { console.log('buffer full') readStreams.read()// 버퍼 풀의 모든 데이터를 소비하고 결과를 반환하고 데이터 이벤트를 트리거합니다.}) readStreams.on('data ', (data) => { console.log('data')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/reader.js#L527
크기가 0이면 읽기 가능한 이벤트가 트리거됩니다.
캐시 풀의 데이터 길이가 float 값인 highWaterMark에 도달하면 생산 데이터를 적극적으로 요청하지 않고 데이터가 소비될 때까지 기다린 후 데이터를 생산합니다.
일시 중지된 상태의 스트림이 데이터를 소비하기 위해 read를 호출하지 않으면 data 및 reader는 나중에 트리거되지 않습니다. read가 소비하기 위해 호출되면 먼저 이 소비 후 남은 데이터의 길이가 float보다 작은지 확인합니다. float 값보다 낮을 경우 소비 전 생산 데이터가 요청됩니다. 이런 식으로 읽기가 완료된 후 논리 실행이 완료되면 새 데이터가 생성될 가능성이 높으며, 다음으로 소비되는 데이터를 미리 생성하여 캐시 풀에 저장하는 메커니즘도 있습니다. 캐시 스트림이 빠른 이유.
흐르는 상태에는 두 가지 흐름 상황이 있습니다.
생산 속도가 소비 속도보다 느린 경우: 이 경우 일반적으로 각 생산 데이터 이후 캐시 풀에 남은 데이터가 없으며, 이번에 생성된 데이터는 데이터 이벤트로 직접 전달될 수 있습니다(왜냐하면 캐시 풀에 들어가므로 읽기를 호출할 필요도 없습니다. 그런 다음 즉시 새 데이터 생성을 시작합니다. 마지막 데이터가 소비될 때까지 데이터가 다시 생성됩니다. . 생산 속도가 소비 속도보다 빠른 경우: 이때, 각 데이터 생산 이후 일반적으로 캐시 풀에 소비되지 않은 데이터가 있는 경우가 많습니다. 이 경우 일반적으로 다음 데이터 소비는 데이터가 소비된 시점부터 시작됩니다. 기존 데이터가 소비되고 새 데이터가 생성되어 캐시 풀에 배치됩니다.이들 사이의 유일한 차이점은 데이터가 생성된 후에도 캐시 풀에 데이터가 여전히 존재하는지 여부입니다. 데이터가 존재하는 경우 생성된 데이터는 소비를 기다리기 위해 캐시 풀에 푸시됩니다. 캐시 풀에 추가하지 않고 데이터에 직접 전달됩니다.
캐시 풀에 데이터가 있는 스트림이 일시 중지 모드에서 흐름 모드로 전환되면 null이 반환될 때까지 데이터를 소비하기 위해 루프에서 read가 호출된다는 점은 주목할 가치가 있습니다.
일시정지 모드에서 읽기 가능한 스트림이 생성되면 모드는 일시정지 모드가 됩니다. 생성 후 버퍼 풀의 데이터가 float 값에 도달할 때까지 데이터 소스에서 버퍼 풀로 데이터를 푸시하기 위해 _read 메서드가 자동으로 호출됩니다. 데이터가 float 값에 도달할 때마다 읽기 가능한 스트림은 "읽기 가능한" 이벤트를 트리거하여 데이터가 준비되었으며 계속 사용할 수 있음을 소비자에게 알립니다.
일반적으로 '읽기 가능' 이벤트는 스트림의 새로운 활동을 나타냅니다. 즉, 새 데이터가 있거나 스트림 끝에 도달했습니다. 따라서 데이터 소스의 데이터를 읽기 전에 '읽기 가능' 이벤트도 트리거됩니다.
소비자 "읽기 가능" 이벤트의 핸들러 함수에서는 stream.read(size)를 통해 버퍼 풀의 데이터를 적극적으로 소비합니다.
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // 매개변수의 read 메소드는 소스 데이터를 얻기 위해 스트림의 _read 메소드로 사용됩니다. read( size) { / / 소스 데이터에 1000개의 1이 있다고 가정합니다. let Chunk = null // 데이터를 읽는 프로세스는 일반적으로 IO 작업과 같이 비동기식입니다. setTimeout(() => { if (count > 0) { let ChunkLength = Math .min( count, size) Chunk = '1'.repeat(chunkLength) count -= ChunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on('은 데이터가 실행될 때마다 트리거됩니다. 성공적으로 캐시 풀에 푸시됨) readable', () => { const Chunk = myReadable.read()//현재 캐시 풀의 모든 데이터 사용 console.log(chunk.toString())})read(size)의 크기가 float 값보다 크면 새 float 값이 다시 계산되고 새 float 값은 크기의 다음 2제곱입니다(size <= 2^n, n은 최소값)
// hwm은 1GB보다 크지 않습니다.const MAX_HWM = 0x40000000; function ComputeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB 제한 n = MAX_HWM } else { // 다음으로 가장 높은 2의 거듭제곱을 제거합니다. 과도한 증가 방지 n--; n |= n >>> n |= n >>> n |= n >> > 16 ; n++; n을 반환;}읽을 수 있는 모든 스트림은 일시 중지 모드에서 시작하며 다음 방법을 통해 흐름 모드로 전환할 수 있습니다.
"데이터" 이벤트 핸들러를 추가합니다. "재개" 메소드를 호출하여 쓰기 가능한 스트림에 데이터를 보냅니다.흐름 모드에서는 버퍼 풀의 데이터가 소비자에게 자동으로 출력되어 소비됩니다. 동시에 각 데이터 출력 후에 _read 메서드가 자동으로 호출되어 데이터 소스의 데이터를 버퍼 풀에 넣습니다. 버퍼 풀이 데이터가 없으면 흐름 모드가 다른 일시 중지 모드로 전환되거나 데이터 소스의 데이터를 읽을 때까지 데이터가 캐시 풀을 거치지 않고 데이터 이벤트로 직접 전달됩니다. (널));
읽기 가능한 스트림은 다음을 통해 일시 중지 모드로 다시 전환할 수 있습니다.
파이프라인 대상이 없으면 stream.pause()가 호출됩니다. 파이프라인 대상이 있는 경우 모든 파이프라인 대상을 제거합니다. stream.unpipe()를 호출하여 여러 파이프 대상을 제거할 수 있습니다. const { 읽기 가능 } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, read(size) { let Chunk = null setTimeout(() => { if (count > 0) { let ChunkLength = Math.min(count, size) Chunk = '1'.repeat(chunkLength) count -= ChunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})읽기 가능한 스트림에 비해 쓰기 가능한 스트림은 더 간단합니다.
생산자가 write(chunk)를 호출하면 내부적으로 이를 버퍼 큐에 캐시할지 아니면 일부 상태(corked, writing 등)에 따라 _write를 호출할지 선택합니다. 데이터가 기록될 때마다 삭제를 시도합니다. 캐시 큐의 데이터. 버퍼 큐의 데이터 크기가 float 값(highWaterMark)을 초과하면 소비자는 write(chunk) 호출 후 false를 반환합니다. 이때 생산자는 쓰기를 중지해야 합니다.
그럼 언제쯤 계속 쓸 수 있나요? 버퍼의 모든 데이터가 성공적으로 기록되면 버퍼 큐가 지워진 후 배출 이벤트가 트리거됩니다. 이때 생산자는 계속해서 데이터를 쓸 수 있습니다.
생산자가 데이터 쓰기를 완료해야 하는 경우 stream.end 메서드를 호출하여 쓰기 가능한 스트림의 끝을 알려야 합니다.
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, 인코딩, 콜백) {//는 _write 메소드 setTimeout(()으로 사용됩니다. = >{ fileContent += 청크 콜백()// 쓰기가 완료된 후 호출됨}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable .write('123123')// truemyWritable.write('123123')// falsemyWritable.end()캐시 풀의 데이터가 부동 소수점 값에 도달한 후에는 캐시 풀을 지우는 과정(순환 호출 _read) 동안 캐시 풀에 여러 노드가 있을 수 있습니다. 읽기 가능한 스트림입니다. 버퍼 길이가 float 값과 일치하지 않더라도 float 값의 데이터는 한 번에 하나의 버퍼 노드를 사용합니다.
const { 쓰기 가능 } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, 인코딩, 콜백) { setTimeout(()=>{ fileContent += 청크 console.log ('Consumption', Chunk.toString()) callback()// 쓰기가 완료된 후 호출됨}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})let count = 0function ProductionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}productionData()myWritable.on('drain', ProductionData)위는 float 값이 10인 쓰기 가능한 스트림입니다. 이제 데이터 소스는 0부터 20까지의 연속 숫자 문자열이고, ProductionData는 데이터를 쓰는 데 사용됩니다.
먼저 myWritable.write("0")이 처음 호출되면 캐시 풀에 데이터가 없기 때문에 "0"이 캐시 풀에 들어가지 않고 _wirte에 직접 반환됩니다. .write("0")은 참입니다
myWritable.write("1")이 실행되면 _wirte의 콜백이 아직 호출되지 않았으므로 마지막 데이터가 아직 작성되지 않았음을 나타냅니다. 해당 위치는 데이터 쓰기의 순서를 보장합니다. "1"을 저장합니다. " 캐시 풀에 추가합니다. 이는 다음 2~9에도 해당됩니다.
myWritable.write("10")이 실행되면 버퍼 길이는 9(1-9)이고 아직 float 값에 도달하지 않았습니다. "10"은 계속해서 캐시 풀에 버퍼로 추가되고 캐시 풀은 길이가 11이 되므로 myWritable.write("1")는 false를 반환합니다. 이는 버퍼의 데이터가 충분하고 데이터를 다시 생성하기 위해 배수 이벤트 알림을 기다려야 함을 의미합니다.
100ms 후에 _write("0", 인코딩, 콜백)의 콜백이 호출되어 "0"이 기록되었음을 나타냅니다. 그런 다음 캐시 풀에 데이터가 있는지 확인합니다. 데이터가 있으면 먼저 _read를 호출하여 캐시 풀의 헤드 노드("1")를 소비한 다음 캐시 풀이 빌 때까지 이 프로세스를 계속 반복합니다. , 배수 이벤트를 트리거하고 ProductionData를 다시 실행하십시오.
myWritable.write("11")를 호출하여 1단계부터 시작하여 스트림이 끝날 때까지 프로세스를 트리거합니다.
readable 스트림과 writable 스트림을 이해하고 나면 duplex 스트림은 실제로 readable 스트림을 상속받아서 writable 스트림을 구현한다는 것을 이해하기 쉽습니다. 동시에 읽기 및 쓰기 가능한 스트림을 갖는 것이 더 좋습니다).
이중 흐름은 다음 두 가지 방법을 동시에 구현해야 합니다.
읽을 수 있는 스트림을 위한 데이터를 생성하는 _read() 메서드를 구현합니다.
쓰기 가능한 스트림에 대한 데이터를 소비하는 _write() 메서드 구현
위의 두 가지 방법을 구현하는 방법은 위의 쓰기 가능 스트림과 읽기 가능 스트림에 소개되었습니다. 여기서 주목해야 할 점은 이중 스트림에 대해 각각 두 개의 독립적인 버퍼 풀이 있으며 해당 데이터 소스도 동일하지 않다는 것입니다.
NodeJ의 표준 입력 및 출력 스트림을 예로 들어 보겠습니다.
콘솔에 데이터를 입력하면 데이터 이벤트가 트리거되어 읽기 가능한 스트림 기능이 있음을 증명합니다. 이는 사용자가 Enter를 입력할 때마다 읽기 가능한 푸시 메서드를 호출하여 생성된 데이터를 푸시하는 것과 같습니다. write 메소드를 호출하면 콘텐츠를 콘솔에 출력할 수도 있지만 데이터 이벤트는 트리거되지 않습니다. 이는 쓰기 가능한 스트림 기능이 있고 _write 메소드의 구현이 있음을 보여줍니다. 콘솔이 텍스트를 표시하도록 허용합니다. // 사용자가 콘솔(_read)에 데이터를 입력할 때마다 읽기 가능한 스트림의 특성인 데이터 이벤트가 트리거됩니다. process.stdin.on('data', data=>{ process.stdin.write(data ); })// 매초마다 표준 입력 스트림에 데이터를 생성하며(이것은 콘솔에 직접 출력되는 쓰기 가능한 스트림의 기능입니다) ('사용자 콘솔에서 입력한 데이터가 아닙니다')}, 1000)이중 스트림은 쓰기 가능한 스트림이 있는 읽기 가능한 스트림으로 생각할 수 있습니다. 둘 다 독립적이며 각각 독립적인 내부 버퍼를 가지고 있습니다. 읽기 및 쓰기 이벤트는 독립적으로 발생합니다.
이중 스트림 -----| 읽기 <------ 외부 소스 -----| 쓰기 -----> 외부 싱크 -----|변환 스트림은 양방향이며, 읽기 및 쓰기가 인과관계로 발생합니다. 이중 스트림의 끝점은 일부 변환을 통해 연결됩니다. 읽기를 수행하려면 쓰기가 필요합니다.
스트림 변환 --------------|--------------- 당신이 쓰세요 ----> ----> 당신을 읽으세요 ----- --------|---------------Transform 스트림을 생성할 때 가장 중요한 것은 _write 또는 _read 대신 _transform 메서드를 구현하는 것입니다. _transform에서는 쓰기 가능한 스트림에 의해 작성된 데이터가 처리(소비)된 다음 읽기 가능한 스트림에 대한 데이터가 생성됩니다.
변환 스트림은 종종 스트림이 끝나기 전에 호출되는 `_flush` 메서드를 구현합니다. 이는 일반적으로 스트림 끝에 무언가를 추가하는 데 사용됩니다. 예를 들어 파일을 압축할 때 일부 압축 정보가 여기에 추가됩니다. } = require('fs')const { 변환, 통과 } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const 변환 = new Transform({ highWaterMark: 10, 변환(청크,인코딩, 뒤로){ // 변환 data, push를 호출하여 변환 결과를 캐시 풀에 추가 this.push(chunk.toString().replace('1', '@')) callback() }, flash(callback){//this.push 실행 (' 종료 전 트리거 <<<') callback() }})// 쓰기는 계속해서 데이터를 씁니다 let count = 0transform.write('>>>')function ProductionData() { let flag = true while (count <= 20 && 플래그) { 플래그 = 변환.write(count.toString()) count++ } if (count > 20) { 변환.end() }}productionData()transform.on('drain', ProductionData)let result = '' 변환.on( '데이터', 데이터=>{ 결과 += data.toString()})transform.on('end', ()=>{ console.log(result) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})