Skip to content

Commit dc26542

Browse files
Merge pull request #32 from utopia-php/amqp-use-quorum-queues
feat(amqp): allow configuration of queue arguments
2 parents a100316 + 2adb42b commit dc26542

File tree

1 file changed

+23
-5
lines changed

1 file changed

+23
-5
lines changed

src/Queue/Broker/AMQP.php

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
class AMQP implements Publisher, Consumer
1818
{
1919
private ?AMQPChannel $channel = null;
20+
private array $exchangeArguments = [];
21+
private array $queueArguments = [];
22+
private array $consumerArguments = [];
2023

2124
public function __construct(
2225
private readonly string $host,
@@ -28,6 +31,21 @@ public function __construct(
2831
) {
2932
}
3033

34+
public function setExchangeArgument(string $key, string $value): void
35+
{
36+
$this->exchangeArguments[$key] = $value;
37+
}
38+
39+
public function setQueueArgument(string $key, string $value): void
40+
{
41+
$this->queueArguments[$key] = $value;
42+
}
43+
44+
public function setConsumerArguments(string $key, string $value): void
45+
{
46+
$this->consumerArguments[$key] = $value;
47+
}
48+
3149
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
3250
{
3351
$processMessage = function (AMQPMessage $amqpMessage) use ($messageCallback, $successCallback, $errorCallback) {
@@ -60,19 +78,19 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
6078
// dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages.
6179

6280
// 1. Declare the exchange and a dead-letter-exchange.
63-
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false);
64-
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false);
81+
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
82+
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
6583

6684
// 2. Declare the working queue and configure the DLX for receiving rejected messages.
67-
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(["x-dead-letter-exchange" => "{$queue->namespace}.failed"]));
85+
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
6886
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);
6987

7088
// 3. Declare the dead-letter-queue and bind it to the DLX.
71-
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false);
89+
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments));
7290
$channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name);
7391

7492
// 4. Instruct to consume on the working queue.
75-
$channel->basic_consume($queue->name, callback: $processMessage);
93+
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));
7694

7795
// 5. Consume. This blocks until the connection gets closed.
7896
$channel->consume();

0 commit comments

Comments
 (0)