Полный цикл в digital

Простая очередь

Я использую Docker, поэтому привожу ссылку на готовый образ в GitFlic, образ содержит все необходимое для удобной работы.

В качестве клиента для обработки протокола AMQP я буду использовать bunny/bunny, устанавливаем с помощью Сomposer в корень проекта библиотеку, выполнив команду из консоли:

composer req bunny/bunny

Все готово для дальнейших работ, админка RabbitMQ доступна по адресу http://site.loc:15673:

Логин - guest
Пароль - guest

Настройка клиента Producer

В продюсер Producer который отправляет сообщения, добавляем следующий код:

html/site.loc/public_html/producer.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$client = new Client([
    // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
    'host' => 'rabbitmq',
    // хост корень проекта
    'vhost' => '/',
    // имя пользователя
    'user' => 'guest',
    // пароль
    'password' => 'guest',
]);
// создаем соединение
$client->connect();
// подключаемся к серверу
$channel = $client->channel();
// создаем очередь simple
$channel->queueDeclare('simple', durable: true);
// опубликовываем сообщение в очередь
$channel->publish('{"hello": "world"}', routingKey: 'simple');

Если сейчас перейти по адресу и выполнить действие продюсера, будет записана в обменник информация:

http://site.loc/producer.php

Когда публикуете сообщения, вы публикуете их в обменник, а не в очередь. Даже если явно не создали обменник, вы будете использовать специальный встроенный обменник с названием AMQP default:

Если вы нажмете на него, то в расширенном обзоре можете увидеть следующее описание:

Дословный перевод: Обмен по умолчанию неявно привязан к каждой очереди, причем ключ маршрутизации равен имени очереди. Невозможно явно привязаться к exchange по умолчанию или отменить привязку от него. Он также не может быть удален.

Простыми словами, для обычных ситуаций вы можете не создавать обменник и не привязывать обменник к очереди, достаточно создать очередь и начинать публиковать в нее сообщения.

После вызова продюсера Producer, вернитесь в админку RabbitMQ на вкладку с очередями, вы должны увидеть следующее:

Настройка получателя Consumer

Получать сообщения будем уже напрямую из очереди, а не из обменника. Зачем тогда нужен обменник? Обменники позволяют организовать мощный и гибкий роутинг между очередями, но к этому мы вернемся чуть позже. Итак, чтобы получать сообщения, есть два варианта по командам с примерам которых можно познакомится тут:

  1. basic.get
  2. basic.consume

basic.get

В получатель Consumer который получает сообщения, добавляем следующий код:

html/site.loc/public_html/consumer.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для получения
$client = new Client([
    // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
    'host' => 'rabbitmq',
    // хост корень проекта
    'vhost' => '/',
    // имя пользователя
    'user' => 'guest',
    // пароль
    'password' => 'guest',
]);
// создаем соединение
$client->connect();
// подключаемся к серверу
$channel = $client->channel();
// имя очереди simple
$msg = $channel->get('simple');
// если ответ не пустой
if (null !== $msg) {
    // подтверждаем сообщение вызвав команду ack
    $channel->ack($msg);
    // выводим сообщение на экран
    var_dump($msg->content);
}

Попробуем получить сообщение первым способом basic.get, если сейчас перейти по адресу и выполнить действие получателя:

http://site.loc/consumer.php

Будет выведен на экран результат:

Получив сообщение, вы должны его либо подтвердить, либо отвергнуть вызвав соответствующию команду:

  • ack() подтверждает получение сообщения, удаляет сообщение из очереди
  • reject()отвергнуть получение, можно только одно сообщение
  • nack() отвергнуть получение, можно сразу несколько сообщений

Если вызвали nack(), RabbitMQ либо его вернет обратно в очередь, если вместе с nack() передали опцию requeue=true, либо будет действовать в зависимости от логики, заложенной в вашей очереди. Например, RabbitMQ переместит сообщение в другую очередь, о чем подробнее будет рассказано дальше, либо так же удалит.

basic.consume

html/site.loc/public_html/consumer.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
// создаем объект класса для подключения
$client = new Client([
    // имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
    'host' => 'rabbitmq',
    // хост корень проекта
    'vhost' => '/',
    // имя пользователя
    'user' => 'guest',
    // пароль
    'password' => 'guest',
]);
// создаем соединение
$client->connect();
// подключаемся к серверу
$channel = $client->channel();
// отдаем только одно сообщение
$channel->qos(prefetchCount: 1);
// подписываемся на очередь
$channel->consume(function (Message $message, Channel $channel): void {
    // выводим сообщение на экран
    var_dump($message->content);
    // подтверждаем сообщение вызвав команду ack
    $channel->ack($message);
}, 'simple');
// свойством для run, можно передать сколько получатель будет работать, например в течение 10 секунд, а затем останавливается
$client->run(10);

Попробуем получить сообщение вторым способом basic.consume, если сейчас перейти по адресу и выполнить действие получателя:

http://site.loc/consumer.php

Будет выведен на экран результат:

Используя второй способ, мы регистрируете консьюмер, который также отражается в информации об очереди в пункте Consumers:

Обратите внимание на поле $channel->qos(prefetchCount: 1);, оно говорит RabbitMQ, что консьюмеру надо отдать только одно сообщение до его подтверждения консьюмером. Таким образом вы можете:

  1. Гарантировать что ваши сообщения никогда не пропадут, так как кролик будет давать следующее сообщение только после подтверждения обработки предыдущего командами ack() или nack()
  2. Балансировать нагрузку между консьюмерами, чтобы одному консьюмеру не выпало больше сообщений, чем другому

Поэтому настоятельно советую следить за этим числом и никогда не указывать 0, иначе вашему консьюмеру будет передаваться неограниченное количество сообщений, в случае нагрузок это может привести к деградации производительности консьюмера и его падению, в результате чего вы либо потеряете сообщение, либо обработаете его повторно.

В конфигурации консьюмера есть строка $channel->ack($message);, она говорит RabbitMQ, что необходимо ожидать подтверждения обработки сообщений от консьюмера. Отключение этой опции приведет к увеличению производительности, поскольку тогда RabbitMQ не будет ждать подтверждения, но в то же время вы рискуете потерять сообщение, если ваш консьюмер упадет на его обработке, так как тогда оно не вернется обратно в очередь. Поэтому всегда используйте режим подтверждения сообщений консьюмером, только если не уверены, что вам это не нужно.

Заполните форму уже сегодня!
Для начала сотрудничества необходимо заполнить заявку или заказать обратный звонок. В ответ получите коммерческое предложение, которое будет содержать индивидуальную стратегию с учетом требований и поставленных задач
Работаем по будням с 9:00 до 18:00. Заявки, отправленные в выходные, обрабатываем в первый рабочий день до 12:00.
Спасибо, ваш запрос принят и будет обработан!
Эйч Маркетинг