Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions queue_job_cron_jobrunner/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TO BE GENERATED AUTOMATICALLY
1 change: 1 addition & 0 deletions queue_job_cron_jobrunner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import models
16 changes: 16 additions & 0 deletions queue_job_cron_jobrunner/__manifest__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "Queue Job Cron Jobrunner",
"summary": "Run jobs without a dedicated JobRunner",
"version": "15.0.1.0.0",
"development_status": "Alpha",
"author": "Camptocamp SA, Odoo Community Association (OCA)",
"maintainers": ["ivantodorovich"],
"website": "https://github.com/OCA/queue",
"license": "AGPL-3",
"category": "Others",
"depends": ["queue_job"],
"data": [
"data/ir_cron.xml",
"views/ir_cron.xml",
],
}
16 changes: 16 additions & 0 deletions queue_job_cron_jobrunner/data/ir_cron.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="utf-8" ?>
<odoo noupdate="1">

<record id="queue_job_cron" model="ir.cron">
<field name="name">Queue Job Runner</field>
<field name="model_id" ref="queue_job.model_queue_job" />
<field name="state">code</field>
<field name="code">model._job_runner()</field>
<field name="queue_job_runner" eval="True" />
<field name="user_id" ref="base.user_root" />
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="numbercall">-1</field>
</record>

</odoo>
2 changes: 2 additions & 0 deletions queue_job_cron_jobrunner/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from . import ir_cron
from . import queue_job
13 changes: 13 additions & 0 deletions queue_job_cron_jobrunner/models/ir_cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2022 Camptocamp SA (https://www.camptocamp.com).
# @author Iván Todorovich <ivan.todorovich@camptocamp.com>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from odoo import fields, models


class IrCron(models.Model):
_inherit = "ir.cron"

queue_job_runner = fields.Boolean(
help="If checked, the cron is considered to be a queue.job runner.",
)
164 changes: 164 additions & 0 deletions queue_job_cron_jobrunner/models/queue_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Copyright 2022 Camptocamp SA (https://www.camptocamp.com).
# @author Iván Todorovich <ivan.todorovich@camptocamp.com>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import logging
import traceback
from io import StringIO

from psycopg2 import OperationalError

from odoo import _, api, models, tools
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from odoo.addons.queue_job.controllers.main import PG_RETRY
from odoo.addons.queue_job.exception import (
FailedJobError,
NothingToDoJob,
RetryableJobError,
)
from odoo.addons.queue_job.job import Job

_logger = logging.getLogger(__name__)


class QueueJob(models.Model):
_inherit = "queue.job"

@api.model
def _acquire_one_job(self):
"""Acquire the next job to be run.

:returns: queue.job record (locked for update)
"""
# TODO: This method should respect channel priority and capacity,
# rather than just fetching them by creation date.
self.flush()
self.env.cr.execute(
"""
SELECT id
FROM queue_job
WHERE state = 'pending'
AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC'))
ORDER BY date_created DESC
LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED
"""
)
row = self.env.cr.fetchone()
return self.browse(row and row[0])

def _process(self, commit=False):
"""Process the job"""
self.ensure_one()
job = Job._load_from_db_record(self)
# Set it as started
job.set_started()
job.store()
_logger.debug("%s started", job.uuid)
# TODO: Commit the state change so that the state can be read from the UI
# while the job is processing. However, doing this will release the
# lock on the db, so we need to find another way.
# if commit:
# self.flush()
# self.env.cr.commit()

# Actual processing
try:
try:
with self.env.cr.savepoint():
job.perform()
job.set_done()
job.store()
except OperationalError as err:
# Automatically retry the typical transaction serialization errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise
message = tools.ustr(err.pgerror, errors="replace")
job.postpone(result=message, seconds=PG_RETRY)
job.set_pending(reset_retry=False)
job.store()
_logger.debug("%s OperationalError, postponed", job)

except NothingToDoJob as err:
if str(err):
msg = str(err)
else:
msg = _("Job interrupted and set to Done: nothing to do.")
job.set_done(msg)
job.store()

except RetryableJobError as err:
# delay the job later, requeue
job.postpone(result=str(err), seconds=5)
job.set_pending(reset_retry=False)
job.store()
_logger.debug("%s postponed", job)

except (FailedJobError, Exception):
with StringIO() as buff:
traceback.print_exc(file=buff)
_logger.error(buff.getvalue())
job.set_failed(exc_info=buff.getvalue())
job.store()

if commit: # pragma: no cover
self.env["base"].flush()
self.env.cr.commit() # pylint: disable=invalid-commit

@api.model
def _job_runner(self, commit=True):
"""Short-lived job runner, triggered by async crons"""
job = self._acquire_one_job()
while job:
job._process(commit=commit)
job = self._acquire_one_job()
# TODO: If limit_time_real_cron is reached before all the jobs are done,
# the worker will be killed abruptly.
# Ideally, find a way to know if we're close to reaching this limit,
# stop processing, and trigger a new execution to continue.
#
# if job and limit_time_real_cron_reached_or_about_to_reach:
# self._cron_trigger()
# break

@api.model
def _cron_trigger(self, at=None):
"""Trigger the cron job runners

Odoo will prevent concurrent cron jobs from running.
So, to support parallel execution, we'd need to have (at least) the
same number of ir.crons records as cron workers.

All crons should be triggered at the same time.
"""
crons = self.env["ir.cron"].sudo().search([("queue_job_runner", "=", True)])
for cron in crons:
cron._trigger(at=at)

def _ensure_cron_trigger(self):
"""Create cron triggers for these jobs"""
records = self.filtered(lambda r: r.state == "pending")
if not records:
return
# Trigger immediate runs
immediate = any(not rec.eta for rec in records)
if immediate:
self._cron_trigger()
# Trigger delayed eta runs
delayed_etas = {rec.eta for rec in records if rec.eta}
if delayed_etas:
self._cron_trigger(at=list(delayed_etas))

@api.model_create_multi
def create(self, vals_list):
# When jobs are created, also create the cron trigger
records = super().create(vals_list)
records._ensure_cron_trigger()
return records

def write(self, vals):
# When a job state or eta changes, make sure a cron trigger is created
res = super().write(vals)
if "state" in vals or "eta" in vals:
self._ensure_cron_trigger()
return res
21 changes: 21 additions & 0 deletions queue_job_cron_jobrunner/readme/CONFIGURE.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.. warning::

Don't use this module if you're already running the regular ``queue_job`` runner.


For the easiest case, no configuration is required besides installing the module.

To avoid CronWorker CPU timeout from abruptly stopping the job processing cron, it's
recommended to launch Odoo with ``--limit-time-real-cron=0``, to disable the CronWorker
timeout altogether.

.. note::

In Odoo.sh, this is done by default.


Parallel execution of jobs can be achieved by leveraging multiple ``ir.cron`` records:

* Make sure you have enough CronWorkers available (Odoo CLI ``--max-cron-threads``)
* Duplicate the ``queue_job_cron`` cron record as many times as needed, until you have
as much records as cron workers.
3 changes: 3 additions & 0 deletions queue_job_cron_jobrunner/readme/CONTRIBUTORS.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* `Camptocamp <https://www.camptocamp.com>`_

* Iván Todorovich <ivan.todorovich@camptocamp.com>
14 changes: 14 additions & 0 deletions queue_job_cron_jobrunner/readme/DESCRIPTION.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
This module implements a simple ``queue.job`` runner using ``ir.cron`` triggers.

It's meant to be used on environments where the regular job runner can't be run, like
on Odoo.sh.

Unlike the regular job runner, where jobs are dispatched to the HttpWorkers, jobs are
processed on the CronWorker threads by the job runner crons. This is a design decision
because:

* Odoo.sh puts HttpWorkers to sleep when there's no network activity
* HttpWorkers are meant for traffic. Users shouldn't pay the price of background tasks.

For now, it only implements the most basic features of the ``queue_job`` runner, notably
no channel capacity nor priorities. Please check the ROADMAP for further details.
3 changes: 3 additions & 0 deletions queue_job_cron_jobrunner/readme/ROADMAP.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* Support channel capacity and priority. (See ``_acquire_one_job``)
* Gracefully handle CronWorker CPU timeouts. (See ``_job_runner``)
* Commit transaction after job state updated to started. (See ``_process``)
1 change: 1 addition & 0 deletions queue_job_cron_jobrunner/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import test_queue_job
54 changes: 54 additions & 0 deletions queue_job_cron_jobrunner/tests/test_queue_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2022 Camptocamp SA (https://www.camptocamp.com).
# @author Iván Todorovich <ivan.todorovich@camptocamp.com>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from datetime import timedelta

from freezegun import freeze_time

from odoo import fields
from odoo.tests.common import TransactionCase
from odoo.tools import mute_logger


class TestQueueJob(TransactionCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.env = cls.env(context=dict(cls.env.context, tracking_disable=True))
cls.cron = cls.env.ref("queue_job_cron_jobrunner.queue_job_cron")
# Cleanup triggers just in case
cls.env["ir.cron.trigger"].search([]).unlink()

def assertTriggerAt(self, at, message=None):
"""Ensures a cron trigger is created at the given time"""
return self.assertTrue(
self.env["ir.cron.trigger"].search([("call_at", "=", at)]),
message,
)

@freeze_time("2022-02-22 22:22:22")
def test_queue_job_cron_trigger(self):
"""Test that ir.cron triggers are created for every queue.job"""
job = self.env["res.partner"].with_delay().create({"name": "test"})
job_record = job.db_record()
self.assertTriggerAt(fields.Datetime.now(), "Trigger should've been created")
job_record.eta = fields.Datetime.now() + timedelta(hours=1)
self.assertTriggerAt(job_record.eta, "A new trigger should've been created")

@mute_logger("odoo.addons.queue_job_cron_jobrunner.models.queue_job")
def test_queue_job_process(self):
"""Test that jobs are processed by the queue job cron"""
# Create some jobs
job1 = self.env["res.partner"].with_delay().create({"name": "test"})
job1_record = job1.db_record()
job2 = self.env["res.partner"].with_delay().create(False)
job2_record = job2.db_record()
job3 = self.env["res.partner"].with_delay(eta=3600).create({"name": "Test"})
job3_record = job3.db_record()
# Run the job processing cron
self.env["queue.job"]._job_runner(commit=False)
# Check that the jobs were processed
self.assertEqual(job1_record.state, "done", "Processed OK")
self.assertEqual(job2_record.state, "failed", "Has errors")
self.assertEqual(job3_record.state, "pending", "Still pending, because of eta")
22 changes: 22 additions & 0 deletions queue_job_cron_jobrunner/views/ir_cron.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="utf-8" ?>
<!--
Copyright 2022 Camptocamp SA (https://www.camptocamp.com).
@author Iván Todorovich <ivan.todorovich@camptocamp.com>
License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
-->
<odoo>

<record id="ir_cron_view_form" model="ir.ui.view">
<field name="model">ir.cron</field>
<field name="inherit_id" ref="base.ir_cron_view_form" />
<field name="arch" type="xml">
<field name="doall" position="after">
<field
name="queue_job_runner"
attrs="{'invisible': [('model_name', '!=', 'queue.job')]}"
/>
</field>
</field>
</record>

</odoo>
6 changes: 6 additions & 0 deletions setup/queue_job_cron_jobrunner/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import setuptools

setuptools.setup(
setup_requires=['setuptools-odoo'],
odoo_addon=True,
)