diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index f105136..a22cf6e 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -17,6 +17,9 @@ class AMQP implements Publisher, Consumer { private ?AMQPChannel $channel = null; + private array $exchangeArguments = []; + private array $queueArguments = []; + private array $consumerArguments = []; public function __construct( private readonly string $host, @@ -28,6 +31,21 @@ public function __construct( ) { } + public function setExchangeArgument(string $key, string $value): void + { + $this->exchangeArguments[$key] = $value; + } + + public function setQueueArgument(string $key, string $value): void + { + $this->queueArguments[$key] = $value; + } + + public function setConsumerArguments(string $key, string $value): void + { + $this->consumerArguments[$key] = $value; + } + public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void { $processMessage = function (AMQPMessage $amqpMessage) use ($messageCallback, $successCallback, $errorCallback) { @@ -60,19 +78,19 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe // dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages. // 1. Declare the exchange and a dead-letter-exchange. - $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false); - $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false); + $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); // 2. Declare the working queue and configure the DLX for receiving rejected messages. - $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(["x-dead-letter-exchange" => "{$queue->namespace}.failed"])); + $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); // 3. Declare the dead-letter-queue and bind it to the DLX. - $channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false); + $channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments)); $channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name); // 4. Instruct to consume on the working queue. - $channel->basic_consume($queue->name, callback: $processMessage); + $channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments)); // 5. Consume. This blocks until the connection gets closed. $channel->consume();