Skip to content

Commit e7607f4

Browse files
Merge pull request #34 from utopia-php/amqp-use-quorum-queues
fix(amqp): use channel callable, reconnect on error
2 parents a6ec26a + cce215b commit e7607f4

File tree

1 file changed

+50
-29
lines changed

1 file changed

+50
-29
lines changed

src/Queue/Broker/AMQP.php

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -92,29 +92,32 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
9292
}
9393
};
9494

95-
$channel = $this->getChannel();
96-
97-
// It's good practice for the consumer to set up exchange and queues.
98-
// This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and
99-
// dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages.
100-
101-
// 1. Declare the exchange and a dead-letter-exchange.
102-
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
103-
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
104-
105-
// 2. Declare the working queue and configure the DLX for receiving rejected messages.
106-
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
107-
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);
108-
109-
// 3. Declare the dead-letter-queue and bind it to the DLX.
110-
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments));
111-
$channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name);
112-
113-
// 4. Instruct to consume on the working queue.
114-
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));
115-
116-
// 5. Consume. This blocks until the connection gets closed.
117-
$channel->consume();
95+
$this->withChannel(function (AMQPChannel $channel) use ($queue, $processMessage) {
96+
// It's good practice for the consumer to set up exchange and queues.
97+
// This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and
98+
// dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages.
99+
100+
// 1. Declare the exchange and a dead-letter-exchange.
101+
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
102+
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
103+
104+
// 2. Declare the working queue and configure the DLX for receiving rejected messages.
105+
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
106+
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);
107+
108+
// 3. Declare the dead-letter-queue and bind it to the DLX.
109+
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments));
110+
$channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name);
111+
112+
// 4. Instruct to consume on the working queue.
113+
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));
114+
});
115+
116+
// Run ->consume in own callback to avoid re-running queue creation flow on error.
117+
$this->withChannel(function (AMQPChannel $channel) {
118+
// 5. Consume. This blocks until the connection gets closed.
119+
$channel->consume();
120+
});
118121
}
119122

120123
public function close(): void
@@ -133,7 +136,9 @@ public function enqueue(Queue $queue, array $payload): bool
133136
'payload' => $payload
134137
];
135138
$message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
136-
$this->getChannel()->basic_publish($message, $queue->namespace, routing_key: $queue->name);
139+
$this->withChannel(function (AMQPChannel $channel) use ($message, $queue) {
140+
$channel->basic_publish($message, $queue->namespace, routing_key: $queue->name);
141+
});
137142
return true;
138143
}
139144

@@ -165,19 +170,35 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
165170
return $data['messages'];
166171
}
167172

168-
private function getChannel(): AMQPChannel
173+
/**
174+
* @param callable(AMQPChannel $channel): void $callback
175+
* @throws \Exception
176+
*/
177+
private function withChannel(callable $callback): void
169178
{
170-
if ($this->channel == null) {
179+
$createChannel = function (): AMQPChannel {
171180
$connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost);
172181
if (is_callable($this->connectionConfigHook)) {
173182
call_user_func($this->connectionConfigHook, $connection);
174183
}
175-
$this->channel = $connection->channel();
184+
$channel = $connection->channel();
176185
if (is_callable($this->channelConfigHook)) {
177-
call_user_func($this->channelConfigHook, $this->channel);
186+
call_user_func($this->channelConfigHook, $channel);
178187
}
188+
return $channel;
189+
};
190+
191+
if ($this->channel == null) {
192+
$this->channel = $createChannel();
179193
}
180194

181-
return $this->channel;
195+
try {
196+
$callback($this->channel);
197+
} catch (\Throwable $th) {
198+
// try to create a new connection once
199+
unset($this->channel);
200+
$this->channel = $createChannel();
201+
$callback($this->channel);
202+
}
182203
}
183204
}

0 commit comments

Comments
 (0)