Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
cafe6af
Store dependencies
Jul 2, 2019
6d37763
Add wait dependencies state
Jul 2, 2019
e5484a7
Enqueue waiting jobs when parent jobs are done
Jul 3, 2019
da4a6f5
Optimize and make enqueue of waiting jobs more robust
Jul 4, 2019
66a868a
Adapt views for state wait_dependencies
Jul 5, 2019
5284431
Add API for Delayables
Jul 5, 2019
f0335d6
Fix tests failing when test_queue_job is installed
Jul 6, 2019
bd55012
Add widget to show job dependencies on UI
Jul 9, 2019
be53aa6
Show the dependency widget in a tab
Jul 11, 2019
711f309
Add documentation on 'base' model public methods
Oct 4, 2019
ddcb254
Improve loading of dependencies using batch read
Oct 4, 2019
ac22bd5
Use Delayable in DelayableRecordset
Oct 4, 2019
f71749f
Add documentation
guewen May 24, 2021
0dbcf3e
Add a graph UUID
guewen May 26, 2021
8188d80
Hide some technical fields
guewen May 26, 2021
ae057d3
Ignore requeues on dependency jobs waiting on parent jobs
guewen May 26, 2021
b36e5bc
Fix lint
guewen May 27, 2021
ee0ec6a
Fix warnings in tests
guewen May 27, 2021
6e71617
Update vis-network js
guewen May 27, 2021
6f0ab15
Improve display of jobs graph widget
guewen May 27, 2021
b935714
Add powerful context manager for running tests on jobs
guewen May 28, 2021
1808ec4
Set graph_uuid only once in DelayableGraph
guewen May 30, 2021
bb97188
Add docstrings on the new delayable classes
guewen May 30, 2021
45bf662
Add option to generate a graph in create_test_job controller
guewen Jul 1, 2021
4b79962
Improve graph widget
guewen Jul 1, 2021
36df139
Add a smart button to open all the jobs of a graph
guewen Jul 1, 2021
26b7d4f
ix duplicate label
guewen Jul 1, 2021
92fe9fc
Fix graph widget now showing title as HTML
guewen Jul 1, 2021
dbae1bf
Improve graph widget performance
guewen Jul 1, 2021
68d275a
Escape strings passed to the graph js widget
guewen Jul 21, 2021
4ba6f21
Fix lint
guewen Jul 21, 2021
fa284de
Improve mock_jobs documentation
guewen Jan 29, 2022
ca58edd
Run pre-commit -a
guewen Jan 29, 2022
add15a7
Add docstring for create_test_job controller
guewen Jan 29, 2022
e07bd8d
Fix things required by odoo 14.0 or python 3.9
guewen Jan 29, 2022
089f28c
Add documentation about handling of failures in a graph
guewen Jan 29, 2022
e9ac3e3
Fix equality of enqueued jobs
guewen Feb 2, 2022
d73087d
Use a python3.6 compatible data class
guewen Feb 2, 2022
f24820d
Rename mock_jobs() to trap_jobs()
guewen Feb 3, 2022
7702849
Rename dependency method done() to on_done()
guewen Feb 3, 2022
da9133a
Make black happy
guewen May 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Job Queue
:target: https://runbot.odoo-community.org/runbot/230/14.0
:alt: Try me on Runbot

|badge1| |badge2| |badge3| |badge4| |badge5|
|badge1| |badge2| |badge3| |badge4| |badge5|

This addon adds an integrated Job Queue to Odoo.

Expand Down Expand Up @@ -409,7 +409,7 @@ promote its widespread use.

Current `maintainer <https://odoo-community.org/page/maintainer-role>`__:

|maintainer-guewen|
|maintainer-guewen|

This module is part of the `OCA/queue <https://github.com/OCA/queue/tree/14.0/queue_job>`_ project on GitHub.

Expand Down
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)


{
"name": "Job Queue",
"version": "14.0.2.4.0",
Expand All @@ -13,6 +12,7 @@
"data": [
"security/security.xml",
"security/ir.model.access.csv",
"views/queue_job_assets.xml",
"views/queue_job_views.xml",
"views/queue_job_channel_views.xml",
"views/queue_job_function_views.xml",
Expand Down
158 changes: 153 additions & 5 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging
import random
import time
import traceback
from io import StringIO

from psycopg2 import OperationalError
from werkzeug.exceptions import Forbidden
from psycopg2 import OperationalError, errorcodes
from werkzeug.exceptions import BadRequest, Forbidden

import odoo
from odoo import _, http, tools
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from ..delay import chain, group
from ..exception import FailedJobError, NothingToDoJob, RetryableJobError
from ..job import ENQUEUED, Job

_logger = logging.getLogger(__name__)

PG_RETRY = 5 # seconds

DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5


class RunJobController(http.Controller):
def _try_perform_job(self, env, job):
Expand All @@ -36,6 +41,35 @@ def _try_perform_job(self, env, job):
env.cr.commit()
_logger.debug("%s done", job)

def _enqueue_dependent_jobs(self, env, job):
tries = 0
while True:
try:
job.enqueue_waiting()
except OperationalError as err:
# Automatically retry the typical transaction serialization
# errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise
if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE:
_logger.info(
"%s, maximum number of tries reached to update dependencies",
errorcodes.lookup(err.pgcode),
)
raise
wait_time = random.uniform(0.0, 2**tries)
tries += 1
_logger.info(
"%s, retry %d/%d in %.04f sec...",
errorcodes.lookup(err.pgcode),
tries,
DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE,
wait_time,
)
time.sleep(wait_time)
else:
break

@http.route("/queue_job/runjob", type="http", auth="none", save_session=False)
def runjob(self, db, job_uuid, **kw):
http.request.session.db = db
Expand Down Expand Up @@ -116,6 +150,10 @@ def retry_postpone(job, message, seconds=None):
buff.close()
raise

_logger.debug("%s enqueue depends started", job)
self._enqueue_dependent_jobs(env, job)
_logger.debug("%s enqueue depends done", job)

return ""

def _get_failure_values(self, job, traceback_txt, orig_exception):
Expand All @@ -132,11 +170,42 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):

@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
self, priority=None, max_retries=None, channel=None, description="Test job"
self,
priority=None,
max_retries=None,
channel=None,
description="Test job",
size=1,
failure_rate=0,
):
"""Create test jobs

Examples of urls:

* http://127.0.0.1:8069/queue_job/create_test_job: single job
* http://127.0.0.1:8069/queue_job/create_test_job?size=10: a graph of 10 jobs
* http://127.0.0.1:8069/queue_job/create_test_job?size=10&failure_rate=0.5:
a graph of 10 jobs, half will fail

"""
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))

if failure_rate is not None:
try:
failure_rate = float(failure_rate)
except (ValueError, TypeError):
failure_rate = 0

if not (0 <= failure_rate <= 1):
raise BadRequest("failure_rate must be between 0 and 1")

if size is not None:
try:
size = int(size)
except (ValueError, TypeError):
size = 1

if priority is not None:
try:
priority = int(priority)
Expand All @@ -149,6 +218,35 @@ def create_test_job(
except ValueError:
max_retries = None

if size == 1:
return self._create_single_test_job(
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
failure_rate=failure_rate,
)

if size > 1:
return self._create_graph_test_jobs(
size,
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
failure_rate=failure_rate,
)
return ""

def _create_single_test_job(
self,
priority=None,
max_retries=None,
channel=None,
description="Test job",
size=1,
failure_rate=0,
):
delayed = (
http.request.env["queue.job"]
.with_delay(
Expand All @@ -157,7 +255,57 @@ def create_test_job(
channel=channel,
description=description,
)
._test_job()
._test_job(failure_rate=failure_rate)
)
return "job uuid: %s" % (delayed.db_record().uuid,)

TEST_GRAPH_MAX_PER_GROUP = 5

return delayed.db_record().uuid
# flake8: noqa: C901
def _create_graph_test_jobs(
self,
size,
priority=None,
max_retries=None,
channel=None,
description="Test job",
failure_rate=0,
):
model = http.request.env["queue.job"]
current_count = 0

possible_grouping_methods = (chain, group)

tails = [] # we can connect new graph chains/groups to tails
root_delayable = None
while current_count < size:
jobs_count = min(
size - current_count, random.randint(1, self.TEST_GRAPH_MAX_PER_GROUP)
)

jobs = []
for __ in range(jobs_count):
current_count += 1
jobs.append(
model.delayable(
priority=priority,
max_retries=max_retries,
channel=channel,
description="%s #%d" % (description, current_count),
)._test_job(failure_rate=failure_rate)
)

grouping = random.choice(possible_grouping_methods)
delayable = grouping(*jobs)
if not root_delayable:
root_delayable = delayable
else:
tail_delayable = random.choice(tails)
tail_delayable.on_done(delayable)
tails.append(delayable)

root_delayable.delay()

return "graph uuid: %s" % (
list(root_delayable._head())[0]._generated_job.graph_uuid,
)
Loading