Ada 4 jenis aliran node: 1. Dapat dibaca (readable stream). Metode "_read" perlu diterapkan untuk mengembalikan konten; 2. Dapat ditulis (aliran yang dapat ditulisi), metode "_write" perlu diterapkan untuk menerima konten; 3. Dupleks (aliran yang dapat dibaca dan ditulis), "_read" dan " Metode _write" perlu diterapkan Untuk menerima dan mengembalikan konten; 4. Transformasi (aliran konversi), Anda perlu menerapkan metode "_transform" untuk mengonversi konten yang diterima dan mengembalikan konten.
Lingkungan operasi tutorial ini: sistem Windows 7, nodejs versi 16, komputer DELL G3.
Stream adalah konsep yang sangat mendasar di Nodejs. Banyak modul dasar diimplementasikan berdasarkan stream dan memainkan peran yang sangat penting. Pada saat yang sama, aliran juga merupakan konsep yang sangat sulit untuk dipahami. Hal ini terutama disebabkan oleh kurangnya dokumentasi yang relevan. Bagi pemula NodeJ, seringkali dibutuhkan banyak waktu untuk memahami aliran sebelum mereka benar-benar dapat menguasai konsep ini. untuk sebagian besar NodeJ, ini hanya digunakan untuk mengembangkan aplikasi Web. Pemahaman yang tidak memadai tentang aliran tidak memengaruhi penggunaannya. Namun, memahami aliran dapat menghasilkan pemahaman yang lebih baik tentang modul lain di NodeJs, dan dalam beberapa kasus, menggunakan aliran untuk memproses data akan memberikan hasil yang lebih baik.
Stream adalah antarmuka abstrak untuk memproses data streaming di Node.js. Streaming bukanlah antarmuka sebenarnya, tetapi istilah umum untuk semua aliran. Antarmuka sebenarnya mencakup ReadableStream, WritableStream, dan ReadWriteStream.
antarmuka ReadableStream extends EventEmitter { dapat dibaca: boolean; baca(ukuran?: nomor): string |.Buffer; setEncoding(pengkodean: BufferEncoding): ini; T extends WritableStream>(tujuan: T, opsi?: { akhir?: boolean | tidak terdefinisi; }): T; unpipe(tujuan?: WritableStream): ini; ; bungkus(oldStream: ReadableStream): ini; [Symbol.asyncIterator](): AsyncIterableIterator<string |.Buffer>;}antarmuka WritableStream extends EventEmitter { dapat ditulis: boolean; Kesalahan |.null) => batal): boolean; tulis(str: string, pengkodean?: BufferEncoding, cb?: (err?: Kesalahan | null) => batal): boolean; ): ini; akhir(data: string | Uint8Array, cb?: () => batal): ini; akhir(str: string, pengkodean?: BufferEncoding, cb?: () => batal): ini;}antarmuka ReadWriteStream memperluas ReadableStream, WritableStream {}Terlihat bahwa ReadableStream dan WritableStream keduanya merupakan antarmuka yang mewarisi kelas EventEmitter (antarmuka di ts dapat mewarisi kelas, karena keduanya hanya menggabungkan tipe).
Kelas implementasi yang sesuai dengan antarmuka di atas masing-masing adalah Readable, Writable, dan Duplex.
Ada 4 jenis aliran di NodeJs:
Aliran yang Dapat Dibaca (mengimplementasikan ReadableStream)
Aliran yang dapat ditulisi (mengimplementasikan WritableStream)
Duplex adalah aliran yang dapat dibaca dan ditulis (mengimplementasikan WritableStream setelah mewarisi Readable)
Transformasi aliran konversi (diwarisi dari Duplex)
Semuanya memiliki metode untuk diterapkan:
Readable perlu menerapkan metode _read untuk mengembalikan konten
Writable perlu mengimplementasikan metode _write untuk menerima konten
Duplex perlu mengimplementasikan metode _read dan _write untuk menerima dan mengembalikan konten
Transform perlu menerapkan metode _transform untuk mengonversi konten yang diterima dan mengembalikannya
Dapat dibaca adalah jenis aliran. Ia memiliki dua mode dan tiga status.
Dua mode membaca:
Mode aliran: Data akan dibaca dan ditulis dari sistem yang mendasarinya ke buffer. Ketika buffer penuh, data akan secara otomatis diteruskan ke event handler yang terdaftar secepat mungkin melalui EventEmitter.
Mode jeda: Dalam mode ini, EventEmitter tidak akan dipicu secara aktif untuk mengirimkan data. Metode Readable.read() harus dipanggil secara eksplisit untuk membaca data dari buffer read akan memicu respons terhadap peristiwa EventEmitter.
Tiga negara bagian:
readableFlowing === null (keadaan awal)
readableFlowing === false (mode jeda)
readableFlowing === true (mode mengalir)
Readable.readableFlowing aliran awalnya nol.
Itu menjadi kenyataan setelah menambahkan data acara. Saat jeda(), unpipe() dipanggil, atau tekanan balik diterima atau peristiwa yang dapat dibaca ditambahkan, readableFlowing akan disetel ke false. Dalam keadaan ini, mengikat pendengar ke peristiwa data tidak akan mengalihkan readableFlowing ke true.
Memanggil resume() dapat mengubah aliran readableFlowing menjadi true.
Menghapus semua peristiwa yang dapat dibaca adalah satu-satunya cara untuk membuat readableFlowing menjadi nol.
Deskripsi nama peristiwa yang dapat dibaca dipicu ketika ada data baru yang dapat dibaca di buffer (akan dipicu setiap kali node dimasukkan ke dalam kumpulan cache) data akan dipicu setiap kali data dikonsumsi aliran kesalahan dipicu ketika aliran dekat ditutup. Ketika kesalahan terjadi, nama metode pemicu menunjukkan bahwa read(size) menggunakan data dengan ukuran panjang. Mengembalikan null menunjukkan bahwa data saat ini lebih kecil dari ukuran data yang dikonsumsi kali ini dikembalikan. Ketika ukuran tidak dilewati, itu berarti menghabiskan semua data di kumpulan cache const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// cache pool float value})readStreams.on('readable', () => { console.log('buffer full') readStreams.read()// Konsumsi semua data di kumpulan buffer, kembalikan hasilnya dan picu peristiwa data}) readStreams.on('data ', (data) => { console.log('data')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
Ketika ukurannya 0, acara yang dapat dibaca akan dipicu.
Ketika panjang data di kumpulan cache mencapai nilai float highWaterMark, ia tidak akan secara aktif meminta data produksi, namun akan menunggu data tersebut dikonsumsi sebelum menghasilkan data.
Jika aliran dalam keadaan dijeda tidak memanggil read untuk mengkonsumsi data, data dan readable tidak akan terpicu nanti. Ketika read dipanggil untuk mengkonsumsi, pertama-tama akan ditentukan apakah panjang data yang tersisa setelah konsumsi ini lebih rendah dari float Jika lebih rendah dari nilai float, data Produksi akan diminta sebelum konsumsi. Dengan cara ini, setelah eksekusi logika setelah pembacaan selesai, kemungkinan besar data baru akan dihasilkan, dan kemudian dapat dibaca akan dipicu lagi. Mekanisme ini juga menghasilkan data yang dikonsumsi berikutnya terlebih dahulu dan menyimpannya di kumpulan cache alasan mengapa aliran cache cepat.
Ada dua situasi aliran dalam keadaan mengalir
Ketika kecepatan produksi lebih lambat dari kecepatan konsumsi: Dalam hal ini, biasanya tidak ada data yang tersisa di kumpulan cache setelah setiap data produksi, dan data yang dihasilkan kali ini dapat langsung diteruskan ke peristiwa data (karena tidak masuk ke kumpulan cache, jadi tidak perlu memanggil read untuk mengkonsumsi), dan kemudian segera mulai menghasilkan data baru tidak akan diproduksi sampai data terakhir dikonsumsi . Ketika kecepatan produksi lebih cepat dari kecepatan konsumsi: Pada saat ini, setelah setiap produksi data, biasanya ada data yang tidak terpakai di kumpulan cache. Dalam hal ini, konsumsi data berikutnya biasanya akan dimulai saat data dikonsumsi, dan setelahnya data lama dikonsumsi, Data baru telah diproduksi dan ditempatkan di kumpulan cacheSatu-satunya perbedaan di antara keduanya adalah apakah data masih ada di kumpulan cache setelah data dibuat. Jika data ada, data yang dihasilkan akan dikirim ke kumpulan cache untuk menunggu konsumsi diserahkan langsung ke data tanpa menambahkannya ke kumpulan cache.
Perlu dicatat bahwa ketika aliran dengan data dalam kumpulan cache memasuki mode aliran dari mode jeda, pembacaan akan dipanggil dalam satu lingkaran untuk menggunakan data hingga nol dikembalikan.
Dalam mode jeda, ketika aliran yang dapat dibaca dibuat, modenya adalah mode jeda. Setelah pembuatan, metode _read secara otomatis dipanggil untuk mendorong data dari sumber data ke kumpulan buffer hingga data di kumpulan buffer mencapai nilai float. Setiap kali data mencapai nilai float, aliran yang dapat dibaca akan memicu peristiwa "dapat dibaca" untuk memberi tahu konsumen bahwa data sudah siap dan dapat terus digunakan.
Secara umum, peristiwa yang 'dapat dibaca' menunjukkan aktivitas baru pada aliran: apakah ada data baru, atau akhir aliran telah tercapai. Oleh karena itu, sebelum data dalam sumber data dibaca, peristiwa 'dapat dibaca' juga akan dipicu;
Dalam fungsi pengendali peristiwa "dapat dibaca" konsumen, data dalam kumpulan buffer dikonsumsi secara aktif melalui stream.read(size).
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // Metode read dari parameter akan digunakan sebagai metode _read dari stream untuk memperoleh data sumber read( size) { / / Asumsikan bahwa data sumber kita mempunyai 1000 1s let chunk = null // Proses pembacaan data umumnya asynchronous, seperti operasi IO setTimeout(() => { if (count > 0) { let chunkLength = Math .min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' akan dipicu setiap kali datanya berhasil didorong ke kumpulan cache) dapat dibaca', () => { const chunk = myReadable.read()//Mengkonsumsi semua data di kumpulan cache saat ini console.log(chunk.toString())})Perlu dicatat bahwa jika ukuran read(size) lebih besar dari nilai float, nilai float baru akan dihitung ulang, dan nilai float baru adalah pangkat kedua berikutnya dari ukuran (ukuran <= 2^n, n membutuhkan nilai minimum)
// hwm tidak akan lebih besar dari 1GB.const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // batas 1GB n = MAX_HWM; } else { // Hapus pangkat tertinggi berikutnya dari 2 hingga mencegah Peningkatan hwm n--; n |= n >>> 1;= n >>> 2; > 16 ; n++; } kembali n;}Semua streaming yang dapat dibaca dimulai dalam mode jeda dan dapat dialihkan ke mode mengalir melalui metode berikut:
Tambahkan event handler "data"; panggil metode "resume"; gunakan metode "pipa" untuk mengirim data ke aliran yang dapat ditulisDalam mode aliran, data di kumpulan buffer akan secara otomatis dikeluarkan ke konsumen untuk dikonsumsi. Pada saat yang sama, setelah setiap keluaran data, metode _read akan secara otomatis dipanggil kembali untuk memasukkan data dari sumber data ke dalam kumpulan buffer. . Jika kumpulan buffer tidak ada, data akan diteruskan langsung ke peristiwa data tanpa melalui kumpulan cache; hingga mode aliran beralih ke mode jeda lainnya, atau data dari sumber data dibaca (push (batal));
Streaming yang dapat dibaca dapat dialihkan kembali ke mode dijeda melalui:
Jika tidak ada target alur, stream.pause() dipanggil. Jika ada target saluran pipa, hapus semua target saluran pipa. Beberapa target pipa dapat dihapus dengan memanggil stream.unpipe(). const { Dapat dibaca } = memerlukan('stream')biarkan hitungan = 1000const myReadable = baru Dapat dibaca({ highWaterMark: 300, baca(ukuran) { biarkan potongan = null setTimeout(() => { if (hitung > 0) { biarkan potonganPanjangnya = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})Dibandingkan dengan aliran yang dapat dibaca, aliran yang dapat ditulis lebih sederhana.
Ketika produser memanggil write(chunk), secara internal ia akan memilih apakah akan menyimpannya dalam cache di antrian buffer atau memanggil _write berdasarkan beberapa status (tersumbat, menulis, dll.). data dalam antrian cache. Jika ukuran data dalam antrian buffer melebihi nilai float (highWaterMark), konsumen akan mengembalikan false setelah memanggil write(chunk).
Jadi kapan saya bisa terus menulis? Ketika semua data dalam buffer telah berhasil ditulis, peristiwa pengurasan akan dipicu setelah antrian buffer dihapus. Pada saat ini, produsen dapat terus menulis data.
Ketika produser harus menyelesaikan penulisan data, ia perlu memanggil metode stream.end untuk memberitahukan akhir dari aliran yang dapat ditulis.
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, coding, callback) {// akan digunakan sebagai metode _write setTimeout(() = >{ fileContent += chunk callback()// Dipanggil setelah penulisan selesai}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable .tulis('123123')// truemyWritable.write('123123')// falsemyWritable.end()Perhatikan bahwa setelah data di kumpulan cache mencapai nilai float, mungkin ada beberapa node di kumpulan cache saat ini Selama proses pembersihan kumpulan cache (panggilan siklik _read), data tersebut tidak akan menggunakan panjang yang sama dengan aliran yang dapat dibaca. Data dengan nilai float dikonsumsi satu node buffer pada satu waktu, meskipun panjang buffer tidak konsisten dengan nilai float.
const { Dapat ditulis } = memerlukan('stream')biarkan fileContent = ''const myWritable = baru yang Dapat Ditulis({ highWaterMark: 10, write(chunk, coding, callback) { setTimeout(()=>{ fileContent += chunk console.log ('Consumption', chunk.toString()) callback()// Dipanggil setelah penulisan selesai}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})biarkan hitung = 0fungsi produksiData(){ biarkan flag = true while (hitungan <= 20 && bendera){ flag = myWritable.write(count.toString()) hitung++ } if(hitungan > 20){ myWritable.end( ) }}productionData()myWritable.on('drain', data produksi)Di atas adalah aliran yang dapat ditulis dengan nilai float 10. Sekarang sumber datanya adalah string angka kontinu dari 0 hingga 20, dan ProductionData digunakan untuk menulis data.
Pertama, ketika myWritable.write("0") dipanggil untuk pertama kalinya, karena tidak ada data di kumpulan cache, "0" tidak masuk ke kumpulan cache, tetapi langsung diberikan ke _wirte .write("0") benar
Ketika myWritable.write("1") dijalankan, karena callback _wirte belum dipanggil, ini menunjukkan bahwa data terakhir belum ditulis. Posisi tersebut menjamin keteraturan penulisan data untuk menyimpan "1". " Tambahkan ke kumpulan cache. Hal ini berlaku untuk 2-9 berikutnya
Ketika myWritable.write("10") dijalankan, panjang buffer adalah 9 (1-9) dan belum mencapai nilai float. "10" terus ditambahkan ke kumpulan cache sebagai buffer, dan kumpulan cache panjangnya menjadi 11, jadi myWritable.write("1") mengembalikan false, yang berarti data di buffer cukup, dan kita perlu menunggu notifikasi acara drain untuk menghasilkan data lagi.
Setelah 100 ms, panggilan balik _write("0", pengkodean, panggilan balik) dipanggil, menunjukkan bahwa "0" telah ditulis. Kemudian akan memeriksa apakah ada data di kumpulan cache. Jika ada, pertama-tama ia akan memanggil _read untuk menggunakan node kepala dari kumpulan cache ("1"), dan kemudian terus mengulangi proses ini hingga kumpulan cache kosong. , picu peristiwa pengurasan, dan jalankan productionData lagi.
Panggil myWritable.write("11") untuk memicu proses yang dimulai pada langkah 1 hingga akhir streaming.
Setelah memahami aliran yang dapat dibaca dan aliran yang dapat ditulis, aliran dupleks mudah dipahami. Aliran dupleks sebenarnya mewarisi aliran yang dapat dibaca dan kemudian mengimplementasikan aliran yang dapat ditulis (kode sumbernya ditulis seperti ini, tetapi harus dikatakan bahwa itu diimplementasikan) pada saat yang sama Lebih baik memiliki aliran yang dapat dibaca dan ditulis).
Aliran dupleks perlu menerapkan dua metode berikut secara bersamaan
Terapkan metode _read() untuk menghasilkan data untuk aliran yang dapat dibaca
Terapkan metode _write() untuk menggunakan data untuk aliran yang dapat ditulis
Cara mengimplementasikan kedua metode di atas telah diperkenalkan pada aliran yang dapat ditulis dan dibaca di atas. Yang perlu diperhatikan di sini adalah bahwa ada dua kumpulan buffer independen untuk aliran dupleks, dan sumber datanya juga tidak sama.
Ambil aliran input dan output standar NodeJs sebagai contoh:
Saat kita memasukkan data di konsol, kejadian datanya dipicu, yang membuktikan bahwa data tersebut memiliki fungsi aliran yang dapat dibaca. Setiap kali pengguna mengetik, itu sama dengan memanggil metode push yang dapat dibaca untuk mendorong data yang dihasilkan. Ketika kita memanggil metode tulisnya, kita juga dapat mengeluarkan konten ke konsol, tetapi peristiwa data tidak akan terpicu. Hal ini menunjukkan bahwa ia memiliki fungsi aliran yang dapat ditulis dan memiliki buffer independen izinkan konsol untuk menampilkan teks. // Setiap kali pengguna memasukkan data di konsol (_read), peristiwa data akan dipicu, yang merupakan karakteristik dari aliran yang dapat dibaca process.stdin.on('data', data=>{ process.stdin.write(data ); })// Menghasilkan data ke aliran masukan standar setiap detik (ini adalah fitur aliran yang dapat ditulis, yang akan dikeluarkan langsung ke konsol) dan tidak akan memicu datasetInterval(()=>{ process.stdin.write ('bukan Data yang dimasukkan oleh konsol pengguna')}, 1000)Aliran Dupleks dapat dianggap sebagai aliran yang dapat dibaca dengan aliran yang dapat ditulis. Keduanya independen, masing-masing dengan buffer internal independen. Peristiwa membaca dan menulis terjadi secara independen.
Aliran Dupleks ----|. Baca <----- Sumber Eksternal Anda ----| Tulis -----> Wastafel Eksternal ----|Aliran transformasi bersifat dupleks, di mana pembacaan dan penulisan terjadi dalam hubungan sebab-akibat. Titik akhir aliran dupleks dihubungkan melalui beberapa transformasi. Pembacaan membutuhkan penulisan untuk terjadi.
Transform Stream --------------|-------------- Anda Menulis ----> ----> Membaca Anda ----- ----------|--------------Untuk membuat aliran Transform, hal terpenting adalah menerapkan metode _transform daripada _write atau _read. Dalam _transform, data yang ditulis oleh aliran yang dapat ditulis diproses (dikonsumsi) dan kemudian data dihasilkan untuk aliran yang dapat dibaca.
Aliran konversi sering kali menerapkan metode `_flush`, yang akan dipanggil sebelum akhir aliran. Biasanya digunakan untuk menambahkan sesuatu ke akhir aliran. Misalnya, beberapa informasi kompresi saat mengompresi file ditambahkan di sini const { write } = require('fs')const { Transform, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const transform = new Transform({ highWaterMark: 10, transform(chunk,encoding, panggilan kembali){ // Konversi data, Panggil push untuk menambahkan hasil konversi ke kumpulan cache this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){//Jalankan this.push (' sebelum pemicu akhir <<<') callback() }})// tulis terus menerus tulis data let count = 0transform.write('>>>')function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() }}productionData()transform.on('drain', productionData)biarkan hasil = '' transform.on( 'data', data=>{ hasil += data.toString()})transform.on('end', ()=>{ console.log(hasil) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})