diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index fef30da..64ddf2f 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -2,6 +2,7 @@ namespace Utopia\Queue\Adapter; +use Swoole\Constant; use Swoole\Process\Pool; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; @@ -10,6 +11,9 @@ class Swoole extends Adapter { protected Pool $pool; + /** @var callable */ + private $onStop; + public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') { parent::__construct($workerNum, $queue, $namespace); @@ -27,13 +31,16 @@ public function start(): self public function stop(): self { + if ($this->onStop) { + call_user_func($this->onStop); + } $this->pool->shutdown(); return $this; } public function workerStart(callable $callback): self { - $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { + $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { call_user_func($callback, $workerId); }); @@ -42,7 +49,8 @@ public function workerStart(callable $callback): self public function workerStop(callable $callback): self { - $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { + $this->onStop = $callback; + $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { call_user_func($callback, $workerId); }); diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 6bf1c6c..91afd58 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { + $this->channel?->stopConsume(); $this->channel?->getConnection()?->close(); } diff --git a/src/Queue/Server.php b/src/Queue/Server.php index a465676..8c74fa1 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -217,45 +217,19 @@ public function start(): self call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); } - while (true) { - $this->adapter->consumer->consume( - $this->adapter->queue, - function (Message $message) { - $receivedAtTimestamp = microtime(true); - Console::info("[Job] Received Job ({$message->getPid()})."); - try { - $waitDuration = microtime(true) - $message->getTimestamp(); - $this->jobWaitTime->record($waitDuration); - - $this->resources = []; - self::setResource('message', fn () => $message); - if ($this->job->getHook()) { - foreach ($this->initHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } - } - } - - foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } - } - } - - return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); - } finally { - $processDuration = microtime(true) - $receivedAtTimestamp; - $this->processDuration->record($processDuration); - } - }, - function (Message $message) { + $this->adapter->consumer->consume( + $this->adapter->queue, + function (Message $message) { + $receivedAtTimestamp = microtime(true); + Console::info("[Job] Received Job ({$message->getPid()})."); + try { + $waitDuration = microtime(true) - $message->getTimestamp(); + $this->jobWaitTime->record($waitDuration); + + $this->resources = []; + self::setResource('message', fn () => $message); if ($this->job->getHook()) { - foreach ($this->shutdownHooks as $hook) { // Global init hooks + foreach ($this->initHooks as $hook) { // Global init hooks if (in_array('*', $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); @@ -264,29 +238,55 @@ function (Message $message) { } foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { // Group init hooks + foreach ($this->initHooks as $hook) { // Group init hooks if (in_array($group, $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); } } } - Console::success("[Job] ({$message->getPid()}) successfully run."); - }, - function (?Message $message, Throwable $th) { - Console::error("[Job] ({$message?->getPid()}) failed to run."); - Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); - self::setResource('error', fn () => $th); + return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); + } finally { + $processDuration = microtime(true) - $receivedAtTimestamp; + $this->processDuration->record($processDuration); + } + }, + function (Message $message) { + if ($this->job->getHook()) { + foreach ($this->shutdownHooks as $hook) { // Global init hooks + if (in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } + } + } - foreach ($this->errorHooks as $hook) { - ($hook->getAction())(...$this->getArguments($hook)); + foreach ($this->job->getGroups() as $group) { + foreach ($this->shutdownHooks as $hook) { // Group init hooks + if (in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } } - }, - ); - } + } + Console::success("[Job] ({$message->getPid()}) successfully run."); + }, + function (?Message $message, Throwable $th) { + Console::error("[Job] ({$message?->getPid()}) failed to run."); + Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); + + self::setResource('error', fn () => $th); + + foreach ($this->errorHooks as $hook) { + ($hook->getAction())(...$this->getArguments($hook)); + } + }, + ); }); + $this->adapter->workerStop(fn () => $this->adapter->consumer->close()); + $this->adapter->start(); } catch (Throwable $error) { self::setResource('error', fn () => $error); @@ -318,31 +318,6 @@ public function getWorkerStart(): Hook return $this->workerStartHook; } - /** - * Is called when a Worker stops. - * @param callable|null $callback - * @return self - * @throws Exception - */ - public function workerStop(?callable $callback = null): self - { - try { - $this->adapter->workerStop(function (string $workerId) use ($callback) { - Console::success("[Worker] Worker {$workerId} is ready!"); - if (!is_null($callback)) { - call_user_func($callback); - } - }); - } catch (Throwable $error) { - self::setResource('error', fn () => $error); - foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); - } - } - - return $this; - } - /** * Get Arguments *