diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 0bd745e..6613f1c 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -128,6 +128,11 @@ public function close(): void } } + public function ping(): bool + { + return $this->withChannel(fn (AMQPChannel $channel) => $channel->is_open()); + } + public function enqueue(Queue $queue, array $payload): bool { $payload = [ @@ -172,10 +177,10 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int } /** - * @param callable(AMQPChannel $channel): void $callback + * @param callable(AMQPChannel $channel): mixed $callback * @throws \Exception */ - private function withChannel(callable $callback): void + private function withChannel(callable $callback): mixed { $createChannel = function (): AMQPChannel { $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost, heartbeat: $this->heartbeat); @@ -194,13 +199,13 @@ private function withChannel(callable $callback): void } try { - $callback($this->channel); + return $callback($this->channel); } catch (\Throwable $th) { // createChannel() might throw, in that case set the channel to `null` first. $this->channel = null; // try creating a new connection once, if this still fails, throw the error $this->channel = $createChannel(); - $callback($this->channel); + return $callback($this->channel); } } } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 34ee22e..e255ff4 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -94,6 +94,11 @@ public function close(): void $this->closed = true; } + public function ping(): bool + { + return $this->connection->ping(); + } + public function enqueue(Queue $queue, array $payload): bool { $payload = [ diff --git a/src/Queue/Publisher.php b/src/Queue/Publisher.php index da07481..3f9c321 100644 --- a/src/Queue/Publisher.php +++ b/src/Queue/Publisher.php @@ -4,6 +4,13 @@ interface Publisher { + /** + * Checks if the publisher can reach the queue. + * + * @return bool + */ + public function ping(): bool; + /** * Publishes a new message onto the queue. *