Боевая очередь
Я использую Docker, поэтому привожу ссылку на готовый образ в GitFlic, образ содержит все необходимое для удобной работы.
Мы сможем настроить настоящую продакшн-топологию очередей, использующую всю мощь и возможности RabbitMQ
. На самом деле от топологии зависит многое:
- Надежность
- Своевременность получения сообщений
- Гарантии доставки
- Эффективность
RabbitMQ
Кроме того, топология является иммутабельной и не позволяет изменять свойства очередей после создания, поэтому, создав топологию один раз, вам придется с ней жить постоянно либо тяжело и долго (зависит от объема сообщений и сложности топологии) мигрировать на другую.
На эффективности работы с RabbitMQ
сказывается то, как вы с ним работаете. Например, кролик не любит когда вы часто и много к нему подключаетесь, поэтому все общение с кроликом реализовано посредством каналов. Однако и их частое создание определенно не идет кролику на пользу, поэтому если у вас нет возможности использовать постоянное подключение к кролику, то есть если вы используете PHP, тогда ставьте перед кроликом amqpproxy,
который за вас будет держать пул соединений к кролику.
Требования проекта
Сформулируем некоторые функциональные требования к реализации бойвой очереди, с помощью RabbitMQ
без привлечения сторонних инструментов вроде базы данных или кафки:
- Если консьюмер не может обработать сообщение в данный момент, он должен отложить его на определенное время, приступив к следующему
- Если после
N
ретраев консьюмер все же не смог обработать сообщение, оно должно переместиться в специальную очередь - Поскольку событий будет очень много, консьюмеры должны быть достаточно производительными, чтобы справляться с нагрузками, однако все же необходимо зафиксировать размер очереди, чтобы не упираться в диск
- Мы должны уметь обрабатывать батчи сообщений
В нашей системе будет существовать три сервиса:
сервис аналитики
в нем допустимо потерять часть сообщений, применяем ограничение очередисервис подписок
должен обработать все события без исключения, события которые не удалось обработать перемещаються в специальную очередь для ручного разборасервис нотификаций
обрабатывает сообщения батчами по 100 штук
Каждый сервис должен получать свою копию сообщений и реализовывать свои стратегии повторов и хранения сообщений индивидуально в случае ошибок.
Сервис аналитики
Наш обменник будет иметь тип direct
, это значит что сообщения будут попадать только в те очереди, которые имеют биндинг к этому обменнику по ключу роутинга, совпадающему с ключом роутинга в сообщении. На этом работа паблишера заканчивается. В распределенной системе, где в событиях могут быть заинтересованы разные сервисы, паблишеру и подписчикам не надо знать друг о друге и тем более паблишеру не надо знать о том, какие очереди существуют. Его дело, публиковать события в обменник,
где ключом роутинга будет название события. Зная названия событий, подписчики могут создать очереди и начинать получать сообщения из них.
Создаем файл, который создаст наш обменник:
html/site.loc/public_html/obmennik.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$bunny = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// создаем обменник analitika_obmennik, с типом direct
$channel->exchangeDeclare('analitika_obmennik', exchangeType: 'direct', durable: true);
Типы обменников
Тип обменника | Принцип работы | Сценарий применения |
---|---|---|
Direct
|
Прямая отправка сообщений в одну или несколько очередей с совпадающим значением ключа маршрутизации | Когда есть точно известный ключ, по значению которого отдельные приложения-потребители должны получить подходящие сообщения |
Fanout
|
Все сообщения отправляются во все очереди независимо от ключа маршрутизации | Когда все приложения-потребители должны быстро получать все сообщения |
Headers
|
Маршрутизация по нескольким атрибутам, заданным в заголовке сообщения. Ключ маршрутизации игнорируется | Когда правила маршрутизации сообщения в очереди сложнее, чем просто по ключу, например, формат данных, комбинация полей и пр. |
Topic
|
Сообщение отправляется в конкретные очереди по значению ключа маршрутизации, заданного по шаблону | Когда ключ маршрутизации сложный и поток сообщений надо разделить по разным приложениям-потребителям |
Сервис аналитики будет очень нагруженным, нам допустимо потерять часть сообщений, поэтому мы можем ограничить размер очереди. Мы создаем очередь analitika_ochered
и с помощью специальных аргументов x-max-length
и x-overflow
говорим RabbitMQ
, что наша очередь ограничена миллионом сообщений, а все остальные сообщения RabbitMQ
должен откидывать. И после чего привязали нашу очередь к обменнику analitika_obmennik
по
роутинг ключу analitikaZapros
:
html/site.loc/analitika.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$bunny = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// создаем очередь analitika_ochered
$channel->queueDeclare('analitika_ochered', durable: true, arguments: [
// ограничение очереди
'x-max-length' => 100_000_000,
// все остальные сообщения откидываем
'x-overflow' => 'reject-publish',
]);
// привязываем нашу очередь analitika_ochered к обменнику analitika_obmennik по роутинг ключу analitikaZapros
$channel->queueBind('analitika_ochered', 'analitika_obmennik', 'analitikaZapros');
// опубликовываем сообщение в очередь
$channel->publish('{"paymentId": 1}', exchange: 'analitika_obmennik', routingKey: 'analitikaZapros');
Идем смотреть на результат в админку. На вкладке Queues
у очереди events.analytics-service
можно заметить два новых лейбла, которые как раз говорят об использовании аргументов x-max-length
и x-overflow
:
Сервис подписок
К сервису предъявляются серьезные требования, мы должны обработать все события без исключения, события которые не удалось обработать даже спустя определенное количество ретраев, должны перемещаться в специальную очередь для полуавтоматического разбора. Опишем топологию, которая будет соответствовать этим требованиям:
html/site.loc/podpiska.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$bunny = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// создаем наш основной обменник osnovnoj_obmennik
$channel->exchangeDeclare('osnovnoj_obmennik', durable: true);
// создаем основную очередь osnovnaya_ochered для новых сообщений
$channel->queueDeclare('osnovnaya_ochered', durable: true);
// привяжем к обменнику osnovnoj_obmennik очередь osnovnoj_ochered для всех новых сообщений, с роутинг ключом pervyjZapros
$channel->queueBind('osnovnaya_ochered', 'osnovnoj_obmennik', 'pervyjZapros');
// привяжем к обменнику osnovnoj_obmennik очередь osnovnoj_ochered для всех возвращенных сообщений из очереди dopolnitelnaya_ochered, с роутинг ключом vtorojZapros
$channel->queueBind('osnovnaya_ochered', 'osnovnoj_obmennik', 'vtorojZapros');
// создаем наш дополнительный обменник dopolnitelnyj_obmennik
$channel->exchangeDeclare('dopolnitelnyj_obmennik', durable: true);
// создаем дополнительную очередь dopolnitelnaya_ochered для не обработанных сообщений
$channel->queueDeclare('dopolnitelnaya_ochered', durable: true, arguments: [
// привязываем обменник
'x-dead-letter-exchange' => 'osnovnoj_obmennik',
// роутинг ключ failed для очереди
'x-dead-letter-routing-key' => 'vtorojZapros',
// 6 секунд до отправки в основной обменник osnovnoj_obmennik
'x-message-ttl' => 6000,
]);
// привяжем к обменнику dopolnitelnyj_obmennik очередь dopolnitelnaya_ochered для всех новых сообщений, с роутинг ключом vtorojZapros
$channel->queueBind('dopolnitelnaya_ochered', 'dopolnitelnyj_obmennik', 'vtorojZapros');
// создаем наш проблемный обменник warinning_obmennik
$channel->exchangeDeclare('warinning_obmennik', durable: true);
// создаем проблемную очередь warinning_ochered для всех сообщений которые пройдут через получателя два раза и на них в блоке try...catch будет ощибка
$channel->queueDeclare('warinning_ochered', durable: true);
// привяжем к обменнику warinning_obmennik очередь warinning_ochered для всех сообщений которые пройдут через получателя два раза и на них в блоке try...catch будет ощибка, с роутинг ключом warinningZapros
$channel->queueBind('warinning_ochered', 'warinning_obmennik', 'warinningZapros');
// опубликовываем сообщение в очередь, через обменник osnovnoj_obmennik, с роутинг ключом pervyjZapros, указываем заголовок
$channel->publish('{"paymentId": 1}', exchange: 'osnovnoj_obmennik', routingKey: 'pervyjZapros', headers: ["Приложение" => "hmarketing.ru"]);
Рассмотрим, как может выглядеть консьюмер для такой очереди:
html/site.loc/consumer-podpiska.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
// создаем объект класса для подключения
$bunny = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// отдаем только одно сообщение
$channel->qos(prefetchCount: 1);
// подписываемся на очередь
$channel->consume(function (Message $message, Channel $channel, Client $bunny) {
// если никаких проблем при обработке сообщения не произошло, ack'аем (читаем) его и переходим к следующему
try {
// подтверждаем сообщение вызвав команду ack
$channel->ack($message);
// выводим сообщение на экран
var_dump($message->content);
} catch (\Throwable) {
// если произошла ошибка, нам необходимо посмотреть сколько заголовков было передано, если больше одного заголовка, значит сообщение уже прошло через dopolnitelnaya_ochered и на этом этапе добавились дополнительные заголовки, сообщение которое имеет больше одного заголовка, помещаем в warinning_obmennik
$retryCount = count($message->headers);
// проверка на количество заголовков
if ($retryCount > 1) {
// подтверждаем сообщение вызвав команду ack
$channel->ack($message);
// опубликовываем сообщение в обменник warinning_obmennik, где сообщение будет ждать ручного разбора
$channel->publish($message->content, $message->headers, exchange: 'warinning_obmennik', routingKey: 'warinningZapros');
} else {
// подтверждаем сообщение вызвав команду ack
$channel->ack($message);
// опубликовываем сообщение в обменник dopolnitelnyj_obmennik, откуда сообщение попадет в osnovnoj_obmennik и пойдет на второй круг
$channel->publish($message->content, $message->headers, exchange: 'dopolnitelnyj_obmennik', routingKey: 'vtorojZapros');
}
}
}, 'osnovnaya_ochered');
// свойством для run, можно передать сколько клиент будет работать, например в течение 10 секунд, а затем останавливается
$bunny->run(1);
Сервис нотификаций
Последний сервис из нашего списка, к которому предъявляется такое требование, как возможность получения сообщений батчами, сервис отправки уведомлений не любит частые обращения к своему API и просит отправлять уведомлениями батчами по 100
штук:
html/site.loc/notifikacii.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$bunny = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// создаем основной обменник notifikacii_obmennik, с типом direct
$channel->exchangeDeclare('notifikacii_obmennik', exchangeType: 'direct', durable: true);
// создадим основную очередь notifikacii_ochered, куда будем отправлять сообщения
$channel->queueDeclare('notifikacii_ochered', durable: true);
// привяжем к обменнику notifikacii_obmennik очередь notifikacii_ochered с роутинг ключом notifikaciiZapros
$channel->queueBind('notifikacii_ochered', 'notifikacii_obmennik', 'notifikaciiZapros');
// в цикле генерируем 150 сообщений
for ($i = 0; $i < 150; $i++) {
// опубликовываем сообщение в очередь
$channel->publish('{"paymentId": 1}', exchange: 'notifikacii_obmennik', routingKey: 'notifikaciiZapros');
}
В RabbitMQ
нет нативного способа получить 100
батч сообщений за раз. Для этого вам необходимо собирать сообщения на стороне приложения, после этого акнуть их все, когда достаточного размера батч соберется и вы его обработаете:
html/site.loc/consumer-notifikacii.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
// создаем объект класса для подключения
$bunny = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// отдаем только 100 сообщение
$channel->qos(prefetchCount: 100);
// пустой массив
$messages = [];
// подписываемся на очередь
$channel->consume(function (Message $message, Channel $channel, Client $bunny) use (&$messages) {
// записываем в массив 100 сообщений
$messages[] = $message;
// проверяем что в массиве 100 сообщений
if (count($messages) >= 100) {
try {
// подтверждаем сразу 100 сообщение вызвав команду ack
$channel->ack($message, multiple: true);
// выводим сообщение на экран
var_dump($message->content);
} catch (\Throwable) {
// если произошла ощибка, отвергаем все 100 сообщений и отправляем их обратно в очередь
$channel->nack($message, multiple: true, requeue: true);
} finally {
// очищаем массив
$messages = [];
}
}
}, 'notifikacii_ochered');
// свойством для run, можно передать сколько клиент будет работать, например в течение 10 секунд, а затем останавливается
$bunny->run(10);
Если вдруг ваш консьюмер упадет и не сможет отправить ack
, сообщения просто вернутся обратно в очередь, что нас вполне устраивает.