Работа с потоками в node.js
Данный документ является вольным переводом stream-handbook и охватывает основы создания node.js приложений с использованием потоков. По сравнению с источником - обновлены некоторые главы с учетом 2016 года, добавлено объяснение различий между разными версиями API, убраны устаревшие модули и добавлены новые, изменена структура повествования.
Таким образом, надеюсь, в результате получился актуальный современный учебник по потоковому API в node.js. Жду ваших комментариев и замечаний.
Оглавление
- Вступление
- Почему мы должны использовать потоки
- Основы
- Встроенные потоки
- Сторонние потоки
- Заключение
Вступление
Нам нужен способ взаимодействия между программами, наподобие того как садовый шланг можно подключать к разным сегментам и изменять направление воды. То же самое можно сделать с вводом-выводом данных
Дуглас Макилрой. 11 октября 1964
Потоки пришли к нам из первых дней эпохи Unix и зарекомендовали себя в течении многих десятилетий как надежный способ создания сложных систем из маленьких компонентов, которые делают что-то одно, но делают это хорошо. В Unix потоки реализуются в оболочке с помощью знака |
(pipe). В node встроенный модуль потоков используется в базовых библиотеках, кроме этого его можно подключать в свой код. Подобно Unix, в node основной метод модуля потоков называется .pipe()
. Он позволяет соединять потоки с разной скоростью передачи данных таким образом что данные не будут потеряны.
Потоки помогают разделять ответственность, поскольку позволяют вынести все взаимодействие в отдельный интерфейс, который может быть использован повторно. Вы сможете подключить вывод одного потока на ввод другого, и использовать библиотеки которые будут работать с подобными интерфейсами на более высоком уровне.
Потоки - важный элемент микроархитектурного дизайна и философии UNIX, но кроме этого есть еще достаточное количество важных абстракций для рассмотрения. Всегда помните своего врага (технический долг) и ищите наиболее подходящие для решения задач абстракции.
Почему мы должны использовать потоки
Ввод-вывод в node асинхронен, поэтому взаимодействие с диском и сетью происходит через различные способы управления асинхронным кодом (обещания, генераторы, функции обратного вызова и т.п.). Следующий код отдает файл браузеру через функцию обратного вызова (callback):
|
|
Этот код работает, но он буферизирует весь data.txt
в память при каждом запросе. Если data.txt
достаточно большой, ваша программа начнет потреблять слишком много оперативной памяти, особенно при большом количестве подключений пользователей с медленными каналами связи.
При этом пользователи останутся недовольными, ведь им придется ждать пока весь файл не будет считан в память на сервере перед отправкой.
К счастью, оба аргумента (req, res)
являются потоками, а это значит что мы можем переписать код с использованием fs.createReadStream()
вместо fs.readFile()
:
|
|
Теперь .pipe()
самостоятельно слушает события 'data'
и'end'
потока созданного через fs.createReadStream()
. Этот код не только чище, но теперь и data.txt
доставляется по частям по мере чтения его с диска.
Использование .pipe()
имеет ряд других преимуществ, например автоматическая обработка скорости ввода-вывода - node.js не будет буферизировать лишние части файла в память пока предыдущие части не отправлены клиенту с медленным соединением.
А если мы хотим еще больше ускорить отправку файла? Добавим сжатие:
|
|
Теперь наш файл cжимается для браузеров, которые поддерживают gzip или deflate! Мы просто отдаем модулю opressor всю логику обработки content-encoding и забываем про нее.
После того как вы ознакомитесь с API потоков, вы сможете писать потоковые модули и соединять их как кусочки лего, вместо того чтобы изобретать свои велосипеды и пытаться запомнить все способы взаимодействия между компонентами системы.
Потоки делают программирование в node.js простым, элегантным и компонуемым.
Основы
Существует 4 вида потоков:
- на чтение (readable)
- на запись (writeable)
- трансформирующие (transform)
- дуплексные (duplex)
Начиная с версии node.js v0.12 в стабильном состоянии заморожена версия APIv3 (streams3) - именно его описывает официальная документация. Все виды потоков, и различия в реализации API между ними будут рассмотрены ниже.
pipe()
Любой поток может использовать.pipe()
для соединения входов с выходами.
.pipe()
это просто функция, которая берет поток на чтение src
и соединяет его вывод с вводом потока на запись dst
:
|
|
.pipe(dst)
возвращает dst
, так что вы можете связывать сразу несколько потоков:
|
|
или то же самое:
|
|
Аналогично в Unix вы можете связать утилиты вместе:
|
|
Потоки на чтение (readable)
Поток на чтение производит данные, которые с помощью .pipe()
могут быть переданы в поток на запись, трансформирующий или дуплексный поток:
|
|
Создание потока на чтение
Давайте создадим считываемый поток!
|
|
|
|
Тут rs.push(null)
сообщает потребителю, что rs
закончил вывод данных.
Заметьте, мы отправили содержимое в поток на чтение rs
ДО привязывания его к process.stdout
, но сообщение все равно появилось в консоли. Когда вы посылаете с помощью .push()
данные в поток на чтение, они буферизируются до тех пор пока потребитель не будет готов их прочитать.
Тем не менее, в большинстве случаев будет лучше если мы не будем их буферизировать совсем, вместо этого будем генерировать их только когда данные запрашиваются потребителем.
Мы можем посылать данные кусками, определив функцию ._read
:
|
|
|
|
Теперь мы помещаем буквы от 'a'
до 'z'
включительно, но только тогда когда потребитель будет готов их прочитать.
Метод _read
также получает в первом аргументе параметр size
, который указывает сколько байт потребитель хочет прочитать - он необязательный, так что ваша реализация потока может его игнорировать.
Обратите внимание, вы также можете использовать util.inherits()
для наследования от базового потока, но такой подход может быть непонятен тому кто будет читать ваш код.
Чтобы продемонстрировать, что наш метод _read
вызовется только когда потребитель запросит данные, добавим задержку в наш поток:
|
|
Запустив программу, мы увидим, что если мы запросим 5 байт - _read ()
вызовется 5 раз:
|
|
Задержка через setTimeout необходима, так как операционной системе требуется определенное время чтобы послать сигнал о закрытии конвейера.
Обработчик process.stdout.on('error', fn)
также необходим, поскольку операционная система пошлет SIGPIPE нашему процессу когда утилите head
больше не будет нужен результат нашей программы (в этом случае будет вызвано событие EPIPE в потоке process.stdout
).
Эти усложнения необходимы при взаимодействии с конвейером в операционной системе, но в случае реализации потоков чисто в коде они будут обработаны автоматически.
Если вы хотите создать читаемый поток, который выдает произвольные форматы данных вместо строк и буферов - убедитесь что вы его инициализировали с соответствующей опцией: Readable ({ objectMode: true })
.
Использование потока на чтение
В большинстве случаев мы будем подключать такой поток к другому потоку, созданному нами или модулями наподобие through, concat-stream. Но иногда может потребоваться использовать его напрямую.
|
|
|
|
Когда данные становятся доступными, возникает событие 'readable'
, и вы можете вызвать .read()
чтобы получить следующую порцию данных из буффера.
Когда поток завершится, .read()
вернет null
, потому что не останется доступных для чтения байтов.
Вы можете запросить определенное количество байтов: .read(n)
. Указание необходимого размера носит рекомендательный характер, и не сработает для потоков возвращающих объекты. Однако, все базовые потоки обязаны поддерживать данную опцию.
Пример чтения в буффер порциями по 3 байта:
|
|
Но, при запуске этого примера мы получим не все данные:
|
|
Это произошло потому что последняя порция данных осталась во внутреннем буфере, и нам надо “подопнуть” их. Сделаем мы это сообщив с помощью .read(0)
что нам надо больше чем только что полученные 3 байта данных:
|
|
Теперь наш код работает как и ожидалось:
|
|
В случае, если вы получили больше данных чем вам требуется - можно использовать .unshift()
чтобы вернуть их назад. Использование .unshift()
помогает нам предотвратить получение ненужных частей.
К примеру, создадим парсер который разделяет абзац на строки с делителем - переносом строки:
|
|
|
|
Код выше приведен только для примера, если вам действительно нужно будет разбить строку - лучше будет воспользоваться специализированным модулем split и не изобретать велосипед.
Потоки на запись (writeable)
В поток на запись можно послать данные используя .pipe()
, но прочитать их уже не получится:
|
|
Создание потока на запись
Просто определяем методом ._write(chunk, enc, next)
, и теперь в наш поток можно передавать данные:
|
|
|
|
Первый аргумент, chunk
, это данные которые посылает отправитель.
Второй аргумент, enc
, это строка с названием кодировки. Она используется только в случае когда опция opts.decodeString
установлена в false
, и вы отправляете строку.
Третий аргумент, next(err)
, является функцией обратного вызова (callback), сообщающей отправителю что можно послать еще данные. Если вы вызовите ее с параметром err
, в потоке будет создано событие 'error'
.
В случае если поток из которого вы читаете передает строки, они будут преобразовываться в Buffer
. Чтобы отключить преобразование - создайте поток на запись с соответствующим параметром: Writable({ decodeStrings: false })
.
Если поток на чтение передает объекты - явно укажите это в параметрах: Writable({ objectMode: true })
.
Отправка данных в поток на запись
Чтобы передать данные в поток на запись - вызовите .write(data)
, где data
это набор данных которые вы хотите записать.
|
|
Если вы хотите сообщить что вы закончили запись - вызовите .end()
(или .end(data)
чтобы отправить еще немного данных перед завершением):
|
|
|
|
Не беспокойтесь о синхронизации данных и буферизации, .write()
вернет false
если в буфере скопилось данных больше чем указывалось в параметре opts.highWaterMark
при создании потока. В этом случае следует подождать события 'drain'
, которое сигнализирует о том что данные можно снова писать.
Дуплексные потоки (duplex)
Дуплексные потоки наследуют методы как от потоков на чтение, так и от потоков на запись. Это позволяет им действовать в обоих направлениях - читать данные, и записывать их в обе стороны. В качестве аналогии можно привести телефон. Если вам требуется сделать что-нибудь типа такого:
|
|
значит вам нужен дуплексный поток.
Трансформирующие потоки (transform)
Трансформирующие потоки это частный случай дуплексных потоков (в обоих случаях они могут использоваться как для записи, так и чтения). Разница в том, что в случае трансформации отдаваемые данные так или иначе зависят от того что подается на вход.
Возможно, вы также встречали второе название таких потоков - “сквозные” (“through streams”). В любом случае, это просто фильтры которые преобразовывают входящие данные и отдают их.
Различия в реализации потоков
streams1: устаревшее API
В первых версиях node.js существовал классический (streams1) интерфейс потоков. Интерфейс поддерживал добавление данных в поток (push-режим), однако потребитель мог только слушать события data
и end
, буферизация не поддерживалась и данные легко было потерять. Разработчики вручную контролировали поток вызывая .pause()
и .resume()
. На текущий момент его практически нигде не используют. Если вы все таки работаете с подобным потоком - вам пригодится несколько практик.
К примеру, чтобы избежать установки слушателей "data"
и "end"
подойдет модуль through:
|
|
|
|
а для буферизации всего содержимого потока сойдет concat-stream:
|
|
|
|
У классических потоков на чтение для остановки и продолжения есть методы .pause()
и .resume()
, но их использования следует избегать. Если вам необходим этот функционал - рекомендуется не создавать логику самостоятельно, а использовать модуль through.
streams2: второе поколение
В node.js v0.10 появилось второе поколение потоков (streams2). Эти потоки всегда запускаются в режиме паузы, и у потребителей уже есть возможность запросить данные вызвав .read(numBytes)
(pull-режим), присутствует буферизация. Ключевая особенность данного API - поток автоматически переключается в классический режим в целях совместимости если назначить обработчики на data
и end
. При этом поток снимается с режима паузы и отключается возможность использовать pull-режим. На момент написания статьи (11.07.2016) многие неактуальные модули работают в данном режиме, однако активно развивающиеся модули перешли на третье поколение.
streams3: стабильная реализация
Начиная с node.js v0.11, концепция потоков переработана и признана стабильной - в официальной документации описывается именно поведение streams3. По умолчанию потоки все еще запускаются в режиме паузы а назначение обработчиков снимает их с паузы. Однако, если использовать .pause()
и вызвать метод .read()
- соответствующие данные будут возвращены. Таким образом, потоки поддерживают как pull режим, так и push. При этом, можно смело использовать модули с streams2 так как они совместимы.
Дополнительно
Вы прочитали про базовые понятия касающиеся потоков, если вы хотите узнать больше - обратитесь к актуальной документация по потокам. В случае если вам понадобится сделать API streams2 потоков совместимым с “классическим” API streams1 (например, при использовании устаревших версий node.js)- используйте модуль readable-stream. Просто подключите его в свой проект: require('readable-stream')
вместо require('stream')
.
Встроенные потоки
Эти потоки поставляются с node.js и могут быть использованы без подключения дополнительных библиотек.
process
process.stdin
Поток на чтение содержит стандартный системный поток ввода для вашей программы.
По умолчанию он находится в режиме паузы, но после первого вызова .resume()
он начнет исполняться в
следующем системном тике.
Если process.stdin указывает на терминал (проверяется вызовомtty.isatty()
), тогда входящие данные будут буферизироваться построчно. Вы можете выключить построчную буферизацию вызвав process.stdin.setRawMode(true)
. Однако, имейте ввиду что в этом случае обработчики системных нажатий (таких как^C
и ^D
) будут удалены.
process.stdout
Поток на запись, содержащий стандартный системный вывод для вашей программы. Посылайте туда данные, если вам нужно передать их в stdout.
process.stderr
Поток на запись, содержащий стандартный системный вывод ошибок для вашей программы. Посылайте туда данные, если вам нужно передать их в stderr.
child_process.spawn()
Данная функция запускает процесс, и возвращает объект содержащий stderr/stdin/stdout потоки данного процесса.
fs.createReadStream()
Поток на чтение, содержащий указанный файл. Используйте, если вам надо прочесть большой файл без больших затрат ресурсов.
fs.createWriteStream()
Поток на запись, позволяющий сохранить переданные данные в файл.
net
net.connect()
Данная функция вернет дуплексный поток, который позволяет подключиться к удаленному хосту по протоколу tcp.
Все данные которые вы будете в него записывать будут буферизироваться до тех пор, пока не возникнет событие 'connect'
.
net.createServer()
Создает сервер для обработки входящих соединений. Параметром передается функция обратного вызова (callback), которая вызывается при создании соединения, и содержит поток на запись.
|
|
http.request()
Создает поток на чтение, позволяющий сделать запрос к веб-серверу и вернуть результат.
http.createServer()
Создает сервер для обработки входящих веб-запросов. Параметром передается функция обратного вызова (callback), которая вызывается при создании соединения, и содержит поток на запись.
zlib.createGzip()
Трансформирующий поток, который отдает на выходе запакованный gzip.
zlib.createGunzip()
Трансформирующий поток, распаковывает gzip-поток.
zlib.createDeflate()
zlib.createInflate()
Сторонние потоки
Список модулей
Ниже приведен список npm-модулей, работающих с потоками. Список является далеко не полным, постоянно появляются новые модули и их нет возможности отслеживать. Цель данной таблицы - дать представление о “кирпичиках”, из которых вы можете собрать свое приложение. Не стесняйтесь проходить по ссылкам и изучать документацию, там есть более подробное описание и примеры использования.
through | Простой способ создания дуплексного потока или конвертации “классического” в современный | ||
from | Аналог through, только для создания потока для чтения | ||
pause-stream | Позволяет буферизировать поток и получать результат буфера в произвольный момент | ||
concat-stream | Буферизирует поток в один общий буфер. concat(cb) принимает параметром только один аргумент - функцию cb(body) , которая вернет body когда поток завершится |
||
duplex, duplexer | Создание дуплексного потока | ||
emit-stream | Конвертирует события (event-emitter) в поток, и обратно | ||
invert-stream | Создает из двух потоков один, “соединяя” вход первого потока с выходом второго и наоборот | ||
map-stream | Создает трансформирующий поток для заданной асинхронной функции | ||
remote-events | Позволяет объединять несколько эмиттеров событий в единый поток | ||
buffer-stream | Дуплексный поток, буферизирующий проходящие через него данные | ||
highland | Управление асинхронным кодом с использованием потоков | ||
auth-stream | Добавление слоя авторизации для доступа к потокам | ||
mux-demux | Создание мультифункциональных потоков на основе любых текстовых. | ||
stream-router | Роутер для потоков, созданных с помощью mux-demux |
||
multi-channel-mdm | Создание постоянных потоков (каналов) из потоков mux-demux |
||
crdt, delta-stream, scuttlebutt | Данная коллекция потоков предполагает, что операции над данными всегда возвращают один и тот же результат вне зависимости от порядка этих операций | ||
request | Создание http-запросов | ||
reconnect-core | Базовый настраиваемый интерфейс для переподключения потоков при возникновении проблем в сети | ||
kv | Абстрактный поток, предоставляющий враппер для доступа к различным key-value хранилищам | ||
trumpet | Трансформация html-текста с использованием css-селекторов | ||
JSONStream | Преобразование JSON.parse и JSON.stringify . Примеры использования - обработка большого объема JSON-данных при недостаточном количестве оперативной памяти, обработка json “на лету” при получении его через медленные каналы, и т.п. |
||
shoe | Трансляция вебсокет событий. | ||
dnode | Данный модуль дает вам возможность вызывать удаленные функции (RPC) через любой поток | ||
tap | Фреймворк для тестирования node.js на основе потоков. | ||
stream-spec | Способ описания спецификации потоков, для автоматизации их тестирования. |
Примеры использования
pause-stream
позволяет буферизировать поток и получать результат буфера в произвольный момент:
|
|
В данном примере concat-stream
вернет строку "beep boop"
только после того как вызовется cs.end()
. Результат работы программы - перевод строки в верхний регистр:
|
|
|
|
Следующий пример с concat-stream
обработает строку с параметрами, и вернет их уже в JSON:
|
|
|
|
В данном примере используются JSONStream
и emit-stream
и net
. Будет создан сервер который автоматически отправит все события клиенту:
|
|
Клиент, со своей стороны, может автоматически конвертировать приходящие данные обратно в события:
|
|
Данная программа создаст из stdin и stdout дуплексный поток с помощью invert-stream
:
|
|
|
|
Для создания потока, работающего с датами, тут мы используем mux-demux
:
|
|
Мощные комбинации
Статья была бы не полной без рассказа о той магии, которую можно совершать используя комбинации различных потоков. Давайте рассмотрим некоторые из них.
Создание распределенной сети
Модуль scuttlebutt
может быть использован для синхронизации состояния между узлами mesh-сети, где узлы непосредственно не связаны между собой и нет единого мастера (аналог торрент-клиента).
Под капотом у scuttlebutt
используется широко известный в узких кругах протокол gossip, который гарантирует что все узлы будут возвращать последнее актуальное значение.
Используя интерфейс scuttlebutt/model
, мы можем создавать клиентов и связывать их между собой:
|
|
Мы создали сеть в форме ненаправленного графа, которая выглядит так:
|
|
Узлы a
и e
напрямую не соединены, но если мы выполним команду:
|
|
то увидим что узел a
будет доступен узлу e
через узлы b
и d
. Учитывая то, что scuttlebutt
использует простой потоковый интерфейс, и все узлы гарантированно получат данные - мы можем соединить любой процесс, сервер или транспорт которые поддерживают обработку строк.
Давайте создадим более реалистичный пример. В нем мы будем соединяться через сеть, и увеличивать счетчик каждые 320 миллисекунд на всех узлах:
|
|
Теперь создадим клиента, который подключается к серверу, получает обновления и выводит их на экран:
|
|
Клиент получился чуть-чуть сложнее, так как ему приходится ждать обновления от остальных участников прежде чем убедиться что он может увеличить счетчик.
После того как мы запустим сервер и несколько клиентов - мы увидим изменения счетчика наподобие такого:
|
|
Время от времени на некоторых узлах мы будем замечать что значения повторяются:
|
|
Это происходит потому, что мы не предоставили достаточно данных алгоритму для разрешения временных конфликтов, и ему сложнее поддерживать синхронизацию всех узлов. К сожалению, дальнейшее развитие примера выходит за пределы данной статьи, поэтому рекомендуем самостоятельно изучить scuttlebutt
.
Обратите внимание, что в вышеприведенных примерах сервер это всего лишь еще один узел с теми же привилегиями что и остальные клиенты. Понятия “клиент” и “сервер” не затрагивают способы синхронизации данных, в данном сервер это “тот кто первым создал соединение”. Подобные протоколы называют “симметричными”, еще один пример подобного протокола можно посмотреть в реализации модуля dnode
.
Клиент-серверный RPC
Для примера, создадим простой сервер dnode
:
|
|
потом напишем клиента, который вызывает метод сервера .transform()
:
|
|
После запуска, клиент выведет следующий текст:
|
|
Клиент послал 'beep'
на сервер, запросив выполнение метода .transform()
, сервер вернул результат.
Интерфейс, который предоставляет dnode
, является дуплексным потоком. Таким образом, так как и клиент и сервер подключены друг к другу (c.pipe(d).pipe(c)
), запросы можно выполнять в обе стороны.
dnode
раскрывает себя во всей красе когда вы начинаете передавать аргументы к предоставленным методам. Посмотрим на обновленную версию предыдущего сервера:
|
|
Вот обновленный клиент:
|
|
После запуска клиента, мы увидим:
|
|
Сервер увидел аргумент, и выполнил функцию с ним!
Основная идея такая: вы просто кладете функцию в объект, и на другой стороне земного шара вызываете идентичную функцию с нужными вам аргументами. Вместо того чтобы выполниться локально, данные передаются на сервер и функция возвращает результат удаленного выполнения. Это просто работает.
dnode
работает через потоки как в node.js, так и в браузере. Удобно комбинировать потоки через mux-demux
для создания мультиплексного потока, работающего в обе стороны.
Собственная реализация socket.io
Мы можем создать собственное API для генерации событий через websocket с использованием потоков.
Сперва, используем shoe
для создания серверного обработчика вебсокетов, и emit-stream
чтобы превратить эмиттер событий в поток, который генерирует объекты.
Далее, поток с объектами мы подключаем к JSONStream
, с целью преобразовать объект в строку готовую для передачи в сеть.
|
|
Теперь мы можем прозрачно генерировать события используя метод эмиттера ev
. К примеру, несколько событий через разные промежутки времени:
|
|
Наконец, экземпляр shoe
привяжем к http-серверу:
|
|
Между тем, на стороне браузера поток от shoe
содержащий json обрабатывается и получившиеся объекты передаются в eventStream()
. Таким образом, eventStream()
возвращает эмиттер который генерирует переданные сервером события:
|
|
Используем browserify для генерации кода в браузере, чтобы мы могли делать require()
прямо в файле:
|
|
Подключаем <script src="/bundle.js"></script>
в html-страницу, открываем ее в браузере и наслаждаемся серверными событиями которые отображаются в браузере.
Заключение
Начав использовать потоки и планировать с их помощью процесс разработки программ, вы заметите что стали больше полагаться на маленькие переиспользуемые компоненты которым не нужно ничего кроме общего интерфейса потоков. Вместо маршрутизации сообщений через глобальную систему событий и настройки обработчиков вы сфокусируетесь на разбиении приложения на мелкие компоненты, хорошо выполняющими какую-то одну задачу.
В примерах вы можете легко заменить JSONStream
на stream-serializer
чтобы получить немного другой способ преобразования в строку. Вы можете добавить дополнительный слой чтобы обрабатывать потери связи с помощью reconnect-core
. Если вы захотите использовать события с областью видимости - вы вставите дополнительный поток с поддержкой eventemitter2. В случае если вам потребуется изменить поведение некоторых частей потока вы сможете пропустить его через mux-demux
и разделить на отдельные каналы каждый со своей логикой.
С течением времени, при изменении требований к приложению, вы легко сможете заменять устаревшие компоненты новыми, с гораздо меньшим риском получить в результате неработающую систему.