Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a372b90
Store dependencies
Jul 2, 2019
ccc0361
Add wait dependencies state
Jul 2, 2019
8b35d3e
Enqueue waiting jobs when parent jobs are done
Jul 3, 2019
554b139
Optimize and make enqueue of waiting jobs more robust
Jul 4, 2019
960690e
Adapt views for state wait_dependencies
Jul 5, 2019
6f03076
Add API for Delayables
Jul 5, 2019
1e965cb
Fix tests failing when test_queue_job is installed
Jul 6, 2019
47c50fc
Add widget to show job dependencies on UI
Jul 9, 2019
da4b1b6
Show the dependency widget in a tab
Jul 11, 2019
9d355fe
Add documentation on 'base' model public methods
Oct 4, 2019
26a34e1
Improve loading of dependencies using batch read
Oct 4, 2019
6dc7fbb
Use Delayable in DelayableRecordset
Oct 4, 2019
f864ec5
Add documentation
guewen May 24, 2021
2b336a2
Add a graph UUID
guewen May 26, 2021
10c2d0c
Hide some technical fields
guewen May 26, 2021
11eda8e
Ignore requeues on dependency jobs waiting on parent jobs
guewen May 26, 2021
9e3aba7
Fix lint
guewen May 27, 2021
0b71ef1
Fix warnings in tests
guewen May 27, 2021
8b904b1
Update vis-network js
guewen May 27, 2021
80da8df
Improve display of jobs graph widget
guewen May 27, 2021
4e3d9cc
Add powerful context manager for running tests on jobs
guewen May 28, 2021
2466aac
Set graph_uuid only once in DelayableGraph
guewen May 30, 2021
7b8c876
Add docstrings on the new delayable classes
guewen May 30, 2021
97a2677
Add option to generate a graph in create_test_job controller
guewen Jul 1, 2021
68cf0c0
Improve graph widget
guewen Jul 1, 2021
5329ce7
Add a smart button to open all the jobs of a graph
guewen Jul 1, 2021
4036a4b
ix duplicate label
guewen Jul 1, 2021
574e2d3
Fix graph widget now showing title as HTML
guewen Jul 1, 2021
801054b
Improve graph widget performance
guewen Jul 1, 2021
1d9a32c
Escape strings passed to the graph js widget
guewen Jul 21, 2021
c769330
Fix things required by odoo 14.0 or python 3.9
guewen Oct 26, 2022
be68c63
Add documentation about handling of failures in a graph
guewen Jan 29, 2022
1082ac2
Fix equality of enqueued jobs
guewen Feb 2, 2022
18a99cc
Use a python3.6 compatible data class
guewen Feb 2, 2022
96a874a
Rename mock_jobs() to trap_jobs()
guewen Feb 3, 2022
3191890
Rename dependency method done() to on_done()
guewen Feb 3, 2022
99970d2
Migrate queue job graph/dependencies to 15.0
guewen Oct 24, 2022
4e22318
Apply pre-commit on migration of jobs graph
guewen Oct 24, 2022
10253b4
Use new TransactionCase instead of deprecated SavepointCase
guewen Oct 24, 2022
2bf26cf
Add enqueue of graph dependencies in queue_job_cron_jobrunner
guewen Nov 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Job Queue
:target: https://runbot.odoo-community.org/runbot/230/15.0
:alt: Try me on Runbot

|badge1| |badge2| |badge3| |badge4| |badge5|
|badge1| |badge2| |badge3| |badge4| |badge5|

This addon adds an integrated Job Queue to Odoo.

Expand Down Expand Up @@ -385,7 +385,7 @@ promote its widespread use.

Current `maintainer <https://odoo-community.org/page/maintainer-role>`__:

|maintainer-guewen|
|maintainer-guewen|

This module is part of the `OCA/queue <https://github.com/OCA/queue/tree/15.0/queue_job>`_ project on GitHub.

Expand Down
9 changes: 8 additions & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)


{
"name": "Job Queue",
"version": "15.0.1.1.0",
Expand All @@ -23,6 +22,14 @@
"data/queue_data.xml",
"data/queue_job_function_data.xml",
],
"assets": {
"web.assets_backend": [
"/queue_job/static/lib/vis/vis-network.min.css",
"/queue_job/static/src/scss/queue_job_fields.scss",
"/queue_job/static/lib/vis/vis-network.min.js",
"/queue_job/static/src/js/queue_job_fields.js",
],
},
"installable": True,
"development_status": "Mature",
"maintainers": ["guewen"],
Expand Down
148 changes: 143 additions & 5 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging
import random
import time
import traceback
from io import StringIO

from psycopg2 import OperationalError
from werkzeug.exceptions import Forbidden
from psycopg2 import OperationalError, errorcodes
from werkzeug.exceptions import BadRequest, Forbidden

from odoo import SUPERUSER_ID, _, api, http, registry, tools
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from ..delay import chain, group
from ..exception import FailedJobError, NothingToDoJob, RetryableJobError
from ..job import ENQUEUED, Job

_logger = logging.getLogger(__name__)

PG_RETRY = 5 # seconds

DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5


class RunJobController(http.Controller):
def _try_perform_job(self, env, job):
Expand All @@ -35,6 +40,35 @@ def _try_perform_job(self, env, job):
env.cr.commit()
_logger.debug("%s done", job)

def _enqueue_dependent_jobs(self, env, job):
tries = 0
while True:
try:
job.enqueue_waiting()
except OperationalError as err:
# Automatically retry the typical transaction serialization
# errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise
if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE:
_logger.info(
"%s, maximum number of tries reached to update dependencies",
errorcodes.lookup(err.pgcode),
)
raise
wait_time = random.uniform(0.0, 2**tries)
tries += 1
_logger.info(
"%s, retry %d/%d in %.04f sec...",
errorcodes.lookup(err.pgcode),
tries,
DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE,
wait_time,
)
time.sleep(wait_time)
else:
break

@http.route("/queue_job/runjob", type="http", auth="none", save_session=False)
def runjob(self, db, job_uuid, **kw):
http.request.session.db = db
Expand Down Expand Up @@ -111,6 +145,10 @@ def retry_postpone(job, message, seconds=None):
buff.close()
raise

_logger.debug("%s enqueue depends started", job)
self._enqueue_dependent_jobs(env, job)
_logger.debug("%s enqueue depends done", job)

return ""

def _get_failure_values(self, job, traceback_txt, orig_exception):
Expand All @@ -125,13 +163,35 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):
"exc_message": exc_message,
}

# flake8: noqa: C901
@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
self, priority=None, max_retries=None, channel=None, description="Test job"
self,
priority=None,
max_retries=None,
channel=None,
description="Test job",
size=1,
failure_rate=0,
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))

if failure_rate is not None:
try:
failure_rate = float(failure_rate)
except (ValueError, TypeError):
failure_rate = 0

if not (0 <= failure_rate <= 1):
raise BadRequest("failure_rate must be between 0 and 1")

if size is not None:
try:
size = int(size)
except (ValueError, TypeError):
size = 1

if priority is not None:
try:
priority = int(priority)
Expand All @@ -144,6 +204,35 @@ def create_test_job(
except ValueError:
max_retries = None

if size == 1:
return self._create_single_test_job(
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
failure_rate=failure_rate,
)

if size > 1:
return self._create_graph_test_jobs(
size,
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
failure_rate=failure_rate,
)
return ""

def _create_single_test_job(
self,
priority=None,
max_retries=None,
channel=None,
description="Test job",
size=1,
failure_rate=0,
):
delayed = (
http.request.env["queue.job"]
.with_delay(
Expand All @@ -152,7 +241,56 @@ def create_test_job(
channel=channel,
description=description,
)
._test_job()
._test_job(failure_rate=failure_rate)
)
return "job uuid: %s" % (delayed.db_record().uuid,)

TEST_GRAPH_MAX_PER_GROUP = 5

return delayed.db_record().uuid
def _create_graph_test_jobs(
self,
size,
priority=None,
max_retries=None,
channel=None,
description="Test job",
failure_rate=0,
):
model = http.request.env["queue.job"]
current_count = 0

possible_grouping_methods = (chain, group)

tails = [] # we can connect new graph chains/groups to tails
root_delayable = None
while current_count < size:
jobs_count = min(
size - current_count, random.randint(1, self.TEST_GRAPH_MAX_PER_GROUP)
)

jobs = []
for __ in range(jobs_count):
current_count += 1
jobs.append(
model.delayable(
priority=priority,
max_retries=max_retries,
channel=channel,
description="%s #%d" % (description, current_count),
)._test_job(failure_rate=failure_rate)
)

grouping = random.choice(possible_grouping_methods)
delayable = grouping(*jobs)
if not root_delayable:
root_delayable = delayable
else:
tail_delayable = random.choice(tails)
tail_delayable.on_done(delayable)
tails.append(delayable)

root_delayable.delay()

return "graph uuid: %s" % (
list(root_delayable._head())[0]._generated_job.graph_uuid,
)
Loading