1.1. Историческая эволюция потоков.
Потоки не являются уникальной концепцией Nodejs. Они были представлены несколько десятилетий назад в операционной системе Unix, и программы могут взаимодействовать друг с другом в потоках через оператор канала (|).
Оператор канала (|) можно использовать в MacOS и Linux на базе систем Unix. Он может преобразовывать выходные данные процесса в левой части оператора во входные данные в правой части.
В Node, если мы используем традиционный readFile для чтения файла, файл будет прочитан в память от начала до конца. Когда все содержимое будет прочитано, содержимое файла, загруженного в память, будет обработано единообразно.
У этого есть два недостатка:
память: это занимает много памяти;
время: вам нужно дождаться загрузки всей полезной нагрузки, прежде чем начинать обработку данных.
Чтобы решить вышеуказанные проблемы, Node. .js реализовал концепцию потоков. В Node.js существует четыре типа потоков. Все они являются экземплярами EventEmitter в Node.js:
читаемый поток,
записываемый поток,
читаемый и записываемый полнодуплексный поток (). Duplex Stream)
Transform Stream (Преобразование потока).
Чтобы глубже изучить эту часть и постепенно понять концепцию потоков в Node.js, а также поскольку часть исходного кода относительно сложна, я решил начать изучение этой части с читаемого потока. .
1.2.Что такое поток?
Поток — это абстрактная структура данных, которая представляет собой набор данных. Типы данных, хранящихся в нем, могут быть только следующих типов (только для случая objectMode === false):
We. можно использовать поток. Рассматривая его как набор этих данных, точно так же, как и жидкости, мы сначала сохраняем эти жидкости в контейнере (внутренний буфер BufferList потока), а когда срабатывает соответствующее событие, мы выливаем жидкость внутри в трубу . И сообщите другим, чтобы они поставили свои собственные контейнеры на другой стороне трубы, чтобы собрать жидкость внутри для утилизации.
1.3. Что такое читаемый поток?
Читаемый поток — это тип потока. Он имеет два режима, три состояния
и два режима чтения:
режим потока: данные будут считаны из базовой системы и переданы через EventEmitter как можно скорее. Данные передаются зарегистрированному обработчику событий в
режиме паузы: в этом режиме данные не будут считаны, и для чтения данных из потока необходимо явно вызвать метод Stream.read().
Три состояния:
readableFlowing = =. = null: данные генерироваться не будут. Вызов Stream.pipe() и Stream.resume изменит его статус на true, начнет генерировать данные и активно вызовет событие
readableFlowing === false: поток данных в это время будет приостановлен. , но не будет Генерация данных будет приостановлена, поэтому возникнет отставание в данных
readableFlowing === true: нормально генерировать и использовать данные
2.1 Определение внутреннего состояния (ReadableState)
ReadableState
_readableState { objectMode: false, // Для работы с другими типами данных, кроме строковых, буферных и нулевых, этот режим необходимо включить highWaterMark: 16384, // Предел уровня воды, 1024 * 16, по умолчанию 16 КБ, если этот предел превышен , вызов остановится _read() считывает данные в буферный буфер: BufferList { head: null, Tail: null, length: 0 }, // Связанный список буфера, используемый для сохранения длины данных: 0, // Размер все читаемые данные потока, если objectMode равен buffer.length Pipes: [], // Сохраняем все очереди каналов, которые отслеживают поток читаемого потока: null, // Статус независимого потока — null, false, true закончено: false, // Все данные были израсходованы endEmited: false, // Было ли конечное событие отправлено или не читается: false, // Читаются ли данные сконструировано: true, // Поток не может быть обработан раньше он создается или терпит неудачу. Уничтожить sync: true, // Следует ли запускать событие 'readable'/'data' синхронно или ждать до следующего тика. NeedReadable: false, // Необходимо ли отправлять читаемое событие emulatedReadable: false, // Было отправлено читаемое событие readableListening: false, // Есть ли читаемое событие прослушивания summaryScheduled: false, // Есть ли метод возобновления было вызвано errorEmited: false, // Ошибка Событие было отправлено emmitClose: true, // Когда поток уничтожен, следует ли отправлять событие закрытия autoDestroy: true, // Автоматически уничтожается, оно вызывается после 'конца' срабатывает событие уничтожено: false, // Был ли поток уничтожен errored: null, // Определяет, сообщил ли поток об ошибке close: false, // Был ли поток закрыт closeEmited: false, // Будет ли закрытие событие было отправлено defaultEncoding: 'utf8', // Формат кодировки символов по умолчанию awaitDrainWriters: null, // Указывает на отслеживаемую ссылку 'drain' Writer события, тип - null, Writable, Set<Writable> multiAwaitDrain: false, // Есть ли несколько устройств записи, ожидающих события стока readMore: false, // Можно ли прочитать больше данных dataEmited: false, // Данные были отправлены decoder: null, // Кодировка декодера: null, // Кодировщик[Символ(kPaused)]: ноль },
2.2.Реализация внутреннего хранилища данных (BufferList)
BufferList — это контейнер, используемый для хранения внутренних данных в потоке. Он выполнен в виде связанного списка и имеет три атрибута: заголовок, хвост и длину.
Я представляю каждый узел в BufferList как BufferNode, а тип данных внутри зависит от objectMode.
Эта структура данных получает данные заголовка быстрее, чем Array.prototype.shift().
2.2.1. Тип хранения данных.если objectMode === true:
Тогда данные могут быть любого типа. Какие бы данные ни были отправлены, они будут сохранены.
objectMode=истина
const Stream = require('поток'); const readableStream = новый Stream.Readable({ Режим объекта: правда, читать() {}, }); readableStream.push({ name: 'lisa'}); console.log(readableStream._readableState.buffer.tail); readableStream.push(истина); console.log(readableStream._readableState.buffer.tail); readableStream.push('Лиза'); console.log(readableStream._readableState.buffer.tail); readableStream.push(666); console.log(readableStream._readableState.buffer.tail); readableStream.push(() => {}); console.log(readableStream._readableState.buffer.tail); readableStream.push(Символ(1)); console.log(readableStream._readableState.buffer.tail); readableStream.push(BigInt(123)); console.log(readableStream._readableState.buffer.tail);
Результаты запуска:
если objectMode === false:
Тогда данные могут быть только строкой, буфером или Uint8Array.
objectMode = ложь
const Stream = require('поток'); const readableStream = новый Stream.Readable({ Режим объекта: ложь, читать() {}, }); readableStream.push({ name: 'lisa'});
Результаты запуска:
2.2.2. Структура хранения данных.Создаем читаемый поток в консоли через командную строку узла, чтобы наблюдать за изменениями данных в буфере:
Конечно, прежде чем отправлять данные, нам нужно реализовать их метод _read или реализовать метод чтения в параметрах конструктора:
const Stream = require('поток'); const readableStream = новый Stream.Readable(); RS._read = функция(размер) {}
или
const Stream = require('поток'); const readableStream = новый Stream.Readable({ читать(размер) {} });
После операции readableStream.push('abc') текущий буфер:
Вы можете видеть, что текущие данные сохранены. Данные, хранящиеся в начале и конце, представляют собой коды ASCII строки «abc», а тип — тип «буфер». Длина представляет собой количество сохраненных в данный момент данных, а не размер. содержание данных.
2.2.3. Связанные API.Распечатав все методы BufferList, вы можете получить:
За исключением соединения, которое сериализует BufferList в строку, все остальные являются операциями доступа к данным.
Я не буду здесь объяснять все методы один за другим, а сосредоточусь на потреблении, _getString и _getBuffer.
2.2.3.1.потреблять
Адрес исходного кода: BufferList.consume. https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L80
потреблять
// Потребляет указанное количество байтов или символов из буферизованных данных. потреблять (п, hasStrings) { константные данные = this.head.data; если (n <data.length) { // `slice` одинаков для буферов и строк. const срез = data.slice(0, n); this.head.data = data.slice(n); возвратный фрагмент; } если (n === data.length) { // Первый фрагмент идеально подходит. верните этот.shift(); } // Результат охватывает более одного буфера. вернуть hasStrings ? this._getString(n): this._getBuffer(n); }
В кодексе есть три условия оценки:
Если длина байта потребляемых данных меньше длины данных, хранящихся в головном узле связанного списка, берутся первые n байт данных головного узла и устанавливаются данные текущего головного узла. к данным после нарезки.
Если использованные данные точно равны длине данных, хранящихся в головном узле связанного списка, данные текущего головного узла возвращаются напрямую.
Если длина потребляемых данных превышает длину головного узла связанного списка, последнее решение будет принято на основе второго переданного параметра, чтобы определить, хранит ли нижний уровень текущего BufferList строку или буфер. .
2.2.3.2._getBuffer
Адрес исходного кода: BufferList._getBuffer https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L137
потреблять
// Потребляет указанное количество байтов из буферизованных данных. _getBuffer (п) { const ret = Buffer.allocUnsafe(n); константа retLen = п; пусть p = this.head; пусть с = 0; делать { const buf = p.data; если (n > buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.длина; } еще { if (n === buf.length) { TypedArrayPrototypeSet(ret, buf, retLen - n); ++с; если (стр.следующий) this.head = p.next; еще this.head = this.tail = null; } еще { TypedArrayPrototypeSet(ret, новый Uint8Array(buf.buffer, buf.byteOffset, n), ретЛен - н); this.head = р; p.data = buf.slice(n); } перерыв; } ++с; } while ((p = p.next) !== null); this.length -= c; вернуть возврат; }
В общем, это цикл для управления узлами связанного списка и создания нового буферного массива для хранения возвращаемых данных.
Сначала начните извлекать данные из головного узла связанного списка и продолжайте копировать их во вновь созданный буфер до тех пор, пока данные определенного узла не станут больше или равны длине, подлежащей выборке, минус полученная длина.
Другими словами, после чтения последнего узла связанного списка он не достиг желаемой длины, поэтому возвращается вновь созданный Buffer.
2.2.3.3._getString
Адрес исходного кода: BufferList._getString https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/buffer_list.js#L106
потреблять
// Потребляет указанное количество символов из буферизованных данных. _getString (п) { пусть Рет = ''; пусть p = this.head; пусть с = 0; делать { константа стр = p.data; if (n > длина строки) { Рет += ул; n -= длина строки; } еще { if (n === длина строки) { Рет += ул; ++с; если (стр.следующий) this.head = p.next; еще this.head = this.tail = null; } еще { ret += StringPrototypeSlice(str, 0, n); this.head = р; p.data = StringPrototypeSlice(str, n); } перерыв; } ++с; } while ((p = p.next) !== null); this.length -= c; вернуть возврат; }
Работа со строками аналогична работе с буферами. Также в цикле считываются данные из головы связанного списка. Есть лишь некоторые различия в копировании и хранении данных. Кроме того, тип данных, возвращаемый функцией. Операция _getString имеет строковый тип.
2.3. Почему потоки являются читаемыми экземплярами EventEmitter?
Для ответа на этот вопрос мы должны сначала понять, что такое модель публикации-подписки. Модель публикации-подписки имеет важные приложения в большинстве API. Будь то Promise или Redux, расширенные API, основанные на модели публикации-подписки, можно увидеть повсюду.
Его преимущество заключается в том, что он может хранить функции обратного вызова, связанные с событиями, в очереди, а затем уведомлять другую сторону о необходимости обработки данных в определенное время в будущем, тем самым обеспечивая разделение задач. Производитель только создает данные и уведомляет потребителя. , в то время как потребитель Тогда он обрабатывает только соответствующие события и соответствующие им данные, а модель потоковой передачи Node.js как раз соответствует этой характеристике.
Так как же поток Node.js реализует создание экземпляров на основе EventEmitter?
Исходный код для этого находится здесь: поток/наследие. https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/legacy.js#L10
наследие
функция Stream(opts) { EE.call(это, выбирает); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE);
Затем в исходном коде читаемого потока есть следующие строки кода:
Эта часть исходного кода находится здесь: доступна для чтения https://github.com/nodejs/node/blob/d5e94fa7121c9d424588f0e1a388f8c72c784622/lib/internal/streams/readable.js#L77
наследие
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Чтение, Поток);
Во-первых, унаследуйте объект-прототип Stream от EventEmitter, чтобы все экземпляры Stream могли получить доступ к методам EventEmitter.
При этом статические методы EventEmitter также наследуются через ObjectSetPrototypeOf(Stream, EE), а в конструкторе Stream заимствуется конструктор EE для реализации наследования всех свойств в EventEmitter, а затем в читаемом потоке, используйте тот же метод. Метод реализует прототипное наследование и наследование статических свойств класса Stream, получая таким образом:
Readable.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototype
поэтому:
Readable.prototype.__proto__.__proto__ === EE.prototype
Таким образом, вы можете найти прототип EventEmitter, проследив цепочку прототипов читаемого потока и реализовать наследование EventEmitter.
2.4. Реализация связанных API.
API-интерфейсы будут отображаться здесь в том порядке, в котором они появляются в документах с исходным кодом, и будут объяснены только основные реализации API.
Примечание. Здесь интерпретируются только функции, объявленные в исходном коде читаемого потока Node.js, а определения внешних функций не включаются. Чтобы уменьшить длину, все коды не будут копироваться.
Читабельный.прототип
Транслировать { уничтожить: [Функция: уничтожить], _undestroy: [Функция: уничтожить], _destroy: [Функция (анонимно)], push: [Функция (анонимно)], unshift: [Функция (анонимно)], isPaused: [Функция (анонимно)], setEncoding: [Функция (анонимная)], читать: [Функция (анонимно)], _read: [Функция (анонимно)], труба: [Функция (анонимно)], unpipe: [Функция (анонимно)], on: [Функция (анонимно)], addListener: [Функция (анонимно)], RemoveListener: [Функция (анонимно)], выкл: [Функция (анонимно)], RemoveAllListeners: [Функция (анонимная)], резюме: [Функция (анонимно)], пауза: [Функция (анонимно)], обертка: [Функция (анонимно)], итератор: [Функция (анонимная)], [Символ(nodejs.rejection)]: [Функция (анонимно)], [Символ(Symbol.asyncIterator)]: [Функция (анонимная)] }2.4.1.
читаемый.push
Readable.prototype.push = функция(кусок, кодировка) { вернуть readableAddChunk(this, chunk,coding, false); };
Основная функция метода push — передать блок данных в нижестоящий конвейер путем запуска события data или сохранить данные в собственном буфере.
Следующий код является соответствующим псевдокодом и показывает только основной процесс:
читаемый.push
функция readableAddChunk(поток, чанк, кодирование, addToFront) { константное состояние = поток._readableState; if (chunk === null) { // выдвигаем нулевой сигнал окончания потока, после этого больше данные не могут быть записаны. state.reading = false; onEofChunk (поток, состояние); } else if (!state.objectMode) { // Если не объектный режим if (typeof chunk === 'string') { кусок = Buffer.from(кусок); } else if (chunk instanceof Buffer) { //Если это Buffer // Обрабатываем кодировку} else if (Stream._isUint8Array(chunk)) { кусок = Stream._uint8ArrayToBuffer(кусок); } else if (кусок != null) { err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } } if (state.objectMode || (chunk && chunk.length > 0)) { // Это объектный режим или чанк является буфером // Оценка нескольких методов вставки данных здесь опущена addChunk(stream, state, chunk, true); } } функция addChunk(поток, состояние, чанк, addToFront) { if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { // В режиме потоковой передачи есть подписчики, прослушивающие данныеstream.emit('data', chunk); } else { // В противном случае сохранить данные в буфере state.length += state.objectMode 1 : chunk.length; если (addToFront) { state.buffer.unshift(кусок); } еще { state.buffer.push(кусок); } } mayReadMore(stream, state); // Попробуйте прочитать немного больше данных}
Операция push в основном делится на оценку objectMode. Разные типы выполняют разные операции с входящими данными:
Первое решение addChunk в основном касается ситуации, когда Readable находится в потоковом режиме, имеет прослушиватель данных, а данные в буфере пусты.
В это время данные в основном передаются другим программам, которые подписываются на событие данных, в противном случае данные сохраняются в буфере.
2.4.2. читатьЗа исключением оценки граничных условий и состояния потока, этот метод в основном состоит из двух операций.
Вызовите реализованный пользователем метод _read для обработки результатов выполнения.
Считайте данные из буферного буфера и вызовите событие «данные».
читаемый.читать
// Если длина чтения больше hwm, hwm будет пересчитан if (n > state.highWaterMark) { state.highWaterMark = вычислятьNewHighWaterMark(n); } // Вызов реализованного пользователем метода _read try { const result = this._read(state.highWaterMark); если (результат!= ноль) { const then = result.then; if (typeof then === 'функция') { тогда.вызов( результат, нет, функция (ошибка) { errorOrDestroy(это, ошибка); }); } } } поймать (ошибиться) { errorOrDestroy(это, ошибка); }
Если метод _read, реализованный пользователем, возвращает обещание, вызовите метод then этого обещания и передайте обратные вызовы успеха и неудачи, чтобы облегчить обработку исключений.
Основной код метода чтения для чтения данных зоны из буфера выглядит следующим образом:
читаемый.читать
функция fromList(n, состояние) { // ничего не буферизовано. если (state.length === 0) вернуть ноль; пусть уходит; если (state.objectMode) ret = state.buffer.shift(); else if (!n || n >= state.length) { // Обработка случая, когда n пусто или превышает длину буфера // Прочитайте все, обрезаем список. if (state.decoder) // Если есть декодер, сериализуем результат в строку ret = state.buffer.join(''); else if (state.buffer.length === 1) // Есть только одни данные, возвращаем данные головного узла ret = state.buffer.first(); else // Сохраняем все данные в буфере ret = state.buffer.concat(state.length); state.buffer.clear(); // Очищаем буфер} else { // Обрабатываем ситуацию, когда длина чтения меньше буфера ret = state.buffer.consume(n, state.decoder); } вернуть возврат; }2.4.3._прочитать
Метод, который должен быть реализован, когда пользователи инициализируют поток, доступный для чтения. Вы можете вызвать метод push в этом методе, чтобы постоянно запускать метод чтения. Когда мы нажимаем значение null, мы можем остановить операцию записи потока.
Пример кода:
читабельно._прочитано
const Stream = require('поток'); const readableStream = новый Stream.Readable({ читать (хм) { this.push(String.fromCharCode(this.currentCharCode++)); если (this.currentCharCode > 122) { this.push(ноль); } }, }); readableStream.currentCharCode = 97; readableStream.pipe(process.stdout); // abcdefghijklmnopqrstuvwxyz%2.4.4. труба (важно)
Привяжите один или несколько потоков, доступных для записи, к текущему потоку, доступному для чтения, и переключите поток, доступный для чтения, в потоковый режим.
В этом методе много дескрипторов прослушивания событий, и я не буду описывать их здесь по одному:
читаемый.pipe
Readable.prototype.pipe = function(dest, PipeOpts) { константный источник = это; const state = this._readableState; state.pipes.push(dest); // Собираем записываемый поток src.on('data', ondata); функция ondata(кусок) { const ret = dest.write(кусок); если (рет === ложь) { пауза(); } } // Сообщаем устройству, куда оно передается. dest.emit('труба', источник); // Запускаем поток, если поток находится в режиме паузы if (dest.writableNeedDrain === true) { если (state.flowing) { пауза(); } } еще если (!state.flowing) { источник.резюме(); } место возврата; }
Операция канала очень похожа на оператор канала Linux «|», изменяющий левый вывод на правый вход. Этот метод собирает записываемый поток для обслуживания и запускает событие «данные», когда доступный для чтения поток доступен.
Когда данные вытекают, срабатывает событие записи записываемого потока, так что данные могут быть переданы и могут быть реализованы такие операции, как конвейер. И автоматически изменит читаемый поток в режиме паузы на текущий режим.
2.4.5. резюмеПереключите поток из режима «паузы» в режим «потока». Если установлен «читаемый» прослушиватель событий, этот метод не имеет никакого эффекта.
читаемое.резюме
Readable.prototype.resume = function() { константное состояние = this._readableState; если (!state.flowing) { state.flowing = !state.readableListening; // Находится ли он в потоковом режиме, зависит от того, установлен ли «читаемый» дескриптор прослушивания возобновить(this, state); } }; функция резюме (поток, состояние) { if (!state.resumeScheduled) { // Переключаемся так, чтобы метод возобновить_ вызывался только один раз в одном и том же тике state.resumeScheduled = true; процесс.nextTick(резюме_, поток, состояние); } } функция резюме_(поток, состояние) { если (!state.reading) { поток.читать(0); } state.resumeScheduled = ложь; поток.emit('резюме'); поток (поток); } function flow(stream) { // Когда поток находится в потоковом режиме, этот метод будет продолжать читать данные из буфера до тех пор, пока буфер не станет пустым. const state =stream._readableState; while (state.flowing &&stream.read() !== null); // Поскольку здесь будет вызван метод чтения и установлен поток «читаемого» прослушивателя событий, метод чтения также может быть вызван. //Это приводит к бессвязным данным (не влияет на данные, влияет только на вызов метода чтения в обратном вызове события "readable" для чтения данных) }2.4.6. пауза
Измените поток из потокового режима в режим паузы, прекратите генерировать событие data и сохраните все данные в буфере.
читаемый.пауза
Readable.prototype.pause = function() { if (this._readableState.flowing !== false) { отладка («пауза»); this._readableState.flowing = ложь; this.emit('пауза'); } верните это; };
2.5 Использование и механизм работы.
Метод использования был упомянут в разделе BufferList. Создайте экземпляр Readable и реализуйте его метод _read() или реализуйте метод чтения в первом параметре объекта конструктора.
2.5.1. Рабочий механизм.Здесь мы рисуем только общий процесс и условия срабатывания преобразования режима Readable потока.
в: