Полный цикл в digital

Боевая очередь

Я использую Docker, поэтому привожу ссылку на готовый образ в GitFlic, образ содержит все необходимое для удобной работы.

Мы сможем настроить настоящую продакшн-топологию очередей, использующую всю мощь и возможности RabbitMQ. На самом деле от топологии зависит многое:

  • Надежность
  • Своевременность получения сообщений
  • Гарантии доставки
  • Эффективность RabbitMQ

Кроме того, топология является иммутабельной и не позволяет изменять свойства очередей после создания, поэтому, создав топологию один раз, вам придется с ней жить постоянно либо тяжело и долго (зависит от объема сообщений и сложности топологии) мигрировать на другую.

На эффективности работы с RabbitMQ сказывается то, как вы с ним работаете. Например, кролик не любит когда вы часто и много к нему подключаетесь, поэтому все общение с кроликом реализовано посредством каналов. Однако и их частое создание определенно не идет кролику на пользу, поэтому если у вас нет возможности использовать постоянное подключение к кролику, то есть если вы используете PHP, тогда ставьте перед кроликом amqpproxy, который за вас будет держать пул соединений к кролику.

Требования проекта

Сформулируем некоторые функциональные требования к реализации бойвой очереди, с помощью RabbitMQ без привлечения сторонних инструментов вроде базы данных или кафки:

  1. Если консьюмер не может обработать сообщение в данный момент, он должен отложить его на определенное время, приступив к следующему
  2. Если после N ретраев консьюмер все же не смог обработать сообщение, оно должно переместиться в специальную очередь
  3. Поскольку событий будет очень много, консьюмеры должны быть достаточно производительными, чтобы справляться с нагрузками, однако все же необходимо зафиксировать размер очереди, чтобы не упираться в диск
  4. Мы должны уметь обрабатывать батчи сообщений

В нашей системе будет существовать три сервиса:

  • сервис аналитики в нем допустимо потерять часть сообщений, применяем ограничение очереди
  • сервис подписок должен обработать все события без исключения, события которые не удалось обработать перемещаються в специальную очередь для ручного разбора
  • сервис нотификаций обрабатывает сообщения батчами по 100 штук

Каждый сервис должен получать свою копию сообщений и реализовывать свои стратегии повторов и хранения сообщений индивидуально в случае ошибок.

Сервис аналитики

Наш обменник будет иметь тип direct, это значит что сообщения будут попадать только в те очереди, которые имеют биндинг к этому обменнику по ключу роутинга, совпадающему с ключом роутинга в сообщении. На этом работа паблишера заканчивается. В распределенной системе, где в событиях могут быть заинтересованы разные сервисы, паблишеру и подписчикам не надо знать друг о друге и тем более паблишеру не надо знать о том, какие очереди существуют. Его дело, публиковать события в обменник, где ключом роутинга будет название события. Зная названия событий, подписчики могут создать очереди и начинать получать сообщения из них.

Создаем файл, который создаст наш обменник:

html/site.loc/public_html/obmennik.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$bunny = new Client([
   // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
   'host' => 'rabbitmq',
   // хост корень проекта
   'vhost' => '/',
   // имя пользователя
   'user' => 'guest',
   // пароль
   'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// создаем обменник analitika_obmennik, с типом direct
$channel->exchangeDeclare('analitika_obmennik', exchangeType: 'direct', durable: true);

Типы обменников

Тип обменника Принцип работы Сценарий применения
Direct Прямая отправка сообщений в одну или несколько очередей с совпадающим значением ключа маршрутизации Когда есть точно известный ключ, по значению которого отдельные приложения-потребители должны получить подходящие сообщения
Fanout Все сообщения отправляются во все очереди независимо от ключа маршрутизации Когда все приложения-потребители должны быстро получать все сообщения
Headers Маршрутизация по нескольким атрибутам, заданным в заголовке сообщения. Ключ маршрутизации игнорируется Когда правила маршрутизации сообщения в очереди сложнее, чем просто по ключу, например, формат данных, комбинация полей и пр.
Topic Сообщение отправляется в конкретные очереди по значению ключа маршрутизации, заданного по шаблону Когда ключ маршрутизации сложный и поток сообщений надо разделить по разным приложениям-потребителям

Сервис аналитики будет очень нагруженным, нам допустимо потерять часть сообщений, поэтому мы можем ограничить размер очереди. Мы создаем очередь analitika_ochered и с помощью специальных аргументов x-max-length и x-overflow говорим RabbitMQ, что наша очередь ограничена миллионом сообщений, а все остальные сообщения RabbitMQ должен откидывать. И после чего привязали нашу очередь к обменнику analitika_obmennik по роутинг ключу analitikaZapros:

html/site.loc/analitika.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$bunny = new Client([
   // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
   'host' => 'rabbitmq',
   // хост корень проекта
   'vhost' => '/',
   // имя пользователя
   'user' => 'guest',
   // пароль
   'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// создаем очередь analitika_ochered
$channel->queueDeclare('analitika_ochered', durable: true, arguments: [
   // ограничение очереди 
   'x-max-length' => 100_000_000,
   // все остальные сообщения откидываем
   'x-overflow' => 'reject-publish',
]);
// привязываем нашу очередь analitika_ochered к обменнику analitika_obmennik по роутинг ключу analitikaZapros
$channel->queueBind('analitika_ochered', 'analitika_obmennik', 'analitikaZapros');
// опубликовываем сообщение в очередь
$channel->publish('{"paymentId": 1}', exchange: 'analitika_obmennik', routingKey: 'analitikaZapros');

Идем смотреть на результат в админку. На вкладке Queues у очереди events.analytics-service можно заметить два новых лейбла, которые как раз говорят об использовании аргументов x-max-length и x-overflow:

Сервис подписок

К сервису предъявляются серьезные требования, мы должны обработать все события без исключения, события которые не удалось обработать даже спустя определенное количество ретраев, должны перемещаться в специальную очередь для полуавтоматического разбора. Опишем топологию, которая будет соответствовать этим требованиям:

html/site.loc/podpiska.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$bunny = new Client([
   // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
   'host' => 'rabbitmq',
   // хост корень проекта
   'vhost' => '/',
   // имя пользователя
   'user' => 'guest',
   // пароль
   'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();

// создаем наш основной обменник osnovnoj_obmennik
$channel->exchangeDeclare('osnovnoj_obmennik', durable: true);
// создаем основную очередь osnovnaya_ochered для новых сообщений
$channel->queueDeclare('osnovnaya_ochered', durable: true);
// привяжем к обменнику osnovnoj_obmennik очередь osnovnoj_ochered для всех новых сообщений, с роутинг ключом pervyjZapros
$channel->queueBind('osnovnaya_ochered', 'osnovnoj_obmennik', 'pervyjZapros');
// привяжем к обменнику osnovnoj_obmennik очередь osnovnoj_ochered для всех возвращенных сообщений из очереди dopolnitelnaya_ochered, с роутинг ключом vtorojZapros
$channel->queueBind('osnovnaya_ochered', 'osnovnoj_obmennik', 'vtorojZapros');

// создаем наш дополнительный обменник dopolnitelnyj_obmennik
$channel->exchangeDeclare('dopolnitelnyj_obmennik', durable: true);
// создаем дополнительную очередь dopolnitelnaya_ochered для не обработанных сообщений
$channel->queueDeclare('dopolnitelnaya_ochered', durable: true, arguments: [
   // привязываем обменник
   'x-dead-letter-exchange' => 'osnovnoj_obmennik',
   // роутинг ключ failed для очереди 
   'x-dead-letter-routing-key' => 'vtorojZapros',
   // 6 секунд до отправки в основной обменник osnovnoj_obmennik
   'x-message-ttl' => 6000,
]);
// привяжем к обменнику dopolnitelnyj_obmennik очередь dopolnitelnaya_ochered для всех новых сообщений, с роутинг ключом vtorojZapros
$channel->queueBind('dopolnitelnaya_ochered', 'dopolnitelnyj_obmennik', 'vtorojZapros');

// создаем наш проблемный обменник warinning_obmennik
$channel->exchangeDeclare('warinning_obmennik', durable: true);
// создаем проблемную очередь warinning_ochered для всех сообщений которые пройдут через получателя два раза и на них в блоке try...catch будет ощибка
$channel->queueDeclare('warinning_ochered', durable: true);
// привяжем к обменнику warinning_obmennik очередь warinning_ochered для всех сообщений которые пройдут через получателя два раза и на них в блоке try...catch будет ощибка, с роутинг ключом warinningZapros
$channel->queueBind('warinning_ochered', 'warinning_obmennik', 'warinningZapros');

// опубликовываем сообщение в очередь, через обменник osnovnoj_obmennik, с роутинг ключом pervyjZapros, указываем заголовок
$channel->publish('{"paymentId": 1}', exchange: 'osnovnoj_obmennik', routingKey: 'pervyjZapros', headers: ["Приложение" => "hmarketing.ru"]);

Рассмотрим, как может выглядеть консьюмер для такой очереди:

html/site.loc/consumer-podpiska.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
// создаем объект класса для подключения
$bunny = new Client([
   // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
   'host' => 'rabbitmq',
   // хост корень проекта
   'vhost' => '/',
   // имя пользователя
   'user' => 'guest',
   // пароль
   'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// отдаем только одно сообщение
$channel->qos(prefetchCount: 1);
// подписываемся на очередь
$channel->consume(function (Message $message, Channel $channel, Client $bunny) {
   // если никаких проблем при обработке сообщения не произошло, ack'аем (читаем) его и переходим к следующему
   try {
       // подтверждаем сообщение вызвав команду ack
       $channel->ack($message);
       // выводим сообщение на экран
       var_dump($message->content);
   } catch (\Throwable) {
       // если произошла ошибка, нам необходимо посмотреть сколько заголовков было передано, если больше одного заголовка, значит сообщение уже прошло через dopolnitelnaya_ochered и на этом этапе добавились дополнительные заголовки, сообщение которое имеет больше одного заголовка, помещаем в warinning_obmennik
       $retryCount = count($message->headers);
       // проверка на количество заголовков
       if ($retryCount > 1) {
           // подтверждаем сообщение вызвав команду ack
           $channel->ack($message);
           // опубликовываем сообщение в обменник warinning_obmennik, где сообщение будет ждать ручного разбора
           $channel->publish($message->content, $message->headers, exchange: 'warinning_obmennik', routingKey: 'warinningZapros');
       } else {
           // подтверждаем сообщение вызвав команду ack
           $channel->ack($message);
           // опубликовываем сообщение в обменник dopolnitelnyj_obmennik, откуда сообщение попадет в osnovnoj_obmennik и пойдет на второй круг
           $channel->publish($message->content, $message->headers, exchange: 'dopolnitelnyj_obmennik', routingKey: 'vtorojZapros');
       }
   }
}, 'osnovnaya_ochered');
// свойством для run, можно передать сколько клиент будет работать, например в течение 10 секунд, а затем останавливается
$bunny->run(1);

Сервис нотификаций

Последний сервис из нашего списка, к которому предъявляется такое требование, как возможность получения сообщений батчами, сервис отправки уведомлений не любит частые обращения к своему API и просит отправлять уведомлениями батчами по 100 штук:

html/site.loc/notifikacii.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$bunny = new Client([
    // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
    'host' => 'rabbitmq',
    // хост корень проекта
    'vhost' => '/',
    // имя пользователя
    'user' => 'guest',
    // пароль
    'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// создаем основной обменник notifikacii_obmennik, с типом direct
$channel->exchangeDeclare('notifikacii_obmennik', exchangeType: 'direct', durable: true);
// создадим основную очередь notifikacii_ochered, куда будем отправлять сообщения
$channel->queueDeclare('notifikacii_ochered', durable: true);
// привяжем к обменнику notifikacii_obmennik очередь notifikacii_ochered с роутинг ключом notifikaciiZapros
$channel->queueBind('notifikacii_ochered', 'notifikacii_obmennik', 'notifikaciiZapros');
// в цикле генерируем 150 сообщений
for ($i = 0; $i < 150; $i++) {
    // опубликовываем сообщение в очередь
    $channel->publish('{"paymentId": 1}', exchange: 'notifikacii_obmennik', routingKey: 'notifikaciiZapros');
}

В RabbitMQ нет нативного способа получить 100 батч сообщений за раз. Для этого вам необходимо собирать сообщения на стороне приложения, после этого акнуть их все, когда достаточного размера батч соберется и вы его обработаете:

html/site.loc/consumer-notifikacii.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
// создаем объект класса для подключения
$bunny = new Client([
    // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
    'host' => 'rabbitmq',
    // хост корень проекта
    'vhost' => '/',
    // имя пользователя
    'user' => 'guest',
    // пароль
    'password' => 'guest',
]);
// создаем соединение
$bunny->connect();
// подключаемся к серверу
$channel = $bunny->channel();
// отдаем только 100 сообщение
$channel->qos(prefetchCount: 100);
// пустой массив 
$messages = [];
// подписываемся на очередь
$channel->consume(function (Message $message, Channel $channel, Client $bunny) use (&$messages) {
    // записываем в массив 100 сообщений
    $messages[] = $message;
    // проверяем что в массиве 100 сообщений
    if (count($messages) >= 100) {
        try {
            // подтверждаем сразу 100 сообщение вызвав команду ack
            $channel->ack($message, multiple: true);
            // выводим сообщение на экран
            var_dump($message->content);
        } catch (\Throwable) {
            // если произошла ощибка, отвергаем все 100 сообщений и отправляем их обратно в очередь
            $channel->nack($message, multiple: true, requeue: true);
        } finally {
            // очищаем массив
            $messages = [];
        }
    }
}, 'notifikacii_ochered');
// свойством для run, можно передать сколько клиент будет работать, например в течение 10 секунд, а затем останавливается
$bunny->run(10);

Если вдруг ваш консьюмер упадет и не сможет отправить ack, сообщения просто вернутся обратно в очередь, что нас вполне устраивает.

Заполните форму уже сегодня!
Для начала сотрудничества необходимо заполнить заявку или заказать обратный звонок. В ответ получите коммерческое предложение, которое будет содержать индивидуальную стратегию с учетом требований и поставленных задач
Работаем по будням с 9:00 до 18:00. Заявки, отправленные в выходные, обрабатываем в первый рабочий день до 12:00.
Спасибо, ваш запрос принят и будет обработан!
Эйч Маркетинг