From 298429ebd1d4502ffff00849f492453106f9b8f3 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 2 Jul 2019 21:41:24 +0200 Subject: [PATCH 01/34] Store dependencies --- queue_job/job.py | 76 +++++++++++++++ queue_job/models/queue_job.py | 2 + test_queue_job/tests/__init__.py | 1 + test_queue_job/tests/test_dependencies.py | 111 ++++++++++++++++++++++ 4 files changed, 190 insertions(+) create mode 100644 test_queue_job/tests/test_dependencies.py diff --git a/queue_job/job.py b/queue_job/job.py index 2045bf8362..d52bf0badd 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -8,7 +8,10 @@ import uuid import os import sys +import weakref + from datetime import datetime, timedelta +from functools import total_ordering import odoo @@ -138,6 +141,7 @@ def identity_example(job_): return hasher.hexdigest() +@total_ordering class Job(object): """A Job is a task to execute. It is the in-memory representation of a job. @@ -297,6 +301,13 @@ def _load_from_db_record(cls, job_db_record): job_.company_id = stored.company_id.id job_.identity_key = stored.identity_key job_.worker_pid = stored.worker_pid + + job_.__depends_on_uuids.update( + stored.dependencies.get('depends_on', []) + ) + job_.__reverse_depends_on_uuids.update( + stored.dependencies.get('reverse_depends_on', []) + ) return job_ def job_record_with_same_identity_key(self): @@ -421,6 +432,11 @@ def __init__(self, func, self.args = args self.kwargs = kwargs + self.__depends_on_uuids = set() + self.__reverse_depends_on_uuids = set() + self._depends_on = set() + self._reverse_depends_on = weakref.WeakSet() + self.priority = priority if self.priority is None: self.priority = DEFAULT_PRIORITY @@ -462,6 +478,20 @@ def __init__(self, func, self.channel = channel self.worker_pid = None + def add_depends(self, jobs): + self.__depends_on_uuids |= {j.uuid for j in jobs} + self._depends_on.update(jobs) + for parent in jobs: + parent.__reverse_depends_on_uuids.add(self.uuid) + parent._reverse_depends_on.add(self) + + def add_reverse_depends(self, jobs): + self.__reverse_depends_on_uuids |= {j.uuid for j in jobs} + self._reverse_depends_on.update(jobs) + for child in jobs: + child.__depends_on_uuids.add(self.uuid) + child._depends_on.add(self) + def perform(self): """Execute the job. @@ -541,6 +571,16 @@ def _store_values(self, create=False): if self.identity_key: vals['identity_key'] = self.identity_key + dependencies = { + 'depends_on': [ + parent.uuid for parent in self.depends_on + ], + 'reverse_depends_on': [ + children.uuid for children in self.reverse_depends_on + ], + } + vals['dependencies'] = dependencies + if create: vals.update( { @@ -588,6 +628,22 @@ def func_string(self): all_args = ", ".join(args + kwargs) return "{}.{}({})".format(model, self.method_name, all_args) + def __eq__(self, other): + return self.uuid == other.uuid + + def __hash__(self): + return self.uuid.__hash__() + + def sorting_key(self): + return self.eta, self.priority, self.date_created, self.seq + + def __lt__(self, other): + if self.eta and not other.eta: + return True + elif not self.eta and other.eta: + return False + return self.sorting_key() < other.sorting_key() + def db_record(self): return self.db_record_from_uuid(self.env, self.uuid) @@ -620,6 +676,26 @@ def identity_key(self, value): self._identity_key = None self._identity_key_func = value + @property + def depends_on(self): + if not self._depends_on: + # TODO batch load instead of loop + self._depends_on = { + Job.load(self.env, parent_uuid) for parent_uuid + in self.__depends_on_uuids + } + return self._depends_on + + @property + def reverse_depends_on(self): + if not self._reverse_depends_on: + # TODO batch load instead of loop + self._reverse_depends_on = { + Job.load(self.env, child_uuid) for child_uuid + in self.__reverse_depends_on_uuids + } + return set(self._reverse_depends_on) + @property def description(self): if self._description: diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 6dfedb4467..8556979f83 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -8,6 +8,7 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models, tools +from odoo.addons.base_sparse_field.models.fields import Serialized from odoo.osv import expression from ..fields import JobSerialized @@ -68,6 +69,7 @@ class QueueJob(models.Model): records = JobSerialized( string="Record(s)", readonly=True, base_type=models.BaseModel, ) + dependencies = Serialized(readonly=True) args = JobSerialized(readonly=True, base_type=tuple) kwargs = JobSerialized(readonly=True, base_type=dict) func_string = fields.Char(string="Task", readonly=True) diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 502a0752fd..0984a41db0 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,4 +1,5 @@ from . import test_autovacuum +from . import test_dependencies from . import test_job from . import test_job_auto_delay from . import test_job_channels diff --git a/test_queue_job/tests/test_dependencies.py b/test_queue_job/tests/test_dependencies.py new file mode 100644 index 0000000000..159e949d28 --- /dev/null +++ b/test_queue_job/tests/test_dependencies.py @@ -0,0 +1,111 @@ +# Copyright 2019 Camptocamp SA +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html) + +import odoo.tests.common as common + +from odoo.addons.queue_job.job import ( + Job, +) + + +class TestJobDependencies(common.TransactionCase): + + def setUp(self): + super().setUp() + self.queue_job = self.env['queue.job'] + self.method = self.env['test.queue.job'].testing_method + + def test_depends_store(self): + job_root = Job(self.method) + job_lvl1_a = Job(self.method) + job_lvl1_a.add_depends({job_root}) + job_lvl1_b = Job(self.method) + job_lvl1_b.add_depends({job_root}) + job_lvl2_a = Job(self.method) + job_lvl2_a.add_depends({job_lvl1_a}) + + # Jobs must be stored after the dependencies are set up. + # (Or if not, a new store must be called on the parent) + job_root.store() + job_lvl1_a.store() + job_lvl1_b.store() + job_lvl2_a.store() + + # test properties + self.assertFalse(job_root.depends_on) + + self.assertEqual(job_lvl1_a.depends_on, {job_root}) + self.assertEqual(job_lvl1_b.depends_on, {job_root}) + + self.assertEqual(job_lvl2_a.depends_on, {job_lvl1_a}) + + self.assertEqual(job_root.reverse_depends_on, {job_lvl1_a, job_lvl1_b}) + + self.assertEqual(job_lvl1_a.reverse_depends_on, {job_lvl2_a}) + self.assertFalse(job_lvl1_b.reverse_depends_on) + + self.assertFalse(job_lvl2_a.reverse_depends_on) + + # test DB state + self.assertEqual(job_root.db_record().dependencies['depends_on'], []) + self.assertEqual( + sorted(job_root.db_record().dependencies['reverse_depends_on']), + sorted([job_lvl1_a.uuid, job_lvl1_b.uuid]) + ) + + self.assertEqual( + job_lvl1_a.db_record().dependencies['depends_on'], [job_root.uuid] + ) + self.assertEqual( + job_lvl1_a.db_record().dependencies['reverse_depends_on'], + [job_lvl2_a.uuid] + ) + + self.assertEqual( + job_lvl1_b.db_record().dependencies['depends_on'], [job_root.uuid] + ) + self.assertEqual( + job_lvl1_b.db_record().dependencies['reverse_depends_on'], [] + ) + + self.assertEqual( + job_lvl2_a.db_record().dependencies['depends_on'], + [job_lvl1_a.uuid] + ) + self.assertEqual( + job_lvl2_a.db_record().dependencies['reverse_depends_on'], [] + ) + + def test_depends_store_after(self): + job_root = Job(self.method) + job_root.store() + job_a = Job(self.method) + job_a.add_depends({job_root}) + job_a.store() + + # as the reverse dependency has been added after the root job has been + # stored, it is not reflected in DB + self.assertEqual( + job_root.db_record().dependencies['reverse_depends_on'], [] + ) + + # a new store will write it + job_root.store() + self.assertEqual( + job_root.db_record().dependencies['reverse_depends_on'], + [job_a.uuid] + ) + + def test_depends_load(self): + job_root = Job(self.method) + job_a = Job(self.method) + job_a.add_depends({job_root}) + + job_root.store() + job_a.store() + + read_job_root = Job.load(self.env, job_root.uuid) + self.assertEqual(read_job_root.reverse_depends_on, {job_a}) + + read_job_a = Job.load(self.env, job_a.uuid) + self.assertEqual(read_job_a.depends_on, {job_root}) From 2b3ab779de5f6907a00b965802fb2dade15724bb Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 2 Jul 2019 22:39:26 +0200 Subject: [PATCH 02/34] Add wait dependencies state --- queue_job/job.py | 18 +++++++++++++++++- test_queue_job/tests/test_dependencies.py | 22 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/queue_job/job.py b/queue_job/job.py index d52bf0badd..d31e54c8e7 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -20,6 +20,7 @@ RetryableJobError) +WAIT_DEPENDENCIES = 'wait_dependencies' PENDING = 'pending' ENQUEUED = 'enqueued' CANCELLED = 'cancelled' @@ -27,7 +28,8 @@ STARTED = 'started' FAILED = 'failed' -STATES = [(PENDING, 'Pending'), +STATES = [(WAIT_DEPENDENCIES, 'Wait Dependencies'), + (PENDING, 'Pending'), (ENQUEUED, 'Enqueued'), (STARTED, 'Started'), (DONE, 'Done'), @@ -484,6 +486,8 @@ def add_depends(self, jobs): for parent in jobs: parent.__reverse_depends_on_uuids.add(self.uuid) parent._reverse_depends_on.add(self) + if any(j.state != DONE for j in jobs): + self.state = WAIT_DEPENDENCIES def add_reverse_depends(self, jobs): self.__reverse_depends_on_uuids |= {j.uuid for j in jobs} @@ -516,8 +520,20 @@ def perform(self): ) raise new_exc from err raise + return self.result + # TODO call in an isolated transaction with retries (in RunJobController) + def enqueue_waiting(self): + children = self.reverse_depends_on + for child in children: + if child.state != WAIT_DEPENDENCIES: + continue + parents = child.depends_on + if all(parent.state == 'done' for parent in parents): + child.state = PENDING + child.store() + def store(self): """Store the Job""" job_model = self.env["queue.job"] diff --git a/test_queue_job/tests/test_dependencies.py b/test_queue_job/tests/test_dependencies.py index 159e949d28..c334839e9a 100644 --- a/test_queue_job/tests/test_dependencies.py +++ b/test_queue_job/tests/test_dependencies.py @@ -5,6 +5,8 @@ from odoo.addons.queue_job.job import ( Job, + WAIT_DEPENDENCIES, + PENDING, ) @@ -109,3 +111,23 @@ def test_depends_load(self): read_job_a = Job.load(self.env, job_a.uuid) self.assertEqual(read_job_a.depends_on, {job_root}) + + def test_depends_enqueue_waiting_single(self): + job_root = Job(self.method) + job_a = Job(self.method) + job_a.add_depends({job_root}) + + job_root.store() + job_a.store() + + self.assertEqual(job_a.state, WAIT_DEPENDENCIES) + + # these are the steps run by RunJobController + job_root.perform() + job_root.set_done() + job_root.store() + + job_root.enqueue_waiting() + + # will be picked up by the jobrunner + self.assertEqual(job_a.state, PENDING) From 0ab5933ab5379ae70ca957266f7b6f0b6970043a Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 3 Jul 2019 21:30:29 +0200 Subject: [PATCH 03/34] Enqueue waiting jobs when parent jobs are done --- queue_job/controllers/main.py | 5 +++++ queue_job/job.py | 6 ++++-- queue_job/models/queue_job.py | 5 ++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index f7f7da66de..30eb23ff5e 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -36,6 +36,9 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug('%s done', job) + def _enqueue_dependent_jobs(self, env, job): + job.enqueue_waiting() + @http.route('/queue_job/session', type='http', auth="none") def session(self): """Used by the jobrunner to spawn a session @@ -130,6 +133,8 @@ def retry_postpone(job, message, seconds=None): buff.close() raise + self._enqueue_dependent_jobs(env, job) + return "" def _get_failure_values(self, job, traceback_txt, orig_exception): diff --git a/queue_job/job.py b/queue_job/job.py index d31e54c8e7..5193112671 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -523,7 +523,6 @@ def perform(self): return self.result - # TODO call in an isolated transaction with retries (in RunJobController) def enqueue_waiting(self): children = self.reverse_depends_on for child in children: @@ -766,7 +765,10 @@ def exec_time(self): return None def set_pending(self, result=None, reset_retry=True): - self.state = PENDING + if any(j.state != DONE for j in self.depends_on): + self.state = WAIT_DEPENDENCIES + else: + self.state = PENDING self.date_enqueued = None self.date_started = None self.date_done = None diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 8556979f83..7e5ad9f90d 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -199,13 +199,16 @@ def _change_job_state(self, state, result=None): job_ = Job.load(record.env, record.uuid) if state == DONE: job_.set_done(result=result) + job_.store() + job_.enqueue_waiting() elif state == PENDING: job_.set_pending(result=result) + job_.store() elif state == CANCELLED: job_.set_cancelled(result=result) + job_.store() else: raise ValueError('State not supported: %s' % state) - job_.store() @api.multi def button_done(self): From 27f10a2734fcb9f1e13679225482a556c08014ee Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Thu, 4 Jul 2019 08:37:07 +0200 Subject: [PATCH 04/34] Optimize and make enqueue of waiting jobs more robust --- queue_job/controllers/main.py | 39 ++++++++++++++++++++++++++++++-- queue_job/job.py | 40 ++++++++++++++++++++++++++------- queue_job/jobrunner/channels.py | 9 ++++++-- queue_job/models/queue_job.py | 5 +++++ 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 30eb23ff5e..748570f86a 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -2,11 +2,13 @@ # Copyright 2013-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +import random import logging +import time import traceback from io import StringIO -from psycopg2 import OperationalError +from psycopg2 import OperationalError, errorcodes from werkzeug.exceptions import Forbidden import odoo @@ -20,6 +22,8 @@ PG_RETRY = 5 # seconds +DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5 + class RunJobController(http.Controller): @@ -37,7 +41,36 @@ def _try_perform_job(self, env, job): _logger.debug('%s done', job) def _enqueue_dependent_jobs(self, env, job): - job.enqueue_waiting() + tries = 0 + # FIXME: timing condition, when 2 "parent" jobs are done + # at the same time neither will see the other as done (I think) + while True: + try: + job.enqueue_waiting() + except OperationalError as err: + # Automatically retry the typical transaction serialization + # errors + if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: + raise + if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE: + _logger.info( + "%s, maximum number of tries reached to" + " update dependencies", + errorcodes.lookup(err.pgcode) + ) + raise + wait_time = random.uniform(0.0, 2 ** tries) + tries += 1 + _logger.info( + "%s, retry %d/%d in %.04f sec...", + errorcodes.lookup(err.pgcode), + tries, + DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE, + wait_time + ) + time.sleep(wait_time) + else: + break @http.route('/queue_job/session', type='http', auth="none") def session(self): @@ -133,7 +166,9 @@ def retry_postpone(job, message, seconds=None): buff.close() raise + _logger.debug('%s enqueue depends started', job) self._enqueue_dependent_jobs(env, job) + _logger.debug('%s enqueue depends done', job) return "" diff --git a/queue_job/job.py b/queue_job/job.py index 5193112671..66da05e37c 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -481,6 +481,8 @@ def __init__(self, func, self.worker_pid = None def add_depends(self, jobs): + if self in jobs: + raise ValueError('job cannot depend on itself') self.__depends_on_uuids |= {j.uuid for j in jobs} self._depends_on.update(jobs) for parent in jobs: @@ -490,6 +492,8 @@ def add_depends(self, jobs): self.state = WAIT_DEPENDENCIES def add_reverse_depends(self, jobs): + if self in jobs: + raise ValueError('job cannot depend on itself') self.__reverse_depends_on_uuids |= {j.uuid for j in jobs} self._reverse_depends_on.update(jobs) for child in jobs: @@ -524,14 +528,34 @@ def perform(self): return self.result def enqueue_waiting(self): - children = self.reverse_depends_on - for child in children: - if child.state != WAIT_DEPENDENCIES: - continue - parents = child.depends_on - if all(parent.state == 'done' for parent in parents): - child.state = PENDING - child.store() + # TODO replace states by constants + sql = """ + UPDATE queue_job + SET state = 'pending' + FROM ( + SELECT child.id, array_agg(parent.state) as parent_states + FROM queue_job job + JOIN LATERAL + json_array_elements_text( + job.dependencies::json->'reverse_depends_on' + ) child_deps ON true + JOIN queue_job child + ON child.uuid = child_deps + JOIN LATERAL + json_array_elements_text( + child.dependencies::json->'depends_on' + ) parent_deps ON true + JOIN queue_job parent + ON parent.uuid = parent_deps + WHERE job.uuid = %s + GROUP BY child.id + ) jobs + WHERE + queue_job.id = jobs.id + AND 'done' = ALL(jobs.parent_states) + AND state = 'wait_dependencies'; + """ + self.env.cr.execute(sql, (self.uuid,)) def store(self): """Store the Job""" diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index ec3d9b9228..ca9b9fc494 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -7,8 +7,10 @@ from weakref import WeakValueDictionary from ..exception import ChannelNotFound -from ..job import PENDING, ENQUEUED, STARTED, FAILED, DONE -NOT_DONE = (PENDING, ENQUEUED, STARTED, FAILED) +from ..job import ( + PENDING, ENQUEUED, STARTED, FAILED, DONE, WAIT_DEPENDENCIES +) +NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) _logger = logging.getLogger(__name__) @@ -1035,6 +1037,9 @@ def notify(self, db_name, channel_name, uuid, job.channel.set_running(job) elif state == FAILED: job.channel.set_failed(job) + elif state == WAIT_DEPENDENCIES: + # wait until all parent jobs are done + pass else: _logger.error("unexpected state %s for job %s", state, job) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 7e5ad9f90d..b471b9cad4 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -224,6 +224,11 @@ def button_cancelled(self): @api.multi def requeue(self): + # FIXME if leaves are requeued before their done parents + # they will be pending instead of wait_dependencies + # (in a scenario where we select all the jobs and requeue them) + # Requeue them in reverse order of the graph? Or recheck the state + # after they are all updated. self._change_job_state(PENDING) return True From c02c2e80b002beb882beb33753587b49cae09879 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Fri, 5 Jul 2019 23:47:57 +0200 Subject: [PATCH 05/34] Adapt views for state wait_dependencies --- queue_job/views/queue_job_views.xml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index c598c992eb..79f9dfa2d8 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -15,7 +15,7 @@ type="object" groups="queue_job.group_queue_job_manager"/>