diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index fb35c74..8a5bc62 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -92,29 +92,32 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe } }; - $channel = $this->getChannel(); - - // It's good practice for the consumer to set up exchange and queues. - // This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and - // dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages. - - // 1. Declare the exchange and a dead-letter-exchange. - $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); - $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); - - // 2. Declare the working queue and configure the DLX for receiving rejected messages. - $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); - $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); - - // 3. Declare the dead-letter-queue and bind it to the DLX. - $channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments)); - $channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name); - - // 4. Instruct to consume on the working queue. - $channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments)); - - // 5. Consume. This blocks until the connection gets closed. - $channel->consume(); + $this->withChannel(function (AMQPChannel $channel) use ($queue, $processMessage) { + // It's good practice for the consumer to set up exchange and queues. + // This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and + // dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages. + + // 1. Declare the exchange and a dead-letter-exchange. + $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + + // 2. Declare the working queue and configure the DLX for receiving rejected messages. + $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); + $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); + + // 3. Declare the dead-letter-queue and bind it to the DLX. + $channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments)); + $channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name); + + // 4. Instruct to consume on the working queue. + $channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments)); + }); + + // Run ->consume in own callback to avoid re-running queue creation flow on error. + $this->withChannel(function (AMQPChannel $channel) { + // 5. Consume. This blocks until the connection gets closed. + $channel->consume(); + }); } public function close(): void @@ -133,7 +136,9 @@ public function enqueue(Queue $queue, array $payload): bool 'payload' => $payload ]; $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); - $this->getChannel()->basic_publish($message, $queue->namespace, routing_key: $queue->name); + $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { + $channel->basic_publish($message, $queue->namespace, routing_key: $queue->name); + }); return true; } @@ -165,19 +170,35 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int return $data['messages']; } - private function getChannel(): AMQPChannel + /** + * @param callable(AMQPChannel $channel): void $callback + * @throws \Exception + */ + private function withChannel(callable $callback): void { - if ($this->channel == null) { + $createChannel = function (): AMQPChannel { $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost); if (is_callable($this->connectionConfigHook)) { call_user_func($this->connectionConfigHook, $connection); } - $this->channel = $connection->channel(); + $channel = $connection->channel(); if (is_callable($this->channelConfigHook)) { - call_user_func($this->channelConfigHook, $this->channel); + call_user_func($this->channelConfigHook, $channel); } + return $channel; + }; + + if ($this->channel == null) { + $this->channel = $createChannel(); } - return $this->channel; + try { + $callback($this->channel); + } catch (\Throwable $th) { + // try to create a new connection once + unset($this->channel); + $this->channel = $createChannel(); + $callback($this->channel); + } } }