node stream有4種類型:1、Readable(可讀流)。需要實作「_read」方法來傳回內容;2、Writable(可寫流),需要實作「_write」方法來接受內容;3、Duplex(可讀可寫流),需要實作「_read」和「_write」方法來接受和返回內容;4、Transform(轉換流),需要實作「_transform」方法來把接受的內容轉換之後返回內容。
本教學操作環境:windows7系統、nodejs16版,DELL G3電腦。
流(Stream)在Nodejs 中是個十分基礎的概念,許多基礎模組都是基於流實現的,扮演著十分重要的角色。同時流也是一個十分難以理解的概念,這主要是相關的文檔比較缺少,對於NodeJs 初學者來說,理解流往往需要花很多時間理解,才能真正掌握這個概念,所幸的是,對於大部分NodeJs使用者來說,只是用來開發Web 應用,對流的不充分認識並不影響使用。但是,理解流能夠對NodeJs 中的其他模組有更好的理解,同時在某些情況下,使用流來處理資料會有更好的效果。
Stream 是在Node.js 中處理流資料的抽象介面。 Stream 並不是實際的接口,而是對所有流的一種統稱。實際的介面有ReadableStream、 WritableStream、ReadWriteStream 這幾個。
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BuffercoEnding ; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;}interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this;}interface ReadWriteStreamStream extends ReadableStream, WritableStream { }可以看出ReadableStream 和WritableStream 都是繼承EventEmitter 類別的介面(ts中介面是可以繼承類別的,因為他們只是在進行類型的合併)。
上面這些介面對應的實作類別分別是Readable、Writable 和Duplex
NodeJs中的流有4種:
Readable 可讀流(實作ReadableStream)
Writable 可寫入流(實作WritableStream)
Duplex 可讀可寫流(繼承Readable後實作WritableStream)
Transform 轉換流(繼承Duplex)
它們都有要實現的方法:
Readable 需要實作_read 方法來傳回內容
Writable 需要實作_write 方法來接受內容
Duplex 需要實作_read 和_write 方法來接受和傳回內容
Transform 需要實作_transform 方法來把接受的內容轉換之後回傳
可讀流(Readable)是流的一種類型,他有兩種模式三種狀態
兩種讀取模式:
流動模式:資料會從底層系統讀取寫入到緩衝區,當緩衝區被寫滿後自動透過EventEmitter 盡快的將資料傳遞給所註冊的事件處理程序中
暫停模式:在這種模式下將不會主動觸發EventEmitter 傳輸數據,必須顯示的呼叫Readable.read() 方法來從緩衝區中讀取數據,read 會觸發回應到EventEmitter 事件。
三種狀態:
readableFlowing === null(初始狀態)
readableFlowing === false(暫停模式)
readableFlowing === true(流動模式)
初始時流的readable.readableFlowing 為null
新增data事件後變為true 。呼叫pause()、unpipe()、或接收到背壓或新增readable 事件,則readableFlowing 會被設為false ,在這個狀態下,為data 事件綁定監聽器不會使readableFlowing 切換到true。
呼叫resume() 可以讓可讀流的readableFlowing 切換到true
移除所有的readable 事件是讓readableFlowing 變成null 的唯一方法。
事件名稱說明readable當緩衝區有新的可讀取資料時觸發(每個想快取池插入節點都會觸發)data每一次消費資料後都會觸發,參數是本次消費的資料close流關閉時觸發error流發生錯誤時觸發方法名稱說明read(size)消費長度為size的數據,傳回null表示目前資料不足size,否則傳回本次消費的資料。 size不傳遞時表示消費緩存池中所有資料const fs = require('fs');const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// 快取池浮標值})readStreams. on('readable', () => { console.log('緩衝區滿了') readStreams.read()// 消費緩存池的所有數據,返回結果並且觸發data事件})readStreams.on('data ', (data) => { console.log('data')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
當size 為0 會觸發readable 事件。
當快取池中的數據長度達到浮標值highWaterMark 後,就不會在主動請求生產數據,而是等待數據被消耗後在生產數據
暫停狀態的流如果不呼叫read 來消費資料時,後續也不會在觸發data 和readable,當呼叫read 消費時會先判斷本次消費後剩餘的資料長度是否低於浮標值,如果低於浮標值就會在消費前請求生產數據。這樣在read 後的邏輯執行完成後新的資料大概率也已經生產完成,然後再次觸發readable,這種提前生產下一次消費的資料存放在快取池的機制也是快取流為什麼快的原因
流動狀態下的流有兩種情況
生產速度慢於消費速度時:這種情況下每一個生產數據後一般緩存池中都不會有剩餘數據,直接將本次生產的數據傳遞給data 事件即可(因為沒有進入緩存池,所以也不用呼叫read 來消費),然後立即開始生產新數據,待上一次數據消費完後新數據才生產好,再次觸發data ,一隻到流結束。生產速度快於消費速度時:此時每一次生產完數據後一般緩存池都還存在未消費的數據,這種情況一般會在消費數據時開始生產下一次消費的數據,待舊數據消費完後新資料已經生產完並且放入快取池他們的差異僅在於資料生產後快取池是否還存在數據,如果存在資料則將生產的資料push 到快取池等待消費,如果不存在則直接將資料交給data 而不加入快取池。
值得注意的是當一個快取池中存在資料的流從暫停模式進入的流動模式時,會先循環呼叫read 來消費資料只到返回null
暫停模式下,一個可讀流讀創建時,模式是暫停模式,創建後會自動呼叫_read 方法,把資料從資料來源push 到緩衝池中,直到緩衝池中的資料達到了浮標值。每當資料到達浮標值時,可讀流會觸發一個" readable " 事件,告訴消費者有資料已經準備好了,可以繼續消費。
一般來說, 'readable' 事件表明流有新的動態:要么有新的數據,要么到達流的盡頭。所以,資料來源的資料讀完前,也會觸發一次'readable' 事件;
消費者" readable " 事件的處理函數中,透過stream.read(size) 主動消費緩衝池中的資料。
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // 參數的read 方法會作為流的_read 方法,用於取得來源資料read(size) { / / 假設我們的來源資料上1000 個1 let chunk = null // 讀取資料的過程一般是異步的,例如IO操作setTimeout(() => { if (count > 0) { let chunkLength = Math.min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})// 每一次成功push 資料到快取池後都會觸發readablemyReadable.on(' readable', () => { const chunk = myReadable.read()//消費目前快取池中所有資料console.log(chunk.toString())})值得注意的是, 如果read(size) 的size 大於浮標值,會重新計算新的浮標值,新浮標值是size的下一個二次冪(size <= 2^n,n取最小值)
// hwm 不會大於1GB.const MAX_HWM = 0x40000000;function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB限制n = MAX_HWM; } else { //取下一個2最高冪,以防止過度增加hwm n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16 ; n++; } return n;}所有可讀流開始的時候都是暫停模式,可以透過以下方法可以切換至流動模式:
新增" data " 事件句柄;呼叫「 resume 」方法;使用" pipe " 方法把資料傳送到可寫流流動模式下,緩衝池裡面的資料會自動輸出到消費端消費,同時,每次輸出資料後,會自動回呼_read 方法,把資料來源的資料放到緩衝池中,如果此時快取池中不存在資料則會直接吧資料傳遞給data 事件,不會經過快取池;直到流動模式切換至其他暫停模式,或資料來源的資料被讀取完了( push(null) );
可讀流可以透過以下方式切換回暫停模式:
如果沒有管道目標,則呼叫stream.pause() 。如果有管道目標,則移除所有管道目標。呼叫stream.unpipe() 可以移除多個管道目標。 const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLthength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})相對可讀流來說,可寫流要簡單一些。
當生產者呼叫write(chunk) 時,內部會根據一些狀態(corked,writing等)選擇是否緩存到緩衝隊列或呼叫_write,每次寫完資料後,都會嘗試清空快取佇列中的資料。如果緩衝佇列中的資料大小超出了浮標值(highWaterMark),消費者調用write(chunk) 後會回傳false,這時候生產者應該停止繼續寫入。
那什麼時候可以繼續寫入呢?當緩衝中的資料都成功_write 之後,清空了緩衝佇列後會觸發drain 事件,這時候生產者可以繼續寫入資料。
當生產者需要結束寫入資料時,需要呼叫stream.end 方法通知可寫入流結束。
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// 會作為_write方法setTimeout(()= >{ fileContent += chunk callback()// 寫入結束後呼叫}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable. write('123123')// truemyWritable.write('123123')// falsemyWritable.end()請注意,在快取池中資料到達浮標值後,此時快取池中可能存在多個節點,在清空快取池的過程中(循環呼叫_read),並不會向可讀流一樣盡量一次消費長度為浮標值的數據,而是每次消費一個緩衝區節點,即使這個緩衝區長度於浮標值不一致也是如此
const { Writable } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log ('消費', chunk.toString()) callback()// 寫入結束後呼叫}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})let count = 0function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}productionData()myWritable.on('drain', productionData)上述是一個浮標值為10 的可寫流,現在資料來源是一個0——20 到連續的數字字串,productionData 用來寫入資料。
首先第一次呼叫myWritable.write("0") 時,因為快取池不存在數據,所以"0" 不進入快取池,而是直接交給_wirte,myWritable.write("0") 傳回值為true
當執行myWritable.write("1") 時,因為_wirte 的callback 還未調用,表明上一次資料還未寫入完,位置保證資料寫入的有序性,只能建立一個緩衝區將"1 " 加入快取池中。後面2-9 都是如此
執行myWritable.write("10") 時,此時緩衝區長度為9(1-9),還未到達浮標值, "10" 繼續作為一個緩衝區加入快取池中,此時快取池長度變為11,所以myWritable.write("1") 回傳false,這表示緩衝區的資料已經足夠,我們需要等待drain 事件通知時再生產資料。
100ms過後,_write("0", encoding, callback) 的callback 被調用,表示"0" 已經寫入完成。然後會檢查快取池中是否存在數據,如果存在則會先呼叫_read 消費快取池的頭節點("1"),然後繼續重複這個過程直到快取池為空後觸發drain 事件,再次執行productionData
呼叫myWritable.write("11"),觸發第1步開始的過程,直到流結束。
在理解了可讀流與可寫流後,雙工流就好理解了,雙工流事實上是繼承了可讀流然後實現了可寫流(源碼是這麼寫的,但是應該說是同時實現了可讀流和可寫流更加好)。
Duplex 流需要同時實作下面兩個方法
實作_read() 方法,為可讀流生產數據
實作_write() 方法,為可寫流消費數據
上面兩個方法如何實現在上面可寫流可讀流的部分已經介紹過了,這裡要注意的是,雙工流是存在兩個獨立的快取池分別提供給兩個流,他們的資料來源也不一樣
以NodeJs 的標準輸入輸出流為例:
當我們在控制台輸入資料時會觸發其data 事件,這證明他有可讀流的功能,每個使用者鍵入回車相當於調用可讀的push 方法推送生產的資料。當我們呼叫其write 方法時也可以向控制台輸出內容,但不會觸發data 事件,這表示他有可寫流的功能,而且有獨立的緩衝區,_write 方法的實作內容就是讓控制台展示文字。 // 每當使用者在控制台輸入資料(_read),就會觸發data事件,這是可讀流的特性process.stdin.on('data', data=>{ process.stdin.write(data); })// 每隔一秒向標準輸入流生產資料(這是可寫流的特性,會直接輸出到控制台上),不會觸發datasetInterval(()=>{ process.stdin.write('不是使用者控制台輸入的資料')}, 1000)可以將Duplex 流視為具有可寫流的可讀流。兩者都是獨立的,每個都有獨立的內部緩衝區。讀寫事件獨立發生。
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|Transform 流是雙工的,其中讀寫以因果關係進行。雙工流的端點透過某種轉換連結。讀取要求發生寫入。
Transform Stream --------------|-------------- You Write ----> ----> Read You ----- ---------|--------------對於建立Transform 流,最重要的是要實作_transform 方法而不是_write 或_read。 _transform 中對可寫入流寫入的資料做處理(消耗)然後為可讀流生產資料。
轉換流也經常實現一個`_flush` 方法,他會在流結束前被調用,一般用於對流的末尾追加一些東西,例如壓縮文件時的一些壓縮信息就是在這裡加上的const { write } = require('fs')const { Transform, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186491264618914681864991264618914681864991264617(Ft. ,encoding, callback){// 轉換數據,呼叫push將轉換結果加入快取池this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){// end觸發前執行this.push(' <<<') callback() }})// write 不斷寫入資料let count = 0transform.write('>>>')function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() }}productionData()transform.on('drain', productionData)let result = ''transform.on( 'data', data=>{ result += data.toString()})transform.on('end', ()=>{ console.log(result) // >>>0@23456789@0@1@ 2@3@4@5@6@7@8@920<<<})