Skip to content

Commit 39b85b9

Browse files
committed
fix(amqp): use channel callable, set heartbeat
1 parent a6ec26a commit 39b85b9

File tree

2 files changed

+61
-42
lines changed

2 files changed

+61
-42
lines changed

composer.lock

Lines changed: 16 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Queue/Broker/AMQP.php

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ public function __construct(
3838
private readonly int $httpPort = 15672,
3939
private readonly ?string $user = null,
4040
private readonly ?string $password = null,
41-
private readonly string $vhost = '/'
41+
private readonly string $vhost = '/',
42+
private readonly int $heartbeat = 0,
4243
) {
4344
}
4445

@@ -92,29 +93,29 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
9293
}
9394
};
9495

95-
$channel = $this->getChannel();
96+
$this->withChannel(function (AMQPChannel $channel) use ($queue, $processMessage) {
97+
// It's good practice for the consumer to set up exchange and queues.
98+
// This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and
99+
// dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages.
96100

97-
// It's good practice for the consumer to set up exchange and queues.
98-
// This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and
99-
// dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages.
101+
// 1. Declare the exchange and a dead-letter-exchange.
102+
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
103+
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
100104

101-
// 1. Declare the exchange and a dead-letter-exchange.
102-
$channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
103-
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
105+
// 2. Declare the working queue and configure the DLX for receiving rejected messages.
106+
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
107+
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);
104108

105-
// 2. Declare the working queue and configure the DLX for receiving rejected messages.
106-
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
107-
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);
109+
// 3. Declare the dead-letter-queue and bind it to the DLX.
110+
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments));
111+
$channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name);
108112

109-
// 3. Declare the dead-letter-queue and bind it to the DLX.
110-
$channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments));
111-
$channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name);
113+
// 4. Instruct to consume on the working queue.
114+
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));
112115

113-
// 4. Instruct to consume on the working queue.
114-
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));
115-
116-
// 5. Consume. This blocks until the connection gets closed.
117-
$channel->consume();
116+
// 5. Consume. This blocks until the connection gets closed.
117+
$channel->consume();
118+
});
118119
}
119120

120121
public function close(): void
@@ -133,7 +134,9 @@ public function enqueue(Queue $queue, array $payload): bool
133134
'payload' => $payload
134135
];
135136
$message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
136-
$this->getChannel()->basic_publish($message, $queue->namespace, routing_key: $queue->name);
137+
$this->withChannel(function (AMQPChannel $channel) use ($message, $queue) {
138+
$channel->basic_publish($message, $queue->namespace, routing_key: $queue->name);
139+
});
137140
return true;
138141
}
139142

@@ -165,19 +168,35 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
165168
return $data['messages'];
166169
}
167170

168-
private function getChannel(): AMQPChannel
171+
/**
172+
* @param callable(AMQPChannel $channel): void $callback
173+
* @throws \Exception
174+
*/
175+
private function withChannel(callable $callback): void
169176
{
170-
if ($this->channel == null) {
171-
$connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost);
177+
$createChannel = function(): AMQPChannel {
178+
$connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost, heartbeat: $this->heartbeat);
172179
if (is_callable($this->connectionConfigHook)) {
173180
call_user_func($this->connectionConfigHook, $connection);
174181
}
175-
$this->channel = $connection->channel();
182+
$channel = $connection->channel();
176183
if (is_callable($this->channelConfigHook)) {
177-
call_user_func($this->channelConfigHook, $this->channel);
184+
call_user_func($this->channelConfigHook, $channel);
178185
}
186+
return $channel;
187+
};
188+
189+
if ($this->channel == null) {
190+
$this->channel = $createChannel();
179191
}
180192

181-
return $this->channel;
193+
try {
194+
$callback($this->channel);
195+
} catch (\Throwable $th) {
196+
// try to create a new connection once
197+
unset($this->channel);
198+
$this->channel = $createChannel();
199+
$callback($this->channel);
200+
}
182201
}
183202
}

0 commit comments

Comments
 (0)