1.1 Sejarah evolusi
aliran Aliran bukanlah sebuah konsep yang unik untuk Nodejs. Mereka diperkenalkan beberapa dekade yang lalu di sistem operasi Unix, dan program dapat berinteraksi satu sama lain melalui aliran melalui operator pipa (|).
Operator pipa (|) dapat digunakan di MacOS dan Linux berbasis sistem Unix. Operator ini dapat mengubah output proses di sisi kiri operator menjadi input di sisi kanan.
Di Node, jika kita menggunakan readFile tradisional untuk membaca file, file tersebut akan dibaca ke dalam memori dari awal hingga akhir. Ketika semua konten telah dibaca, konten file yang dimuat ke dalam memori akan diproses secara seragam.
Ada dua kelemahan dalam melakukan hal ini:
memori
: memakan banyak waktu memori
: Anda harus menunggu seluruh muatan data dimuat sebelum mulai memproses data
.js mengikuti dan mengimplementasikan konsep aliran. Di Node Dalam aliran .js, ada empat jenis aliran. Semuanya merupakan instance EventEmitter di Node.js:
Aliran yang Dapat Dibaca,
Aliran yang Dapat Ditulis,
Aliran Dupleks Penuh yang Dapat Dibaca dan Dapat Ditulis ( Duplex Stream)
Transform Stream (Transform Stream)
Untuk mempelajari bagian ini secara mendalam dan secara bertahap memahami konsep aliran di Node.js, dan karena bagian kode sumbernya relatif rumit, saya memutuskan untuk mulai mempelajari bagian ini dari aliran yang dapat dibaca .
1.2. Apa yang dimaksud dengan aliran?
Aliran adalah struktur data abstrak, yang merupakan kumpulan data. Tipe data yang disimpan di dalamnya hanya dapat berupa tipe berikut (hanya untuk kasus objectMode === false):
We dapat menggunakan aliran Terlihat sebagai kumpulan data ini, sama seperti cairan, pertama-tama kita menyimpan cairan ini dalam wadah (buffer internal BufferList aliran), dan ketika peristiwa terkait dipicu, kita menuangkan cairan di dalamnya ke dalam pipa . Dan beri tahu orang lain untuk mengambil wadahnya sendiri di sisi lain pipa untuk menampung cairan di dalamnya untuk dibuang.
1.3. Apa yang dimaksud dengan aliran yang dapat dibaca?
Aliran yang dapat dibaca adalah jenis aliran yang memiliki dua mode, tiga status,
dan dua mode pembacaan:
mode aliran: data akan dibaca dari sistem yang mendasarinya dan diteruskan melalui EventEmitter sesegera mungkin.
data
tidak akandibaca
,
dan metode Stream.read() harus dipanggil secara eksplisit untuk membaca data dari aliran
= null: Tidak ada data yang akan dihasilkan. Memanggil Stream.pipe() dan Stream.resume akan mengubah statusnya menjadi true, mulai menghasilkan data dan secara aktif memicu acara
readableFlowing === false: Aliran data akan ditangguhkan saat ini , tetapi tidak akan Pembuatan data akan ditangguhkan, sehingga simpanan data akan terjadi.
readableFlowing === true: Biasanya menghasilkan dan menggunakan data
2.1. Definisi status internal (ReadableState)
ReadableState
_readableState: ReadableState { objectMode: false, // Untuk mengoperasikan tipe data lain kecuali string, Buffer, dan null, mode ini perlu diaktifkan highWaterMark: 16384, // Batas ketinggian air, 1024 * 16, default 16kb, jika batas ini terlampaui , panggilan akan berhenti _read() membaca data ke dalam buffer buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer linked list, digunakan untuk menyimpan data panjang: 0, // Ukuran seluruh data aliran yang dapat dibaca, jika objectMode sama dengan buffer.panjang pipa: [], // Simpan semua antrian pipa yang memantau aliran aliran yang dapat dibaca: null, // Status aliran independen adalah null, false, true berakhir: salah, // Semua data telah digunakan endEmit: salah, // Apakah acara akhir telah dikirim atau tidak terbaca: salah, // Apakah data sedang dibaca dibuat: benar, // Aliran tidak dapat diproses sebelumnya itu dibuat atau gagal. Hancurkan sinkronisasi: benar, // Apakah akan memicu peristiwa 'dapat dibaca'/'data' secara sinkron, atau menunggu hingga centang berikutnya needReadable: false, // Apakah perlu mengirim acara yang dapat dibaca yang dipancarkanReadable: false, // Acara yang dapat dibaca telah dikirim readableListening: false, // Apakah ada acara mendengarkan yang dapat dibaca resumeScheduled: false, // Apakah metode resume telah dipanggil errorEmited: false, // Error Acara telah dikirim emitClose: true, // Ketika aliran dihancurkan, apakah akan mengirim acara penutupan autoDestroy: true, // Dihancurkan secara otomatis, itu dipanggil setelah 'akhir' peristiwa dipicu musnah: salah, // Apakah aliran telah dimusnahkan error: null, // Mengidentifikasi apakah aliran telah melaporkan kesalahan ditutup: salah, // Apakah aliran telah ditutup closeEmited: false, // Apakah penutupan acara telah dikirim defaultEncoding: 'utf8', // Format pengkodean karakter default menungguDrainWriters: null, // Menunjuk ke 'drain' Referensi penulis acara yang dipantau, tipenya adalah null, Dapat ditulis, Set<Writable> multiAwaitDrain: false, // Apakah ada beberapa penulis yang menunggu acara drain readMore: false, // Apakah lebih banyak data yang dapat dibaca dataEmit: false, // Data telah dikirim decoder: null, // Pengkodean decoder: null, // Encoder[Simbol(kDijeda)]: null },
2.2. Implementasi penyimpanan data internal (BufferList)
BufferList adalah wadah yang digunakan untuk menyimpan data internal dalam suatu aliran. Ini dirancang dalam bentuk daftar tertaut dan memiliki tiga atribut: kepala, ekor dan panjang.
Saya mewakili setiap node di BufferList sebagai BufferNode, dan tipe Data di dalamnya bergantung pada objectMode.
Struktur data ini memperoleh data header lebih cepat daripada Array.prototype.shift().
2.2.1.Jenis penyimpanan datajika objectMode === benar:
Kemudian data bisa jenis apa pun. Data apa pun yang didorong akan disimpan.
mode objek=benar
const Aliran = memerlukan('aliran'); const readableStream = Aliran baru.Dapat Dibaca({ mode objek: benar, membaca() {}, }); readableStream.push({ nama: 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(benar); console.log(readableStream._readableState.buffer.tail); readableStream.push('lisa'); console.log(readableStream._readableState.buffer.tail); readableStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Simbol(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
Hasil berjalan:
jika objectMode === salah:
Maka datanya hanya bisa berupa string atau Buffer atau Uint8Array
mode objek=salah
const Aliran = memerlukan('aliran'); const readableStream = Aliran baru.Dapat Dibaca({ Mode Objek: salah, membaca() {}, }); readableStream.push({ nama: 'lisa'});
Hasil berjalan:
2.2.2.Struktur penyimpanan dataKami membuat aliran yang dapat dibaca di konsol melalui baris perintah node untuk mengamati perubahan data di buffer:
Tentu saja, sebelum memasukkan data, kita perlu mengimplementasikan metode _read-nya, atau mengimplementasikan metode read di parameter konstruktor:
const Aliran = memerlukan('aliran'); const readableStream = Aliran baru.Readable(); RS._read = fungsi(ukuran) {}
atau
const Aliran = memerlukan('aliran'); const readableStream = Aliran baru.Dapat Dibaca({ baca(ukuran) {} });
Setelah operasi readableStream.push('abc'), buffer saat ini adalah:
Anda dapat melihat bahwa data saat ini disimpan. Data yang disimpan di awal dan akhir adalah kode ASCII dari string 'abc', dan tipenya adalah tipe Buffer. Panjangnya mewakili jumlah data yang disimpan saat ini, bukan ukurannya isi datanya.
2.2.3.API TerkaitMencetak semua metode BufferList yang bisa Anda dapatkan:
Kecuali untuk join, yang membuat serial BufferList menjadi string, yang lainnya adalah operasi akses data.
Saya tidak akan menjelaskan semua metode satu per satu di sini, tetapi fokus pada konsumsi, _getString dan _getBuffer.
2.2.3.1.mengkonsumsi
Alamat kode sumber: BufferList.consume https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
konsumsi
// Mengkonsumsi sejumlah byte atau karakter tertentu dari data yang di-buffer. konsumsi(n, hasStrings) { const data = ini.kepala.data; if (n < data.panjang) { // `slice` sama untuk buffer dan string. const irisan = data.irisan(0, n); this.head.data = data.slice(n); kembalikan irisan; } if (n === data.panjang) { // Potongan pertama sangat cocok. kembalikan ini.shift(); } // Hasil mencakup lebih dari satu buffer. kembalikan hasStrings ? ini._getString(n) : ini._getBuffer(n); }
Ada tiga kondisi penilaian dalam kode:
Jika panjang byte dari data yang dikonsumsi kurang dari panjang data yang disimpan di node kepala dari daftar tertaut, n byte pertama dari data dari node kepala diambil, dan data dari node kepala saat ini diatur. ke data setelah diiris.
Jika data yang dikonsumsi sama persis dengan panjang data yang disimpan di node kepala dari daftar tertaut, data dari node kepala saat ini dikembalikan secara langsung.
Jika panjang data yang dikonsumsi lebih besar dari panjang node kepala daftar tertaut, penilaian terakhir akan dibuat berdasarkan parameter kedua yang diteruskan untuk menentukan apakah lapisan bawah BufferList saat ini menyimpan string atau Buffer .
2.2.3.2._dapatkanBuffer
Alamat kode sumber: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
konsumsi
// Mengkonsumsi sejumlah byte tertentu dari data yang di-buffer. _getBuffer(n) { const ret = Buffer.allocUnsafe(n); const retLen = n; misalkan p = ini.kepala; misalkan c = 0; Mengerjakan { const buf = p.data; if (n > buf.panjang) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.panjang; } kalau tidak { if (n === buf.panjang) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++c; jika (hal.berikutnya) this.head = p.selanjutnya; kalau tidak ini.kepala = ini.ekor = null; } kalau tidak { TypedArrayPrototypeSet(ret, Uint8Array baru(buf.buffer, buf.byteOffset, n), retLen - n); ini.kepala = p; p.data = buf.slice(n); } merusak; } ++c; } while ((p = p.berikutnya) !== null); ini.panjang -= c; kembali mundur; }
Secara umum, ini adalah perulangan untuk mengoperasikan node dalam daftar tertaut, dan membuat array Buffer baru untuk menyimpan data yang dikembalikan.
Pertama, mulailah mengambil data dari node kepala dari daftar tertaut, dan terus menyalinnya ke Buffer yang baru dibuat hingga data dari node tertentu lebih besar atau sama dengan panjang yang akan diambil dikurangi panjang yang telah diperoleh.
Dengan kata lain, setelah membaca node terakhir dari linked list, belum mencapai panjang yang diinginkan, sehingga Buffer yang baru dibuat dikembalikan.
2.2.3.3._getString
Alamat kode sumber: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
konsumsi
// Menggunakan sejumlah karakter tertentu dari data yang di-buffer. _getString(n) { biarkan ret = ''; misalkan p = ini.kepala; misalkan c = 0; Mengerjakan { const str = p.data; if (n > str.panjang) { kembali += str; n -= str.panjang; } kalau tidak { if (n === str.panjang) { kembali += str; ++c; jika (hal.berikutnya) this.head = p.selanjutnya; kalau tidak ini.kepala = ini.ekor = null; } kalau tidak { ret += StringPrototypeSlice(str, 0, n); ini.kepala = p; p.data = StringPrototypeSlice(str, n); } merusak; } ++c; } while ((p = p.berikutnya) !== null); ini.panjang -= c; kembali mundur; }
Pengoperasian string sama dengan pengoperasian Buffer. Ia juga membaca data dari kepala daftar tertaut dalam satu lingkaran. Hanya ada beberapa perbedaan dalam penyalinan dan penyimpanan data Operasi _getString adalah tipe string.
2.3. Mengapa aliran yang dapat dibaca merupakan contoh EventEmitter?
Untuk pertanyaan ini, pertama-tama kita harus memahami apa itu model terbitkan-langganan. Model terbitkan-langganan memiliki aplikasi penting di sebagian besar API. Baik itu Promise atau Redux, API tingkat lanjut yang didasarkan pada model terbitkan-langganan dapat dilihat di mana saja.
Keuntungannya adalah dapat menyimpan fungsi panggilan balik terkait peristiwa dalam antrian, dan kemudian memberi tahu pihak lain untuk memproses data pada waktu tertentu di masa mendatang, sehingga mencapai pemisahan kekhawatiran. Produsen hanya memproduksi data dan memberi tahu konsumen , sedangkan konsumen Kemudian hanya memproses peristiwa terkait dan data terkait, dan model streaming Node.js sesuai dengan karakteristik ini.
Jadi bagaimana aliran Node.js mengimplementasikan pembuatan instance berdasarkan EventEmitter?
Kode sumber untuk ini ada di sini: stream/legacy https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
warisan
fungsi Aliran(memilih) { EE.call(ini, pilih); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Aliran, EE);
Lalu ada baris kode berikut di kode sumber aliran yang dapat dibaca:
Bagian kode sumber ini ada di sini: dapat dibaca https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
warisan
ObjectSetPrototypeOf(Dapat dibaca.prototipe, Stream.prototipe); ObjectSetPrototypeOf(Dapat Dibaca, Aliran);
Pertama, warisi objek prototipe Stream dari EventEmitter, sehingga semua instance Stream dapat mengakses metode di EventEmitter.
Pada saat yang sama, metode statis di EventEmitter juga diwarisi melalui ObjectSetPrototypeOf(Stream, EE), dan di konstruktor Stream, konstruktor EE dipinjam untuk mewujudkan pewarisan semua properti di EventEmitter, dan kemudian di aliran yang dapat dibaca, gunakan metode yang sama mengimplementasikan pewarisan prototipe dan pewarisan properti statis dari kelas Stream, sehingga memperoleh:
Dapat dibaca.prototype.__proto__ === Stream.prototype;
Aliran.prototipe.__proto__ === EE.prototipe
Karena itu:
Dapat dibaca.prototipe.__proto__.__proto__ === EE.prototipe
Oleh karena itu, Anda dapat menemukan prototipe EventEmitter dengan menelusuri rantai prototipe aliran yang dapat dibaca, dan mewujudkan warisan EventEmitter.
2.4. Implementasi API terkait
API akan ditampilkan di sini sesuai urutan kemunculannya di dokumen kode sumber, dan hanya implementasi API inti yang akan dijelaskan.
Catatan: Hanya fungsi yang dideklarasikan dalam kode sumber aliran Node.js yang dapat dibaca yang ditafsirkan di sini, dan definisi fungsi yang diperkenalkan secara eksternal tidak disertakan. Untuk mengurangi panjangnya, semua kode tidak akan disalin.
Dapat dibaca.prototipe
Sungai kecil { menghancurkan: [Fungsi: menghancurkan], _unstroy: [Fungsi: unestroy], _destroy: [Fungsi (anonim)], dorong: [Fungsi (anonim)], unshift: [Fungsi (anonim)], isPaused: [Fungsi (anonim)], setEncoding: [Fungsi (anonim)], baca: [Fungsi (anonim)], _baca: [Fungsi (anonim)], pipa: [Fungsi (anonim)], buka pipa: [Fungsi (anonim)], pada: [Fungsi (anonim)], addListener: [Fungsi (anonim)], hapusListener: [Fungsi (anonim)], mati: [Fungsi (anonim)], hapusAllListeners: [Fungsi (anonim)], melanjutkan: [Fungsi (anonim)], jeda: [Fungsi (anonim)], bungkus: [Fungsi (anonim)], iterator: [Fungsi (anonim)], [Simbol(nodejs.penolakan)]: [Fungsi (anonim)], [Simbol(Simbol.asyncIterator)]: [Fungsi (anonim)] }2.4.1
dapat dibaca.push
Dapat dibaca.prototipe.push = function(potongan, pengkodean) { return readableAddChunk(ini, potongan, pengkodean, false); };
Fungsi utama dari metode push adalah untuk meneruskan blok data ke pipa hilir dengan memicu peristiwa 'data', atau untuk menyimpan data dalam buffernya sendiri.
Kode berikut adalah kodesemu yang relevan dan hanya menampilkan proses utama:
dapat dibaca.push
fungsi dapat dibacaAddChunk(aliran, potongan, pengkodean, addToFront) { const state = aliran._readableState; if (chunk === null) { // push sinyal akhir aliran null, tidak ada lagi data yang dapat ditulis setelah status tersebut.reading = false; onEofChunk(aliran, negara bagian); } else if (!state.objectMode) { // Jika bukan mode objek if (typeof chunk === 'string') { potongan = Buffer.dari(potongan); } else if (potongan instanceof Buffer) {//Jika itu adalah Buffer // Memproses pengkodean} else if (Stream._isUint8Array(chunk)) { potongan = Aliran._uint8ArrayToBuffer(potongan); } else if (potongan != null) { err = new ERR_INVALID_ARG_TYPE('potongan', ['string', 'Buffer', 'Uint8Array'], potongan); } } if (state.objectMode || (chunk && chunk.length > 0)) { // Ini adalah mode objek atau chunk adalah Buffer // Penilaian beberapa metode penyisipan data dihilangkan di sini addChunk(stream, state, chunk, true); } } fungsi addChunk(aliran, status, potongan, addToFront) { if (status.mengalir && status.panjang === 0 && !status.sinkronisasi && stream.listenerCount('data') > 0) { // Jika dalam mode streaming, ada pelanggan yang mendengarkan data stream.emit('data', chunk); } else { // Jika tidak, simpan data ke buffer state.length += state.objectMode 1 : chunk.length; jika (tambahkan Ke Depan) { state.buffer.unshift(potongan); } kalau tidak { state.buffer.push(potongan); } } mayReadMore(stream, state); // Coba baca lebih banyak data}
Operasi push terutama dibagi menjadi menilai objectMode. Tipe yang berbeda akan melakukan operasi yang berbeda pada data yang masuk:
Penilaian pertama addChunk terutama untuk menangani situasi ketika Readable berada dalam mode mengalir, memiliki pendengar data, dan data buffer kosong.
Pada saat ini, sebagian besar data diteruskan ke program lain yang berlangganan peristiwa data, jika tidak, data akan disimpan dalam buffer.
2.4.2.bacaKecuali untuk penilaian kondisi batas dan status aliran, metode ini mempunyai dua operasi utama.
Panggil metode _read yang diterapkan pengguna untuk memproses hasil eksekusi
Membaca data dari buffer buffer dan memicu peristiwa 'data'
dapat dibaca.dibaca
// Jika panjang baca lebih besar dari hwm, hwm akan dihitung ulang jika (n > status.highWaterMark) { state.highWaterMark = komputasiNewHighWaterMark(n); } // Panggil metode _read yang diterapkan pengguna, coba { hasil const = ini._read(state.highWaterMark); jika (hasil != nol) { const lalu = hasil.lalu; if (typeof maka === 'fungsi') { lalu.panggil( hasil, tidak, fungsi(salah) { errorOrDestroy(ini, err); }); } } } menangkap (salah) { errorOrDestroy(ini, err); }
Jika metode _read yang diterapkan oleh pengguna mengembalikan sebuah janji, panggil metode kemudian dari janji ini dan teruskan callback keberhasilan dan kegagalan untuk memfasilitasi penanganan pengecualian.
Kode inti dari metode baca untuk membaca data zona dari buffer adalah sebagai berikut:
dapat dibaca.dibaca
fungsi dari Daftar(n, negara bagian) { // tidak ada yang di-buffer. if (status.panjang === 0) kembalikan nol; biarkan mundur; if (status.objectMode) ret = status.buffer.shift(); else if (!n || n >= state.length) { // Tangani kasus dimana n kosong atau lebih besar dari panjang buffer // Baca semuanya, potong daftarnya. if (state.decoder) // Jika ada decoder, buat serial hasilnya menjadi string ret = state.buffer.join(''); else if (state.buffer.length === 1) // Hanya ada satu data, kembalikan data simpul kepala ret = state.buffer.first(); else // Simpan semua data ke dalam Buffer ret = state.buffer.concat(state.length); state.buffer.clear(); // Hapus buffer} else { // Tangani situasi dimana panjang baca kurang dari buffer ret = state.buffer.consume(n, state.decoder); } kembali mundur; }2.4.3._membaca
Sebuah metode yang harus diterapkan ketika pengguna menginisialisasi aliran yang Dapat Dibaca. Anda dapat memanggil metode push dalam metode ini untuk terus memicu metode baca. Saat kami menekan null, kami dapat menghentikan operasi penulisan aliran.
Contoh kode:
dapat dibaca._read
const Aliran = memerlukan('aliran'); const readableStream = Aliran baru.Dapat Dibaca({ baca(hwm) { this.push(String.fromCharCode(ini.currentCharCode++)); if (ini.currentCharCode > 122) { ini.push(null); } }, }); readableStream.currentCharCode = 97; readableStream.pipe(proses.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4. pipa (penting)
Ikat satu atau lebih aliran yang dapat ditulis ke aliran yang Dapat Dibaca saat ini, dan alihkan aliran yang Dapat Dibaca ke mode mengalir.
Ada banyak event listening handle dalam metode ini, dan saya tidak akan memperkenalkannya satu per satu di sini:
dapat dibaca.pipa
Dapat dibaca.prototipe.pipe = function(tujuan, pipeOpts) { const src = ini; const state = ini._readableState; state.pipes.push(dest); // Kumpulkan aliran yang dapat ditulis src.on('data', ondata); fungsi pada data(potongan) { const ret = tujuan.tulis(potongan); jika (ret === salah) { berhenti sebentar(); } } // Beritahu tujuan penyalurannya. dest.emit('pipa', src); // Memulai streaming jika streaming berada dalam mode jeda if (dest.writableNeedDrain === true) { if (status.mengalir) { berhenti sebentar(); } } else if (!state.flowing) { src.resume(); } tujuan kembali; }
Operasi pipa sangat mirip dengan operator pipa Linux '|', mengubah output kiri ke input kanan. Metode ini mengumpulkan aliran yang dapat ditulis untuk pemeliharaan, dan memicu peristiwa 'data' ketika aliran yang dapat dibaca tersedia.
Ketika data mengalir keluar, peristiwa penulisan dari aliran yang dapat ditulis akan dipicu, sehingga data dapat ditransfer dan operasi seperti saluran pipa dapat direalisasikan. Dan secara otomatis akan mengubah aliran yang dapat dibaca dalam mode jeda menjadi mode mengalir.
2.4.5.melanjutkanAlihkan streaming dari mode 'jeda' ke mode 'aliran'. Jika pendengar peristiwa 'dapat dibaca' disetel, maka metode ini tidak berpengaruh.
dapat dibaca.resume
Dapat dibaca.prototipe.resume = function() { const state = ini._readableState; if (!state.mengalir) { state.flowing = !state.readableListening; // Apakah ia berada dalam mode mengalir tergantung pada apakah pengendali mendengarkan 'yang dapat dibaca' disetel resume(this, state); } }; fungsi resume(aliran, status) { if (!state.resumeScheduled) { // Beralih sehingga metode resume_ hanya dipanggil sekali dalam keadaan yang sama Centang state.resumeScheduled = true; process.nextTick(resume_, streaming, status); } } fungsi resume_(aliran, status) { if (!state.reading) { aliran.baca(0); } status.resumeScheduled = false; stream.emit('melanjutkan'); aliran(aliran); } function flow(stream) { // Saat streaming dalam mode streaming, metode ini akan terus membaca data dari buffer hingga buffer kosong const state = stream._readableState; while (status.mengalir && stream.read() !== null); // Karena metode baca akan dipanggil di sini dan aliran pendengar peristiwa 'yang dapat dibaca' diatur, metode baca juga dapat dipanggil. //Ini menghasilkan data yang tidak koheren (tidak mempengaruhi data, hanya mempengaruhi pemanggilan metode baca dalam panggilan balik acara 'yang dapat dibaca' untuk membaca data) }2.4.6.jeda
Ubah streaming dari mode mengalir ke mode jeda, hentikan pengaktifan peristiwa 'data', dan simpan semua data ke buffer
dapat dibaca.jeda
Dapat dibaca.prototipe.jeda = function() { if (ini._readableState.mengalir !== salah) { debug('jeda'); this._readableState.flowing = false; this.emit('jeda'); } kembalikan ini; };
2.5. Cara penggunaan dan mekanisme kerja
Metode penggunaan telah disebutkan di bagian BufferList. Buat instance Readable dan implementasikan metode _read()-nya, atau implementasikan metode read di parameter objek pertama konstruktor.
2.5.1.Mekanisme kerjaDi sini kami hanya menggambarkan proses umum dan kondisi pemicu konversi mode dari aliran Readable.
di dalam: