Существует 4 типа узловых потоков: 1. Readable (читаемый поток). Метод «_read» должен быть реализован для возврата контента; 2. Доступен для записи (поток с возможностью записи), метод «_write» должен быть реализован для приема контента. 3. Дуплекс (поток, доступный для чтения и записи), «_read» и « Необходимо реализовать методы «_write» Для приема и возврата контента; 4. Преобразование (поток преобразования) необходимо реализовать метод «_transform» для преобразования полученного контента и возврата контента.
Операционная среда этого руководства: система Windows 7, nodejs версии 16, компьютер DELL G3.
Поток — это очень простая концепция в Nodejs. Многие базовые модули реализованы на основе потоков и играют очень важную роль. В то же время поток — это очень сложная для понимания концепция. В основном это связано с отсутствием соответствующей документации. Новичкам NodeJ часто требуется много времени, чтобы понять поток, прежде чем они смогут по-настоящему освоить эту концепцию. для большинства NodeJ это так. Для пользователей он используется только для разработки веб-приложений. Недостаточное понимание потоков не влияет на их использование. Однако понимание потоков может привести к лучшему пониманию других модулей NodeJ, а в некоторых случаях использование потоков для обработки данных даст лучшие результаты.
Stream — это абстрактный интерфейс для обработки потоковых данных в Node.js. Поток — это не реальный интерфейс, а общий термин для всех потоков. Фактические интерфейсы включают ReadableStream, WritableStream и ReadWriteStream.
интерфейс ReadableStream расширяет EventEmitter {readable: boolean; read(size?: Number): строка | setEncoding (кодирование: BufferEncoding): это; резюме(): isPaused(): boolean; T расширяет WritableStream> (назначение: T, параметры?: {конец?: логическое | неопределенное; }): T; unpipe (назначение?: WritableStream): это; unshift (кусок: строка | Uint8Array, кодирование?: BufferEncoding): void ; Wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;}interface WritableStream расширяет EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Ошибка | ноль) => void): boolean; write(str: string,coding?: BufferEncoding, cb?: (err?: Ошибка | null) => void): boolean; end(cb?: () => void ): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string,coding?: BufferEncoding, cb?: () => void): this;}interface ReadWriteStream расширяет ReadableStream, WritableStream {}Видно, что ReadableStream и WritableStream являются интерфейсами, наследующими класс EventEmitter (интерфейсы в ts могут наследовать классы, поскольку они представляют собой только типы слияния).
Классы реализации, соответствующие вышеуказанным интерфейсам, — Readable, Writable и Duplex соответственно.
В NodeJs есть 4 типа потоков:
Читаемый Читаемый поток (реализует ReadableStream)
Доступный для записи поток (реализует WritableStream)
Duplex — это поток, доступный для чтения и записи (реализация WritableStream после наследования Readable).
Преобразование потока преобразования (унаследовано от Duplex)
У всех есть методы реализации:
Readable необходимо реализовать метод _read для возврата содержимого.
Writable необходимо реализовать метод _write для приема контента.
Duplex должен реализовать методы _read и _write для приема и возврата содержимого.
Transform необходимо реализовать метод _transform для преобразования полученного контента и его возврата.
Readable — это тип потока. Он имеет два режима и три состояния.
Два режима чтения:
Режим потока: данные будут считываться и записываться из базовой системы в буфер. Когда буфер заполнен, данные будут автоматически переданы зарегистрированному обработчику событий как можно быстрее через EventEmitter.
Режим паузы: в этом режиме EventEmitter не будет активно запускаться для передачи данных. Метод Readable.read() должен быть явно вызван для чтения данных из буфера, чтение вызовет ответ на событие EventEmitter.
Три государства:
readableFlowing === null (исходное состояние)
readableFlowing === false (режим паузы)
readableFlowing === true (поточный режим)
readable.readableFlowing потока изначально имеет значение null.
Это становится истинным после добавления события данных. Когда вызывается функция паузы(), unpipe(), принимается противодавление или добавляется читаемое событие, для readableFlowing будет установлено значение false. В этом состоянии привязка прослушивателя к событию данных не переключит readableFlowing на true.
Вызов возобновить() может переключить readableFlowing читаемого потока на true.
Удаление всех читаемых событий — единственный способ сделать readableFlowing нулевым.
Описание имени события, доступное для чтения, запускается, когда в буфере появляются новые доступные для чтения данные (оно будет срабатывать каждый раз, когда узел вставляется в пул кэша), данные будут запускаться каждый раз, когда данные потребляются, и на этот раз используются данные. поток ошибок запускается при закрытии потока закрытия. При возникновении ошибки имя метода триггера указывает, что read(size) потребляет данные с длиной size. Возврат значения null указывает на то, что текущие данные меньше размера. данные, использованные на этот раз, возвращаются. Если размер не передан, это означает использование всех данных в пуле кэша const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// пул кэша float value})readStreams. on('readable', () => { console.log('buffer full') readStreams.read()// Потребляем все данные в пуле буферов, возвращаем результат и запускаем событие данных}) readStreams.on('данные', (данные) => { console.log('данные')})https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
Когда размер равен 0, будет запущено событие чтения.
Когда длина данных в пуле кэша достигает значения с плавающей запятой highWaterMark, он не будет активно запрашивать производственные данные, а будет ждать, пока данные будут использованы, прежде чем создавать данные.
Если поток в состоянии паузы не вызывает чтение для потребления данных, data и readable не будут активированы позже. Когда read вызывается для потребления, он сначала определяет, меньше ли длина оставшихся данных после этого потребления, чем число с плавающей запятой. Если оно меньше плавающего значения, данные о производстве будут запрошены перед потреблением. Таким образом, после завершения выполнения логики после завершения чтения новые данные, скорее всего, будут созданы, а затем снова будут запущены доступные для чтения. Этот механизм предварительного создания следующих потребляемых данных и сохранения их в пуле кэша также работает. причина, почему поток кэша быстрый.
В текущем состоянии возможны две ситуации течения.
Когда скорость производства ниже скорости потребления: в этом случае в пуле кэша обычно не остается данных после каждых производственных данных, и данные, созданные на этот раз, могут быть напрямую переданы в событие данных (поскольку это не войти в пул кэша, так что нет необходимости вызывать read для потребления), а затем сразу начать создавать новые данные. Новые данные не будут создаваться до тех пор, пока не будут использованы последние данные. Данные запускаются снова, пока поток не завершится. . Когда скорость производства выше скорости потребления: в это время после каждого создания данных в пуле кэша обычно остаются неизрасходованные данные. В этом случае следующее потребление данных обычно начинается, когда данные потребляются, и после. старые данные потребляются. Новые данные были созданы и помещены в пул кэша.Единственная разница между ними заключается в том, существуют ли данные в пуле кэша после создания данных. Если данные существуют, созданные данные будут отправлены в пул кэша для ожидания использования. Если он не существует, данные будут. передаваться непосредственно данным без добавления их в пул кэша.
Стоит отметить, что когда поток с данными в пуле кэша переходит в режим потока из режима паузы, чтение будет вызываться в цикле для потребления данных до тех пор, пока не будет возвращено значение null.
В режиме паузы, когда создается читаемый поток, это режим паузы. После создания автоматически вызывается метод _read для передачи данных из источника данных в пул буферов до тех пор, пока данные в пуле буферов не достигнут значения с плавающей запятой. Всякий раз, когда данные достигают значения с плавающей запятой, читаемый поток запускает «читаемое» событие, сообщающее потребителю, что данные готовы и могут продолжать использоваться.
Вообще говоря, «читаемое» событие указывает на новую активность в потоке: либо появились новые данные, либо достигнут конец потока. Таким образом, прежде чем данные в источнике данных будут прочитаны, также будет инициировано событие «readable»;
В функции-обработчике потребительского «читаемого» события данные в пуле буферов активно используются через поток.read(size).
const { Readable } = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, // Метод чтения параметра будет использоваться как метод _read потока для получения исходных данных read( size) { // Предположим, что наши исходные данные имеют 1000 единиц let chunk = null // Процесс чтения данных обычно асинхронный, например, операция ввода-вывода setTimeout(() => { if (count > 0) { let chunkLength = Math .min( count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})//readablemyReadable.on(' будет запускаться каждый раз, когда данные успешно отправлено в пул кэша) readable', () => { const chunk = myReadable.read()//Использовать все данные в текущем пуле кэша console.log(chunk.toString())})Стоит отметить, что если размер read(size) больше значения с плавающей запятой, новое значение с плавающей запятой будет пересчитано, а новое значение с плавающей запятой будет следующей второй степенью размера (размер <= 2^n, n принимает минимальное значение)
// hwm не будет больше 1 ГБ.const MAX_HWM = 0x40000000; function ComputeNewHighWaterMark(n) { if (n >= MAX_HWM) { // предел 1 ГБ n = MAX_HWM } else { // Удаляем следующую по величине степень 2 до предотвратить чрезмерное увеличение hwm n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 8; > 16 ; n++ } вернуть n;}Все читаемые потоки запускаются в режиме паузы и могут быть переключены в потоковый режим следующими способами:
Добавить обработчик событий «data»; вызвать метод «resume»; использовать метод «pipe» для отправки данных в записываемый поток;В потоковом режиме данные в буферном пуле будут автоматически выводиться потребителю для потребления. В то же время после каждого вывода данных метод _read будет автоматически вызываться для помещения данных из источника данных в буферный пул. . Если пул буферов отсутствует, данные будут передаваться непосредственно в событие данных, минуя пул кэша, пока режим потока не переключится на другие режимы паузы или не будут прочитаны данные из источника данных (push); (нулевой));
Читаемые потоки можно переключить обратно в режим паузы с помощью:
Если цель конвейера отсутствует, вызываетсяstream.pause(). Если есть цели конвейера, удаляет все цели конвейера. Несколько целей канала можно удалить, вызвав функциюstream.unpipe(). const {Readable} = require('stream')let count = 1000const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) }})myReadable.on('data', data => { console .log(data.toString())})По сравнению с читаемыми потоками, записываемые потоки проще.
Когда производитель вызывает write(chunk), он внутренне выбирает, кэшировать ли его в очереди буфера или вызывать _write на основе некоторого статуса (закупорено, запись и т. д.) После каждой записи данных он будет пытаться очистить их. данные в очереди кэша. Если размер данных в буферной очереди превышает значение с плавающей запятой (highWaterMark), потребитель вернет false после вызова write(chunk). В это время производитель должен прекратить запись.
Так когда же я смогу продолжить писать? Когда все данные в буфере успешно записаны, после очистки очереди буфера сработает событие стока. В это время производитель может продолжить запись данных.
Когда производителю необходимо завершить запись данных, ему необходимо вызвать методstream.end, чтобы уведомить об окончании записываемого потока.
const { Writable, Duplex } = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk,coding, callback) {// будет использоваться как метод _write setTimeout(() = >{ fileContent += обратный вызов фрагмента()// Вызывается после завершения записи}, 500) }})myWritable.on('close', ()=>{ console.log('close', fileContent)})myWritable . write('123123')// truemyWritable.write('123123')// falsemyWritable.end()Обратите внимание, что после того, как данные в пуле кэша достигнут значения с плавающей запятой, в пуле кэша в это время может быть несколько узлов. Во время процесса очистки пула кэша (циклический вызов _read) он не будет использовать ту же длину, что и пул кэша. читаемый поток. Данные значения с плавающей запятой потребляются по одному узлу буфера за раз, даже если длина буфера не соответствует значению с плавающей запятой.
const {Writable} = require('stream')let fileContent = ''const myWritable = new Writable({ highWaterMark: 10, write(chunk,coding, callback) { setTimeout(()=>{ fileContent += chunk console.log ('Consumption', chunk.toString()) callback()// Вызывается после завершения записи}, 100) }})myWritable.on('close', ()=>{ console.log('close', fileContent )})let count = 0function ProductionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end( ) }}productionData()myWritable.on('drain', ProductionData)Вышеупомянутый поток представляет собой записываемый поток со значением с плавающей запятой, равным 10. Теперь источником данных является непрерывная числовая строка от 0 до 20, а для записи данных используется ProductionData.
Во-первых, когда myWritable.write("0") вызывается в первый раз, поскольку в пуле кэша нет данных, "0" не попадает в пул кэша, а передается непосредственно в _wirte. Возвращаемое значение myWritable. .write("0") верно
Когда выполняется myWritable.write("1"), поскольку обратный вызов _wirte еще не был вызван, это указывает на то, что последние данные еще не записаны. Такая позиция гарантирует упорядоченность записи данных. Можно создать только один буфер. для хранения «1». Добавить в пул кеша. Это верно для следующих 2-9
Когда выполняется myWritable.write("10"), длина буфера равна 9 (1-9) и еще не достигла значения с плавающей запятой. "10" продолжает добавляться в пул кеша в качестве буфера, и пул кеша продолжает добавляться в пул кеша в качестве буфера. длина становится равной 11, поэтому myWritable.write("1") возвращает false, что означает, что данных в буфере достаточно, и нам нужно дождаться уведомления о событии стока, чтобы снова создать данные.
Через 100 мс вызывается обратный вызов _write("0",coding,callback), указывающий, что записано "0". Затем он проверит, есть ли данные в пуле кэша. Если он существует, он сначала вызовет _read для использования головного узла пула кэша («1»), а затем продолжит повторять этот процесс, пока пул кэша не станет пустым. , вызовите событие дренажа и снова выполните ProductionData.
Вызовите myWritable.write("11"), чтобы запустить процесс, начиная с шага 1 и до конца потока.
После понимания читаемого потока и записываемого потока легко понять дуплексный поток. Дуплексный поток фактически наследует читаемый поток, а затем реализует записываемый поток (исходный код написан так, но следует сказать, что он реализован). в то же время лучше иметь читаемые и записываемые потоки).
Дуплексный поток должен одновременно реализовать следующие два метода:
Реализуйте метод _read() для создания данных для читаемых потоков.
Реализуйте метод _write() для использования данных для записываемых потоков.
Как реализовать два вышеупомянутых метода, было описано выше в потоках, доступных для записи и чтения. Здесь необходимо отметить, что существует два независимых пула буферов для дуплексных потоков соответственно, и их источники данных также не одинаковы.
В качестве примера возьмем стандартный поток ввода и вывода NodeJ:
Когда мы вводим данные в консоль, запускается событие данных, что доказывает, что он выполняет функцию читаемого потока. Каждый раз, когда пользователь вводит данные, это эквивалентно вызову метода readable push для отправки созданных данных. Когда мы вызываем его метод записи, мы также можем выводить контент на консоль, но событие данных не будет запущено. Это показывает, что он имеет функцию записываемого потока и имеет независимый буфер. разрешить консоли отображать текст. // Всякий раз, когда пользователь вводит данные в консоль (_read), событие данных будет вызвано, что является характеристикой читаемого потока ); })// Создаём данные в стандартный поток ввода каждую секунду (это функция записываемого потока, который будет выводиться непосредственно на консоль) и не будет запускать datasetInterval(()=>{process.stdin.write). ('не являются данными, введенными с пользовательской консоли')}, 1000)Дуплексный поток можно рассматривать как поток, доступный для чтения, с потоком, доступным для записи. Оба независимы, каждый с независимыми внутренними буферами. События чтения и записи происходят независимо.
Дуплексный поток ------------------| Чтение <----- Внешний источник Вы ------------------| Запись -----> Внешний приемник ------------------|Потоки преобразования являются дуплексными, в которых операции чтения и записи происходят в причинно-следственной связи. Конечные точки дуплексного потока связываются посредством некоторого преобразования. Для чтения требуется запись.
Transform Stream --------------|--------------- Вы пишете ----> ----> Читаете ----- ----------|--------------Для создания потоков Transform наиболее важно реализовать метод _transform вместо _write или _read. В _transform данные, записанные записываемым потоком, обрабатываются (потребляются), а затем создаются данные для читаемого потока.
Потоки преобразования часто реализуют метод `_flush`, который вызывается перед концом потока. Обычно он используется для добавления чего-либо в конец потока. Например, сюда добавляется некоторая информация о сжатии файлов. } = require('fs')const { Transform, PassThrough } = require('stream')const reurce = '1312123213124341234213423428354816273513461891468186499126412'const Transform = new Transform({ highWaterMark: 10, Transform(chunk,encoding, call назад){ // Конвертируем data, вызовите push, чтобы добавить результат преобразования в пул кеша this.push(chunk.toString().replace('1', '@')) callback() },lush(callback){//Execute this.push (' before end запускает <<<') callback() }})// write непрерывно записывает данные let count = 0transform.write('>>>>')function ProductionData() { let flag = true while (count <= 20 && flag) { flag = Transform.write(count.toString()) count++ } if (count > 20) { Transform.end() }}productionData()transform.on('drain', ProductionData)let result = '' Transform.on( 'data', data=>{ result += data.toString()})transform.on('end', ()=>{ console.log(result) // >>>0@23456789@ 0@1@ 2@3@4@5@6@7@8@920<<<})