Skip to content

Commit e8e9c01

Browse files
committed
fix(amqp): use loop in server to avoid disconnected consumers
1 parent f82d2f2 commit e8e9c01

File tree

1 file changed

+50
-48
lines changed

1 file changed

+50
-48
lines changed

src/Queue/Server.php

Lines changed: 50 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -219,19 +219,45 @@ public function start(): self
219219
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
220220
}
221221

222-
$this->adapter->consumer->consume(
223-
$this->adapter->queue,
224-
function (Message $message) {
225-
$receivedAtTimestamp = microtime(true);
226-
Console::info("[Job] Received Job ({$message->getPid()}).");
227-
try {
228-
$waitDuration = microtime(true) - $message->getTimestamp();
229-
$this->jobWaitTime->record($waitDuration);
230-
231-
$this->resources = [];
232-
self::setResource('message', fn () => $message);
222+
while (true) {
223+
$this->adapter->consumer->consume(
224+
$this->adapter->queue,
225+
function (Message $message) {
226+
$receivedAtTimestamp = microtime(true);
227+
Console::info("[Job] Received Job ({$message->getPid()}).");
228+
try {
229+
$waitDuration = microtime(true) - $message->getTimestamp();
230+
$this->jobWaitTime->record($waitDuration);
231+
232+
$this->resources = [];
233+
self::setResource('message', fn () => $message);
234+
if ($this->job->getHook()) {
235+
foreach ($this->initHooks as $hook) { // Global init hooks
236+
if (in_array('*', $hook->getGroups())) {
237+
$arguments = $this->getArguments($hook, $message->getPayload());
238+
\call_user_func_array($hook->getAction(), $arguments);
239+
}
240+
}
241+
}
242+
243+
foreach ($this->job->getGroups() as $group) {
244+
foreach ($this->initHooks as $hook) { // Group init hooks
245+
if (in_array($group, $hook->getGroups())) {
246+
$arguments = $this->getArguments($hook, $message->getPayload());
247+
\call_user_func_array($hook->getAction(), $arguments);
248+
}
249+
}
250+
}
251+
252+
\call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
253+
} finally {
254+
$processDuration = microtime(true) - $receivedAtTimestamp;
255+
$this->processDuration->record($processDuration);
256+
}
257+
},
258+
function (Message $message) {
233259
if ($this->job->getHook()) {
234-
foreach ($this->initHooks as $hook) { // Global init hooks
260+
foreach ($this->shutdownHooks as $hook) { // Global init hooks
235261
if (in_array('*', $hook->getGroups())) {
236262
$arguments = $this->getArguments($hook, $message->getPayload());
237263
\call_user_func_array($hook->getAction(), $arguments);
@@ -240,51 +266,27 @@ function (Message $message) {
240266
}
241267

242268
foreach ($this->job->getGroups() as $group) {
243-
foreach ($this->initHooks as $hook) { // Group init hooks
269+
foreach ($this->shutdownHooks as $hook) { // Group init hooks
244270
if (in_array($group, $hook->getGroups())) {
245271
$arguments = $this->getArguments($hook, $message->getPayload());
246272
\call_user_func_array($hook->getAction(), $arguments);
247273
}
248274
}
249275
}
250276

251-
\call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
252-
} finally {
253-
$processDuration = microtime(true) - $receivedAtTimestamp;
254-
$this->processDuration->record($processDuration);
255-
}
256-
},
257-
function (Message $message) {
258-
if ($this->job->getHook()) {
259-
foreach ($this->shutdownHooks as $hook) { // Global init hooks
260-
if (in_array('*', $hook->getGroups())) {
261-
$arguments = $this->getArguments($hook, $message->getPayload());
262-
\call_user_func_array($hook->getAction(), $arguments);
263-
}
264-
}
265-
}
277+
Console::success("[Job] ({$message->getPid()}) successfully run.");
278+
},
279+
function (Message $message, Throwable $th) {
280+
Console::error("[Job] ({$message->getPid()}) failed to run.");
281+
Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}");
266282

267-
foreach ($this->job->getGroups() as $group) {
268-
foreach ($this->shutdownHooks as $hook) { // Group init hooks
269-
if (in_array($group, $hook->getGroups())) {
270-
$arguments = $this->getArguments($hook, $message->getPayload());
271-
\call_user_func_array($hook->getAction(), $arguments);
272-
}
283+
self::setResource('error', fn () => $th);
284+
foreach ($this->errorHooks as $hook) {
285+
call_user_func_array($hook->getAction(), $this->getArguments($hook));
273286
}
274-
}
275-
276-
Console::success("[Job] ({$message->getPid()}) successfully run.");
277-
},
278-
function (Message $message, Throwable $th) {
279-
Console::error("[Job] ({$message->getPid()}) failed to run.");
280-
Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}");
281-
282-
self::setResource('error', fn () => $th);
283-
foreach ($this->errorHooks as $hook) {
284-
call_user_func_array($hook->getAction(), $this->getArguments($hook));
285-
}
286-
},
287-
);
287+
},
288+
);
289+
}
288290
});
289291

290292
$this->adapter->start();

0 commit comments

Comments
 (0)