diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index a22cf6e..fb35c74 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -3,6 +3,7 @@ namespace Utopia\Queue\Broker; use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; @@ -21,6 +22,16 @@ class AMQP implements Publisher, Consumer private array $queueArguments = []; private array $consumerArguments = []; + /** + * @var callable(AbstractConnection $connection): void + */ + private $connectionConfigHook; + + /** + * @var callable(AMQPChannel $channel): void + */ + private $channelConfigHook; + public function __construct( private readonly string $host, private readonly int $port = 5672, @@ -46,6 +57,16 @@ public function setConsumerArguments(string $key, string $value): void $this->consumerArguments[$key] = $value; } + public function configureConnection(callable $callback): void + { + $this->connectionConfigHook = $callback; + } + + public function configureChannel(callable $callback): void + { + $this->channelConfigHook = $callback; + } + public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void { $processMessage = function (AMQPMessage $amqpMessage) use ($messageCallback, $successCallback, $errorCallback) { @@ -148,7 +169,13 @@ private function getChannel(): AMQPChannel { if ($this->channel == null) { $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost); + if (is_callable($this->connectionConfigHook)) { + call_user_func($this->connectionConfigHook, $connection); + } $this->channel = $connection->channel(); + if (is_callable($this->channelConfigHook)) { + call_user_func($this->channelConfigHook, $this->channel); + } } return $this->channel;