diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 0f78354..6bf1c6c 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -42,6 +42,8 @@ public function __construct( private readonly ?string $password = null, private readonly string $vhost = '/', private readonly int $heartbeat = 0, + private readonly float $connectTimeout = 3.0, + private readonly float $readWriteTimeout = 3.0, ) { } @@ -94,7 +96,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $amqpMessage->nack(requeue: true); $errorCallback($message ?? null, $e); } catch (\Throwable $th) { - $amqpMessage->nack(requeue: false); + $amqpMessage->nack(); $errorCallback($message ?? null, $th); } }; @@ -129,9 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { - if ($this->channel) { - $this->channel->getConnection()?->close(); - } + $this->channel?->getConnection()?->close(); } public function enqueue(Queue $queue, array $payload): bool @@ -184,7 +184,16 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int private function withChannel(callable $callback): void { $createChannel = function (): AMQPChannel { - $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost, heartbeat: $this->heartbeat); + $connection = new AMQPStreamConnection( + $this->host, + $this->port, + $this->user, + $this->password, + $this->vhost, + connection_timeout: $this->connectTimeout, + read_write_timeout: $this->readWriteTimeout, + heartbeat: $this->heartbeat, + ); if (is_callable($this->connectionConfigHook)) { call_user_func($this->connectionConfigHook, $connection); } @@ -201,7 +210,7 @@ private function withChannel(callable $callback): void try { $callback($this->channel); - } catch (\Throwable $th) { + } catch (\Throwable) { // createChannel() might throw, in that case set the channel to `null` first. $this->channel = null; // try creating a new connection once, if this still fails, throw the error