Простая очередь
Я использую Docker, поэтому привожу ссылку на готовый образ в GitFlic, образ содержит все необходимое для удобной работы.
В качестве клиента для обработки протокола AMQP
я буду использовать bunny/bunny
, устанавливаем с помощью Сomposer
в корень проекта библиотеку, выполнив команду из консоли:
composer req bunny/bunny
Все готово для дальнейших работ, админка RabbitMQ
доступна по адресу http://site.loc:15673:
Логин - guest
Пароль - guest
Настройка клиента Producer
В продюсер Producer
который отправляет сообщения, добавляем следующий код:
html/site.loc/public_html/producer.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для подключения
$client = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$client->connect();
// подключаемся к серверу
$channel = $client->channel();
// создаем очередь simple
$channel->queueDeclare('simple', durable: true);
// опубликовываем сообщение в очередь
$channel->publish('{"hello": "world"}', routingKey: 'simple');
Если сейчас перейти по адресу и выполнить действие продюсера, будет записана в обменник информация:
http://site.loc/producer.php
Когда публикуете сообщения, вы публикуете их в обменник, а не в очередь. Даже если явно не создали обменник, вы будете использовать специальный встроенный обменник с названием AMQP default
:
Если вы нажмете на него, то в расширенном обзоре можете увидеть следующее описание:
Дословный перевод: Обмен по умолчанию неявно привязан к каждой очереди, причем ключ маршрутизации равен имени очереди. Невозможно явно привязаться к exchange
по умолчанию или отменить привязку от него. Он также не может быть удален.
Простыми словами, для обычных ситуаций вы можете не создавать обменник и не привязывать обменник к очереди, достаточно создать очередь и начинать публиковать в нее сообщения.
После вызова продюсера Producer
, вернитесь в админку RabbitMQ
на вкладку с очередями, вы должны увидеть следующее:
Настройка получателя Consumer
Получать сообщения будем уже напрямую из очереди, а не из обменника. Зачем тогда нужен обменник? Обменники позволяют организовать мощный и гибкий роутинг между очередями, но к этому мы вернемся чуть позже. Итак, чтобы получать сообщения, есть два варианта по командам с примерам которых можно познакомится тут:
basic.get
basic.consume
basic.get
В получатель Consumer
который получает сообщения, добавляем следующий код:
html/site.loc/public_html/consumer.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Client;
// создаем объект класса для получения
$client = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$client->connect();
// подключаемся к серверу
$channel = $client->channel();
// имя очереди simple
$msg = $channel->get('simple');
// если ответ не пустой
if (null !== $msg) {
// подтверждаем сообщение вызвав команду ack
$channel->ack($msg);
// выводим сообщение на экран
var_dump($msg->content);
}
Попробуем получить сообщение первым способом basic.get
, если сейчас перейти по адресу и выполнить действие получателя:
http://site.loc/consumer.php
Будет выведен на экран результат:
Получив сообщение, вы должны его либо подтвердить, либо отвергнуть вызвав соответствующию команду:
ack()
подтверждает получение сообщения, удаляет сообщение из очередиreject()
отвергнуть получение, можно только одно сообщениеnack()
отвергнуть получение, можно сразу несколько сообщений
Если вызвали nack()
, RabbitMQ
либо его вернет обратно в очередь, если вместе с nack()
передали опцию requeue=true
, либо будет действовать в зависимости от логики, заложенной в вашей очереди. Например, RabbitMQ
переместит сообщение в другую очередь, о чем подробнее будет рассказано дальше, либо так же удалит.
basic.consume
html/site.loc/public_html/consumer.php<?
// включаем строгую типизацию
declare(strict_types=1);
// подключаем загрузку bunny/bunny из Сomposer
require_once __DIR__ . '/vendor/autoload.php';
// пространства имен
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
// создаем объект класса для подключения
$client = new Client([
// имя контейнера RabbitMQ в Docker, на продакшене будет ip адрес сервера
'host' => 'rabbitmq',
// хост корень проекта
'vhost' => '/',
// имя пользователя
'user' => 'guest',
// пароль
'password' => 'guest',
]);
// создаем соединение
$client->connect();
// подключаемся к серверу
$channel = $client->channel();
// отдаем только одно сообщение
$channel->qos(prefetchCount: 1);
// подписываемся на очередь
$channel->consume(function (Message $message, Channel $channel): void {
// выводим сообщение на экран
var_dump($message->content);
// подтверждаем сообщение вызвав команду ack
$channel->ack($message);
}, 'simple');
// свойством для run, можно передать сколько получатель будет работать, например в течение 10 секунд, а затем останавливается
$client->run(10);
Попробуем получить сообщение вторым способом basic.consume
, если сейчас перейти по адресу и выполнить действие получателя:
http://site.loc/consumer.php
Будет выведен на экран результат:
Используя второй способ, мы регистрируете консьюмер, который также отражается в информации об очереди в пункте Consumers
:
Обратите внимание на поле $channel->qos(prefetchCount: 1);
, оно говорит RabbitMQ
, что консьюмеру надо отдать только одно сообщение до его подтверждения консьюмером. Таким образом вы можете:
- Гарантировать что ваши сообщения никогда не пропадут, так как кролик будет давать следующее сообщение только после подтверждения обработки предыдущего командами
ack()
илиnack()
- Балансировать нагрузку между консьюмерами, чтобы одному консьюмеру не выпало больше сообщений, чем другому
Поэтому настоятельно советую следить за этим числом и никогда не указывать 0
, иначе вашему консьюмеру будет передаваться неограниченное количество сообщений, в случае нагрузок это может привести к деградации производительности консьюмера и его падению, в результате чего вы либо потеряете сообщение, либо обработаете его повторно.
В конфигурации консьюмера есть строка $channel->ack($message);
, она говорит RabbitMQ
, что необходимо ожидать подтверждения обработки сообщений от консьюмера. Отключение этой опции приведет к увеличению производительности, поскольку тогда RabbitMQ
не будет ждать подтверждения, но в то же время вы рискуете потерять сообщение, если ваш консьюмер упадет на его обработке, так как тогда оно не вернется обратно в очередь. Поэтому всегда используйте режим подтверждения сообщений консьюмером, только если не уверены, что вам это не нужно.