1.
1.1
พวกเขาได้รับการแนะนำเมื่อหลายสิบปีก่อนในระบบปฏิบัติการ UNIX และโปรแกรมสามารถโต้ตอบกันบนสตรีมผ่านตัวดำเนินการท่อ (|)
ตัวดำเนินการไปป์ (|) สามารถใช้ใน MacOS และ Linux ที่ใช้ระบบ Unix โดยสามารถแปลงเอาต์พุตของกระบวนการทางด้านซ้ายของตัวดำเนินการให้เป็นอินพุตทางด้านขวา
ใน Node หากเราใช้ readFile แบบเดิมเพื่ออ่านไฟล์ ไฟล์จะถูกอ่านเข้าสู่หน่วยความจำตั้งแต่ต้นจนจบ เมื่อเนื้อหาทั้งหมดถูกอ่าน เนื้อหาของไฟล์ที่โหลดลงในหน่วยความจำจะถูกประมวลผลอย่างสม่ำเสมอ
มีข้อเสียสองประการในการทำสิ่งนี้:
หน่วย
ความจำ: ใช้เวลามาก
ขึ้น
.JS
ติดตาม
และ
นำไปใช้แนวคิดของสตรีม
สตรีมเพล็กซ์)
แปลงสตรีม (แปลงสตรีม)
เพื่อศึกษาส่วนนี้ในเชิงลึกและค่อยๆเข้าใจแนวคิดของสตรีมใน node.js และเนื่องจากส่วนซอร์สโค้ดค่อนข้างซับซ้อนฉันจึงตัดสินใจเริ่มเรียนรู้ส่วนนี้จากสตรีมที่อ่านได้ .
1.2. สตรี
อะไร
? สามารถใช้สตรีม มองว่าเป็นการรวบรวมข้อมูลเหล่านี้ เช่นเดียวกับของเหลว เราจะบันทึกของเหลวเหล่านี้ไว้ในคอนเทนเนอร์ก่อน (รายการบัฟเฟอร์บัฟเฟอร์ภายในของสตรีม) และเมื่อเหตุการณ์ที่เกี่ยวข้องเกิดขึ้น เราจะเทของเหลวภายในลงในท่อ และแจ้งให้ผู้อื่นนำภาชนะของตนเองไปอีกด้านหนึ่งของท่อเพื่อจับของเหลวภายในเพื่อนำไปกำจัด
1.3. สตรีมที่อ่านได้คืออะไร
สตรีมที่อ่านได้นั้นเป็นสตรีมประเภทหนึ่ง โดยมีสองโหมด สามสถานะ
และโหมดการอ่านสองโหมด:
โหมดโฟลว์: ข้อมูลจะถูกอ่านจากระบบพื้นฐานและส่งผ่าน EventEmitter โดยเร็วที่สุด ข้อมูลจะถูกส่งผ่านไปยังตัวจัดการเหตุการณ์ที่ลงทะเบียนไว้ใน
โหมดหยุดชั่วคราว: ในโหมดนี้ ข้อมูลจะไม่ถูกอ่าน และจะต้องเรียกเมธอด Stream.read() อย่างชัดเจนเพื่ออ่านข้อมูลจากสตรีม
สามสถานะ:
readableFlowing = = = null: จะไม่มีการสร้างข้อมูล การเรียก Stream.pipe() และ Stream.resume จะเปลี่ยนสถานะเป็น true เริ่มสร้างข้อมูลและทริกเกอร์เหตุการณ์
readableFlowing === false: การไหลของข้อมูลจะถูกระงับในเวลานี้ แต่จะไม่ การสร้างข้อมูลจะถูกระงับ ดังนั้น data backlog จะเกิดขึ้น
readableFlowing === true: โดยปกติแล้วจะสร้างและใช้ข้อมูล
2.1. คำจำกัดความสถานะภายใน (ReadableState)
ReadableState
_readableState: ReadableState { objectMode: false, // หากต้องการดำเนินการกับข้อมูลประเภทอื่นยกเว้นสตริง, บัฟเฟอร์ และ null โหมดนี้จะต้องเปิดไว้ highWaterMark: 16384, // ขีดจำกัดระดับน้ำ, 1024 * 16, ค่าเริ่มต้น 16kb หากเกินขีดจำกัดนี้ การโทรจะหยุด _read() อ่านข้อมูลลงในบัฟเฟอร์บัฟเฟอร์: BufferList { head: null, tail: null, length: 0 }, // Buffer linked list, ใช้เพื่อบันทึกความยาวข้อมูล: 0, // ขนาดของ ข้อมูลสตรีมที่อ่านได้ทั้งหมดถ้า ObjectMode เท่ากับ buffer.length pipes: [], // บันทึกคิวท่อทั้งหมดที่ตรวจสอบกระแสที่อ่านได้: null, // สถานะของการไหลอิสระเป็นค่าเท็จ สิ้นสุด: FALSE, // ข้อมูลทั้งหมดถูกใช้ endemitted: FALSE, // ไม่ว่าเหตุการณ์สุดท้ายจะถูกส่งหรือไม่อ่าน: false, // ไม่ว่าข้อมูลจะถูกสร้างขึ้นมา มันถูกสร้างขึ้นหรือล้มเหลว NeedReadable: FALSE, // ไม่ว่าจะเป็นสิ่งจำเป็นในการส่งเหตุการณ์ที่สามารถอ่านได้ emittedReadable: false, // เหตุการณ์ที่อ่านได้ถูกส่ง ReadableListening: FALSE, // ไม่ว่าจะมีเหตุการณ์การฟังที่อ่านได้ ถูกเรียกว่า errorEmission: false, // Error เหตุการณ์ถูกส่งแล้ว emitClose: true, // เมื่อกระแสข้อมูลถูกทำลายไม่ว่าจะส่งเหตุการณ์ปิด autoDestroy: true, // ถูกทำลายโดยอัตโนมัติ, จะถูกเรียกหลัง 'end' เหตุการณ์ถูกทริกเกอร์ ทำลาย: เท็จ // ไม่ว่ากระแสข้อมูลจะถูกทำลายหรือไม่ ข้อผิดพลาด: null, // ระบุว่ากระแสข้อมูลได้รายงานข้อผิดพลาดที่ปิด: เท็จ // ไม่ว่ากระแสข้อมูลจะถูกปิด ปิดที่ปล่อยออกมา: เท็จ // ไม่ว่าจะปิด เหตุการณ์ถูกส่งไป defaultEncoding: 'utf8', // รูปแบบการเข้ารหัสอักขระเริ่มต้น awaitDrainWriters: null, // ชี้ไปที่ 'drain ' การอ้างอิงผู้เขียนของเหตุการณ์ที่ได้รับการตรวจสอบ ประเภทเป็น null เขียนได้ ชุด <เขียนได้> multiAwaitDrain: false, // ไม่ว่าจะมีนักเขียนหลายคนกำลังรอการอ่านเหตุการณ์การระบายหรือไม่เพิ่มเติม: false, // สามารถอ่านข้อมูลเพิ่มเติมได้หรือไม่ dataEmission: false, // ข้อมูลถูกส่งไปแล้ว ตัวถอดรหัส: null, // การเข้ารหัสตัวถอดรหัส: null, // encoder [สัญลักษณ์ (kpaused)]: null },
2.2. การใช้งานที่เก็บข้อมูลภายใน (BufferList)
BufferList เป็นคอนเทนเนอร์ที่ใช้ในการจัดเก็บข้อมูลภายในในสตรีม ซึ่งได้รับการออกแบบในรูปแบบของรายการที่เชื่อมโยงและมีคุณลักษณะสามประการ: head, tail และ length
ฉันแสดงแต่ละโหนดในบัฟเฟอร์เป็นบัฟเฟอร์และประเภทของข้อมูลภายในขึ้นอยู่กับ ObjectMode
โครงสร้างข้อมูลนี้รับข้อมูลส่วนหัวได้เร็วกว่า Array.prototype.shift()
2.2.1. ประเภทการจัดเก็บข้อมูลถ้า objectMode === จริง:
จากนั้นข้อมูลจะเป็นประเภทใดก็ได้
ObjectMode = true
สตรีม const = ต้องการ ('สตรีม'); const readableStream = stream ใหม่อ่านได้ ({ objectMode: จริง อ่าน() {}, - readableStream.push ({ชื่อ: 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push (จริง); console.log(readableStream._readableState.buffer.tail); readableStream.push('ลิซ่า'); console.log(readableStream._readableState.buffer.tail); readableStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push (สัญลักษณ์ (1)); console.log (readableStream._readablestate.buffer.tail); ReadableStream.push (Bigint (123)); console.log(readableStream._readableState.buffer.tail);
ผลการวิ่ง:
ถ้า ObjectMode === False:
จากนั้นข้อมูลสามารถเป็นสตริงหรือบัฟเฟอร์หรือ uint8Array เท่านั้น
objectMode=เท็จ
สตรีม const = ต้องการ ('สตรีม'); const readableStream = stream ใหม่อ่านได้ ({ ObjectMode: เท็จ อ่าน() {}, - readableStream.push ({ชื่อ: 'lisa'});
ผลการวิ่ง:
2.2.2. โครงสร้างการจัดเก็บข้อมูลเราสร้างสตรีมที่อ่านได้ในคอนโซลผ่านบรรทัดคำสั่งโหนดเพื่อสังเกตการเปลี่ยนแปลงในข้อมูลในบัฟเฟอร์:
แน่นอนก่อนที่จะผลักดันข้อมูลเราจำเป็นต้องใช้วิธี _read หรือใช้วิธีการอ่านในพารามิเตอร์ของตัวสร้าง:
สตรีม const = ต้องการ ('สตรีม'); const readableStream = stream ใหม่ readable (); rs._read = function (ขนาด) {}
หรือ
สตรีม const = ต้องการ ('สตรีม'); const readableStream = stream ใหม่อ่านได้ ({ อ่าน (ขนาด) {} -
หลังจากการดำเนินการ readableStream.push ('ABC') บัฟเฟอร์ปัจจุบันคือ:
คุณจะเห็นว่าข้อมูลปัจจุบันถูกเก็บไว้ เนื้อหาข้อมูล
2.2.3การพิมพ์วิธีการบัฟเฟอร์ทั้งหมดที่คุณได้รับ:
ยกเว้นการเข้าร่วมซึ่งเป็นอนุกรมบัฟเฟอร์เป็นสตริงอื่น ๆ คือการดำเนินการเข้าถึงข้อมูลทั้งหมด
ฉันจะไม่อธิบายวิธีการทั้งหมดทีละวิธี แต่มุ่งเน้นไปที่การบริโภค _getString และ _getBuffer
2.2.3.1.Consume
ที่อยู่ซอร์สโค้ด: bufferlist.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#l80
ผู้เข้าร่วมประชุม
// ใช้จำนวนไบต์หรืออักขระที่ระบุจากข้อมูลบัฟเฟอร์ กิน (n, hasstrings) { const data = this.head.data; if (n <data.length) { // `Slice` เหมือนกันสำหรับบัฟเฟอร์และสตริง SLICE SLICE = DATA.SLICE (0, N); this.head.data = data.slice (n); คืนกลับ; - if (n === data.length) { // First Chunk เป็นการจับคู่ที่สมบูรณ์แบบ คืนสิ่งนี้ shift (); - // ผลลัพธ์ครอบคลุมมากกว่าหนึ่งบัฟเฟอร์ ส่งคืน hasstrings? -
มีสามเงื่อนไขการตัดสินในรหัส:
หากความยาวไบต์ของข้อมูลที่ใช้น้อยกว่าความยาวของข้อมูลที่เก็บไว้ในโหนดหัวของรายการที่เชื่อมโยงจะมีการใช้ N แรกของข้อมูลของโหนดหัวและข้อมูลของโหนดหัวปัจจุบันถูกตั้งค่า ไปยังข้อมูลหลังจากหั่น
หากข้อมูลที่ใช้จะเท่ากับความยาวของข้อมูลที่เก็บไว้ในโหนดหัวของรายการที่เชื่อมโยงข้อมูลของโหนดหัวปัจจุบันจะถูกส่งกลับโดยตรง
หากความยาวของข้อมูลที่ใช้มากกว่าความยาวของโหนดหัวของรายการที่เชื่อมโยงการตัดสินครั้งสุดท้ายจะทำตามพารามิเตอร์ที่สองที่ส่งผ่านเพื่อตรวจสอบว่าชั้นล่างของบัฟเฟอร์ปัจจุบันเก็บสตริงหรือบัฟเฟอร์ .
2.2.3.2
ที่อยู่ซอร์สโค้ด: bufferlist._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#l137
ผู้เข้าร่วมประชุม
// ใช้จำนวนไบต์ที่ระบุจากข้อมูลบัฟเฟอร์ _getBuffer (n) { const ret = buffer.allocunsafe (n); const retlen = n; ให้ p = this.head; ให้ c = 0; ทำ { const buf = p.data; if (n> buf.length) { TypedArrayPrototypeset (ret, buf, retlen - n); n -= buf.length; } อื่น { if (n === buf.length) { TypedArrayPrototypeset (ret, buf, retlen - n); ++ C; ถ้า (p.next) this.head = p.next; อื่น this.head = this.tail = null; } อื่น { TypedArrayPrototypeset (ret, ใหม่ uint8array (buf.buffer, buf.byteoffset, n), retlen - n); this.head = p; p.data = buf.slice (n); - หยุดพัก; - ++ C; } ในขณะที่ ((p = p.next)! == null); this.length -= c; กลับ; -
โดยทั่วไปมันเป็นลูปที่จะใช้งานโหนดในรายการที่เชื่อมโยงและสร้างอาร์เรย์บัฟเฟอร์ใหม่เพื่อจัดเก็บข้อมูลที่ส่งคืน
ขั้นแรกให้เริ่มดึงข้อมูลจากโหนดหัวของรายการที่เชื่อมโยงและดำเนินการคัดลอกต่อไปยังบัฟเฟอร์ที่สร้างขึ้นใหม่จนกว่าข้อมูลของโหนดหนึ่งจะมากกว่าหรือเท่ากับความยาวที่จะดึงความยาวที่ได้รับ
กล่าวอีกนัยหนึ่งหลังจากอ่านโหนดสุดท้ายของรายการที่เชื่อมโยงมันยังไม่ถึงความยาวที่ต้องการดังนั้นบัฟเฟอร์ที่สร้างขึ้นใหม่จะถูกส่งคืน
2.2.3.3
ที่อยู่ซอร์สโค้ด: bufferlist._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#l106
ผู้เข้าร่วมประชุม
// ใช้จำนวนอักขระที่ระบุจากข้อมูลบัฟเฟอร์ _getString (n) { ให้ ret = ''; ให้ p = this.head; ให้ c = 0; ทำ { const str = p.data; if (n> str.length) { ret += str; n -= str.length; } อื่น { if (n === str.length) { ret += str; ++ C; ถ้า (p.next) this.head = p.next; อื่น this.head = this.tail = null; } อื่น { ret += stringprototypesLice (str, 0, n); this.head = p; p.data = stringprototypeslice (str, n); - หยุดพัก; - ++ C; } ในขณะที่ ((p = p.next)! == null); this.length -= c; กลับ; -
การทำงานของสตริงนั้นเหมือนกับการทำงานของบัฟเฟอร์ การดำเนินการ _getString เป็นประเภทสตริง
2.3. ทำไมอินสแตนซ์ของ Eventemitter
สำหรับคำถามนี้เราต้องเข้าใจก่อนว่าโมเดล Publish-Subscribe คืออะไร
ข้อได้เปรียบของมันคือมันสามารถจัดเก็บฟังก์ชั่นการโทรกลับที่เกี่ยวข้องกับเหตุการณ์ในคิวและจากนั้นแจ้งให้อีกฝ่ายทราบเพื่อประมวลผลข้อมูลในช่วงเวลาหนึ่งในอนาคตจึงสามารถแยกข้อกังวลได้ ในขณะที่ผู้บริโภคจะประมวลผลเฉพาะเหตุการณ์ที่เกี่ยวข้องและข้อมูลที่สอดคล้องกันและโมเดลสตรีมมิ่ง Node.js นั้นเหมาะกับคุณลักษณะนี้
ดังนั้นสตรีม Node.js จะใช้การสร้างอินสแตนซ์ตาม Eventemitter อย่างไร
ซอร์สโค้ดสำหรับสิ่งนี้อยู่ที่นี่: สตรีม/มรดก https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#l10
มรดก
สตรีมฟังก์ชั่น (opts) { EE.CALL (นี่, opts); - ObjectSetPrototypeof (stream.prototype, ee.prototype); ObjectSetPrototypeof (สตรีม, EE);
จากนั้นมีบรรทัดของรหัสเหล่านี้ในซอร์สโค้ดของสตรีมที่อ่านได้:
ส่วนนี้ของซอร์สโค้ดอยู่ที่นี่: อ่านได้ https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#l777777777777777
มรดก
ObjectSetPrototypeof (Readable.prototype, Stream.prototype); ObjectSetPrototypeof (อ่านได้, สตรีม);
ขั้นแรกให้สืบทอดวัตถุต้นแบบของสตรีมจาก Eventemitter เพื่อให้ทุกอินสแตนซ์ของสตรีมสามารถเข้าถึงวิธีการบน Eventemitter
ในเวลาเดียวกันวิธีการคงที่ของ Eventemitter นั้นได้รับการสืบทอดผ่าน ObjectSetPrototypeof (Stream, EE) และในตัวสร้างของสตรีมตัวสร้าง EE จะถูกยืมมาเพื่อตระหนักถึงการสืบทอดของคุณสมบัติทั้งหมดใน Eventemitter และในลำธารที่อ่านได้ ใช้วิธีการเดียวกันนี้ใช้การสืบทอดการสืบทอดต้นแบบและการสืบทอดคุณสมบัติแบบคงที่ของคลาสสตรีมดังนั้นจึงได้รับ:
Readable.prototype .__ Proto__ === Stream.prototype;
stream.prototype .__ proto__ === ee.prototype
ดังนั้น:
อ่านได้.
ดังนั้นคุณสามารถค้นหาต้นแบบของ Eventemitter ได้โดยการติดตามห่วงโซ่ต้นแบบของกระแสที่อ่านได้และตระหนักถึงการสืบทอดของ Eventemitter
2.4.
APIs จะแสดงที่นี่ตามลำดับที่ปรากฏในเอกสารซอร์สโค้ดและจะมีการอธิบายเฉพาะการใช้งาน API หลักเท่านั้น
หมายเหตุ: เฉพาะฟังก์ชั่นที่ประกาศใน Node.js ซอร์สสตรีมที่อ่านได้ถูกตีความที่นี่และคำจำกัดความของฟังก์ชั่นที่แนะนำจากภายนอกจะไม่รวมอยู่
Readable.prototype
ลำธาร { ทำลาย: [ฟังก์ชั่น: ทำลาย],, _undestroy: [ฟังก์ชั่น: unestroy], _destroy: [ฟังก์ชั่น (ไม่ระบุชื่อ)], กด: [ฟังก์ชั่น (ไม่ระบุชื่อ)], Unshift: [ฟังก์ชั่น (ไม่ระบุชื่อ)], ispaused: [ฟังก์ชั่น (ไม่ระบุชื่อ)], SetEncoding: [ฟังก์ชั่น (ไม่ระบุชื่อ)], อ่าน: [ฟังก์ชั่น (ไม่ระบุชื่อ)], _read: [ฟังก์ชั่น (ไม่ระบุชื่อ)], ท่อ: [ฟังก์ชั่น (ไม่ระบุชื่อ)], unpipe: [ฟังก์ชั่น (ไม่ระบุชื่อ)], บน: [ฟังก์ชั่น (ไม่ระบุชื่อ)], AddListener: [ฟังก์ชั่น (ไม่ระบุชื่อ)], RemoveListener: [ฟังก์ชั่น (ไม่ระบุชื่อ)], ปิด: [ฟังก์ชั่น (ไม่ระบุชื่อ)], RemoveAlLlisteners: [ฟังก์ชั่น (ไม่ระบุชื่อ)], ประวัติย่อ: [ฟังก์ชั่น (ไม่ระบุชื่อ)], หยุดชั่วคราว: [ฟังก์ชั่น (ไม่ระบุชื่อ)], Wrap: [ฟังก์ชั่น (ไม่ระบุชื่อ)], ตัววนซ้ำ: [ฟังก์ชั่น (ไม่ระบุชื่อ)], [สัญลักษณ์ (nodejs.rejection)]: [ฟังก์ชั่น (ไม่ระบุชื่อ)], [สัญลักษณ์ (symbol.asynciterator)]: [ฟังก์ชั่น (ไม่ระบุชื่อ)] -2.4.1
อ่านได้พัช
Readable.prototype.push = function (chunk, encoding) { ส่งคืน ReadableAddChunk (นี่, ก้อน, การเข้ารหัส, เท็จ); -
ฟังก์ชั่นหลักของวิธีการพุชคือการส่งผ่านบล็อกข้อมูลไปยังไปป์ไลน์ดาวน์สตรีมโดยเรียกเหตุการณ์ 'ข้อมูล' หรือเก็บข้อมูลไว้ในบัฟเฟอร์ของตัวเอง
รหัสต่อไปนี้เป็น pseudocode ที่เกี่ยวข้องและแสดงเฉพาะกระบวนการหลัก:
อ่านได้พัช
ฟังก์ชั่น readableaddchunk (สตรีม, ก้อน, การเข้ารหัส, addTofront) { สถานะ const = สตรีม _ readablestate; if (chunk === null) {// push สัญญาณ null stream สิ้นสุดไม่สามารถเขียนข้อมูลเพิ่มเติมได้หลังจาก state.reading = false; Oneofchunk (Stream, State); } อื่นถ้า (! state.ObjectMode) {// ถ้าไม่ใช่โหมดวัตถุถ้า (typeof chunk === 'สตริง') { chunk = buffer. จาก (ก้อน); } อื่นถ้า (chunk instanceof buffer) {// ถ้าเป็นบัฟเฟอร์ // ประมวลผลการเข้ารหัส} อื่นถ้า (สตรีม. _ isuint8array (chunk)) { chunk = stream. _ uint8arraytobuffer (chunk); } อื่นถ้า (chunk! = null) { err = new err _invalid _arg _type ('chunk', ['String', 'buffer', 'uint8array'], chunk); - - if (state.ObjectMode || (chunk && chunk.length> 0)) {// เป็นโหมดวัตถุหรือก้อนเป็นบัฟเฟอร์ // คำพิพากษาของวิธีการแทรกข้อมูลหลายวิธีถูกละไว้ที่นี่ addchunk (สตรีม, รัฐ, ก้อน, จริง); - - ฟังก์ชั่น addchunk (สตรีม, สถานะ, ก้อน, addtofront) { if (state.flowing && state.length === 0 &&! state.sync && stream.listenercount ('data')> 0) {// ถ้าอยู่ในโหมดสตรีมมิ่งมีสมาชิกที่ฟัง Data Stream.emit ('data', chunk); } else {// มิฉะนั้นให้บันทึกข้อมูลไปยังสถานะบัฟเฟอร์ความยาว += state.objectMode? if (addTofront) { state.buffer.unshift (ก้อน); } อื่น { state.buffer.push (ก้อน); - - Maybereadmore (สตรีม, รัฐ);
การดำเนินการผลักดันส่วนใหญ่จะแบ่งออกเป็น ObjectMode
การตัดสินครั้งแรกของ AddChunk ส่วนใหญ่จะจัดการกับสถานการณ์เมื่ออ่านได้อยู่ในโหมดการไหลมีตัวฟังข้อมูลและข้อมูลบัฟเฟอร์ว่างเปล่า
ในเวลานี้ข้อมูลส่วนใหญ่จะถูกส่งผ่านไปยังโปรแกรมอื่น ๆ ที่สมัครสมาชิกเหตุการณ์ข้อมูลไม่เช่นนั้นข้อมูลจะถูกบันทึกไว้ในบัฟเฟอร์
2.4.2ยกเว้นการตัดสินเงื่อนไขขอบเขตและสถานะการไหลวิธีนี้ส่วนใหญ่มีการดำเนินการสองครั้ง
เรียกใช้เมธอด _read ที่ผู้ใช้ดำเนินการเพื่อประมวลผลผลลัพธ์การดำเนินการ
อ่านข้อมูลจากบัฟเฟอร์บัฟเฟอร์และทริกเกอร์เหตุการณ์ 'ข้อมูล'
อ่านได้อ่าน
// หากความยาวของการอ่านมากกว่า HWM HWM จะถูกคำนวณใหม่ if (n> state.highwatermark) { state.highwatermark = computenewhighwatermark (N); - // เรียกเมธอด _read ที่ใช้งานผู้ใช้ลอง { const result = this. _ อ่าน (state.highwatermark); ถ้า (ผลลัพธ์! = null) { const แล้ว = ผลลัพธ์จากนั้น; if (typeof แล้ว === 'function') { แล้วโทร ( ผลลัพธ์, ไม่ ฟังก์ชัน (err) { Errorordestroy (นี่, err); - - - } catch (err) { Errorordestroy (นี่, err); -
หากวิธีการ _read ดำเนินการโดยผู้ใช้ส่งคืนสัญญาให้โทรหาวิธีการของสัญญานี้และส่งผ่านในความสำเร็จและการโทรกลับที่ล้มเหลวเพื่ออำนวยความสะดวกในการจัดการข้อยกเว้น
รหัสหลักของวิธีการอ่านเพื่ออ่านข้อมูลโซนจากบัฟเฟอร์มีดังนี้:
อ่านได้อ่าน
ฟังก์ชั่นจากรายการ (n, state) { // ไม่มีอะไรบัฟเฟอร์ if (state.length === 0) กลับเป็นโมฆะ; ให้ ret; if (state.ObjectMode) ret = state.buffer.shift (); อื่นถ้า (! n || n> = state.length) {// จัดการกรณีที่ n ว่างเปล่าหรือมากกว่าความยาวของบัฟเฟอร์ // อ่านทั้งหมดตัดรายการ if (state.decoder) // หากมีตัวถอดรหัสให้จัดลำดับผลลัพธ์ลงในสตริง ret = state.buffer.join (''); อื่นถ้า (state.buffer.length === 1) // มีเพียงข้อมูลเดียวเท่านั้นให้ส่งคืนข้อมูลโหนดหัว ret = state.buffer.first (); อื่น // จัดเก็บข้อมูลทั้งหมดลงในบัฟเฟอร์ ret = state.buffer.concat (state.length); state.buffer.clear (); // จัดการกับสถานการณ์ที่ความยาวการอ่านน้อยกว่าบัฟเฟอร์ ret = state.buffer.consume (n, state.decoder); - กลับ; -2.4.3
วิธีการที่ต้องใช้เมื่อผู้ใช้เริ่มต้นสตรีมที่อ่านได้
รหัสตัวอย่าง:
อ่านได้ _Read
สตรีม const = ต้องการ ('สตรีม'); const readableStream = stream ใหม่อ่านได้ ({ อ่าน (hwm) { this.push (String.FromCharcode (this.currentCharcode ++)); if (this.currentcharcode> 122) { this.push (null); - - - readableStream.currentCharcode = 97; readableStream.pipe (process.stdout); // ABCDEFGHIJKLMNOPQRSTUVWXYZ%2.4.4
ผูกสตรีมที่เขียนได้อย่างน้อยหนึ่งรายการเข้ากับสตรีมที่อ่านได้ปัจจุบันและสลับสตรีมที่อ่านได้เป็นโหมดการไหล
มีการฟังเหตุการณ์มากมายในวิธีนี้และฉันจะไม่แนะนำพวกเขาทีละคนที่นี่:
อ่านได้.
Readable.prototype.pipe = function (dest, pipeopts) { const src = this; สถานะ const = this. _ readablestate; state.pipes.push (dest); ฟังก์ชั่น ondata (ก้อน) { const ret = dest.write (chunk); if (ret === false) { หยุดชั่วคราว(); - - // บอกโชคชะตาว่ากำลังถูกส่งไป dest.emit ('pipe', src); // เริ่มสตรีมถ้าสตรีมอยู่ในโหมดหยุดชั่วคราวถ้า (dest.writableneedDrain === true) { if (state.flowing) { หยุดชั่วคราว(); - } อื่นถ้า (! state.flowing) { src.resume (); - ปลายทางขากลับ; -
การทำงานของท่อนั้นคล้ายคลึงกับผู้ดำเนินการท่อ Linux '|' การเปลี่ยนเอาต์พุตซ้ายเป็นอินพุตขวา
เมื่อข้อมูลไหลออกมาเหตุการณ์การเขียนของสตรีมที่เขียนได้จะถูกเรียกใช้เพื่อให้สามารถถ่ายโอนข้อมูลและการดำเนินการเช่นท่อส่งน้ำได้ และจะเปลี่ยนสตรีมที่อ่านได้โดยอัตโนมัติในโหมดหยุดชั่วคราวเป็นโหมดการไหล
2.4.5สลับสตรีมจากโหมด 'หยุดชั่วคราว' เป็นโหมด 'flow'
Readable.resume
Readable.prototype.resume = function () { สถานะ const = this._readablestate; if (! state.flowing) { state.flowing =! state.readableListening; - - ฟังก์ชั่นเรซูเม่ (สตรีมสถานะ) { if (! state.resumescheduled) {// สวิตช์เพื่อให้วิธีการ resume_ ถูกเรียกเพียงครั้งเดียวใน tick state เดียวกัน Resumescheduled = true; process.nexttick (resume_, สตรีม, สถานะ); - - ฟังก์ชั่น resume_ (สตรีมสถานะ) { if (! state.reading) { stream.read (0); - state.resumescheduled = false; stream.emit ('resume'); ไหล (สตรีม); - ฟังก์ชั่นโฟลว์ (สตรีม) {// เมื่อสตรีมอยู่ในโหมดสตรีมวิธีการนี้จะอ่านข้อมูลจากบัฟเฟอร์ต่อไปจนกว่าบัฟเฟอร์จะเป็นสถานะ const ที่ว่างเปล่า = สตรีม _readablestate; ในขณะที่ (state.flowing && stream.read ()! == null); // เนื่องจากวิธีการอ่านจะถูกเรียกที่นี่และสตรีมของผู้ฟังเหตุการณ์ 'อ่านได้' ได้รับการตั้งค่าวิธีการอ่านอาจเรียกได้ // สิ่งนี้ส่งผลให้ข้อมูลที่ไม่ต่อเนื่องกัน (ไม่ส่งผลกระทบต่อข้อมูลมีผลต่อการเรียกใช้วิธีการอ่านในการโทรกลับเหตุการณ์ 'อ่านได้' เพื่ออ่านข้อมูล) -2.4.6
เปลี่ยนสตรีมจากโหมดการไหลเป็นโหมดหยุดชั่วคราวหยุดยิงเหตุการณ์ 'ข้อมูล' และบันทึกข้อมูลทั้งหมดลงในบัฟเฟอร์
อ่านได้
Readable.prototype.pause = function () { if (this._readablestate.flowing! == False) { การดีบัก ('หยุดชั่วคราว'); this._readablestate.flowing = false; this.emit ('หยุดชั่วคราว'); - คืนสิ่งนี้; -
2.5.
วิธีการใช้งานได้รับการกล่าวถึงในส่วน bufferlist
2.5.1ที่นี่เราวาดกระบวนการทั่วไปและเงื่อนไขการแปลงโหมดที่เรียกใช้ของสตรีมที่อ่านได้
ใน: