diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 1e389a0..41e9537 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -219,19 +219,45 @@ public function start(): self call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); } - $this->adapter->consumer->consume( - $this->adapter->queue, - function (Message $message) { - $receivedAtTimestamp = microtime(true); - Console::info("[Job] Received Job ({$message->getPid()})."); - try { - $waitDuration = microtime(true) - $message->getTimestamp(); - $this->jobWaitTime->record($waitDuration); - - $this->resources = []; - self::setResource('message', fn () => $message); + while (true) { + $this->adapter->consumer->consume( + $this->adapter->queue, + function (Message $message) { + $receivedAtTimestamp = microtime(true); + Console::info("[Job] Received Job ({$message->getPid()})."); + try { + $waitDuration = microtime(true) - $message->getTimestamp(); + $this->jobWaitTime->record($waitDuration); + + $this->resources = []; + self::setResource('message', fn () => $message); + if ($this->job->getHook()) { + foreach ($this->initHooks as $hook) { // Global init hooks + if (in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } + } + } + + foreach ($this->job->getGroups() as $group) { + foreach ($this->initHooks as $hook) { // Group init hooks + if (in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } + } + } + + \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); + } finally { + $processDuration = microtime(true) - $receivedAtTimestamp; + $this->processDuration->record($processDuration); + } + }, + function (Message $message) { if ($this->job->getHook()) { - foreach ($this->initHooks as $hook) { // Global init hooks + foreach ($this->shutdownHooks as $hook) { // Global init hooks if (in_array('*', $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); @@ -240,7 +266,7 @@ function (Message $message) { } foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { // Group init hooks + foreach ($this->shutdownHooks as $hook) { // Group init hooks if (in_array($group, $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); @@ -248,43 +274,19 @@ function (Message $message) { } } - \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); - } finally { - $processDuration = microtime(true) - $receivedAtTimestamp; - $this->processDuration->record($processDuration); - } - }, - function (Message $message) { - if ($this->job->getHook()) { - foreach ($this->shutdownHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } - } - } + Console::success("[Job] ({$message->getPid()}) successfully run."); + }, + function (Message $message, Throwable $th) { + Console::error("[Job] ({$message->getPid()}) failed to run."); + Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}"); - foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } + self::setResource('error', fn () => $th); + foreach ($this->errorHooks as $hook) { + call_user_func_array($hook->getAction(), $this->getArguments($hook)); } - } - - Console::success("[Job] ({$message->getPid()}) successfully run."); - }, - function (Message $message, Throwable $th) { - Console::error("[Job] ({$message->getPid()}) failed to run."); - Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}"); - - self::setResource('error', fn () => $th); - foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); - } - }, - ); + }, + ); + } }); $this->adapter->start();