From a372b901db42ac02b60f0a04c435edc7941c4de5 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 2 Jul 2019 21:41:24 +0200 Subject: [PATCH 01/40] Store dependencies --- queue_job/job.py | 76 +++++++++++++++ queue_job/models/queue_job.py | 4 +- test_queue_job/tests/__init__.py | 1 + test_queue_job/tests/test_dependencies.py | 111 ++++++++++++++++++++++ 4 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 test_queue_job/tests/test_dependencies.py diff --git a/queue_job/job.py b/queue_job/job.py index 794c7fb030..e03e25c73b 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -7,8 +7,11 @@ import os import sys import uuid +import weakref + from datetime import datetime, timedelta from random import randint +from functools import total_ordering import odoo @@ -147,6 +150,7 @@ def identity_example(job_): return hasher.hexdigest() +@total_ordering class Job(object): """A Job is a task to execute. It is the in-memory representation of a job. @@ -315,6 +319,13 @@ def _load_from_db_record(cls, job_db_record): job_.company_id = stored.company_id.id job_.identity_key = stored.identity_key job_.worker_pid = stored.worker_pid + + job_.__depends_on_uuids.update( + stored.dependencies.get('depends_on', []) + ) + job_.__reverse_depends_on_uuids.update( + stored.dependencies.get('reverse_depends_on', []) + ) return job_ def job_record_with_same_identity_key(self): @@ -470,6 +481,11 @@ def __init__( self.args = args self.kwargs = kwargs + self.__depends_on_uuids = set() + self.__reverse_depends_on_uuids = set() + self._depends_on = set() + self._reverse_depends_on = weakref.WeakSet() + self.priority = priority if self.priority is None: self.priority = DEFAULT_PRIORITY @@ -506,6 +522,20 @@ def __init__( self.channel = channel self.worker_pid = None + def add_depends(self, jobs): + self.__depends_on_uuids |= {j.uuid for j in jobs} + self._depends_on.update(jobs) + for parent in jobs: + parent.__reverse_depends_on_uuids.add(self.uuid) + parent._reverse_depends_on.add(self) + + def add_reverse_depends(self, jobs): + self.__reverse_depends_on_uuids |= {j.uuid for j in jobs} + self._reverse_depends_on.update(jobs) + for child in jobs: + child.__depends_on_uuids.add(self.uuid) + child._depends_on.add(self) + def perform(self): """Execute the job. @@ -585,6 +615,16 @@ def _store_values(self, create=False): if self.identity_key: vals["identity_key"] = self.identity_key + dependencies = { + 'depends_on': [ + parent.uuid for parent in self.depends_on + ], + 'reverse_depends_on': [ + children.uuid for children in self.reverse_depends_on + ], + } + vals['dependencies'] = dependencies + if create: vals.update( { @@ -632,6 +672,22 @@ def func_string(self): all_args = ", ".join(args + kwargs) return "{}.{}({})".format(model, self.method_name, all_args) + def __eq__(self, other): + return self.uuid == other.uuid + + def __hash__(self): + return self.uuid.__hash__() + + def sorting_key(self): + return self.eta, self.priority, self.date_created, self.seq + + def __lt__(self, other): + if self.eta and not other.eta: + return True + elif not self.eta and other.eta: + return False + return self.sorting_key() < other.sorting_key() + def db_record(self): return self.db_record_from_uuid(self.env, self.uuid) @@ -663,6 +719,26 @@ def identity_key(self, value): self._identity_key = None self._identity_key_func = value + @property + def depends_on(self): + if not self._depends_on: + # TODO batch load instead of loop + self._depends_on = { + Job.load(self.env, parent_uuid) for parent_uuid + in self.__depends_on_uuids + } + return self._depends_on + + @property + def reverse_depends_on(self): + if not self._reverse_depends_on: + # TODO batch load instead of loop + self._reverse_depends_on = { + Job.load(self.env, child_uuid) for child_uuid + in self.__reverse_depends_on_uuids + } + return set(self._reverse_depends_on) + @property def description(self): if self._description: diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index b6f8b52a6e..693aea0a78 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -4,7 +4,8 @@ import logging from datetime import datetime, timedelta -from odoo import _, api, exceptions, fields, models +from odoo import _, api, exceptions, fields, models, tools +from odoo.addons.base_sparse_field.models.fields import Serialized from odoo.osv import expression from ..fields import JobSerialized @@ -62,6 +63,7 @@ class QueueJob(models.Model): readonly=True, base_type=models.BaseModel, ) + dependencies = Serialized(readonly=True) args = JobSerialized(readonly=True, base_type=tuple) kwargs = JobSerialized(readonly=True, base_type=dict) func_string = fields.Char(string="Task", readonly=True) diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 502a0752fd..0984a41db0 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,4 +1,5 @@ from . import test_autovacuum +from . import test_dependencies from . import test_job from . import test_job_auto_delay from . import test_job_channels diff --git a/test_queue_job/tests/test_dependencies.py b/test_queue_job/tests/test_dependencies.py new file mode 100644 index 0000000000..159e949d28 --- /dev/null +++ b/test_queue_job/tests/test_dependencies.py @@ -0,0 +1,111 @@ +# Copyright 2019 Camptocamp SA +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html) + +import odoo.tests.common as common + +from odoo.addons.queue_job.job import ( + Job, +) + + +class TestJobDependencies(common.TransactionCase): + + def setUp(self): + super().setUp() + self.queue_job = self.env['queue.job'] + self.method = self.env['test.queue.job'].testing_method + + def test_depends_store(self): + job_root = Job(self.method) + job_lvl1_a = Job(self.method) + job_lvl1_a.add_depends({job_root}) + job_lvl1_b = Job(self.method) + job_lvl1_b.add_depends({job_root}) + job_lvl2_a = Job(self.method) + job_lvl2_a.add_depends({job_lvl1_a}) + + # Jobs must be stored after the dependencies are set up. + # (Or if not, a new store must be called on the parent) + job_root.store() + job_lvl1_a.store() + job_lvl1_b.store() + job_lvl2_a.store() + + # test properties + self.assertFalse(job_root.depends_on) + + self.assertEqual(job_lvl1_a.depends_on, {job_root}) + self.assertEqual(job_lvl1_b.depends_on, {job_root}) + + self.assertEqual(job_lvl2_a.depends_on, {job_lvl1_a}) + + self.assertEqual(job_root.reverse_depends_on, {job_lvl1_a, job_lvl1_b}) + + self.assertEqual(job_lvl1_a.reverse_depends_on, {job_lvl2_a}) + self.assertFalse(job_lvl1_b.reverse_depends_on) + + self.assertFalse(job_lvl2_a.reverse_depends_on) + + # test DB state + self.assertEqual(job_root.db_record().dependencies['depends_on'], []) + self.assertEqual( + sorted(job_root.db_record().dependencies['reverse_depends_on']), + sorted([job_lvl1_a.uuid, job_lvl1_b.uuid]) + ) + + self.assertEqual( + job_lvl1_a.db_record().dependencies['depends_on'], [job_root.uuid] + ) + self.assertEqual( + job_lvl1_a.db_record().dependencies['reverse_depends_on'], + [job_lvl2_a.uuid] + ) + + self.assertEqual( + job_lvl1_b.db_record().dependencies['depends_on'], [job_root.uuid] + ) + self.assertEqual( + job_lvl1_b.db_record().dependencies['reverse_depends_on'], [] + ) + + self.assertEqual( + job_lvl2_a.db_record().dependencies['depends_on'], + [job_lvl1_a.uuid] + ) + self.assertEqual( + job_lvl2_a.db_record().dependencies['reverse_depends_on'], [] + ) + + def test_depends_store_after(self): + job_root = Job(self.method) + job_root.store() + job_a = Job(self.method) + job_a.add_depends({job_root}) + job_a.store() + + # as the reverse dependency has been added after the root job has been + # stored, it is not reflected in DB + self.assertEqual( + job_root.db_record().dependencies['reverse_depends_on'], [] + ) + + # a new store will write it + job_root.store() + self.assertEqual( + job_root.db_record().dependencies['reverse_depends_on'], + [job_a.uuid] + ) + + def test_depends_load(self): + job_root = Job(self.method) + job_a = Job(self.method) + job_a.add_depends({job_root}) + + job_root.store() + job_a.store() + + read_job_root = Job.load(self.env, job_root.uuid) + self.assertEqual(read_job_root.reverse_depends_on, {job_a}) + + read_job_a = Job.load(self.env, job_a.uuid) + self.assertEqual(read_job_a.depends_on, {job_root}) From ccc03615381486134d157e56ea99428979b6da18 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 2 Jul 2019 22:39:26 +0200 Subject: [PATCH 02/40] Add wait dependencies state --- queue_job/job.py | 16 ++++++++++++++++ test_queue_job/tests/test_dependencies.py | 22 ++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/queue_job/job.py b/queue_job/job.py index e03e25c73b..b8b0646a25 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -17,6 +17,7 @@ from .exception import FailedJobError, NoSuchJobError, RetryableJobError +WAIT_DEPENDENCIES = 'wait_dependencies' PENDING = "pending" ENQUEUED = "enqueued" CANCELLED = "cancelled" @@ -25,6 +26,7 @@ FAILED = "failed" STATES = [ + (WAIT_DEPENDENCIES, 'Wait Dependencies'), (PENDING, "Pending"), (ENQUEUED, "Enqueued"), (STARTED, "Started"), @@ -528,6 +530,8 @@ def add_depends(self, jobs): for parent in jobs: parent.__reverse_depends_on_uuids.add(self.uuid) parent._reverse_depends_on.add(self) + if any(j.state != DONE for j in jobs): + self.state = WAIT_DEPENDENCIES def add_reverse_depends(self, jobs): self.__reverse_depends_on_uuids |= {j.uuid for j in jobs} @@ -560,8 +564,20 @@ def perform(self): ) raise new_exc from err raise + return self.result + # TODO call in an isolated transaction with retries (in RunJobController) + def enqueue_waiting(self): + children = self.reverse_depends_on + for child in children: + if child.state != WAIT_DEPENDENCIES: + continue + parents = child.depends_on + if all(parent.state == 'done' for parent in parents): + child.state = PENDING + child.store() + def store(self): """Store the Job""" job_model = self.env["queue.job"] diff --git a/test_queue_job/tests/test_dependencies.py b/test_queue_job/tests/test_dependencies.py index 159e949d28..c334839e9a 100644 --- a/test_queue_job/tests/test_dependencies.py +++ b/test_queue_job/tests/test_dependencies.py @@ -5,6 +5,8 @@ from odoo.addons.queue_job.job import ( Job, + WAIT_DEPENDENCIES, + PENDING, ) @@ -109,3 +111,23 @@ def test_depends_load(self): read_job_a = Job.load(self.env, job_a.uuid) self.assertEqual(read_job_a.depends_on, {job_root}) + + def test_depends_enqueue_waiting_single(self): + job_root = Job(self.method) + job_a = Job(self.method) + job_a.add_depends({job_root}) + + job_root.store() + job_a.store() + + self.assertEqual(job_a.state, WAIT_DEPENDENCIES) + + # these are the steps run by RunJobController + job_root.perform() + job_root.set_done() + job_root.store() + + job_root.enqueue_waiting() + + # will be picked up by the jobrunner + self.assertEqual(job_a.state, PENDING) From 8b35d3eb0b515e09120fda7bf470acb8e1f11da2 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 3 Jul 2019 21:30:29 +0200 Subject: [PATCH 03/40] Enqueue waiting jobs when parent jobs are done --- queue_job/controllers/main.py | 5 +++++ queue_job/job.py | 6 ++++-- queue_job/models/queue_job.py | 5 ++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 33b5778476..85a8534f17 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -35,6 +35,9 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug("%s done", job) + def _enqueue_dependent_jobs(self, env, job): + job.enqueue_waiting() + @http.route("/queue_job/runjob", type="http", auth="none", save_session=False) def runjob(self, db, job_uuid, **kw): http.request.session.db = db @@ -111,6 +114,8 @@ def retry_postpone(job, message, seconds=None): buff.close() raise + self._enqueue_dependent_jobs(env, job) + return "" def _get_failure_values(self, job, traceback_txt, orig_exception): diff --git a/queue_job/job.py b/queue_job/job.py index b8b0646a25..96a610be04 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -567,7 +567,6 @@ def perform(self): return self.result - # TODO call in an isolated transaction with retries (in RunJobController) def enqueue_waiting(self): children = self.reverse_depends_on for child in children: @@ -809,7 +808,10 @@ def exec_time(self): return None def set_pending(self, result=None, reset_retry=True): - self.state = PENDING + if any(j.state != DONE for j in self.depends_on): + self.state = WAIT_DEPENDENCIES + else: + self.state = PENDING self.date_enqueued = None self.date_started = None self.date_done = None diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 693aea0a78..2937de85a6 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -188,13 +188,16 @@ def _change_job_state(self, state, result=None): job_ = Job.load(record.env, record.uuid) if state == DONE: job_.set_done(result=result) + job_.store() + job_.enqueue_waiting() elif state == PENDING: job_.set_pending(result=result) + job_.store() elif state == CANCELLED: job_.set_cancelled(result=result) + job_.store() else: raise ValueError("State not supported: %s" % state) - job_.store() def button_done(self): result = _("Manually set to done by %s") % self.env.user.name From 554b1391569673d47f1edfa04f7816d0f24f9ecd Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Thu, 4 Jul 2019 08:37:07 +0200 Subject: [PATCH 04/40] Optimize and make enqueue of waiting jobs more robust --- queue_job/controllers/main.py | 39 ++++++++++++++++++++++++++++++-- queue_job/job.py | 40 ++++++++++++++++++++++++++------- queue_job/jobrunner/channels.py | 10 ++++++--- queue_job/models/queue_job.py | 5 +++++ 4 files changed, 81 insertions(+), 13 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 85a8534f17..eb35ed63ad 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -2,11 +2,13 @@ # Copyright 2013-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +import random import logging +import time import traceback from io import StringIO -from psycopg2 import OperationalError +from psycopg2 import OperationalError, errorcodes from werkzeug.exceptions import Forbidden from odoo import SUPERUSER_ID, _, api, http, registry, tools @@ -19,6 +21,8 @@ PG_RETRY = 5 # seconds +DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5 + class RunJobController(http.Controller): def _try_perform_job(self, env, job): @@ -36,7 +40,36 @@ def _try_perform_job(self, env, job): _logger.debug("%s done", job) def _enqueue_dependent_jobs(self, env, job): - job.enqueue_waiting() + tries = 0 + # FIXME: timing condition, when 2 "parent" jobs are done + # at the same time neither will see the other as done (I think) + while True: + try: + job.enqueue_waiting() + except OperationalError as err: + # Automatically retry the typical transaction serialization + # errors + if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: + raise + if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE: + _logger.info( + "%s, maximum number of tries reached to" + " update dependencies", + errorcodes.lookup(err.pgcode) + ) + raise + wait_time = random.uniform(0.0, 2 ** tries) + tries += 1 + _logger.info( + "%s, retry %d/%d in %.04f sec...", + errorcodes.lookup(err.pgcode), + tries, + DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE, + wait_time + ) + time.sleep(wait_time) + else: + break @http.route("/queue_job/runjob", type="http", auth="none", save_session=False) def runjob(self, db, job_uuid, **kw): @@ -114,7 +147,9 @@ def retry_postpone(job, message, seconds=None): buff.close() raise + _logger.debug('%s enqueue depends started', job) self._enqueue_dependent_jobs(env, job) + _logger.debug('%s enqueue depends done', job) return "" diff --git a/queue_job/job.py b/queue_job/job.py index 96a610be04..8339a3c668 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -525,6 +525,8 @@ def __init__( self.worker_pid = None def add_depends(self, jobs): + if self in jobs: + raise ValueError('job cannot depend on itself') self.__depends_on_uuids |= {j.uuid for j in jobs} self._depends_on.update(jobs) for parent in jobs: @@ -534,6 +536,8 @@ def add_depends(self, jobs): self.state = WAIT_DEPENDENCIES def add_reverse_depends(self, jobs): + if self in jobs: + raise ValueError('job cannot depend on itself') self.__reverse_depends_on_uuids |= {j.uuid for j in jobs} self._reverse_depends_on.update(jobs) for child in jobs: @@ -568,14 +572,34 @@ def perform(self): return self.result def enqueue_waiting(self): - children = self.reverse_depends_on - for child in children: - if child.state != WAIT_DEPENDENCIES: - continue - parents = child.depends_on - if all(parent.state == 'done' for parent in parents): - child.state = PENDING - child.store() + # TODO replace states by constants + sql = """ + UPDATE queue_job + SET state = 'pending' + FROM ( + SELECT child.id, array_agg(parent.state) as parent_states + FROM queue_job job + JOIN LATERAL + json_array_elements_text( + job.dependencies::json->'reverse_depends_on' + ) child_deps ON true + JOIN queue_job child + ON child.uuid = child_deps + JOIN LATERAL + json_array_elements_text( + child.dependencies::json->'depends_on' + ) parent_deps ON true + JOIN queue_job parent + ON parent.uuid = parent_deps + WHERE job.uuid = %s + GROUP BY child.id + ) jobs + WHERE + queue_job.id = jobs.id + AND 'done' = ALL(jobs.parent_states) + AND state = 'wait_dependencies'; + """ + self.env.cr.execute(sql, (self.uuid,)) def store(self): """Store the Job""" diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 2d7e0a8be0..9108ffdad6 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -7,9 +7,10 @@ from weakref import WeakValueDictionary from ..exception import ChannelNotFound -from ..job import DONE, ENQUEUED, FAILED, PENDING, STARTED - -NOT_DONE = (PENDING, ENQUEUED, STARTED, FAILED) +from ..job import ( + PENDING, ENQUEUED, STARTED, FAILED, DONE, WAIT_DEPENDENCIES +) +NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) _logger = logging.getLogger(__name__) @@ -1054,6 +1055,9 @@ def notify( job.channel.set_running(job) elif state == FAILED: job.channel.set_failed(job) + elif state == WAIT_DEPENDENCIES: + # wait until all parent jobs are done + pass else: _logger.error("unexpected state %s for job %s", state, job) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 2937de85a6..ce41e5d8a4 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -210,6 +210,11 @@ def button_cancelled(self): return True def requeue(self): + # FIXME if leaves are requeued before their done parents + # they will be pending instead of wait_dependencies + # (in a scenario where we select all the jobs and requeue them) + # Requeue them in reverse order of the graph? Or recheck the state + # after they are all updated. self._change_job_state(PENDING) return True From 960690ea3b79e61a9172ad0ab8b376c787909906 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Fri, 5 Jul 2019 23:47:57 +0200 Subject: [PATCH 05/40] Adapt views for state wait_dependencies --- queue_job/views/queue_job_views.xml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index cf049fdfeb..c51b1c34fc 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -17,7 +17,7 @@ />

@@ -73,7 +78,7 @@ - + @@ -92,22 +97,23 @@ - + + name="exc_info" + string="Exception Information" + attrs="{'invisible': [('exc_info', '=', False)]}" + colspan="4" + >