From f1fc42a9127b6f368581da8f763b4ac8100e6ef1 Mon Sep 17 00:00:00 2001 From: Jannik Zschiesche Date: Mon, 2 Jun 2025 10:12:39 +0200 Subject: [PATCH 1/2] Add `task-manager:run-worker` command as wrapper for Symfony messengers `consume` command --- CHANGELOG.md | 6 +++ src/Command/RunWorkerCommand.php | 91 ++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/Command/RunWorkerCommand.php diff --git a/CHANGELOG.md b/CHANGELOG.md index a26f7bc..d8377e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +2.1.0 +===== + +* (feature) Add `task-manager:run-worker` command as wrapper for Symfony messengers `consume` command. + + 2.0.3 ===== diff --git a/src/Command/RunWorkerCommand.php b/src/Command/RunWorkerCommand.php new file mode 100644 index 0000000..8ae586b --- /dev/null +++ b/src/Command/RunWorkerCommand.php @@ -0,0 +1,91 @@ +addOption( + 'limit', + 'l', + InputOption::VALUE_REQUIRED, + 'Limit the number of received messages', + ) + ->addOption( + 'time-limit', + 't', + InputOption::VALUE_REQUIRED, + 'The time limit in seconds the worker can handle new messages', + ) + ->addOption( + 'failure-limit', + 'f', + InputOption::VALUE_REQUIRED, + 'The number of failed messages the worker can consume', + ) + ->addOption( + 'memory-limit', + 'm', + InputOption::VALUE_REQUIRED, + 'The memory limit the worker can consume', + ); + } + + /** + */ + public function execute (InputInterface $input, OutputInterface $output) : int + { + $io = new TorrStyle($input, $output); + $io->title("Task Manager: Run Worker"); + $io->comment("Starting task workers for all registered queues."); + + if ($this->transportsHelper->hasSyncTransport()) + { + $io->caution("The app is using sync transports: that means that registered tasks are directly worked on and not in this worker. These queues are automatically filtered in this worker by Symfony."); + } + + $messengerConsumeArguments = new ArrayInput(array_filter([ + 'command' => 'messenger:consume', + 'receivers' => $this->transportsHelper->getOrderedQueueNames(), + '--limit' => $input->getOption("limit"), + '--time-limit' => $input->getOption("time-limit"), + '--failure-limit' => $input->getOption("failure-limit"), + '--memory-limit' => $input->getOption("memory-limit"), + ])); + + // disable interactive behavior for the greet command + $messengerConsumeArguments->setInteractive(false); + + $application = $this->getApplication(); + \assert(null !== $application); + + return $application->doRun($messengerConsumeArguments, $output); + } +} From da8e3c596a595033e12c0b7d686ca7232f161baa Mon Sep 17 00:00:00 2001 From: Jannik Zschiesche Date: Mon, 2 Jun 2025 10:20:10 +0200 Subject: [PATCH 2/2] Add default for limit --- CHANGELOG.md | 1 + src/Command/RunWorkerCommand.php | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8377e4..0bae1d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ===== * (feature) Add `task-manager:run-worker` command as wrapper for Symfony messengers `consume` command. +* (improvement) Default to limit of 5 messages in run worker command, if no other limit is given. 2.0.3 diff --git a/src/Command/RunWorkerCommand.php b/src/Command/RunWorkerCommand.php index 8ae586b..6982e60 100644 --- a/src/Command/RunWorkerCommand.php +++ b/src/Command/RunWorkerCommand.php @@ -71,13 +71,23 @@ public function execute (InputInterface $input, OutputInterface $output) : int $io->caution("The app is using sync transports: that means that registered tasks are directly worked on and not in this worker. These queues are automatically filtered in this worker by Symfony."); } - $messengerConsumeArguments = new ArrayInput(array_filter([ - 'command' => 'messenger:consume', - 'receivers' => $this->transportsHelper->getOrderedQueueNames(), + $limits = array_filter([ '--limit' => $input->getOption("limit"), '--time-limit' => $input->getOption("time-limit"), '--failure-limit' => $input->getOption("failure-limit"), '--memory-limit' => $input->getOption("memory-limit"), + ]); + + // if no limits are set, default to 5 messages + if (empty($limits)) + { + $limits["--limit"] = 5; + } + + $messengerConsumeArguments = new ArrayInput(array_filter([ + 'command' => 'messenger:consume', + 'receivers' => $this->transportsHelper->getOrderedQueueNames(), + ...$limits, ])); // disable interactive behavior for the greet command