Полный цикл в digital

Как работает RabbitMQ

RabbitMQ работает следующим образом:

  1. Приложение-продюсер отправляет сообщение конкретному обменнику
  2. Обменник маршрутизирует полученное сообщение в одну или несколько очередей, в зависимости от типа самого обменника, правилам его привязки к очереди binding и значению ключа маршрутизации в сообщении routing и key
  3. Очередь хранит ссылку на полученное сообщение, которое физически может хранится в оперативной памяти или на диске, в зависимости от свойств самой очереди
  4. Когда приложение-потребитель готово получить сообщение из очереди, брокер копирует его по ссылке из очереди и отправляет копию сообщения потребителю
  5. Получив сообщение, приложение-потребитель отправляет брокеру подтверждение об успешном получении данных
  6. После получения подтверждения брокер удаляет копию сообщения из очереди
  7. Наконец, брокер удаляет само исходное сообщение из очереди, очищая место в оперативной памяти или на жестком диске

В проектах межсистемного взаимодействия с RabbitMQ важно определить способ маршрутизации сообщений между интегрируемыми приложениями, что задается типом обменника. Чтобы кратко рассказать, чем они отличаются друг от друга и в каких случаях стоит выбирать тот или другой, я составила таблицу:

Тип обменника Принцип работы Сценарий применения
Direct Прямая отправка сообщений в одну или несколько очередей с совпадающим значением ключа маршрутизации Когда есть точно известный ключ, по значению которого отдельные приложения-потребители должны получить подходящие сообщения
Fanout Все сообщения отправляются во все очереди независимо от ключа маршрутизации Когда все приложения-потребители должны быстро получать все сообщения
Headers Маршрутизация по нескольким атрибутам, заданным в заголовке сообщения. Ключ маршрутизации игнорируется Когда правила маршрутизации сообщения в очереди сложнее, чем просто по ключу, например, формат данных, комбинация полей и пр.
Topic Сообщение отправляется в конкретные очереди по значению ключа маршрутизации, заданного по шаблону Когда ключ маршрутизации сложный и поток сообщений надо разделить по разным приложениям-потребителям

Можно создать собственный обменник, комбинируя несколько разных типов между собой, чтобы еще более повысить гибкость маршрутизации сообщений и задать сложный маршрут их распределения по приложениям-потребителям.

Обменник отправляет сообщение в очередь согласно правилам привязки. Очереди в RabbitMQ работают по принципу FIFO и бывают 2-х видов:

  • autoDelete временные создаются при подключении первого клиента и удаляются при отсутствии клиентских подключений
  • durable постоянные сохраняют свое состояние и восстанавливаются после перезапуска брокера. Это более надежный вид очередей

Каждая очередь в RabbitMQ должна иметь название name, а также ряд других обязательных и опциональных свойств, которые определяют поведение очереди. Например, эксклюзивная очередь используется только одним соединением и удаляется при его закрытии, подобно очереди с автоматическим удалением Auto-Delete. Эксклюзивная очередь предполагает, что сообщения из нее получает только одно приложение-потребитель. Также в свойствах очереди можно задать ее тип, классическая Classic, реплицируемая Quorum, потоковая Stream и максимальную длину, время жизни TTL, Time To Live, приоритет и правила репликации.

Чтобы понять, как все эти концепции RabbitMQ работают на самом деле, рассмотрим практический пример.

Практический пример

У нас есть 2 продюсера, которые создают данные и отправляют их в RabbitMQ. Пусть это будут приложения, которые снимают показания с датчиков температуры Producer 1 и давления Producer 2 на нескольких устройствах. Данные по температуре обрабатывает приложение-потребитель обработчик температуры Consumer1, а данные по давлению обработчик давления Consumer 2.

Приложение-продюсер по температуре отправляет в RabbitMQ данные в следующей JSON:

{
   "measuring": "temperature",
   "timestamp": "2023-01-21T02:05:55.000Z ",
   "device": [
 {
     "device_id": 4123,
     "value": 69.67,
     "status": "OK"
 },
 {
     "device_id": 587,
     "value": 19.67,
     "status": "ERROR"
 },
 {
     "device_id": 1524,
     "value": 123,
     "status": "ERROR"
 },
 {
     "device_id": 97,
     "value": 70,
     "status": "OK"
 }
 ]
 }

Аналогичная структура данных приходит от приложения-продюсера по давлению:

{
   "measuring": "pressure",
   "timestamp": "2023-01-21T02:05:55.000Z ",
   "device": [
 {
     "device_id": 4123,
     "value": 9.67,
     "status": "OK"
 },
 {
     "device_id": 587,
     "value": 1.03,
     "status": "ERROR"
 },
 {
     "device_id": 1524,
     "value": 6.34,
     "status": "OK"
 },
 {
     " device_id": 97,
     "value": 0.21,
     "status": "ERROR"
 }
 ]
 }

Чтобы направлять сообщения разным приложениям-потребителям, проще всего выбрать ключ маршрутизации pressure или temperature и маркировать этим ключом каждое отправляемое от соответствующего продюсера сообщение. Такую простую маршрутизацию по точному значению ключа обеспечит прямой тип обменника Direct.

Сперва выберем подходящий тип обменника на вкладке Exchanges. В случае рассматриваемого примера это прямой обменник, amq.direct. Можно создать собственный обменник, используя существующий типы или комбинировав их между собой. Изначально платформа предоставляет 7 обменников:

Чтобы привязать обменник к очереди, надо сначало создать очередь. Для этого перейдем на вкладку очередей Queues. Создадим 2 очереди:

  • pressure для сообщений с данными по давлению
  • temperature для данных по температуре

Свойства каждой очереди опишем в таблице:

Очередь Смысл Тип Приложение-Потребитель Эксклюзивная Постоянная Авто-удаление Хранение данных Длина (число сообщений) Приоритет
Pressure Хранит данные по давлению classic Обработчик давления да Да Нет В памяти 100 5
Temperature Хранит данные по температуре classic Обработчик температуры Да да нет В памяти 100 3

Весь перечень очередей показан на вкладке Queues. Сейчас все они находятся в холостом состоянии idle, т.к. на текущий момент приложения-продюсеры не публикуют в них сообщения. Также каждая очередь развернута в кластере c с высокой доступностью, что отмечено маркером HA High Availability. Из-за отсутствия приложений-потребителей нет никаких сведений о подтверждении получения сообщений ack.

Далее вернемся на вкладку Exchanges и привяжем обменник к очереди, задав ключ маршрутизации. В случае рассматриваемого примера выбираем обменник amq.direct и привязываем к очереди pressure, написав в поле Routing key значение pressure и нажав кнопку Bind. Аналогично проделаем для temperature:

Затем опубликуем несколько сообщений в этот обменник, задав каждому ключ маршрутизации. Дополнительно сообщению можно задать свойства, например, приоритет, время жизни и прочее:

Посмотреть опубликованные сообщения можно на вкладке Queues:

В реальных проектах межсистемной интеграции и микросервисного взаимодействия с применением RabbitMQ можно определить следующие свойства сообщения:

  • content_type тип контента, например application/json
  • content_encoding кодировка содержимого, например gzip
  • priority приоритет
  • expiration время жизни
  • message_id идентификатор сообщения
  • timestamp отметка времени
  • type тип произвольная строка, которая помогает приложениям-продюсерам сообщать, что это за сообщение. RabbitMQ не проверяет и не использует это поле, оно нужно для использования и интерпретации приложениями и плагинами.
  • user_id идентификатор пользователя
  • app_id идентификатор приложения
  • cluster_id идентификатор кластера

После публикации сообщений можно посмотреть, как меняется поведение очереди:

В реальных проектах RabbitMQ доставляет очереди приложениям-потребителям программным образом, эта тема будет полностью разобрана дальше.

Заполните форму уже сегодня!
Для начала сотрудничества необходимо заполнить заявку или заказать обратный звонок. В ответ получите коммерческое предложение, которое будет содержать индивидуальную стратегию с учетом требований и поставленных задач
Работаем по будням с 9:00 до 18:00. Заявки, отправленные в выходные, обрабатываем в первый рабочий день до 12:00.
Спасибо, ваш запрос принят и будет обработан!
Эйч Маркетинг