К содержанию
Medusa
Документация

RabbitMQ

Medusa\Queue\RabbitMQ используется для отправки и обработки сообщений через RabbitMQ-сервер.

Инициализация класса

Если в конфигурации включён параметр BOOTSTRAP_RMQ_AUTOCONNECT, объект RabbitMQ уже доступен в глобальной переменной $RMQ.

Для отдельного CLI-скрипта или воркера итоговый флаг автоподключения также можно переопределить служебной PHP-константой BOOTSTRAP_OVERRIDE_RMQ_AUTOCONNECT до подключения www/medusa/index.php. Если такая константа определена, она имеет приоритет над параметром конфигурации BOOTSTRAP_RMQ_AUTOCONNECT.

/** @var Medusa\Queue\RabbitMQ $RMQ */

global $RMQ;

Отправка сообщения

Метод $RMQ->push($queue, $message) публикует сообщение в очередь.

$RMQ->push('mailing', [
    'email' => 'vadim.valeev@mail.ru',
    'subject' => 'Спасибо за регистрацию',
    'content' => 'Ваш аккаунт готов к работе',
]);

Подготовка PHP-скрипта для прослушивания очереди

В примерах ниже используется каталог www/app/ как пример пользовательского каталога. При другой настройке автозагрузчика путь к пользовательским воркерам будет отличаться.

define('BOOTSTRAP_OVERRIDE_PDO_AUTOCONNECT', false);
define('BOOTSTRAP_OVERRIDE_REDIS_AUTOCONNECT', false);
define('BOOTSTRAP_OVERRIDE_RMQ_AUTOCONNECT', true);

require_once __DIR__ . '/../../../medusa/index.php';

/** @var Medusa\Queue\RabbitMQ $RMQ */

global $RMQ;

$RMQ->listen('mailing', function (array $message): bool {
    echo print_r($message, true);

    return true;
});

Для прослушивания очереди используется метод $RMQ->listen($queue, $handler), который запускает обработчик очереди и передаёт ему сообщение.

Добавление задачи для прослушивания очереди

Для запуска воркера через cron можно добавить строку в .docker/etc/crontab и использовать flock -n, например:

*/1 * * * * www-data flock -n /tmp/rmq-worker.lock /usr/local/bin/php /var/www/html/app/cli/queue/rmq_worker.php >> /var/log/cron.log 2>&1

В этом примере flock -n /tmp/rmq-worker.lock не позволит запустить дублирующую копию воркера, если предыдущий процесс всё ещё работает. Если процесс завершится или упадёт, cron запустит его заново при следующем срабатывании расписания.

Подтверждение обработки сообщения

Если callback возвращает true, сообщение подтверждается и удаляется из очереди.

$RMQ->listen('queue', function ($data): bool {
    // ...

    return true;
});

Повторная постановка сообщения в очередь

Если callback возвращает false, сообщение считается необработанным и возвращается в очередь для повторной обработки.

$RMQ->listen('queue', function ($data): bool {
    // ...

    return false;
});

Ошибка обработки

Если во время обработки callback выбрасывает исключение или тело сообщения не удалось декодировать из JSON, ошибка логируется, сообщение отбрасывается из очереди, а текст ошибки выводится в стандартный вывод.

$RMQ->listen('queue', function ($data): bool {
    throw new Exception('Queue is unavailable');
});