โหนดสตรีมมี 4 ประเภท: 1. อ่านได้ (สตรีมที่อ่านได้) จำเป็นต้องใช้เมธอด "_read" เพื่อส่งคืนเนื้อหา 2. ต้องใช้เมธอด "_write" เพื่อยอมรับเนื้อหา 3. ดูเพล็กซ์ (สตรีมที่อ่านได้และเขียนได้) "_read" ต้องใช้วิธี _write" เพื่อยอมรับและส่งคืนเนื้อหา 4. การแปลง (สตรีมการแปลง) คุณต้องใช้วิธี "_transform" เพื่อแปลงเนื้อหาที่ได้รับและส่งคืนเนื้อหา
สภาพแวดล้อมการทำงานของบทช่วยสอนนี้: ระบบ Windows 7, nodejs เวอร์ชัน 16, คอมพิวเตอร์ DELL G3
สตรีมเป็นแนวคิดพื้นฐานใน Nodejs โมดูลพื้นฐานจำนวนมากถูกนำไปใช้ตามสตรีมและมีบทบาทสำคัญมาก ในขณะเดียวกัน Flow ก็เป็นแนวคิดที่เข้าใจยากมาก สาเหตุหลักมาจากการขาดเอกสารที่เกี่ยวข้อง สำหรับผู้เริ่มต้น NodeJs มักต้องใช้เวลามากในการทำความเข้าใจ Flow ก่อนจึงจะสามารถเชี่ยวชาญแนวคิดนี้ได้อย่างแท้จริง สำหรับ NodeJs ส่วนใหญ่ สำหรับผู้ใช้ จะใช้ในการพัฒนาเว็บแอปพลิเคชันเท่านั้น ความเข้าใจที่ไม่เพียงพอเกี่ยวกับสตรีมจะไม่ส่งผลกระทบต่อการใช้งาน อย่างไรก็ตาม การทำความเข้าใจสตรีมสามารถนำไปสู่ความเข้าใจที่ดีขึ้นเกี่ยวกับโมดูลอื่นๆ ใน NodeJ และในบางกรณี การใช้สตรีมเพื่อประมวลผลข้อมูลจะมีผลลัพธ์ที่ดีกว่า
Stream เป็นอินเทอร์เฟซแบบนามธรรมสำหรับการประมวลผลข้อมูลสตรีมมิ่งใน Node.js สตรีมไม่ใช่อินเทอร์เฟซที่แท้จริง แต่เป็นคำทั่วไปสำหรับสตรีมทั้งหมด อินเทอร์เฟซที่แท้จริง ได้แก่ ReadableStream, WritableStream และ ReadWriteStream
อินเทอร์เฟซ ReadableStream ขยาย EventEmitter { อ่านได้: boolean; read(size?: number): string | .buffer; setEncoding(encoding: BufferEncoding): this; T ขยาย WritableStream>(ปลายทาง: T, options?: { end?: boolean | undefinition; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void ; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string |. Buffer>;} อินเทอร์เฟซ WritableStream ขยาย EventEmitter { เขียนได้: boolean; ข้อผิดพลาด | .null) => เป็นโมฆะ): บูลีน; เขียน (str: สตริง, การเข้ารหัส?: BufferEncoding, cb?: (ผิดพลาด?: ข้อผิดพลาด | null) => โมฆะ): บูลีน; สิ้นสุด (cb?: () => void ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this;} อินเทอร์เฟซ ReadWriteStream ขยาย ReadableStream, WriteableStream { }จะเห็นได้ว่า ReadableStream และ WritableStream เป็นอินเทอร์เฟซที่สืบทอดคลาส EventEmitter (อินเทอร์เฟซใน ts สามารถสืบทอดคลาสได้ เนื่องจากเป็นเพียงประเภทที่ผสานกัน)
คลาสการใช้งานที่สอดคล้องกับอินเทอร์เฟซข้างต้นคือ Readable, Writable และ Duplex ตามลำดับ
สตรีมใน NodeJs มี 4 ประเภท:
สตรีมที่อ่านได้ (ใช้งาน ReadableStream)
สตรีมที่เขียนได้แบบเขียนได้ (ใช้งาน WritableStream)
Duplex เป็นสตรีมที่สามารถอ่านและเขียนได้ (การนำ WritableStream ไปใช้หลังจากสืบทอด Readable)
แปลงสตรีม Conversion (สืบทอดมาจาก Duplex)
ล้วนมีวิธีปฏิบัติ ดังนี้
Readable จำเป็นต้องใช้เมธอด _read เพื่อส่งคืนเนื้อหา
Writable จำเป็นต้องใช้เมธอด _write เพื่อยอมรับเนื้อหา
Duplex จำเป็นต้องใช้เมธอด _read และ _write เพื่อยอมรับและส่งคืนเนื้อหา
การแปลงจำเป็นต้องใช้วิธี _transform เพื่อแปลงเนื้อหาที่ได้รับและส่งคืน
Readable เป็นสตรีมประเภทหนึ่ง มีสองโหมดและสามสถานะ
สองโหมดการอ่าน:
โหมดโฟลว์: ข้อมูลจะถูกอ่านและเขียนจากระบบพื้นฐานไปยังบัฟเฟอร์ เมื่อบัฟเฟอร์เต็ม ข้อมูลจะถูกส่งผ่านไปยังตัวจัดการเหตุการณ์ที่ลงทะเบียนโดยอัตโนมัติโดยเร็วที่สุดผ่าน EventEmitter
โหมดหยุดชั่วคราว: ในโหมดนี้ EventEmitter จะไม่ถูกทริกเกอร์ให้ส่งข้อมูล จะต้องเรียกเมธอด Readable.read() อย่างชัดเจนเพื่ออ่านข้อมูลจากบัฟเฟอร์ การอ่าน จะทริกเกอร์การตอบสนองต่อเหตุการณ์ EventEmitter
สามรัฐ:
readableFlowing === null (สถานะเริ่มต้น)
readableFlowing === เท็จ (โหมดหยุดชั่วคราว)
readableFlowing === จริง (โหมดโฟลว์)
readable.readableFlowing ของกระแสข้อมูลเป็นโมฆะในขั้นต้น
มันจะกลายเป็นจริงหลังจากเพิ่มเหตุการณ์ข้อมูล เมื่อ Pause(), Unpipe() ถูกเรียก หรือได้รับ Back Pressure หรือมีการเพิ่มเหตุการณ์ที่อ่านได้ ReadableFlowing จะถูกตั้งค่าเป็น False ในสถานะนี้ การผูก Listener เข้ากับเหตุการณ์ข้อมูลจะไม่เปลี่ยน ReadableFlowing เป็นจริง
การเรียก resume() สามารถเปลี่ยน readableFlowing ของสตรีมที่อ่านได้ให้เป็นจริง
การลบเหตุการณ์ที่อ่านได้ทั้งหมดเป็นวิธีเดียวที่จะทำให้ readableFlowing null ได้
คำอธิบายชื่อเหตุการณ์ที่สามารถอ่านได้จะถูกทริกเกอร์เมื่อมีข้อมูลใหม่ที่อ่านได้ในบัฟเฟอร์ (จะถูกทริกเกอร์ทุกครั้งที่มีการแทรกโหนดลงในพูลแคช) ข้อมูลจะถูกทริกเกอร์ทุกครั้งที่มีการใช้ข้อมูล พารามิเตอร์คือข้อมูลที่ใช้ในครั้งนี้ สตรีมข้อผิดพลาดจะถูกทริกเกอร์เมื่อปิดสตรีม เมื่อมีข้อผิดพลาดเกิดขึ้น ชื่อวิธีการทริกเกอร์บ่งชี้ว่าการอ่าน (ขนาด) ใช้ข้อมูลที่มีความยาวขนาดที่ส่งคืน แสดงว่าข้อมูลปัจจุบันมีขนาดเล็กกว่าขนาด ข้อมูลที่ใช้ไปในครั้งนี้จะถูกส่งคืน เมื่อขนาดไม่ผ่าน หมายถึงการใช้ข้อมูลทั้งหมดในพูลแคช const fs = need('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// cache pool float value})readStreams. on('readable', () => { console.log('buffer full') readStreams.read()// ใช้ข้อมูลทั้งหมดในพูลบัฟเฟอร์ ส่งคืนผลลัพธ์และทริกเกอร์เหตุการณ์ข้อมูล}) readStreams.on('data ', (data) => { console.log('data')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
เมื่อขนาดเป็น 0 เหตุการณ์ที่อ่านได้จะถูกทริกเกอร์
เมื่อความยาวของข้อมูลในพูลแคชถึงค่าทศนิยม highWaterMark จะไม่ขอข้อมูลการผลิต แต่จะรอให้ข้อมูลถูกใช้ก่อนที่จะสร้างข้อมูล
หากสตรีมในสถานะหยุดชั่วคราวไม่เรียกให้อ่านเพื่อใช้ข้อมูล ข้อมูลและสิ่งที่อ่านได้จะไม่ถูกทริกเกอร์ในภายหลัง เมื่อมีการเรียกให้อ่าน อันดับแรกจะกำหนดว่าความยาวของข้อมูลที่เหลือหลังจากการบริโภคนี้ต่ำกว่าโฟลตหรือไม่ ค่า หากต่ำกว่าค่าลอยตัว ระบบจะขอข้อมูลการผลิตก่อนการบริโภค ด้วยวิธีนี้ หลังจากการดำเนินการลอจิกหลังจากการอ่านเสร็จสิ้น ข้อมูลใหม่มักจะถูกสร้างขึ้น และจากนั้นสามารถอ่านได้จะถูกทริกเกอร์อีกครั้ง กลไกในการสร้างข้อมูลที่ใช้ถัดไปล่วงหน้าและจัดเก็บไว้ในพูลแคชก็เช่นกัน สาเหตุที่แคชสตรีมเร็ว
มีสองสถานการณ์ของการไหลในสถานะการไหล
เมื่อความเร็วการผลิตช้ากว่าความเร็วการบริโภค: ในกรณีนี้ โดยทั่วไปจะไม่มีข้อมูลเหลืออยู่ในพูลแคชหลังจากข้อมูลการผลิตแต่ละรายการ และข้อมูลที่ผลิตในเวลานี้สามารถส่งผ่านไปยังเหตุการณ์ข้อมูลได้โดยตรง (เนื่องจากไม่ เข้าสู่พูลแคชดังนั้นจึงไม่จำเป็นต้องเรียกการอ่านเพื่อบริโภค) จากนั้นเริ่มสร้างข้อมูลใหม่ทันทีจะไม่ถูกสร้างจนกว่าข้อมูลสุดท้ายจะถูกเรียกใช้อีกครั้งจนกว่าสตรีมจะสิ้นสุด . เมื่อความเร็วในการผลิตเร็วกว่าความเร็วการใช้: ในเวลานี้ หลังจากการผลิตข้อมูลแต่ละครั้ง โดยปกติแล้วจะมีข้อมูลที่ยังไม่ได้ใช้ในพูลแคช ในกรณีนี้ โดยทั่วไปการใช้ข้อมูลครั้งต่อไปจะเริ่มขึ้นเมื่อมีการใช้ข้อมูล และหลังจากนั้น ข้อมูลเก่าถูกใช้ไปแล้ว ข้อมูลใหม่ถูกสร้างขึ้นและวางไว้ในพูลแคชข้อแตกต่างเพียงอย่างเดียวคือข้อมูลยังคงมีอยู่ในพูลแคชหรือไม่ หากมีข้อมูลเกิดขึ้นแล้ว ข้อมูลที่สร้างขึ้นจะถูกส่งไปยังพูลแคชเพื่อรอการบริโภค หากไม่มีข้อมูลก็จะเป็นเช่นนั้น ถูกส่งมอบโดยตรงไปยังข้อมูลโดยไม่ต้องเพิ่มลงในพูลแคช
เป็นที่น่าสังเกตว่าเมื่อสตรีมที่มีข้อมูลในพูลแคชเข้าสู่โหมดโฟลว์จากโหมดหยุดชั่วคราว การอ่านจะถูกเรียกในลูปเพื่อใช้ข้อมูลจนกว่าจะส่งคืนค่าว่าง
ในโหมดหยุดชั่วคราว เมื่อสร้างสตรีมที่อ่านได้ โหมดจะเป็นโหมดหยุดชั่วคราว หลังจากสร้างแล้ว เมธอด _read จะถูกเรียกโดยอัตโนมัติเพื่อส่งข้อมูลจากแหล่งข้อมูลไปยังพูลบัฟเฟอร์จนกว่าข้อมูลในพูลบัฟเฟอร์จะถึงค่าทศนิยม เมื่อใดก็ตามที่ข้อมูลถึงค่าทศนิยม สตรีมที่อ่านได้จะทริกเกอร์เหตุการณ์ "ที่อ่านได้" เพื่อแจ้งให้ผู้บริโภคทราบว่าข้อมูลพร้อมแล้วและสามารถนำไปใช้ต่อไปได้
โดยทั่วไป เหตุการณ์ 'สามารถอ่านได้' จะบ่งบอกถึงกิจกรรมใหม่ในสตรีม: มีข้อมูลใหม่หรือถึงจุดสิ้นสุดของสตรีมแล้ว ดังนั้น ก่อนที่จะอ่านข้อมูลในแหล่งข้อมูล เหตุการณ์ 'ที่สามารถอ่านได้' จะถูกทริกเกอร์เช่นกัน
ในฟังก์ชันตัวจัดการของเหตุการณ์ "ที่อ่านได้" ของผู้บริโภค ข้อมูลในพูลบัฟเฟอร์จะถูกใช้งานผ่าน stream.read(size)
const { Readable } = need('stream')let count = 1,000const myReadable = new Readable({ highWaterMark: 300, // วิธีการอ่านของพารามิเตอร์จะถูกใช้เป็นวิธีการ _read ของสตรีมเพื่อรับข้อมูลต้นฉบับที่อ่าน ( size) { / / สมมติว่าแหล่งข้อมูลของเรามี 1,000 1 วินาที ให้ chunk = null // โดยทั่วไปกระบวนการอ่านข้อมูลเป็นแบบอะซิงโครนัส เช่น การดำเนินการ IO setTimeout(() => { if (count > 0) { la chunkLength = Math .min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' จะถูกทริกเกอร์ทุกครั้งที่ข้อมูลถูกทริกเกอร์ พุชไปยังพูลแคชสำเร็จแล้ว) อ่านได้', () => { const chunk = myReadable.read()//ใช้ข้อมูลทั้งหมดในพูลแคชปัจจุบัน console.log(chunk.toString())})เป็นที่น่าสังเกตว่าหากขนาดของการอ่าน (ขนาด) มากกว่าค่าทศนิยม ค่าทศนิยมใหม่จะถูกคำนวณใหม่และค่าทศนิยมใหม่คือกำลังขนาดวินาทีถัดไปของขนาด (ขนาด <= 2^n, n รับ ค่าต่ำสุด)
// hwm จะไม่ใหญ่กว่า 1GB.const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // ขีดจำกัด 1GB n = MAX_HWM } else { // ลบกำลังสูงสุดถัดไปที่ 2 ถึง ป้องกันมากเกินไป เพิ่ม hwm n--; n |= n |= n >>> 2; n |= n >>> 8; > 16 ; } ส่งคืน n;}สตรีมที่อ่านได้ทั้งหมดเริ่มต้นในโหมดหยุดชั่วคราว และสามารถเปลี่ยนเป็นโหมดต่อเนื่องได้ด้วยวิธีการต่อไปนี้:
เพิ่มตัวจัดการเหตุการณ์ "data" เรียกวิธี "ดำเนินการต่อ" ใช้วิธี "ไปป์" เพื่อส่งข้อมูลไปยังสตรีมแบบเขียนได้ในโหมดโฟลว์ ข้อมูลในบัฟเฟอร์พูลจะถูกส่งออกไปยังผู้ใช้บริการโดยอัตโนมัติเพื่อการบริโภค ในเวลาเดียวกัน หลังจากเอาต์พุตข้อมูลแต่ละรายการ เมธอด _read จะถูกเรียกกลับโดยอัตโนมัติเพื่อนำข้อมูลจากแหล่งข้อมูลไปไว้ในพูลบัฟเฟอร์ หากพูลบัฟเฟอร์เป็นหากไม่มีข้อมูล ข้อมูลจะถูกส่งโดยตรงไปยังเหตุการณ์ข้อมูลโดยไม่ต้องผ่านพูลแคช จนกว่าโหมดโฟลว์จะเปลี่ยนเป็นโหมดหยุดชั่วคราวอื่น หรือข้อมูลจากแหล่งข้อมูลถูกอ่าน (พุช (โมฆะ));
สตรีมที่อ่านได้สามารถเปลี่ยนกลับไปเป็นโหมดหยุดชั่วคราวได้ทาง:
หากไม่มีเป้าหมายไปป์ไลน์ จะมีการเรียก stream.pause() หากมีเป้าหมายไปป์ไลน์ ให้ลบเป้าหมายไปป์ไลน์ทั้งหมด ไปป์เป้าหมายหลายรายการสามารถลบออกได้โดยการเรียก stream.unpipe() const { อ่านได้ } = ต้องการ ('สตรีม') ให้นับ = 1,000 const myReadable = ใหม่ อ่านได้ ({ highWaterMark: 300, อ่าน (ขนาด) { ให้ก้อน = null setTimeout (() => { ถ้า (นับ > 0) { ให้ chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})เมื่อเทียบกับสตรีมที่อ่านได้ สตรีมที่เขียนได้ง่ายกว่า
เมื่อโปรดิวเซอร์เรียก write(chunk) มันจะเลือกภายในว่าจะแคชมันไว้ในคิวบัฟเฟอร์หรือเรียก _write ตามสถานะบางอย่าง (corked, กำลังเขียน ฯลฯ ) หลังจากแต่ละครั้งที่ข้อมูลถูกเขียน มันจะพยายามล้างข้อมูล ข้อมูลในคิวแคช หากขนาดข้อมูลในคิวบัฟเฟอร์เกินค่าทศนิยม (highWaterMark) คอนซูเมอร์จะส่งกลับค่าเท็จหลังจากเรียก write(chunk) ในขณะนี้ ผู้ผลิตควรหยุดการเขียน
แล้วเมื่อไหร่ฉันจะเขียนต่อได้? เมื่อเขียนข้อมูลทั้งหมดในบัฟเฟอร์สำเร็จแล้ว เหตุการณ์การระบายจะถูกทริกเกอร์หลังจากล้างคิวบัฟเฟอร์แล้ว ในขณะนี้ ผู้ผลิตสามารถเขียนข้อมูลต่อไปได้
เมื่อผู้ผลิตจำเป็นต้องเขียนข้อมูลให้เสร็จสิ้น ก็จะต้องเรียกใช้เมธอด stream.end เพื่อแจ้งการสิ้นสุดของสตรีมที่เขียนได้
const { Writable, Duplex } = need('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// จะถูกใช้เป็น _write method setTimeout(() = >{ fileContent += chunk callback()// ถูกเรียกหลังจากการเขียนเสร็จสิ้น}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable . write('123123')// truemyWritable.write('123123')// falsemyWritable.end()โปรดทราบว่าหลังจากที่ข้อมูลในพูลแคชถึงค่าทศนิยม อาจมีหลายโหนดในพูลแคชในขณะนี้ ในระหว่างกระบวนการล้างพูลแคช (การเรียกแบบวน _read) จะไม่ใช้ความยาวเท่ากันกับ สตรีมที่อ่านได้ ข้อมูลของค่า float จะถูกใช้ทีละโหนดบัฟเฟอร์ แม้ว่าความยาวของบัฟเฟอร์จะไม่สอดคล้องกับค่า float ก็ตาม
const { เขียนได้ } = need('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log ('Consumption', chunk.toString()) callback()// ถูกเรียกหลังจากการเขียนเสร็จสิ้น}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})ให้นับ = 0function data(){ ให้ flag = true ในขณะที่ (นับ <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}productionData()myWritable.on('drain', ข้อมูลการผลิต)ด้านบนนี้เป็นสตรีมแบบเขียนได้ที่มีค่าทศนิยม 10 ขณะนี้แหล่งข้อมูลเป็นสตริงตัวเลขต่อเนื่องตั้งแต่ 0 ถึง 20 และ ProductionData ใช้ในการเขียนข้อมูล
ขั้นแรก เมื่อ myWritable.write("0") ถูกเรียกเป็นครั้งแรก เนื่องจากไม่มีข้อมูลในพูลแคช "0" จะไม่เข้าสู่พูลแคช แต่ถูกกำหนดให้กับ _wirte โดยตรง .write("0") เป็นจริง
เมื่อ myWritable.write("1") ถูกดำเนินการ เนื่องจากยังไม่ได้เรียกการเรียกกลับของ _wirte แสดงว่ายังไม่ได้เขียนข้อมูลล่าสุด ตำแหน่งนี้รับประกันความเป็นระเบียบของการเขียนข้อมูลเพียงบัฟเฟอร์เดียวเท่านั้นที่สามารถสร้างได้ เพื่อเก็บ "1" " เพิ่มลงในพูลแคช นี่เป็นเรื่องจริงสำหรับ 2-9 ถัดไป
เมื่อดำเนินการ myWritable.write("10") ความยาวบัฟเฟอร์คือ 9 (1-9) และยังไม่ถึงค่าทศนิยม "10" ยังคงถูกเพิ่มลงในพูลแคชเป็นบัฟเฟอร์ และพูลแคช ความยาวกลายเป็น 11 ดังนั้น myWritable.write("1") จึงส่งคืน false ซึ่งหมายความว่าข้อมูลในบัฟเฟอร์เพียงพอ และเราต้องรอการแจ้งเตือนเหตุการณ์การระบายเพื่อสร้างข้อมูลอีกครั้ง
หลังจาก 100 มิลลิวินาที ระบบจะเรียกการเรียกกลับของ _write("0", การเข้ารหัส, การเรียกกลับ) เพื่อระบุว่ามีการเขียน "0" ไว้แล้ว จากนั้นจะตรวจสอบว่ามีข้อมูลอยู่ในพูลแคชหรือไม่ หากมี อันดับแรกจะเรียก _read เพื่อใช้โหนดหลักของพูลแคช ("1") จากนั้นจึงทำซ้ำขั้นตอนนี้ต่อไปจนกว่าพูลแคชจะว่างเปล่า ทริกเกอร์เหตุการณ์การระบาย และดำเนินการ ProductionData อีกครั้ง
เรียก myWritable.write("11") เพื่อทริกเกอร์กระบวนการที่เริ่มต้นในขั้นตอนที่ 1 จนกระทั่งสิ้นสุดสตรีม
หลังจากทำความเข้าใจสตรีมที่อ่านได้และสตรีมที่เขียนได้ สตรีมดูเพล็กซ์จะเข้าใจได้ง่ายจริง ๆ แล้วสตรีมที่อ่านได้จะสืบทอดสตรีมที่เขียนได้ จากนั้นจึงนำสตรีมที่เขียนได้ไปใช้ (ซอร์สโค้ดเขียนเช่นนี้ แต่ควรกล่าวว่ามีการใช้งานแล้ว ในขณะเดียวกัน ควรมีสตรีมที่อ่านและเขียนได้จะดีกว่า)
โฟลว์แบบดูเพล็กซ์จำเป็นต้องใช้สองวิธีต่อไปนี้พร้อมกัน
ใช้เมธอด _read() เพื่อสร้างข้อมูลสำหรับสตรีมที่อ่านได้
ใช้เมธอด _write() เพื่อใช้ข้อมูลสำหรับสตรีมที่เขียนได้
วิธีการใช้สองวิธีข้างต้นได้รับการแนะนำในสตรีมแบบเขียนได้และแบบอ่านได้ด้านบน สิ่งที่ต้องสังเกตที่นี่คือมีพูลบัฟเฟอร์อิสระสองตัวสำหรับสตรีมดูเพล็กซ์ตามลำดับ และแหล่งข้อมูลก็ไม่เหมือนกัน
นำสตรีมอินพุตและเอาท์พุตมาตรฐานของ NodeJs เป็นตัวอย่าง:
เมื่อเราป้อนข้อมูลในคอนโซล เหตุการณ์ข้อมูลจะถูกทริกเกอร์ ซึ่งพิสูจน์ว่ามีฟังก์ชันของสตรีมที่อ่านได้ ทุกครั้งที่ผู้ใช้ป้อน จะเทียบเท่ากับการเรียกวิธีพุชที่อ่านได้เพื่อพุชข้อมูลที่สร้างขึ้น เมื่อเราเรียกวิธีการเขียน เราสามารถส่งออกเนื้อหาไปยังคอนโซลได้ แต่เหตุการณ์ข้อมูลจะไม่ถูกทริกเกอร์ นี่แสดงให้เห็นว่ามีฟังก์ชันของสตรีมแบบเขียนได้และมีบัฟเฟอร์อิสระ การใช้วิธี _write คือ อนุญาตให้คอนโซลแสดงข้อความ // เมื่อใดก็ตามที่ผู้ใช้ป้อนข้อมูลบนคอนโซล (_read) เหตุการณ์ข้อมูลจะถูกทริกเกอร์ ซึ่งเป็นลักษณะของสตรีมที่อ่านได้ process.stdin.on('data', data=>{ process.stdin.write(data })// สร้างข้อมูลไปยังสตรีมอินพุตมาตรฐานทุกวินาที (นี่คือคุณลักษณะของสตรีมแบบเขียนได้ ซึ่งจะส่งออกไปยังคอนโซลโดยตรง) และจะไม่ทริกเกอร์ datasetInterval(()=>{ process.stdin.write) ('ไม่ใช่ข้อมูลที่ป้อนโดยคอนโซลผู้ใช้')}, 1,000)สตรีมดูเพล็กซ์ถือได้ว่าเป็นสตรีมที่อ่านได้ด้วยสตรีมที่เขียนได้ ทั้งสองมีความเป็นอิสระ โดยแต่ละอันมีบัฟเฟอร์ภายในที่เป็นอิสระ เหตุการณ์การอ่านและเขียนเกิดขึ้นอย่างเป็นอิสระ
สตรีมแบบดูเพล็กซ์ -------------------| อ่าน <----- แหล่งที่มาภายนอกคุณ ------------------| เขียน -----> อ่างล้างจานภายนอก -------------------|สตรีมการแปลงเป็นแบบดูเพล็กซ์ โดยที่การอ่านและการเขียนเกิดขึ้นในความสัมพันธ์ระหว่างเหตุและผล จุดสิ้นสุดของสตรีมดูเพล็กซ์เชื่อมโยงกันผ่านการเปลี่ยนแปลงบางอย่าง การอ่านต้องมีการเขียนจึงจะเกิดขึ้น
แปลงกระแส ---------------|-------------- คุณเขียน ----> ----> อ่านคุณ ----- ----------|--------------สำหรับการสร้างสตรีม Transform สิ่งที่สำคัญที่สุดคือการใช้เมธอด _transform แทน _write หรือ _read ใน _transform ข้อมูลที่เขียนโดยสตรีมที่เขียนได้จะถูกประมวลผล (ใช้ไป) จากนั้นข้อมูลจะถูกสร้างขึ้นสำหรับสตรีมที่อ่านได้
สตรีม Conversion มักจะใช้เมธอด `_flush` ซึ่งจะถูกเรียกก่อนสิ้นสุดสตรีม โดยทั่วไปจะใช้เพื่อต่อท้ายบางสิ่งที่ส่วนท้ายของสตรีม ตัวอย่างเช่น ข้อมูลการบีบอัดบางอย่างจะถูกเพิ่มที่นี่ const { write } = need('fs')const { Transform, PassThrough } = need('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const การแปลง = การแปลงใหม่ ({ highWaterMark: 10, การแปลง (ก้อน, การเข้ารหัส, โทรกลับ) // แปลง data, Call push เพื่อเพิ่มผลลัพธ์การแปลงลงในพูลแคช this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){//Execute this.push (' ก่อนทริกเกอร์สิ้นสุด <<<') callback() }})// เขียนอย่างต่อเนื่อง เขียนข้อมูล ให้นับ = 0transform.write('>>>')ฟังก์ชัน ProductionData() { ให้แฟล็ก = จริง ในขณะที่ (นับ <= 20 && flag) { flag = เปลี่ยนรูปเขียน(count.toString()) นับ++ } ถ้า (นับ > 20) { แปลง.end() }}productionData()transform.on('drain', ProductionData)let result = '' ()=>{ console.log(ผลลัพธ์) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})