Как работает RabbitMQ
RabbitMQ
работает следующим образом:
- Приложение-продюсер отправляет сообщение конкретному обменнику
- Обменник маршрутизирует полученное сообщение в одну или несколько очередей, в зависимости от типа самого обменника, правилам его привязки к очереди
binding
и значению ключа маршрутизации в сообщенииrouting
иkey
- Очередь хранит ссылку на полученное сообщение, которое физически может хранится в оперативной памяти или на диске, в зависимости от свойств самой очереди
- Когда приложение-потребитель готово получить сообщение из очереди, брокер копирует его по ссылке из очереди и отправляет копию сообщения потребителю
- Получив сообщение, приложение-потребитель отправляет брокеру подтверждение об успешном получении данных
- После получения подтверждения брокер удаляет копию сообщения из очереди
- Наконец, брокер удаляет само исходное сообщение из очереди, очищая место в оперативной памяти или на жестком диске
В проектах межсистемного взаимодействия с RabbitMQ
важно определить способ маршрутизации сообщений между интегрируемыми приложениями, что задается типом обменника. Чтобы кратко рассказать, чем они отличаются друг от друга и в каких случаях стоит выбирать тот или другой, я составила таблицу:
Тип обменника | Принцип работы | Сценарий применения |
---|---|---|
Direct
|
Прямая отправка сообщений в одну или несколько очередей с совпадающим значением ключа маршрутизации | Когда есть точно известный ключ, по значению которого отдельные приложения-потребители должны получить подходящие сообщения |
Fanout
|
Все сообщения отправляются во все очереди независимо от ключа маршрутизации | Когда все приложения-потребители должны быстро получать все сообщения |
Headers
|
Маршрутизация по нескольким атрибутам, заданным в заголовке сообщения. Ключ маршрутизации игнорируется | Когда правила маршрутизации сообщения в очереди сложнее, чем просто по ключу, например, формат данных, комбинация полей и пр. |
Topic
|
Сообщение отправляется в конкретные очереди по значению ключа маршрутизации, заданного по шаблону | Когда ключ маршрутизации сложный и поток сообщений надо разделить по разным приложениям-потребителям |
Можно создать собственный обменник, комбинируя несколько разных типов между собой, чтобы еще более повысить гибкость маршрутизации сообщений и задать сложный маршрут их распределения по приложениям-потребителям.
Обменник отправляет сообщение в очередь согласно правилам привязки. Очереди в RabbitMQ
работают по принципу FIFO
и бывают 2-х видов:
autoDelete
временные создаются при подключении первого клиента и удаляются при отсутствии клиентских подключенийdurable
постоянные сохраняют свое состояние и восстанавливаются после перезапуска брокера. Это более надежный вид очередей
Каждая очередь в RabbitMQ
должна иметь название name
, а также ряд других обязательных и опциональных свойств, которые определяют поведение очереди. Например, эксклюзивная очередь используется только одним соединением и удаляется при его закрытии, подобно очереди с автоматическим удалением Auto-Delete
. Эксклюзивная очередь предполагает, что сообщения из нее получает только одно приложение-потребитель. Также в свойствах очереди можно задать ее тип, классическая Classic
, реплицируемая Quorum
, потоковая Stream
и максимальную длину, время жизни TTL
, Time
To Live
, приоритет и правила репликации.
Чтобы понять, как все эти концепции RabbitMQ
работают на самом деле, рассмотрим практический пример.
Практический пример
У нас есть 2 продюсера, которые создают данные и отправляют их в RabbitMQ
. Пусть это будут приложения, которые снимают показания с датчиков температуры Producer 1
и давления Producer 2
на нескольких устройствах. Данные по температуре обрабатывает приложение-потребитель обработчик температуры Consumer1
, а данные по давлению обработчик давления Consumer 2
.
Приложение-продюсер по температуре отправляет в RabbitMQ
данные в следующей JSON
:
{
"measuring": "temperature",
"timestamp": "2023-01-21T02:05:55.000Z ",
"device": [
{
"device_id": 4123,
"value": 69.67,
"status": "OK"
},
{
"device_id": 587,
"value": 19.67,
"status": "ERROR"
},
{
"device_id": 1524,
"value": 123,
"status": "ERROR"
},
{
"device_id": 97,
"value": 70,
"status": "OK"
}
]
}
Аналогичная структура данных приходит от приложения-продюсера по давлению:
{
"measuring": "pressure",
"timestamp": "2023-01-21T02:05:55.000Z ",
"device": [
{
"device_id": 4123,
"value": 9.67,
"status": "OK"
},
{
"device_id": 587,
"value": 1.03,
"status": "ERROR"
},
{
"device_id": 1524,
"value": 6.34,
"status": "OK"
},
{
" device_id": 97,
"value": 0.21,
"status": "ERROR"
}
]
}
Чтобы направлять сообщения разным приложениям-потребителям, проще всего выбрать ключ маршрутизации pressure
или temperature
и маркировать этим ключом каждое отправляемое от соответствующего продюсера сообщение. Такую простую маршрутизацию по точному значению ключа обеспечит прямой тип обменника Direct
.
Сперва выберем подходящий тип обменника на вкладке Exchanges
. В случае рассматриваемого примера это прямой обменник, amq.direct
. Можно создать собственный обменник, используя существующий типы или комбинировав их между собой. Изначально платформа предоставляет 7 обменников:
Чтобы привязать обменник к очереди, надо сначало создать очередь. Для этого перейдем на вкладку очередей Queues
. Создадим 2 очереди:
pressure
для сообщений с данными по давлениюtemperature
для данных по температуре
Свойства каждой очереди опишем в таблице:
Очередь | Смысл | Тип | Приложение-Потребитель | Эксклюзивная | Постоянная | Авто-удаление | Хранение данных | Длина (число сообщений) | Приоритет |
---|---|---|---|---|---|---|---|---|---|
Pressure | Хранит данные по давлению | classic | Обработчик давления | да | Да | Нет | В памяти | 100 | 5 |
Temperature | Хранит данные по температуре | classic | Обработчик температуры | Да | да | нет | В памяти | 100 | 3 |
Весь перечень очередей показан на вкладке Queues
. Сейчас все они находятся в холостом состоянии idle
, т.к. на текущий момент приложения-продюсеры не публикуют в них сообщения. Также каждая очередь развернута в кластере c с высокой доступностью, что отмечено маркером HA
High Availability
. Из-за отсутствия приложений-потребителей нет никаких сведений о подтверждении получения сообщений ack
.
Далее вернемся на вкладку Exchanges
и привяжем обменник к очереди, задав ключ маршрутизации. В случае рассматриваемого примера выбираем обменник amq.direct
и привязываем к очереди pressure
, написав в поле Routing key
значение pressure
и нажав кнопку Bind
. Аналогично проделаем для temperature
:
Затем опубликуем несколько сообщений в этот обменник, задав каждому ключ маршрутизации. Дополнительно сообщению можно задать свойства, например, приоритет, время жизни и прочее:
Посмотреть опубликованные сообщения можно на вкладке Queues
:
В реальных проектах межсистемной интеграции и микросервисного взаимодействия с применением RabbitMQ
можно определить следующие свойства сообщения:
content_type
тип контента, напримерapplication/json
content_encoding
кодировка содержимого, напримерgzip
priority
приоритетexpiration
время жизниmessage_id
идентификатор сообщенияtimestamp
отметка времениtype
тип произвольная строка, которая помогает приложениям-продюсерам сообщать, что это за сообщение. RabbitMQ не проверяет и не использует это поле, оно нужно для использования и интерпретации приложениями и плагинами.user_id
идентификатор пользователяapp_id
идентификатор приложенияcluster_id
идентификатор кластера
После публикации сообщений можно посмотреть, как меняется поведение очереди:
В реальных проектах RabbitMQ
доставляет очереди приложениям-потребителям программным образом, эта тема будет полностью разобрана дальше.