Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 50 additions & 29 deletions src/Queue/Broker/AMQP.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,29 +92,32 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
}
};

$channel = $this->getChannel();

// It's good practice for the consumer to set up exchange and queues.
// This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and
// dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages.

// 1. Declare the exchange and a dead-letter-exchange.
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));

// 2. Declare the working queue and configure the DLX for receiving rejected messages.
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);

// 3. Declare the dead-letter-queue and bind it to the DLX.
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments));
$channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name);

// 4. Instruct to consume on the working queue.
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));

// 5. Consume. This blocks until the connection gets closed.
$channel->consume();
$this->withChannel(function (AMQPChannel $channel) use ($queue, $processMessage) {
// It's good practice for the consumer to set up exchange and queues.
// This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and
// dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages.

// 1. Declare the exchange and a dead-letter-exchange.
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));

// 2. Declare the working queue and configure the DLX for receiving rejected messages.
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);

// 3. Declare the dead-letter-queue and bind it to the DLX.
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments));
$channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name);

// 4. Instruct to consume on the working queue.
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));
});

// Run ->consume in own callback to avoid re-running queue creation flow on error.
$this->withChannel(function (AMQPChannel $channel) {
// 5. Consume. This blocks until the connection gets closed.
$channel->consume();
});
}

public function close(): void
Expand All @@ -133,7 +136,9 @@ public function enqueue(Queue $queue, array $payload): bool
'payload' => $payload
];
$message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$this->getChannel()->basic_publish($message, $queue->namespace, routing_key: $queue->name);
$this->withChannel(function (AMQPChannel $channel) use ($message, $queue) {
$channel->basic_publish($message, $queue->namespace, routing_key: $queue->name);
});
return true;
}

Expand Down Expand Up @@ -165,19 +170,35 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
return $data['messages'];
}

private function getChannel(): AMQPChannel
/**
* @param callable(AMQPChannel $channel): void $callback
* @throws \Exception
*/
private function withChannel(callable $callback): void
{
if ($this->channel == null) {
$createChannel = function (): AMQPChannel {
$connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost);
if (is_callable($this->connectionConfigHook)) {
call_user_func($this->connectionConfigHook, $connection);
}
$this->channel = $connection->channel();
$channel = $connection->channel();
if (is_callable($this->channelConfigHook)) {
call_user_func($this->channelConfigHook, $this->channel);
call_user_func($this->channelConfigHook, $channel);
}
return $channel;
};

if ($this->channel == null) {
$this->channel = $createChannel();
}

return $this->channel;
try {
$callback($this->channel);
} catch (\Throwable $th) {
// try to create a new connection once
unset($this->channel);
$this->channel = $createChannel();
$callback($this->channel);
}
}
}