Skip to content

Commit 61675db

Browse files
committed
fix(amqp): properly close connection on errors and open clean, new connection
1 parent b8cd955 commit 61675db

File tree

2 files changed

+61
-58
lines changed

2 files changed

+61
-58
lines changed

src/Queue/Broker/AMQP.php

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
class AMQP implements Publisher, Consumer
1919
{
20+
private bool $closed = false;
2021
private ?AMQPChannel $channel = null;
2122
private array $exchangeArguments = [];
2223
private array $queueArguments = [];
@@ -114,15 +115,18 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
114115
$channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments));
115116
});
116117

117-
// Run ->consume in own callback to avoid re-running queue creation flow on error.
118-
$this->withChannel(function (AMQPChannel $channel) {
119-
// 5. Consume. This blocks until the connection gets closed.
120-
$channel->consume();
121-
});
118+
while (!$this->closed) {
119+
// Run ->consume in own callback to avoid re-running queue creation flow on error.
120+
$this->withChannel(function (AMQPChannel $channel) {
121+
// 5. Consume. This blocks until the connection gets closed.
122+
$channel->consume();
123+
});
124+
}
122125
}
123126

124127
public function close(): void
125128
{
129+
$this->closed = true;
126130
if ($this->channel) {
127131
$this->channel->getConnection()?->close();
128132
}
@@ -189,15 +193,16 @@ private function withChannel(callable $callback): void
189193
return $channel;
190194
};
191195

192-
if ($this->channel == null) {
196+
if (!$this->channel) {
193197
$this->channel = $createChannel();
194198
}
195199

196200
try {
197201
$callback($this->channel);
198202
} catch (\Throwable $th) {
199-
// try to create a new connection once
200-
unset($this->channel);
203+
// createChannel() might throw, in that case set the channel to `null` first.
204+
$this->channel = null;
205+
// try creating a new connection once, if this still fails, throw the error
201206
$this->channel = $createChannel();
202207
$callback($this->channel);
203208
}

src/Queue/Server.php

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -219,45 +219,19 @@ public function start(): self
219219
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
220220
}
221221

222-
while (true) {
223-
$this->adapter->consumer->consume(
224-
$this->adapter->queue,
225-
function (Message $message) {
226-
$receivedAtTimestamp = microtime(true);
227-
Console::info("[Job] Received Job ({$message->getPid()}).");
228-
try {
229-
$waitDuration = microtime(true) - $message->getTimestamp();
230-
$this->jobWaitTime->record($waitDuration);
231-
232-
$this->resources = [];
233-
self::setResource('message', fn () => $message);
234-
if ($this->job->getHook()) {
235-
foreach ($this->initHooks as $hook) { // Global init hooks
236-
if (in_array('*', $hook->getGroups())) {
237-
$arguments = $this->getArguments($hook, $message->getPayload());
238-
\call_user_func_array($hook->getAction(), $arguments);
239-
}
240-
}
241-
}
242-
243-
foreach ($this->job->getGroups() as $group) {
244-
foreach ($this->initHooks as $hook) { // Group init hooks
245-
if (in_array($group, $hook->getGroups())) {
246-
$arguments = $this->getArguments($hook, $message->getPayload());
247-
\call_user_func_array($hook->getAction(), $arguments);
248-
}
249-
}
250-
}
251-
252-
\call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
253-
} finally {
254-
$processDuration = microtime(true) - $receivedAtTimestamp;
255-
$this->processDuration->record($processDuration);
256-
}
257-
},
258-
function (Message $message) {
222+
$this->adapter->consumer->consume(
223+
$this->adapter->queue,
224+
function (Message $message) {
225+
$receivedAtTimestamp = microtime(true);
226+
Console::info("[Job] Received Job ({$message->getPid()}).");
227+
try {
228+
$waitDuration = microtime(true) - $message->getTimestamp();
229+
$this->jobWaitTime->record($waitDuration);
230+
231+
$this->resources = [];
232+
self::setResource('message', fn () => $message);
259233
if ($this->job->getHook()) {
260-
foreach ($this->shutdownHooks as $hook) { // Global init hooks
234+
foreach ($this->initHooks as $hook) { // Global init hooks
261235
if (in_array('*', $hook->getGroups())) {
262236
$arguments = $this->getArguments($hook, $message->getPayload());
263237
\call_user_func_array($hook->getAction(), $arguments);
@@ -266,27 +240,51 @@ function (Message $message) {
266240
}
267241

268242
foreach ($this->job->getGroups() as $group) {
269-
foreach ($this->shutdownHooks as $hook) { // Group init hooks
243+
foreach ($this->initHooks as $hook) { // Group init hooks
270244
if (in_array($group, $hook->getGroups())) {
271245
$arguments = $this->getArguments($hook, $message->getPayload());
272246
\call_user_func_array($hook->getAction(), $arguments);
273247
}
274248
}
275249
}
276250

277-
Console::success("[Job] ({$message->getPid()}) successfully run.");
278-
},
279-
function (Message $message, Throwable $th) {
280-
Console::error("[Job] ({$message->getPid()}) failed to run.");
281-
Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}");
251+
\call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
252+
} finally {
253+
$processDuration = microtime(true) - $receivedAtTimestamp;
254+
$this->processDuration->record($processDuration);
255+
}
256+
},
257+
function (Message $message) {
258+
if ($this->job->getHook()) {
259+
foreach ($this->shutdownHooks as $hook) { // Global init hooks
260+
if (in_array('*', $hook->getGroups())) {
261+
$arguments = $this->getArguments($hook, $message->getPayload());
262+
\call_user_func_array($hook->getAction(), $arguments);
263+
}
264+
}
265+
}
282266

283-
self::setResource('error', fn () => $th);
284-
foreach ($this->errorHooks as $hook) {
285-
call_user_func_array($hook->getAction(), $this->getArguments($hook));
267+
foreach ($this->job->getGroups() as $group) {
268+
foreach ($this->shutdownHooks as $hook) { // Group init hooks
269+
if (in_array($group, $hook->getGroups())) {
270+
$arguments = $this->getArguments($hook, $message->getPayload());
271+
\call_user_func_array($hook->getAction(), $arguments);
272+
}
286273
}
287-
},
288-
);
289-
}
274+
}
275+
276+
Console::success("[Job] ({$message->getPid()}) successfully run.");
277+
},
278+
function (Message $message, Throwable $th) {
279+
Console::error("[Job] ({$message->getPid()}) failed to run.");
280+
Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}");
281+
282+
self::setResource('error', fn () => $th);
283+
foreach ($this->errorHooks as $hook) {
284+
call_user_func_array($hook->getAction(), $this->getArguments($hook));
285+
}
286+
},
287+
);
290288
});
291289

292290
$this->adapter->start();

0 commit comments

Comments
 (0)