diff --git a/CHANGELOG.md b/CHANGELOG.md index a26f7bc..0bae1d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +2.1.0 +===== + +* (feature) Add `task-manager:run-worker` command as wrapper for Symfony messengers `consume` command. +* (improvement) Default to limit of 5 messages in run worker command, if no other limit is given. + + 2.0.3 ===== diff --git a/src/Command/RunWorkerCommand.php b/src/Command/RunWorkerCommand.php new file mode 100644 index 0000000..6982e60 --- /dev/null +++ b/src/Command/RunWorkerCommand.php @@ -0,0 +1,101 @@ +addOption( + 'limit', + 'l', + InputOption::VALUE_REQUIRED, + 'Limit the number of received messages', + ) + ->addOption( + 'time-limit', + 't', + InputOption::VALUE_REQUIRED, + 'The time limit in seconds the worker can handle new messages', + ) + ->addOption( + 'failure-limit', + 'f', + InputOption::VALUE_REQUIRED, + 'The number of failed messages the worker can consume', + ) + ->addOption( + 'memory-limit', + 'm', + InputOption::VALUE_REQUIRED, + 'The memory limit the worker can consume', + ); + } + + /** + */ + public function execute (InputInterface $input, OutputInterface $output) : int + { + $io = new TorrStyle($input, $output); + $io->title("Task Manager: Run Worker"); + $io->comment("Starting task workers for all registered queues."); + + if ($this->transportsHelper->hasSyncTransport()) + { + $io->caution("The app is using sync transports: that means that registered tasks are directly worked on and not in this worker. These queues are automatically filtered in this worker by Symfony."); + } + + $limits = array_filter([ + '--limit' => $input->getOption("limit"), + '--time-limit' => $input->getOption("time-limit"), + '--failure-limit' => $input->getOption("failure-limit"), + '--memory-limit' => $input->getOption("memory-limit"), + ]); + + // if no limits are set, default to 5 messages + if (empty($limits)) + { + $limits["--limit"] = 5; + } + + $messengerConsumeArguments = new ArrayInput(array_filter([ + 'command' => 'messenger:consume', + 'receivers' => $this->transportsHelper->getOrderedQueueNames(), + ...$limits, + ])); + + // disable interactive behavior for the greet command + $messengerConsumeArguments->setInteractive(false); + + $application = $this->getApplication(); + \assert(null !== $application); + + return $application->doRun($messengerConsumeArguments, $output); + } +}