[Часть 2.1] RabbitMQ примеры кода Node.js

Будем следовать сценарию, использованному в предыдущей статье  «Что такое RabbitMQ?». Веб-сайт будет обрабатывать информацию и генерировать PDF-файл и отправлять его обратно пользователю. Создание PDF-файла и отправка электронной почты в этом сценарии займет несколько секунд. Если вы не знакомы с RabbitMQ и очередью сообщений, я бы рекомендовал вам прочитать RabbitMQ для новичков — что такое RabbitMQ? перед началом работы с этим руководством.

Начало работы с RabbitMQ и Node.js

Начните с загрузки клиентской библиотеки для Node.js. У разработчиков есть несколько вариантов клиентских библиотек AMQP. В этом примере будет использоваться amqplib. Начните с добавления amqplib в качестве зависимости в файле package.json.

Для начала вам понадобится установить RabbitMQ.

При запуске кода будет установлено соединение между RabbiMQ и вашим приложением. Будут объявлены и созданы очереди, если они еще не существуют, и, наконец, сообщение будет опубликовано. Метод публикации будет помещать сообщения в очередь, и если соединение отключено, повторно отправит сообщение позже. Пользователь подписывается на очередь. Сообщения обрабатываются один за другим и отправляются методу обработки PDF.

Новое сообщение будет публиковаться каждую секунду. Будет использоваться обмен по умолчанию, идентифицирующий пустую строку («»). Обмен по умолчанию означает, что сообщения направляются в очередь с именем, указанным routing_key, если оно существует. (Обмен по умолчанию — это прямой обмен без имени)

Полный код можно скачать с GitHub.

Загружаем AMQPLIB

Настраиваем подключение

Функция  start установит соединение с RabbitMQ. Если подключение закрыто или оборвано попробует его переподключить.

amqpConn будет удерживать соединение с каналами.

whenConnected метод будет вызван когда соединение будет установлено.

Функция whenConnected вызывает две другие, первая запускает паблишера, а вторая воркер (consumer / слушатель).

Запуск паблишера

createConfirmChannel открывает канал в режиме подтверждения. Канал в режиме подтверждения требует, чтобы каждое опубликованное сообщение было подписано (‘acked’ or ‘nacked’) сервером, тем самым указывая на то, что оно было обработано.

offlinePubQueue является внутренней очередью сообщений, которые не были отправлены, когда приложение не работало. Приложение проверяет эту очередь и отправляет сообщения в очередь, если сообщение добавляется в очередь.

Публикация сообщения

Функция publish опубликует сообщение для обмена с заданным ключом маршрутизации. При возникновении ошибки сообщение будет добавлено во внутреннюю очередь offlinePubQueue

CONSUMER

amqpConn.createChannel создает канал в соединении. ch.assertQueue ожидает существование очереди. ch.consume устанавливает consumer с callback, который должен вызываться с каждым полученным им сообщением. Функция, вызываемая для каждого сообщения, называется processMsg

processMsg обрабатывает сообщение из очереди. Он вызовет функцию work и дождется ее завершения.

Функция work обрабатывает информацию из сообщения и созданет PDF. Это todo функция так как материал нацелен на ознакомление с RabbitMQ, а не генерацией PDF.

Закрываем подключение при возникновении ошибки

Публикация сообщений

Новое сообщение будет публиковаться каждую секунду. Будет использоваться обмен по умолчанию, идентифицирующий пустую строку («»). Обмен по умолчанию означает, что сообщения направляются в очередь с именем, указанным routing_key, если оно существует. (Обмен по умолчанию — это прямой обмен без имени)

Leave a Reply

Добавить комментарий

Этот сайт использует Akismet для борьбы со спамом. Узнайте как обрабатываются ваши данные комментариев.