1.1 التطور التاريخي للتدفقات التدفقات
ليست مفهومًا فريدًا لـ Nodejs. لقد تم تقديمها منذ عقود مضت في نظام التشغيل يونكس، ويمكن للبرامج أن تتفاعل مع بعضها البعض على التدفقات من خلال مشغل الأنابيب (|).
يمكن استخدام مشغل الأنابيب (|) في نظامي التشغيل MacOS وLinux استنادًا إلى أنظمة Unix، ويمكنه تحويل مخرجات العملية على الجانب الأيسر من المشغل إلى المدخلات على الجانب الأيمن.
في العقدة، إذا استخدمنا ملف القراءة التقليدي لقراءة ملف، فستتم قراءة الملف في الذاكرة من البداية إلى النهاية. وبعد قراءة جميع المحتويات، ستتم معالجة محتويات الملف المحمل في الذاكرة بشكل موحد.
هناك عيبان للقيام بذلك:
الذاكرة: فهي تستهلك قدرًا كبيرًا من
الوقت: تحتاج إلى الانتظار حتى يتم تحميل حمولة البيانات بالكامل قبل البدء في معالجة البيانات،
ومن أجل حل المشكلات المذكورة أعلاه، تستخدم Node اتبع .js مفهوم التدفقات ونفذه في Node. في تدفق .js، هناك أربعة أنواع من التدفقات، وكلها مثيلات لـ EventEmitter في Node.js:
التدفق القابل للقراءة،
والتدفق القابل
للكتابة، والتدفق المزدوج الكامل القابل للقراءة (Writable). (Duplex Stream)
Transform Stream (Transform Stream)
من أجل دراسة هذا الجزء بعمق وفهم مفهوم التدفقات في Node.js تدريجيًا، ولأن جزء الكود المصدري معقد نسبيًا، قررت أن أبدأ في تعلم هذا الجزء من التدفق القابل للقراءة .
1.2.ما هو الدفق؟
الدفق عبارة عن بنية بيانات مجردة، وهي عبارة عن مجموعة من البيانات يمكن أن تكون الأنواع التالية فقط (فقط في حالة objectMode === false):
We. يمكن استخدام التدفق الذي يتم رؤيته كمجموعة من هذه البيانات، تمامًا مثل السوائل، نقوم أولاً بحفظ هذه السوائل في حاوية (قائمة المخزن المؤقت الداخلي للتيار)، وعندما يتم تشغيل الحدث المقابل، نقوم بصب السائل بالداخل في الأنبوب وإخطار الآخرين بأن يحصلوا على أوعية خاصة بهم على الجانب الآخر من الأنبوب لاحتجاز السائل بداخله للتخلص منه.
1.3 ما هو الدفق القابل للقراءة؟
الدفق القابل للقراءة هو نوع من الدفق وله وضعان، وثلاث حالات،
ووضعان للقراءة:
وضع التدفق: ستتم قراءة البيانات من النظام الأساسي وتمريرها عبر EventEmitter في أقرب وقت ممكن. يتم تمرير البيانات إلى معالج الحدث المسجل في
وضع الإيقاف المؤقت: في هذا الوضع، لن تتم قراءة البيانات، ويجب استدعاء طريقة Stream.read() بشكل صريح لقراءة البيانات من الدفق.
ثلاث حالات:
readableFlowing = = = null: لن يتم إنشاء أية بيانات
. ولكن لن يتم تعليق إنشاء البيانات، لذلك سيحدث تراكم للبيانات
readableFlowing === true: إنشاء البيانات واستهلاكها بشكل طبيعي
2.1. تعريف الحالة الداخلية (ReadableState)
ReadableState
_readableState: ReadableState {. objectMode: false, // لتشغيل أنواع أخرى من البيانات باستثناء السلسلة والمخزن المؤقت والقيمة الخالية، يجب تشغيل هذا الوضع على HighWaterMark: 16384، // حد مستوى الماء، 1024 * 16، الافتراضي 16 كيلو بايت، إذا تم تجاوز هذا الحد ، سيتوقف الاستدعاء _read() يقرأ البيانات في المخزن المؤقت: BufferList { head: null, tail: null, length: 0 }, // القائمة المرتبطة بالمخزن المؤقت، تستخدم لحفظ البيانات length: 0, // حجم بيانات الدفق القابلة للقراءة بالكامل، إذا كان وضع الكائن يساوي المخزن المؤقت. أنابيب الطول: []، // احفظ جميع قوائم انتظار الأنابيب التي تراقب تدفق الدفق القابل للقراءة: فارغة، // حالة التدفق المستقل فارغة، خاطئة، صحيحة انتهى: خطأ، // تم استهلاك جميع البيانات endEmitted: خطأ، // ما إذا كان قد تم إرسال حدث النهاية أم لا للقراءة: خطأ، // ما إذا كانت البيانات قيد القراءة تم إنشاؤها: صحيح، // لا يمكن معالجة الدفق من قبل تم إنشاؤه أو فشل. تدمير المزامنة: صحيح، // ما إذا كان سيتم تشغيل الحدث 'readable'/'data' بشكل متزامن، أو الانتظار حتى علامة التجزئة التالية. needReadable: false, // ما إذا كان من الضروري إرسال الحدث القابل للقراءة المنبعث: false, // تم إرسال الحدث القابل للقراءة readableListening: false, // ما إذا كان هناك حدث استماع قابل للقراءة استئنافالمجدول: خطأ، // ما إذا كانت طريقة الاستئناف تم استدعاء errorEmitted: false، // خطأ تم إرسال الحدث emitClose: true، // عند تدمير الدفق، ما إذا كان سيتم إرسال حدث الإغلاق التدمير التلقائي: صحيح، // تم إتلافه تلقائيًا، يتم استدعاؤه بعد "النهاية" تم تدمير الحدث: خطأ، // ما إذا كان الدفق قد تم تدميره خطأ: فارغ، // يحدد ما إذا كان الدفق قد أبلغ عن خطأ مغلق: خطأ، // ما إذا كان الدفق قد تم إغلاقه CloseEmitted: false، // ما إذا كان الإغلاق تم إرسال الحدث defaultEncoding: 'utf8'، // تنسيق ترميز الأحرف الافتراضي WaitDrainWriters: null، // يشير إلى مرجع كاتب الحدث 'drain' الذي يتم مراقبته، النوع فارغ، قابل للكتابة، Set<Writable> multiAwaitDrain: false، // ما إذا كان هناك العديد من الكتاب ينتظرون قراءة حدث التصريف المزيد: خطأ، // ما إذا كان يمكن قراءة المزيد من البيانات dataEmitted: false، // تم إرسال البيانات وحدة فك الترميز: فارغة، // ترميز وحدة فك الترميز: فارغة، // التشفير[Symbol(kPaused)]: null },
2.2. تنفيذ تخزين البيانات الداخلية (BufferList)
BufferList عبارة عن حاوية تستخدم لتخزين البيانات الداخلية في الدفق، وهي مصممة على شكل قائمة مرتبطة ولها ثلاث سمات: الرأس والذيل والطول.
أمثل كل عقدة في BufferList باعتبارها BufferNode، ويعتمد نوع البيانات الموجودة بداخلها على objectMode.
تحصل بنية البيانات هذه على بيانات الرأس بشكل أسرع من Array.prototype.shift().
2.2.1 نوع تخزين البياناتإذا كان وضع الكائن === صحيحًا:
ثم يمكن أن تكون البيانات من أي نوع، وسيتم تخزين البيانات التي يتم دفعها.
objectMode=true
تيار ثابت = يتطلب('دفق'); const readableStream = new Stream.Readable({ وضع الكائن: صحيح، يقرأ() {}، }); readableStream.push({ الاسم: 'ليزا'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(true); console.log(readableStream._readableState.buffer.tail); readableStream.push('ليزا'); console.log(readableStream._readableState.buffer.tail); readableStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Symbol(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
نتائج التشغيل:
إذا كان وضع الكائن === خطأ:
ثم يمكن أن تكون البيانات عبارة عن سلسلة أو Buffer أو Uint8Array فقط
objectMode=false
تيار ثابت = يتطلب('دفق'); const readableStream = new Stream.Readable({ وضع الكائن: خطأ، يقرأ() {}، }); readableStream.push({ الاسم: 'ليزا'});
نتائج التشغيل:
2.2.2 هيكل تخزين البياناتنقوم بإنشاء دفق قابل للقراءة في وحدة التحكم من خلال سطر أوامر العقدة لمراقبة التغييرات في البيانات الموجودة في المخزن المؤقت:
بالطبع، قبل دفع البيانات، نحتاج إلى تنفيذ طريقة _read الخاصة بها، أو تنفيذ طريقة القراءة في معلمات المُنشئ:
تيار ثابت = يتطلب('دفق'); const readableStream = new Stream.Readable(); RS._read = الوظيفة (الحجم) {}
أو
تيار ثابت = يتطلب('دفق'); const readableStream = new Stream.Readable({ قراءة (الحجم) {} });
بعد عملية readableStream.push('abc')، يكون المخزن المؤقت الحالي هو:
يمكنك أن ترى أن البيانات الحالية مخزنة، والبيانات المخزنة في البداية والنهاية هي رموز ASCII للسلسلة "abc"، والنوع هو نوع المخزن المؤقت، ويمثل الطول عدد البيانات المحفوظة حاليًا وليس حجمها محتوى البيانات.
2.2.3 واجهات برمجة التطبيقات ذات الصلةطباعة جميع طرق BufferList التي يمكنك الحصول عليها:
باستثناء الانضمام، الذي يُجري تسلسلاً لقائمة BufferList في سلسلة، فإن العمليات الأخرى كلها عبارة عن عمليات وصول إلى البيانات.
لن أشرح جميع الطرق واحدة تلو الأخرى هنا، ولكن سأركز على Consumer و_getString و_getBuffer.
2.2.3.1.تستهلك
عنوان كود المصدر: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
com.comsume
// يستهلك كمية محددة من البايتات أو الأحرف من البيانات المخزنة مؤقتًا. تستهلك (ن، لديها سلاسل) { بيانات ثابتة = this.head.data; إذا (ن <طول البيانات) { // `الشريحة` هي نفسها بالنسبة للمخازن المؤقتة والسلاسل. شريحة ثابتة = data.slice(0, n); this.head.data = data.slice(n); شريحة العودة؛ } إذا (ن === طول البيانات) { // القطعة الأولى متطابقة تمامًا. إرجاع هذا.shift(); } // تمتد النتيجة إلى أكثر من مخزن مؤقت واحد. إرجاع hasStrings ؟ this._getString(n) : this._getBuffer(n); }
هناك ثلاثة شروط للحكم في الكود:
إذا كان طول البايت للبيانات المستهلكة أقل من طول البيانات المخزنة في العقدة الرئيسية للقائمة المرتبطة، فسيتم أخذ أول n بايت من بيانات العقدة الرئيسية، وتعيين بيانات العقدة الرئيسية الحالية للبيانات بعد التقطيع.
إذا كانت البيانات المستهلكة تساوي تمامًا طول البيانات المخزنة في العقدة الرئيسية للقائمة المرتبطة، فسيتم إرجاع بيانات العقدة الرئيسية الحالية مباشرةً.
إذا كان طول البيانات المستهلكة أكبر من طول العقدة الرئيسية للقائمة المرتبطة، فسيتم إصدار الحكم الأخير بناءً على المعلمة الثانية التي تم تمريرها لتحديد ما إذا كانت الطبقة السفلية من BufferList الحالية تخزن سلسلة أو مخزنًا مؤقتًا .
2.2.3.2
عنوان كود المصدر: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
com.comsume
// يستهلك كمية محددة من البايتات من البيانات المخزنة مؤقتًا. _getBuffer(ن) { const ret = Buffer.allocUnsafe(n); const retLen = n; دع p = this.head; دع ج = 0؛ يفعل { const buf = p.data; إذا (ن > بوف.طول) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf. length; } آخر { إذا (ن === buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++ج; إذا (ص.التالي) this.head = p.next; آخر this.head = this.tail = null; } آخر { TypedArrayPrototypeSet(ret, جديد Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); this.head = p; p.data = buf.slice(n); } استراحة؛ } ++ج; } while ((p = p.next) !== null); this.length -= c; عودة متقاعد؛ }
بشكل عام، إنها حلقة لتشغيل العقد في القائمة المرتبطة وإنشاء مصفوفة مخزن مؤقت جديدة لتخزين البيانات التي تم إرجاعها.
أولاً، ابدأ في جلب البيانات من العقدة الرئيسية للقائمة المرتبطة، واستمر في نسخها إلى المخزن المؤقت المنشأ حديثًا حتى تصبح بيانات عقدة معينة أكبر من أو تساوي الطول المطلوب جلبه مطروحًا منه الطول الذي تم الحصول عليه.
بمعنى آخر، بعد قراءة العقدة الأخيرة من القائمة المرتبطة، لم تصل إلى الطول المطلوب، لذلك يتم إرجاع المخزن المؤقت الذي تم إنشاؤه حديثًا.
2.2.3.3
عنوان كود المصدر: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
com.comsume
// يستهلك كمية محددة من الأحرف من البيانات المخزنة مؤقتًا. _getString(ن) { دع ريت = ''; دع p = this.head; دع ج = 0؛ يفعل { const str = p.data; إذا (ن > طول السلسلة) { ret += str; n -= str. length; } آخر { إذا (ن === طول) { ret += str; ++ج; إذا (ص.التالي) this.head = p.next; آخر this.head = this.tail = null; } آخر { ret += StringPrototypeSlice(str, 0, n); this.head = p; p.data = StringPrototypeSlice(str, n); } استراحة؛ } ++ج; } while ((p = p.next) !== null); this.length -= c; عودة متقاعد؛ }
عملية السلاسل هي نفس عملية المخازن المؤقتة، كما أنها تقرأ البيانات من رأس القائمة المرتبطة في حلقة. هناك بعض الاختلافات فقط في نسخ البيانات وتخزينها عملية _getString هي نوع السلسلة.
2.3 لماذا توجد مثيلات للتدفقات القابلة للقراءة لـ EventEmitter؟
بالنسبة لهذا السؤال، يجب علينا أولاً أن نفهم ما هو نموذج النشر والاشتراك؟ يحتوي نموذج النشر والاشتراك على تطبيقات مهمة في معظم واجهات برمجة التطبيقات، سواء كانت Promise أو Redux، ويمكن رؤية واجهات برمجة التطبيقات المتقدمة المستندة إلى نموذج النشر والاشتراك في كل مكان.
وتتمثل ميزته في أنه يمكنه تخزين وظائف رد الاتصال المتعلقة بالحدث في قائمة الانتظار، ثم إخطار الطرف الآخر بمعالجة البيانات في وقت معين في المستقبل، وبالتالي تحقيق الفصل بين الاهتمامات، وينتج المنتج البيانات فقط ويخطر المستهلك ، بينما يقوم المستهلك بعد ذلك بمعالجة الأحداث المقابلة والبيانات المقابلة لها فقط، ويتناسب نموذج تدفق Node.js مع هذه الخاصية.
إذًا كيف يقوم تيار Node.js بتنفيذ إنشاء المثيلات بناءً على EventEmitter؟
الكود المصدري لهذا موجود هنا: تيار/تراث https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
إرث
دفق الدالة (الخيارات) { EE.call(this, opts); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE);
ثم هناك هذه الأسطر من التعليمات البرمجية في التعليمات البرمجية المصدر للدفق القابل للقراءة:
هذا الجزء من الكود المصدري موجود هنا: قابل للقراءة https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
إرث
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream);
أولاً، وراثة كائن النموذج الأولي للدفق من EventEmitter، بحيث يمكن لجميع مثيلات Stream الوصول إلى الأساليب الموجودة على EventEmitter.
في الوقت نفسه، يتم أيضًا توريث الأساليب الثابتة في EventEmitter من خلال ObjectSetPrototypeOf(Stream, EE)، وفي مُنشئ Stream، يتم استعارة مُنشئ EE لتحقيق وراثة جميع الخصائص في EventEmitter، ثم في الدفق القابل للقراءة، استخدم نفس الطريقة التي تطبق الميراث النموذجي ووراثة الخصائص الثابتة لفئة الدفق، وبالتالي الحصول على:
Readable.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototype
لذلك:
Readable.prototype.__proto__.__proto__ === EE.prototype
لذلك، يمكنك العثور على النموذج الأولي لـ EventEmitter من خلال تتبع سلسلة النموذج الأولي للتدفق القابل للقراءة، وتحقيق وراثة EventEmitter.
2.4. تنفيذ واجهات برمجة التطبيقات ذات الصلة
سيتم عرض واجهات برمجة التطبيقات هنا بالترتيب الذي تظهر به في مستندات التعليمات البرمجية المصدر، وسيتم شرح تطبيقات واجهة برمجة التطبيقات الأساسية فقط.
ملاحظة: يتم هنا تفسير الوظائف المعلنة في كود مصدر التدفق القابل للقراءة لـ Node.js فقط، ولم يتم تضمين تعريفات الوظائف المقدمة خارجيًا لتقليل الطول، لن يتم نسخ جميع الرموز.
قابل للقراءة.النموذج الأولي
تدفق { تدمير: [الوظيفة: تدمير]، _undestroy: [الوظيفة: unstroy]، _destroy: [الوظيفة (مجهولة)]، الدفع: [وظيفة (مجهول)]، إلغاء التحول: [الوظيفة (مجهولة)]، متوقف مؤقتًا: [وظيفة (مجهولة)]، setEncoding: [الوظيفة (مجهولة)]، قراءة: [وظيفة (مجهول)]، _اقرأ: [الوظيفة (مجهولة)]، الأنبوب: [وظيفة (مجهول)]، unpipe: [وظيفة (مجهول)]، على: [الوظيفة (مجهولة)]، addListener: [الوظيفة (مجهولة)]، RemoveListener: [الوظيفة (مجهولة)]، إيقاف: [وظيفة (مجهول)]، إزالة AllListeners: [الوظيفة (مجهولة)]، السيرة الذاتية: [الوظيفة (مجهولة)]، وقفة: [وظيفة (مجهول)]، التفاف: [وظيفة (مجهول)]، مكرر: [وظيفة (مجهول)]، [الرمز (nodejs.rejection)]: [الوظيفة (مجهولة)]، [الرمز (Symbol.asyncIterator)]: [الوظيفة (مجهولة)] }2.4.1
readable.push
Readable.prototype.push = function(chunk, encoding) { return readableAddChunk(this, Chunk, encoding, false); };
تتمثل الوظيفة الرئيسية لطريقة الدفع في تمرير كتلة البيانات إلى خط أنابيب المصب عن طريق تشغيل حدث "البيانات"، أو تخزين البيانات في المخزن المؤقت الخاص بها.
الكود التالي هو رمز زائف ذو صلة ويظهر فقط العملية الرئيسية:
readable.push
وظيفة readableAddChunk(stream, Chunk, encoding, addToFront) { حالة const = تيار._readableState; if (chunk === null) { // دفع إشارة نهاية الدفق الفارغة، لا يمكن كتابة المزيد من البيانات بعد تلك الحالة. القراءة = خطأ؛ onEofChunk(stream,state); } else if (!state.objectMode) { // إذا لم يكن وضع الكائن if (typeof Chunk === 'string') { قطعة = Buffer.from(chunk); } else if (chunk exampleof Buffer) { //إذا كان Buffer // معالجة الترميز} else if (Stream._isUint8Array(chunk)) { Chunk = Stream._uint8ArrayToBuffer(chunk); } وإلا إذا (قطعة!= فارغة) { err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], Chunk); } } if (state.objectMode || (chunk && Chunk.length > 0)) { // إنه وضع كائن أو أن القطعة عبارة عن مخزن مؤقت // تم حذف الحكم على العديد من طرق إدراج البيانات هنا addChunk(stream, State, Chunk, true); } } وظيفة addChunk(stream,state,chunk,addToFront) { إذا (state.flowing && State. length === 0 && !state.sync && stream.listenerCount('data') > 0) { // إذا كان هناك مشتركون يستمعون إلى البيانات في وضع البث،stream.emit('data', Chunk); } else { // بخلاف ذلك، احفظ البيانات في المخزن المؤقتstate. length +=state.objectMode 1 : Chunk.length; إذا (أد توفرونت) { State.buffer.unshift(chunk); } آخر { State.buffer.push(chunk); } } mayReadMore(stream,state);// حاول قراءة المزيد من البيانات}
تنقسم عملية الدفع بشكل أساسي إلى الحكم على وضع الكائن، وستقوم الأنواع المختلفة بتنفيذ عمليات مختلفة على البيانات الواردة:
الحكم الأول لـ addChunk هو بشكل أساسي التعامل مع الموقف عندما يكون Readable في وضع التدفق، ولديه مستمع للبيانات، وبيانات المخزن المؤقت فارغة.
في هذا الوقت، يتم تمرير البيانات بشكل أساسي إلى البرامج الأخرى التي تشترك في حدث البيانات، وإلا فسيتم حفظ البيانات في المخزن المؤقت.
2.4.2 القراءةباستثناء الحكم على شروط الحدود وحالة التدفق، تحتوي هذه الطريقة بشكل أساسي على عمليتين.
قم باستدعاء طريقة _read التي ينفذها المستخدم لمعالجة نتائج التنفيذ
اقرأ البيانات من المخزن المؤقت وقم بتشغيل حدث "البيانات".
readable.read
// إذا كان طول القراءة أكبر من hwm، فسيتم إعادة حساب hwm إذا (ن> State.highWaterMark) { state.highWaterMark = computeNewHighWaterMark(n); } // اتصل بالطريقة _read التي ينفذها المستخدم حاول { نتيجة ثابتة = this._read(state.highWaterMark); إذا (النتيجة! = فارغة) { const ثم = result.then; إذا (نوع ثم === 'وظيفة') { ثم اتصل ( نتيجة، كلا، وظيفة (خطأ) { errorOrDestroy(this, err); }); } } } مسك (يخطئ) { errorOrDestroy(this, err); }
إذا أعادت الطريقة _read التي ينفذها المستخدم وعدًا، فاستدعي الطريقةthen لهذا الوعد وقم بتمرير عمليات رد الاتصال الخاصة بالنجاح والفشل لتسهيل معالجة الاستثناءات.
الكود الأساسي لطريقة القراءة لقراءة بيانات المنطقة من المخزن المؤقت هو كما يلي:
readable.read
وظيفة من القائمة (ن، الحالة) { // لا شيء مخزنة. إذا (الحالة. الطول === 0) عودة فارغة؛ دع ريت؛ إذا (state.objectMode) ret =state.buffer.shift(); else if (!n || n >=state.length) { // تعامل مع الحالة التي يكون فيها n فارغًا أو أكبر من طول المخزن المؤقت // اقرأها كلها، واقتطع القائمة. if (state.decoder) // إذا كان هناك وحدة فك ترميز، فقم بإجراء تسلسل للنتيجة في سلسلة ret =state.buffer.join(''); else if (state.buffer.length === 1) // هناك بيانات واحدة فقط، قم بإرجاع بيانات العقدة الرئيسية ret =state.buffer.first(); else // قم بتخزين جميع البيانات في المخزن المؤقت ret =state.buffer.concat(state.length); State.buffer.clear(); // مسح المخزن المؤقت} else { // تعامل مع الموقف الذي يكون فيه طول القراءة أقل من المخزن المؤقت ret =state.buffer.consume(n,state.decoder); } عودة متقاعد؛ }2.4.3_اقرأ
طريقة يجب تنفيذها عندما يقوم المستخدمون بتهيئة مجرى قابل للقراءة. يمكنك استدعاء طريقة الدفع في هذه الطريقة لتشغيل طريقة القراءة بشكل مستمر. عندما نضغط على القيمة null، يمكننا إيقاف عملية الكتابة للتدفق.
رمز العينة:
قابل للقراءة._read
تيار ثابت = يتطلب('دفق'); const readableStream = new Stream.Readable({ قراءة (هوم) { this.push(String.fromCharCode(this.currentCharCode++)); إذا (this.currentCharCode > 122) { this.push(null); } }, }); readableStream.currentCharCode = 97; readableStream.pipe(process.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4 الأنابيب (مهم)
قم بربط واحد أو أكثر من التدفقات القابلة للكتابة بالتدفق الحالي القابل للقراءة، وقم بتبديل التدفق القابل للقراءة إلى الوضع المتدفق.
هناك العديد من مقابض الاستماع للأحداث في هذه الطريقة، ولن أعرضها واحدًا تلو الآخر هنا:
readable.pipe
Readable.prototype.pipe = function(dest, PipeOpts) { const src = this; حالة ثابتة = this._readableState; State.pipes.push(dest); // تجميع الدفق القابل للكتابة src.on('data', ondata); وظيفة ondata(قطعة) { const ret = dest.write(chunk); إذا (إعادة === خطأ) { يوقف()؛ } } // أخبر الوجهة أنه يتم إرسالها إليه. dest.emit('pipe', src); // ابدأ الدفق إذا كان الدفق في وضع الإيقاف المؤقت if (dest.writableNeedDrain === true) { إذا (حالة التدفق) { يوقف()؛ } } وإلا إذا (!state.flowing) { src.resume(); } وجهة العودة؛ }
تشبه عملية توجيه الإخراج إلى حد كبير مشغل توجيه Linux '|'، حيث تقوم بتغيير الإخراج الأيسر إلى الإدخال الأيمن. تجمع هذه الطريقة الدفق القابل للكتابة للصيانة، وتطلق حدث "البيانات" عندما يكون الدفق القابل للقراءة متاحًا.
عندما تتدفق البيانات، سيتم تشغيل حدث الكتابة للتدفق القابل للكتابة، بحيث يمكن نقل البيانات وتنفيذ عمليات مثل خط الأنابيب. وسيتم تلقائيًا تغيير الدفق القابل للقراءة في وضع الإيقاف المؤقت إلى وضع التدفق.
2.4.5 السيرة الذاتيةقم بتحويل الدفق من وضع "الإيقاف المؤقت" إلى وضع "التدفق" إذا تم ضبط مستمع الأحداث "القابل للقراءة"، فلن يكون لهذه الطريقة أي تأثير.
readable.resume
Readable.prototype.resume = function() { حالة ثابتة = this._readableState; إذا (! الحالة. التدفق) { State.flowing = !state.readableListening; // ما إذا كان في الوضع المتدفق يعتمد على ما إذا كان مؤشر الاستماع "القابل للقراءة" قد تم تعيينه استئناف (هذا، الحالة)؛ } }; استئناف الوظيفة (الدفق، الحالة) { if (!state.resumeScheduled) { // قم بالتبديل بحيث يتم استدعاء أسلوب السيرة الذاتية مرة واحدة فقط في نفس العلامةstate.resumeScheduled = true; Process.nextTick(resume_,stream,state); } } وظيفة استئناف_(الدفق، الحالة) { إذا (! الحالة. القراءة) { تيار.قراءة(0); } state.resumeScheduled = false; Stream.emit('استئناف'); تدفق (تيار)؛ } functionflow(stream) { // عندما يكون الدفق في وضع الدفق، ستستمر هذه الطريقة في قراءة البيانات من المخزن المؤقت حتى يصبح المخزن المؤقت فارغًا conststate =stream._readableState; while (state.flowing &&stream.read() !== null); // نظرًا لأنه سيتم استدعاء طريقة القراءة هنا وتعيين دفق مستمع الأحداث "القابل للقراءة"، فقد يتم أيضًا استدعاء طريقة القراءة. // يؤدي هذا إلى بيانات غير متماسكة (لا يؤثر على البيانات، بل يؤثر فقط على استدعاء طريقة القراءة في رد اتصال الحدث "القابل للقراءة" لقراءة البيانات) }2.4.6 وقفة
قم بتغيير الدفق من وضع التدفق إلى وضع الإيقاف المؤقت، وتوقف عن إطلاق حدث "البيانات"، واحفظ جميع البيانات في المخزن المؤقت
readable.pause
Readable.prototype.pause = function() { إذا (this._readableState.flowing !== false) { تصحيح('وقفة'); this._readableState.flowing = false; this.emit('وقفة'); } رد هذا؛ };
2.5 آلية الاستخدام والعمل
تم ذكر طريقة الاستخدام في قسم BufferList. قم بإنشاء نسخة قابلة للقراءة وقم بتنفيذ طريقة _read() الخاصة بها، أو قم بتنفيذ طريقة القراءة في معلمة الكائن الأول للمنشئ.
2.5.1 آلية العملنحن هنا نرسم فقط العملية العامة وتحويل الوضع الذي يؤدي إلى شروط المجرى القابل للقراءة.
في: