Как работает RabbitMQ
RabbitMQ работает следующим образом:
- Приложение-продюсер отправляет сообщение конкретному обменнику
- Обменник маршрутизирует полученное сообщение в одну или несколько очередей, в зависимости от типа самого обменника, правилам его привязки к очереди
bindingи значению ключа маршрутизации в сообщенииroutingиkey - Очередь хранит ссылку на полученное сообщение, которое физически может хранится в оперативной памяти или на диске, в зависимости от свойств самой очереди
- Когда приложение-потребитель готово получить сообщение из очереди, брокер копирует его по ссылке из очереди и отправляет копию сообщения потребителю
- Получив сообщение, приложение-потребитель отправляет брокеру подтверждение об успешном получении данных
- После получения подтверждения брокер удаляет копию сообщения из очереди
- Наконец, брокер удаляет само исходное сообщение из очереди, очищая место в оперативной памяти или на жестком диске
В проектах межсистемного взаимодействия с RabbitMQ важно определить способ маршрутизации сообщений между интегрируемыми приложениями, что задается типом обменника. Чтобы кратко рассказать, чем они отличаются друг от друга и в каких случаях стоит выбирать тот или другой, я составила таблицу:
| Тип обменника | Принцип работы | Сценарий применения |
|---|---|---|
Direct
|
Прямая отправка сообщений в одну или несколько очередей с совпадающим значением ключа маршрутизации | Когда есть точно известный ключ, по значению которого отдельные приложения-потребители должны получить подходящие сообщения |
Fanout
|
Все сообщения отправляются во все очереди независимо от ключа маршрутизации | Когда все приложения-потребители должны быстро получать все сообщения |
Headers
|
Маршрутизация по нескольким атрибутам, заданным в заголовке сообщения. Ключ маршрутизации игнорируется | Когда правила маршрутизации сообщения в очереди сложнее, чем просто по ключу, например, формат данных, комбинация полей и пр. |
Topic
|
Сообщение отправляется в конкретные очереди по значению ключа маршрутизации, заданного по шаблону | Когда ключ маршрутизации сложный и поток сообщений надо разделить по разным приложениям-потребителям |
Можно создать собственный обменник, комбинируя несколько разных типов между собой, чтобы еще более повысить гибкость маршрутизации сообщений и задать сложный маршрут их распределения по приложениям-потребителям.
Обменник отправляет сообщение в очередь согласно правилам привязки. Очереди в RabbitMQ работают по принципу FIFO и бывают 2-х видов:
autoDeleteвременные создаются при подключении первого клиента и удаляются при отсутствии клиентских подключенийdurableпостоянные сохраняют свое состояние и восстанавливаются после перезапуска брокера. Это более надежный вид очередей
Каждая очередь в RabbitMQ должна иметь название name, а также ряд других обязательных и опциональных свойств, которые определяют поведение очереди. Например, эксклюзивная очередь используется только одним соединением и удаляется при его закрытии, подобно очереди с автоматическим удалением Auto-Delete. Эксклюзивная очередь предполагает, что сообщения из нее получает только одно приложение-потребитель. Также в свойствах очереди можно задать ее тип, классическая Classic, реплицируемая Quorum, потоковая Stream и максимальную длину, время жизни TTL, Time To Live, приоритет и правила репликации.
Чтобы понять, как все эти концепции RabbitMQ работают на самом деле, рассмотрим практический пример.
Практический пример
У нас есть 2 продюсера, которые создают данные и отправляют их в RabbitMQ. Пусть это будут приложения, которые снимают показания с датчиков температуры Producer 1 и давления Producer 2 на нескольких устройствах. Данные по температуре обрабатывает приложение-потребитель обработчик температуры Consumer1, а данные по давлению обработчик давления Consumer 2.
Приложение-продюсер по температуре отправляет в RabbitMQ данные в следующей JSON:
{
"measuring": "temperature",
"timestamp": "2023-01-21T02:05:55.000Z ",
"device": [
{
"device_id": 4123,
"value": 69.67,
"status": "OK"
},
{
"device_id": 587,
"value": 19.67,
"status": "ERROR"
},
{
"device_id": 1524,
"value": 123,
"status": "ERROR"
},
{
"device_id": 97,
"value": 70,
"status": "OK"
}
]
}
Аналогичная структура данных приходит от приложения-продюсера по давлению:
{
"measuring": "pressure",
"timestamp": "2023-01-21T02:05:55.000Z ",
"device": [
{
"device_id": 4123,
"value": 9.67,
"status": "OK"
},
{
"device_id": 587,
"value": 1.03,
"status": "ERROR"
},
{
"device_id": 1524,
"value": 6.34,
"status": "OK"
},
{
" device_id": 97,
"value": 0.21,
"status": "ERROR"
}
]
}
Чтобы направлять сообщения разным приложениям-потребителям, проще всего выбрать ключ маршрутизации pressure или temperature и маркировать этим ключом каждое отправляемое от соответствующего продюсера сообщение. Такую простую маршрутизацию по точному значению ключа обеспечит прямой тип обменника Direct.
Сперва выберем подходящий тип обменника на вкладке Exchanges. В случае рассматриваемого примера это прямой обменник, amq.direct. Можно создать собственный обменник, используя существующий типы или комбинировав их между собой. Изначально платформа предоставляет 7 обменников:
Чтобы привязать обменник к очереди, надо сначало создать очередь. Для этого перейдем на вкладку очередей Queues. Создадим 2 очереди:
pressureдля сообщений с данными по давлениюtemperatureдля данных по температуре
Свойства каждой очереди опишем в таблице:
| Очередь | Смысл | Тип | Приложение-Потребитель | Эксклюзивная | Постоянная | Авто-удаление | Хранение данных | Длина (число сообщений) | Приоритет |
|---|---|---|---|---|---|---|---|---|---|
| Pressure | Хранит данные по давлению | classic | Обработчик давления | да | Да | Нет | В памяти | 100 | 5 |
| Temperature | Хранит данные по температуре | classic | Обработчик температуры | Да | да | нет | В памяти | 100 | 3 |
Весь перечень очередей показан на вкладке Queues. Сейчас все они находятся в холостом состоянии idle, т.к. на текущий момент приложения-продюсеры не публикуют в них сообщения. Также каждая очередь развернута в кластере c с высокой доступностью, что отмечено маркером HA High Availability. Из-за отсутствия приложений-потребителей нет никаких сведений о подтверждении получения сообщений ack.
Далее вернемся на вкладку Exchanges и привяжем обменник к очереди, задав ключ маршрутизации. В случае рассматриваемого примера выбираем обменник amq.direct и привязываем к очереди pressure, написав в поле Routing key значение pressure и нажав кнопку Bind. Аналогично проделаем для temperature:
Затем опубликуем несколько сообщений в этот обменник, задав каждому ключ маршрутизации. Дополнительно сообщению можно задать свойства, например, приоритет, время жизни и прочее:
Посмотреть опубликованные сообщения можно на вкладке Queues:
В реальных проектах межсистемной интеграции и микросервисного взаимодействия с применением RabbitMQ можно определить следующие свойства сообщения:
content_typeтип контента, напримерapplication/jsoncontent_encodingкодировка содержимого, напримерgzippriorityприоритетexpirationвремя жизниmessage_idидентификатор сообщенияtimestampотметка времениtypeтип произвольная строка, которая помогает приложениям-продюсерам сообщать, что это за сообщение. RabbitMQ не проверяет и не использует это поле, оно нужно для использования и интерпретации приложениями и плагинами.user_idидентификатор пользователяapp_idидентификатор приложенияcluster_idидентификатор кластера
После публикации сообщений можно посмотреть, как меняется поведение очереди:
В реальных проектах RabbitMQ доставляет очереди приложениям-потребителям программным образом, эта тема будет полностью разобрана дальше.













