Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions src/Queue/Broker/AMQP.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
class AMQP implements Publisher, Consumer
{
private ?AMQPChannel $channel = null;
private array $exchangeArguments = [];
private array $queueArguments = [];
private array $consumerArguments = [];

public function __construct(
private readonly string $host,
Expand All @@ -28,6 +31,21 @@ public function __construct(
) {
}

public function setExchangeArgument(string $key, string $value): void
{
$this->exchangeArguments[$key] = $value;
}

public function setQueueArgument(string $key, string $value): void
{
$this->queueArguments[$key] = $value;
}

public function setConsumerArguments(string $key, string $value): void
{
$this->consumerArguments[$key] = $value;
}

public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
{
$processMessage = function (AMQPMessage $amqpMessage) use ($messageCallback, $successCallback, $errorCallback) {
Expand Down Expand Up @@ -60,19 +78,19 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
// dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages.

// 1. Declare the exchange and a dead-letter-exchange.
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false);
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false);
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));

// 2. Declare the working queue and configure the DLX for receiving rejected messages.
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(["x-dead-letter-exchange" => "{$queue->namespace}.failed"]));
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);

// 3. Declare the dead-letter-queue and bind it to the DLX.
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false);
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments));
$channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name);

// 4. Instruct to consume on the working queue.
$channel->basic_consume($queue->name, callback: $processMessage);
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));

// 5. Consume. This blocks until the connection gets closed.
$channel->consume();
Expand Down