diff --git a/queue_job_cron_jobrunner/README.rst b/queue_job_cron_jobrunner/README.rst new file mode 100644 index 0000000000..d0cc046d68 --- /dev/null +++ b/queue_job_cron_jobrunner/README.rst @@ -0,0 +1 @@ +TO BE GENERATED AUTOMATICALLY diff --git a/queue_job_cron_jobrunner/__init__.py b/queue_job_cron_jobrunner/__init__.py new file mode 100644 index 0000000000..0650744f6b --- /dev/null +++ b/queue_job_cron_jobrunner/__init__.py @@ -0,0 +1 @@ +from . import models diff --git a/queue_job_cron_jobrunner/__manifest__.py b/queue_job_cron_jobrunner/__manifest__.py new file mode 100644 index 0000000000..6c0caa52f9 --- /dev/null +++ b/queue_job_cron_jobrunner/__manifest__.py @@ -0,0 +1,16 @@ +{ + "name": "Queue Job Cron Jobrunner", + "summary": "Run jobs without a dedicated JobRunner", + "version": "15.0.1.0.0", + "development_status": "Alpha", + "author": "Camptocamp SA, Odoo Community Association (OCA)", + "maintainers": ["ivantodorovich"], + "website": "https://github.com/OCA/queue", + "license": "AGPL-3", + "category": "Others", + "depends": ["queue_job"], + "data": [ + "data/ir_cron.xml", + "views/ir_cron.xml", + ], +} diff --git a/queue_job_cron_jobrunner/data/ir_cron.xml b/queue_job_cron_jobrunner/data/ir_cron.xml new file mode 100644 index 0000000000..5cb7436dcc --- /dev/null +++ b/queue_job_cron_jobrunner/data/ir_cron.xml @@ -0,0 +1,16 @@ + + + + + Queue Job Runner + + code + model._job_runner() + + + 1 + days + -1 + + + diff --git a/queue_job_cron_jobrunner/models/__init__.py b/queue_job_cron_jobrunner/models/__init__.py new file mode 100644 index 0000000000..4ba9dd1a16 --- /dev/null +++ b/queue_job_cron_jobrunner/models/__init__.py @@ -0,0 +1,2 @@ +from . import ir_cron +from . import queue_job diff --git a/queue_job_cron_jobrunner/models/ir_cron.py b/queue_job_cron_jobrunner/models/ir_cron.py new file mode 100644 index 0000000000..61235dc45a --- /dev/null +++ b/queue_job_cron_jobrunner/models/ir_cron.py @@ -0,0 +1,13 @@ +# Copyright 2022 Camptocamp SA (https://www.camptocamp.com). +# @author Iván Todorovich +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import fields, models + + +class IrCron(models.Model): + _inherit = "ir.cron" + + queue_job_runner = fields.Boolean( + help="If checked, the cron is considered to be a queue.job runner.", + ) diff --git a/queue_job_cron_jobrunner/models/queue_job.py b/queue_job_cron_jobrunner/models/queue_job.py new file mode 100644 index 0000000000..6a374fcb22 --- /dev/null +++ b/queue_job_cron_jobrunner/models/queue_job.py @@ -0,0 +1,164 @@ +# Copyright 2022 Camptocamp SA (https://www.camptocamp.com). +# @author Iván Todorovich +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +import logging +import traceback +from io import StringIO + +from psycopg2 import OperationalError + +from odoo import _, api, models, tools +from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY + +from odoo.addons.queue_job.controllers.main import PG_RETRY +from odoo.addons.queue_job.exception import ( + FailedJobError, + NothingToDoJob, + RetryableJobError, +) +from odoo.addons.queue_job.job import Job + +_logger = logging.getLogger(__name__) + + +class QueueJob(models.Model): + _inherit = "queue.job" + + @api.model + def _acquire_one_job(self): + """Acquire the next job to be run. + + :returns: queue.job record (locked for update) + """ + # TODO: This method should respect channel priority and capacity, + # rather than just fetching them by creation date. + self.flush() + self.env.cr.execute( + """ + SELECT id + FROM queue_job + WHERE state = 'pending' + AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC')) + ORDER BY date_created DESC + LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED + """ + ) + row = self.env.cr.fetchone() + return self.browse(row and row[0]) + + def _process(self, commit=False): + """Process the job""" + self.ensure_one() + job = Job._load_from_db_record(self) + # Set it as started + job.set_started() + job.store() + _logger.debug("%s started", job.uuid) + # TODO: Commit the state change so that the state can be read from the UI + # while the job is processing. However, doing this will release the + # lock on the db, so we need to find another way. + # if commit: + # self.flush() + # self.env.cr.commit() + + # Actual processing + try: + try: + with self.env.cr.savepoint(): + job.perform() + job.set_done() + job.store() + except OperationalError as err: + # Automatically retry the typical transaction serialization errors + if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: + raise + message = tools.ustr(err.pgerror, errors="replace") + job.postpone(result=message, seconds=PG_RETRY) + job.set_pending(reset_retry=False) + job.store() + _logger.debug("%s OperationalError, postponed", job) + + except NothingToDoJob as err: + if str(err): + msg = str(err) + else: + msg = _("Job interrupted and set to Done: nothing to do.") + job.set_done(msg) + job.store() + + except RetryableJobError as err: + # delay the job later, requeue + job.postpone(result=str(err), seconds=5) + job.set_pending(reset_retry=False) + job.store() + _logger.debug("%s postponed", job) + + except (FailedJobError, Exception): + with StringIO() as buff: + traceback.print_exc(file=buff) + _logger.error(buff.getvalue()) + job.set_failed(exc_info=buff.getvalue()) + job.store() + + if commit: # pragma: no cover + self.env["base"].flush() + self.env.cr.commit() # pylint: disable=invalid-commit + + @api.model + def _job_runner(self, commit=True): + """Short-lived job runner, triggered by async crons""" + job = self._acquire_one_job() + while job: + job._process(commit=commit) + job = self._acquire_one_job() + # TODO: If limit_time_real_cron is reached before all the jobs are done, + # the worker will be killed abruptly. + # Ideally, find a way to know if we're close to reaching this limit, + # stop processing, and trigger a new execution to continue. + # + # if job and limit_time_real_cron_reached_or_about_to_reach: + # self._cron_trigger() + # break + + @api.model + def _cron_trigger(self, at=None): + """Trigger the cron job runners + + Odoo will prevent concurrent cron jobs from running. + So, to support parallel execution, we'd need to have (at least) the + same number of ir.crons records as cron workers. + + All crons should be triggered at the same time. + """ + crons = self.env["ir.cron"].sudo().search([("queue_job_runner", "=", True)]) + for cron in crons: + cron._trigger(at=at) + + def _ensure_cron_trigger(self): + """Create cron triggers for these jobs""" + records = self.filtered(lambda r: r.state == "pending") + if not records: + return + # Trigger immediate runs + immediate = any(not rec.eta for rec in records) + if immediate: + self._cron_trigger() + # Trigger delayed eta runs + delayed_etas = {rec.eta for rec in records if rec.eta} + if delayed_etas: + self._cron_trigger(at=list(delayed_etas)) + + @api.model_create_multi + def create(self, vals_list): + # When jobs are created, also create the cron trigger + records = super().create(vals_list) + records._ensure_cron_trigger() + return records + + def write(self, vals): + # When a job state or eta changes, make sure a cron trigger is created + res = super().write(vals) + if "state" in vals or "eta" in vals: + self._ensure_cron_trigger() + return res diff --git a/queue_job_cron_jobrunner/readme/CONFIGURE.rst b/queue_job_cron_jobrunner/readme/CONFIGURE.rst new file mode 100644 index 0000000000..0de15596d2 --- /dev/null +++ b/queue_job_cron_jobrunner/readme/CONFIGURE.rst @@ -0,0 +1,21 @@ +.. warning:: + + Don't use this module if you're already running the regular ``queue_job`` runner. + + +For the easiest case, no configuration is required besides installing the module. + +To avoid CronWorker CPU timeout from abruptly stopping the job processing cron, it's +recommended to launch Odoo with ``--limit-time-real-cron=0``, to disable the CronWorker +timeout altogether. + +.. note:: + + In Odoo.sh, this is done by default. + + +Parallel execution of jobs can be achieved by leveraging multiple ``ir.cron`` records: + +* Make sure you have enough CronWorkers available (Odoo CLI ``--max-cron-threads``) +* Duplicate the ``queue_job_cron`` cron record as many times as needed, until you have + as much records as cron workers. diff --git a/queue_job_cron_jobrunner/readme/CONTRIBUTORS.rst b/queue_job_cron_jobrunner/readme/CONTRIBUTORS.rst new file mode 100644 index 0000000000..59b447f28a --- /dev/null +++ b/queue_job_cron_jobrunner/readme/CONTRIBUTORS.rst @@ -0,0 +1,3 @@ +* `Camptocamp `_ + + * Iván Todorovich diff --git a/queue_job_cron_jobrunner/readme/DESCRIPTION.rst b/queue_job_cron_jobrunner/readme/DESCRIPTION.rst new file mode 100644 index 0000000000..e713aa89a9 --- /dev/null +++ b/queue_job_cron_jobrunner/readme/DESCRIPTION.rst @@ -0,0 +1,14 @@ +This module implements a simple ``queue.job`` runner using ``ir.cron`` triggers. + +It's meant to be used on environments where the regular job runner can't be run, like +on Odoo.sh. + +Unlike the regular job runner, where jobs are dispatched to the HttpWorkers, jobs are +processed on the CronWorker threads by the job runner crons. This is a design decision +because: + +* Odoo.sh puts HttpWorkers to sleep when there's no network activity +* HttpWorkers are meant for traffic. Users shouldn't pay the price of background tasks. + +For now, it only implements the most basic features of the ``queue_job`` runner, notably +no channel capacity nor priorities. Please check the ROADMAP for further details. diff --git a/queue_job_cron_jobrunner/readme/ROADMAP.rst b/queue_job_cron_jobrunner/readme/ROADMAP.rst new file mode 100644 index 0000000000..b82e199202 --- /dev/null +++ b/queue_job_cron_jobrunner/readme/ROADMAP.rst @@ -0,0 +1,3 @@ +* Support channel capacity and priority. (See ``_acquire_one_job``) +* Gracefully handle CronWorker CPU timeouts. (See ``_job_runner``) +* Commit transaction after job state updated to started. (See ``_process``) diff --git a/queue_job_cron_jobrunner/tests/__init__.py b/queue_job_cron_jobrunner/tests/__init__.py new file mode 100644 index 0000000000..42bd479ed0 --- /dev/null +++ b/queue_job_cron_jobrunner/tests/__init__.py @@ -0,0 +1 @@ +from . import test_queue_job diff --git a/queue_job_cron_jobrunner/tests/test_queue_job.py b/queue_job_cron_jobrunner/tests/test_queue_job.py new file mode 100644 index 0000000000..3798e63778 --- /dev/null +++ b/queue_job_cron_jobrunner/tests/test_queue_job.py @@ -0,0 +1,54 @@ +# Copyright 2022 Camptocamp SA (https://www.camptocamp.com). +# @author Iván Todorovich +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from datetime import timedelta + +from freezegun import freeze_time + +from odoo import fields +from odoo.tests.common import TransactionCase +from odoo.tools import mute_logger + + +class TestQueueJob(TransactionCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.env = cls.env(context=dict(cls.env.context, tracking_disable=True)) + cls.cron = cls.env.ref("queue_job_cron_jobrunner.queue_job_cron") + # Cleanup triggers just in case + cls.env["ir.cron.trigger"].search([]).unlink() + + def assertTriggerAt(self, at, message=None): + """Ensures a cron trigger is created at the given time""" + return self.assertTrue( + self.env["ir.cron.trigger"].search([("call_at", "=", at)]), + message, + ) + + @freeze_time("2022-02-22 22:22:22") + def test_queue_job_cron_trigger(self): + """Test that ir.cron triggers are created for every queue.job""" + job = self.env["res.partner"].with_delay().create({"name": "test"}) + job_record = job.db_record() + self.assertTriggerAt(fields.Datetime.now(), "Trigger should've been created") + job_record.eta = fields.Datetime.now() + timedelta(hours=1) + self.assertTriggerAt(job_record.eta, "A new trigger should've been created") + + @mute_logger("odoo.addons.queue_job_cron_jobrunner.models.queue_job") + def test_queue_job_process(self): + """Test that jobs are processed by the queue job cron""" + # Create some jobs + job1 = self.env["res.partner"].with_delay().create({"name": "test"}) + job1_record = job1.db_record() + job2 = self.env["res.partner"].with_delay().create(False) + job2_record = job2.db_record() + job3 = self.env["res.partner"].with_delay(eta=3600).create({"name": "Test"}) + job3_record = job3.db_record() + # Run the job processing cron + self.env["queue.job"]._job_runner(commit=False) + # Check that the jobs were processed + self.assertEqual(job1_record.state, "done", "Processed OK") + self.assertEqual(job2_record.state, "failed", "Has errors") + self.assertEqual(job3_record.state, "pending", "Still pending, because of eta") diff --git a/queue_job_cron_jobrunner/views/ir_cron.xml b/queue_job_cron_jobrunner/views/ir_cron.xml new file mode 100644 index 0000000000..f1fe6a7455 --- /dev/null +++ b/queue_job_cron_jobrunner/views/ir_cron.xml @@ -0,0 +1,22 @@ + + + + + + ir.cron + + + + + + + + + diff --git a/setup/queue_job_cron_jobrunner/odoo/addons/queue_job_cron_jobrunner b/setup/queue_job_cron_jobrunner/odoo/addons/queue_job_cron_jobrunner new file mode 120000 index 0000000000..9ec9686861 --- /dev/null +++ b/setup/queue_job_cron_jobrunner/odoo/addons/queue_job_cron_jobrunner @@ -0,0 +1 @@ +../../../../queue_job_cron_jobrunner \ No newline at end of file diff --git a/setup/queue_job_cron_jobrunner/setup.py b/setup/queue_job_cron_jobrunner/setup.py new file mode 100644 index 0000000000..28c57bb640 --- /dev/null +++ b/setup/queue_job_cron_jobrunner/setup.py @@ -0,0 +1,6 @@ +import setuptools + +setuptools.setup( + setup_requires=['setuptools-odoo'], + odoo_addon=True, +)