Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
298429e
Store dependencies
Jul 2, 2019
2b3ab77
Add wait dependencies state
Jul 2, 2019
0ab5933
Enqueue waiting jobs when parent jobs are done
Jul 3, 2019
27f10a2
Optimize and make enqueue of waiting jobs more robust
Jul 4, 2019
c02c2e8
Adapt views for state wait_dependencies
Jul 5, 2019
d582118
Add API for Delayables
Jul 5, 2019
49c65ed
Fix tests failing when test_queue_job is installed
Jul 6, 2019
3264dcd
Add widget to show job dependencies on UI
Jul 9, 2019
2c64751
Show the dependency widget in a tab
Jul 11, 2019
4b9f0ba
Add documentation on 'base' model public methods
Oct 4, 2019
84ae3c5
Improve loading of dependencies using batch read
Oct 4, 2019
d6525a5
Use Delayable in DelayableRecordset
Oct 4, 2019
0acf5ae
Add documentation
guewen May 24, 2021
aa38952
Add a graph UUID
guewen May 26, 2021
124594e
Hide some technical fields
guewen May 26, 2021
ac0cb21
Ignore requeues on dependency jobs waiting on parent jobs
guewen May 26, 2021
780b80e
Fix lint
guewen May 27, 2021
e3b7a14
Fix warnings in tests
guewen May 27, 2021
e1a387e
Update vis-network js
guewen May 27, 2021
2c64b65
Improve display of jobs graph widget
guewen May 27, 2021
9876e66
Add powerful context manager for running tests on jobs
guewen May 28, 2021
00ff197
Set graph_uuid only once in DelayableGraph
guewen May 30, 2021
3d947db
Add docstrings on the new delayable classes
guewen May 30, 2021
c0a5af2
Add option to generate a graph in create_test_job controller
guewen Jul 1, 2021
4cab30f
Improve graph widget
guewen Jul 1, 2021
f595d2d
Add a smart button to open all the jobs of a graph
guewen Jul 1, 2021
cb3e7d2
Fix duplicate label
guewen Jul 1, 2021
8c3a964
Fix graph widget now showing title as HTML
guewen Jul 1, 2021
1468a8c
Improve graph widget performance
guewen Jul 1, 2021
5ffe5ee
Escape strings passed to the graph js widget
guewen Jul 21, 2021
26a59cc
Fix lint
guewen Jul 21, 2021
5474a25
Add documentation about handling of failures in a graph
guewen Jan 29, 2022
e00b4e0
Rename mock_jobs() to trap_jobs()
guewen Feb 3, 2022
41a1fe6
Rename dependency method done() to on_done()
guewen Feb 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Job Queue
:target: https://runbot.odoo-community.org/runbot/230/12.0
:alt: Try me on Runbot

|badge1| |badge2| |badge3| |badge4| |badge5|
|badge1| |badge2| |badge3| |badge4| |badge5|

This addon adds an integrated Job Queue to Odoo.

Expand Down Expand Up @@ -90,7 +90,7 @@ Configuration

* Adjust environment variables (optional):

- ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels configuration.
- ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels configuration.
The default is ``root:1``

- if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069``
Expand Down Expand Up @@ -324,7 +324,7 @@ Changelog
understand changes between version. The primary audience is
end users and integrators. Purely technical changes such as
code refactoring must not be mentioned here.

This file may contain ONE level of section titles, underlined
with the ~ (tilde) character. Other section markers are
forbidden and will likely break the structure of the README.rst
Expand Down Expand Up @@ -416,7 +416,7 @@ promote its widespread use.

Current `maintainer <https://odoo-community.org/page/maintainer-role>`__:

|maintainer-guewen|
|maintainer-guewen|

This module is part of the `OCA/queue <https://github.com/OCA/queue/tree/12.0/queue_job>`_ project on GitHub.

Expand Down
1 change: 1 addition & 0 deletions queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
'data': ['security/security.xml',
'security/ir.model.access.csv',
'views/queue_job_views.xml',
'views/queue_job_assets.xml',
'data/queue_data.xml',
"data/queue_job_function_data.xml"],
'installable': True,
Expand Down
136 changes: 131 additions & 5 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,29 @@
# Copyright 2013-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import random
import logging
import time
import traceback
from io import StringIO

from psycopg2 import OperationalError
from werkzeug.exceptions import Forbidden
from psycopg2 import OperationalError, errorcodes
from werkzeug.exceptions import BadRequest, Forbidden

import odoo
from odoo import _, http, tools
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from ..delay import chain, group
from ..job import Job, ENQUEUED
from ..exception import (RetryableJobError, FailedJobError, NothingToDoJob)

_logger = logging.getLogger(__name__)

PG_RETRY = 5 # seconds

DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5


class RunJobController(http.Controller):

Expand All @@ -36,6 +41,36 @@ def _try_perform_job(self, env, job):
env.cr.commit()
_logger.debug('%s done', job)

def _enqueue_dependent_jobs(self, env, job):
tries = 0
while True:
try:
job.enqueue_waiting()
except OperationalError as err:
# Automatically retry the typical transaction serialization
# errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise
if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE:
_logger.info(
"%s, maximum number of tries reached to"
" update dependencies",
errorcodes.lookup(err.pgcode)
)
raise
wait_time = random.uniform(0.0, 2 ** tries)
tries += 1
_logger.info(
"%s, retry %d/%d in %.04f sec...",
errorcodes.lookup(err.pgcode),
tries,
DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE,
wait_time
)
time.sleep(wait_time)
else:
break

@http.route('/queue_job/session', type='http', auth="none")
def session(self):
"""Used by the jobrunner to spawn a session
Expand Down Expand Up @@ -130,6 +165,10 @@ def retry_postpone(job, message, seconds=None):
buff.close()
raise

_logger.debug('%s enqueue depends started', job)
self._enqueue_dependent_jobs(env, job)
_logger.debug('%s enqueue depends done', job)

return ""

def _get_failure_values(self, job, traceback_txt, orig_exception):
Expand All @@ -146,11 +185,27 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):

@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
self, priority=None, max_retries=None, channel=None, description="Test job"
self, priority=None, max_retries=None, channel=None,
description="Test job", size=1, failure_rate=0
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))

if failure_rate is not None:
try:
failure_rate = float(failure_rate)
except (ValueError, TypeError):
failure_rate = 0

if not (0 <= failure_rate <= 1):
raise BadRequest("failure_rate must be between 0 and 1")

if size is not None:
try:
size = int(size)
except (ValueError, TypeError):
size = 1

if priority is not None:
try:
priority = int(priority)
Expand All @@ -163,6 +218,31 @@ def create_test_job(
except ValueError:
max_retries = None

if size == 1:
return self._create_single_test_job(
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
failure_rate=failure_rate
)

if size > 1:
return self._create_graph_test_jobs(
size,
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
failure_rate=failure_rate
)
return ''

def _create_single_test_job(
self, priority=None, max_retries=None,
channel=None, description="Test job", size=1,
failure_rate=0
):
delayed = (
http.request.env["queue.job"]
.with_delay(
Expand All @@ -171,7 +251,53 @@ def create_test_job(
channel=channel,
description=description,
)
._test_job()
._test_job(failure_rate=failure_rate)
)
return 'job uuid: %s' % (delayed.db_record().uuid,)

return delayed.db_record().uuid
TEST_GRAPH_MAX_PER_GROUP = 5

def _create_graph_test_jobs(
self, size, priority=None, max_retries=None,
channel=None, description="Test job",
failure_rate=0
):
model = http.request.env["queue.job"]
current_count = 0

possible_grouping_methods = (chain, group)

tails = [] # we can connect new graph chains/groups to tails
root_delayable = None
while current_count < size:
jobs_count = min(
size - current_count,
random.randint(1, self.TEST_GRAPH_MAX_PER_GROUP)
)

jobs = []
for __ in range(jobs_count):
current_count += 1
jobs.append(
model.delayable(
priority=priority,
max_retries=max_retries,
channel=channel,
description="%s #%d" % (description, current_count),
)._test_job(failure_rate=failure_rate)
)

grouping = random.choice(possible_grouping_methods)
delayable = grouping(*jobs)
if not root_delayable:
root_delayable = delayable
else:
tail_delayable = random.choice(tails)
tail_delayable.on_done(delayable)
tails.append(delayable)

root_delayable.delay()

return 'graph uuid: %s' % (
list(root_delayable._head())[0]._generated_job.graph_uuid,
)
Loading