From 4c2024dec72c548828409733d0b40eef14e7afbe Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Mon, 28 Nov 2022 16:42:20 +0100 Subject: [PATCH 01/10] forward port channels.py --- queue_job/jobrunner/channels.py | 44 +++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index ec3d9b9228..eef6e26f34 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -192,7 +192,7 @@ class ChannelJob(object): """ def __init__(self, db_name, channel, uuid, - seq, date_created, priority, eta): + seq, date_created, priority, eta, sequence_group=None): self.db_name = db_name self.channel = channel self.uuid = uuid @@ -200,6 +200,7 @@ def __init__(self, db_name, channel, uuid, self.date_created = date_created self.priority = priority self.eta = eta + self.sequence_group = sequence_group def __repr__(self): return "" % self.uuid @@ -560,18 +561,47 @@ def get_jobs_to_run(self, now): _logger.debug("channel %s unpaused at %s", self, now) # yield jobs that are ready to run, while we have capacity while self.has_capacity(): + _deferred = SafeSet() job = self._queue.pop(now) if not job: + for job in _deferred: + self._queue.add(job) return + # Maintain sequence for jobs with same sequence group + running_sequence_jobs = filter( + lambda j: j.sequence_group == job.sequence_group, + self._running) + _logger.debug("running sequence %s" % running_sequence_jobs) + _logger.debug("running %s" % self._running) + deferred_sequence_jobs = filter( + lambda j: j.sequence_group == job.sequence_group, + _deferred) + _logger.debug("defer sequence %s" % deferred_sequence_jobs) + if job.sequence_group and (running_sequence_jobs or deferred_sequence_jobs): + _deferred.add(job) + _logger.debug("job %s re-queued because job %s with same " + "sequence group %s is already running " + "in channel %s", + job.uuid, + map(lambda j: j.uuid, running_sequence_jobs) or + map(lambda j: j.uuid, deferred_sequence_jobs), + job.sequence_group, + self) + break self._running.add(job) _logger.debug("job %s marked running in channel %s", job.uuid, self) yield job if self.throttle: - self._pause_until = now + self.throttle - _logger.debug("pausing channel %s until %s", - self, self._pause_until) - return + if _deferred: + _logger.debug("Throttling deferred jobs is not implemented") + else: + self._pause_until = now + self.throttle + _logger.debug("pausing channel %s until %s", + self, self._pause_until) + return + for job in _deferred: + self._queue.add(job) def get_wakeup_time(self, wakeup_time=0): if not self.has_capacity(): @@ -997,7 +1027,7 @@ def get_channel_by_name(self, channel_name, autocreate=False): return parent def notify(self, db_name, channel_name, uuid, - seq, date_created, priority, eta, state): + seq, date_created, priority, eta, state, sequence_group=None): try: channel = self.get_channel_by_name(channel_name) except ChannelNotFound: @@ -1024,7 +1054,7 @@ def notify(self, db_name, channel_name, uuid, job = None if not job: job = ChannelJob(db_name, channel, uuid, - seq, date_created, priority, eta) + seq, date_created, priority, eta, sequence_group) self._jobs_by_uuid[uuid] = job # state transitions if not state or state == DONE: From 8b643bc2aea40c691392964919156b3cb9df9dff Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Mon, 28 Nov 2022 16:46:06 +0100 Subject: [PATCH 02/10] forward port runner.py --- queue_job/jobrunner/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 11d5df15d1..2eeab86f84 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -305,7 +305,7 @@ def select_jobs(self, where, args): # adding the where conditions, values are added later properly with # parameters query = ("SELECT channel, uuid, id as seq, date_created, " - "priority, EXTRACT(EPOCH FROM eta), state " + "priority, EXTRACT(EPOCH FROM eta), state, sequence_group " "FROM queue_job WHERE %s" % (where, )) with closing(self.conn.cursor("select_jobs", withhold=True)) as cr: From 8bd124edd248412396c61732c9079e2c4cc0bf67 Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Mon, 28 Nov 2022 17:31:08 +0100 Subject: [PATCH 03/10] forward port job.py --- queue_job/job.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index 66d82f865e..b8e4d8c065 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -55,7 +55,7 @@ class DelayableRecordset(object): def __init__(self, recordset, priority=None, eta=None, max_retries=None, description=None, channel=None, - identity_key=None): + identity_key=None, sequence_group=None): self.recordset = recordset self.priority = priority self.eta = eta @@ -63,6 +63,7 @@ def __init__(self, recordset, priority=None, eta=None, self.description = description self.channel = channel self.identity_key = identity_key + self.sequence_group = sequence_group def __getattr__(self, name): if name in self.recordset: @@ -87,7 +88,8 @@ def delay(*args, **kwargs): eta=self.eta, description=self.description, channel=self.channel, - identity_key=self.identity_key) + identity_key=self.identity_key, + sequence_group=self.sequence_group) return delay def __str__(self): @@ -272,7 +274,8 @@ def _load_from_db_record(cls, job_db_record): job_ = cls(method, args=args, kwargs=kwargs, priority=stored.priority, eta=eta, job_uuid=stored.uuid, description=stored.name, channel=stored.channel, - identity_key=stored.identity_key) + identity_key=stored.identity_key, + sequence_group=stored.sequence_group) if stored.date_created: job_.date_created = stored.date_created @@ -296,6 +299,7 @@ def _load_from_db_record(cls, job_db_record): if stored.company_id: job_.company_id = stored.company_id.id job_.identity_key = stored.identity_key + job_.sequence_group = stored.sequence_group return job_ def job_record_with_same_identity_key(self): @@ -310,7 +314,7 @@ def job_record_with_same_identity_key(self): @classmethod def enqueue(cls, func, args=None, kwargs=None, priority=None, eta=None, max_retries=None, description=None, - channel=None, identity_key=None): + channel=None, identity_key=None, sequence_group=None): """Create a Job and enqueue it in the queue. Return the job uuid. This expects the arguments specific to the job to be already extracted @@ -323,7 +327,8 @@ def enqueue(cls, func, args=None, kwargs=None, new_job = cls(func=func, args=args, kwargs=kwargs, priority=priority, eta=eta, max_retries=max_retries, description=description, - channel=channel, identity_key=identity_key) + channel=channel, identity_key=identity_key, + sequence_group=sequence_group) if new_job.identity_key: existing = new_job.job_record_with_same_identity_key() if existing: @@ -354,7 +359,8 @@ def db_record_from_uuid(env, job_uuid): def __init__(self, func, args=None, kwargs=None, priority=None, eta=None, job_uuid=None, max_retries=None, - description=None, channel=None, identity_key=None): + description=None, channel=None, identity_key=None, + sequence_group=None): """ Create a Job :param func: function to execute @@ -378,6 +384,9 @@ def __init__(self, func, :param identity_key: A hash to uniquely identify a job, or a function that returns this hash (the function takes the job as argument) + :param sequence_group: Used to group jobs based in ir.sequence to prevent + jobs failing regularly when a nogaps sequence is + used. :param env: Odoo Environment :type env: :class:`odoo.api.Environment` """ @@ -496,6 +505,7 @@ def store(self): 'date_done': False, 'eta': False, 'identity_key': False, + 'sequence_group': self.sequence_group, } if self.date_enqueued: From e2f25a444da05766aaa04635a5af08db82c89451 Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Tue, 29 Nov 2022 14:17:07 +0100 Subject: [PATCH 04/10] forward port model.py --- queue_job/models/queue_job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 696ed9da56..11e6d0d7fd 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -90,6 +90,7 @@ class QueueJob(models.Model): index=True) identity_key = fields.Char() + sequence_group = fields.Char() @api.model_cr def init(self): From 342ddd7eeb754c9647a16f9188c4d1387bfd065f Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Tue, 29 Nov 2022 14:40:21 +0100 Subject: [PATCH 05/10] Add docstring for sequence_group --- queue_job/job.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/queue_job/job.py b/queue_job/job.py index b8e4d8c065..317b9a01c7 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -243,6 +243,10 @@ class Job(object): be added to a channel if the existing job with the same key is not yet started or executed. + .. attribute::sequence_group + + Process jobs sequentially if they have the same sequence_group. + """ @classmethod def load(cls, env, job_uuid): From 15588cc073ec794c592e2f076ee882d33203b6e4 Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Tue, 29 Nov 2022 14:41:09 +0100 Subject: [PATCH 06/10] [FIX] Use continue instead of break (6d5685b1231b443d023e16c3396ce3fdeac8a6ea) --- queue_job/jobrunner/channels.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index eef6e26f34..7bdd5e8274 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -112,6 +112,7 @@ class SafeSet(set): 0 >>> s.remove(1) """ + def remove(self, o): # pylint: disable=missing-return,except-pass try: @@ -560,8 +561,8 @@ def get_jobs_to_run(self, now): self._pause_until = 0 _logger.debug("channel %s unpaused at %s", self, now) # yield jobs that are ready to run, while we have capacity - while self.has_capacity(): _deferred = SafeSet() + while self.has_capacity(): job = self._queue.pop(now) if not job: for job in _deferred: @@ -587,7 +588,7 @@ def get_jobs_to_run(self, now): map(lambda j: j.uuid, deferred_sequence_jobs), job.sequence_group, self) - break + continue self._running.add(job) _logger.debug("job %s marked running in channel %s", job.uuid, self) From eeb31b63824b072e74533b08b9859daf961b91f0 Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Tue, 29 Nov 2022 15:47:25 +0100 Subject: [PATCH 07/10] [FIX] keep sequence group linked to job on read (e99a83259605ef5b109837f5cbf38ec8849b0465) --- queue_job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/job.py b/queue_job/job.py index 317b9a01c7..b30ee9a65a 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -303,7 +303,7 @@ def _load_from_db_record(cls, job_db_record): if stored.company_id: job_.company_id = stored.company_id.id job_.identity_key = stored.identity_key - job_.sequence_group = stored.sequence_group + job_.sequence_group = stored.sequence_group if stored.sequence_group else None return job_ def job_record_with_same_identity_key(self): From 9b98f1b1851142316e858371cccf2fd6d878d44a Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Tue, 29 Nov 2022 16:00:57 +0100 Subject: [PATCH 08/10] [FIX] Deal with large number of jobs with the same sequence group (199bb104a41594a91e026fc9360ef99bbe926607) --- queue_job/jobrunner/channels.py | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 7bdd5e8274..76da9f015b 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -569,26 +569,17 @@ def get_jobs_to_run(self, now): self._queue.add(job) return # Maintain sequence for jobs with same sequence group - running_sequence_jobs = filter( - lambda j: j.sequence_group == job.sequence_group, - self._running) - _logger.debug("running sequence %s" % running_sequence_jobs) - _logger.debug("running %s" % self._running) - deferred_sequence_jobs = filter( - lambda j: j.sequence_group == job.sequence_group, - _deferred) - _logger.debug("defer sequence %s" % deferred_sequence_jobs) - if job.sequence_group and (running_sequence_jobs or deferred_sequence_jobs): - _deferred.add(job) - _logger.debug("job %s re-queued because job %s with same " - "sequence group %s is already running " - "in channel %s", - job.uuid, - map(lambda j: j.uuid, running_sequence_jobs) or - map(lambda j: j.uuid, deferred_sequence_jobs), - job.sequence_group, - self) - continue + if job.sequence_group: + if any(j.sequence_group == job.sequence_group + for j in self._running): + _deferred.add(job) + _logger.debug("job %s re-queued because a job with the same " + "sequence group %s is already running " + "in channel %s", + job.uuid, + job.sequence_group, + self) + continue self._running.add(job) _logger.debug("job %s marked running in channel %s", job.uuid, self) From a472ae5523be89ce34c58dfd76d61888db567cef Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Fri, 13 Jan 2023 08:01:38 +0100 Subject: [PATCH 09/10] OD-1593: [ADD] Queue Job Sequence Rules --- queue_job/__manifest__.py | 2 +- queue_job/job.py | 57 ++++++++++++---- queue_job/jobrunner/channels.py | 87 ++++++++++++++++++++----- queue_job/jobrunner/runner.py | 2 +- queue_job/models/__init__.py | 1 + queue_job/models/queue_job.py | 4 +- queue_job/models/queue_sequence_rule.py | 58 +++++++++++++++++ queue_job/readme/DESCRIPTION.rst | 3 + queue_job/readme/USAGE.rst | 6 ++ queue_job/security/ir.model.access.csv | 1 + queue_job/views/queue_job_views.xml | 44 +++++++++++++ 11 files changed, 233 insertions(+), 32 deletions(-) create mode 100644 queue_job/models/queue_sequence_rule.py diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index e6538f94dc..f5b905fc80 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ {'name': 'Job Queue', - 'version': '12.0.1.5.3', + 'version': '12.0.1.6.0', 'author': 'Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)', 'website': 'https://github.com/OCA/queue/tree/12.0/queue_job', 'license': 'LGPL-3', diff --git a/queue_job/job.py b/queue_job/job.py index b30ee9a65a..acdad63ee7 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -55,7 +55,7 @@ class DelayableRecordset(object): def __init__(self, recordset, priority=None, eta=None, max_retries=None, description=None, channel=None, - identity_key=None, sequence_group=None): + identity_key=None, sequence_rule_ids=None): self.recordset = recordset self.priority = priority self.eta = eta @@ -63,7 +63,7 @@ def __init__(self, recordset, priority=None, eta=None, self.description = description self.channel = channel self.identity_key = identity_key - self.sequence_group = sequence_group + self.sequence_rule_ids = sequence_rule_ids def __getattr__(self, name): if name in self.recordset: @@ -89,7 +89,7 @@ def delay(*args, **kwargs): description=self.description, channel=self.channel, identity_key=self.identity_key, - sequence_group=self.sequence_group) + sequence_rule_ids=self.sequence_rule_ids) return delay def __str__(self): @@ -243,9 +243,9 @@ class Job(object): be added to a channel if the existing job with the same key is not yet started or executed. - .. attribute::sequence_group + .. attribute::sequence_rule_ids - Process jobs sequentially if they have the same sequence_group. + Reference to rules to process jobs sequentially. """ @classmethod @@ -279,7 +279,7 @@ def _load_from_db_record(cls, job_db_record): priority=stored.priority, eta=eta, job_uuid=stored.uuid, description=stored.name, channel=stored.channel, identity_key=stored.identity_key, - sequence_group=stored.sequence_group) + sequence_rule_ids=stored.sequence_rule_ids) if stored.date_created: job_.date_created = stored.date_created @@ -303,7 +303,7 @@ def _load_from_db_record(cls, job_db_record): if stored.company_id: job_.company_id = stored.company_id.id job_.identity_key = stored.identity_key - job_.sequence_group = stored.sequence_group if stored.sequence_group else None + job_.sequence_rule_ids = stored.sequence_rule_ids if stored.sequence_rule_ids else None return job_ def job_record_with_same_identity_key(self): @@ -318,7 +318,7 @@ def job_record_with_same_identity_key(self): @classmethod def enqueue(cls, func, args=None, kwargs=None, priority=None, eta=None, max_retries=None, description=None, - channel=None, identity_key=None, sequence_group=None): + channel=None, identity_key=None, sequence_rule_ids=None): """Create a Job and enqueue it in the queue. Return the job uuid. This expects the arguments specific to the job to be already extracted @@ -332,7 +332,7 @@ def enqueue(cls, func, args=None, kwargs=None, kwargs=kwargs, priority=priority, eta=eta, max_retries=max_retries, description=description, channel=channel, identity_key=identity_key, - sequence_group=sequence_group) + sequence_rule_ids=sequence_rule_ids) if new_job.identity_key: existing = new_job.job_record_with_same_identity_key() if existing: @@ -364,7 +364,7 @@ def __init__(self, func, args=None, kwargs=None, priority=None, eta=None, job_uuid=None, max_retries=None, description=None, channel=None, identity_key=None, - sequence_group=None): + sequence_rule_ids=None): """ Create a Job :param func: function to execute @@ -388,9 +388,7 @@ def __init__(self, func, :param identity_key: A hash to uniquely identify a job, or a function that returns this hash (the function takes the job as argument) - :param sequence_group: Used to group jobs based in ir.sequence to prevent - jobs failing regularly when a nogaps sequence is - used. + :param sequence_rule_ids: Reference to rules to process jobs sequentially. :param env: Odoo Environment :type env: :class:`odoo.api.Environment` """ @@ -467,6 +465,35 @@ def __init__(self, func, self._eta = None self.eta = eta self.channel = channel + # Handle automatic.workflow.job, normally it is a normal model name. + # In practice only _validate_invoice_job is called through automatic.workflow.job + # This allows the user to select the field on account.invoice, instead of having + # to pick automatic.workflow.job, which doesn't have the fields of the invoice model. + if self.model_name == 'automatic.workflow.job': + model_name = 'account.invoice' + record_ids = self.env[model_name].browse(self.args) + else: + model_name = self.model_name + record_ids = self.recordset + sequence_rules = env['queue.sequence.rule'].search([('model_id.model', '=', model_name)]) + if sequence_rules: + self.sequence_rule_ids = [(6, 0, sequence_rules.ids)] + # Change the following when implementing multiple rules per model + self.rule_name = sequence_rules[0].name + if len(sequence_rules) > 1: + _logger.warning('More than one sequence rule defined for %s', + self.model_name) + if len(record_ids) > 1: + _logger.warning( + 'Applying sequence rule for job %s failed because it has multiple related ' + 'records.', self.uuid + ) + else: + for rule in sequence_rules: + value = str(record_ids[rule.field_id.name]) + self.rule_value = value + else: + self.sequence_rule_ids = None def perform(self): """Execute the job. @@ -509,7 +536,7 @@ def store(self): 'date_done': False, 'eta': False, 'identity_key': False, - 'sequence_group': self.sequence_group, + 'sequence_rule_ids': self.sequence_rule_ids if self.sequence_rule_ids else None, } if self.date_enqueued: @@ -536,6 +563,8 @@ def store(self): 'model_name': self.model_name, 'method_name': self.method_name, 'record_ids': self.recordset.ids, + 'rule_name': self.rule_name, + 'rule_value': self.rule_value, 'args': self.args, 'kwargs': self.kwargs, }) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 76da9f015b..c3d0336d49 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -193,7 +193,7 @@ class ChannelJob(object): """ def __init__(self, db_name, channel, uuid, - seq, date_created, priority, eta, sequence_group=None): + seq, date_created, priority, eta, model_name=None, method_name=None, rule_name=None, rule_value=None): self.db_name = db_name self.channel = channel self.uuid = uuid @@ -201,7 +201,10 @@ def __init__(self, db_name, channel, uuid, self.date_created = date_created self.priority = priority self.eta = eta - self.sequence_group = sequence_group + self.rule_value = rule_value + self.rule_name = rule_name + self.model_name = model_name + self.method_name = method_name def __repr__(self): return "" % self.uuid @@ -527,6 +530,56 @@ def has_capacity(self): return True return len(self._running) < self.capacity + def get_matching_rule(self, job): + """Check if a running job has a matching QueueSequenceRule. + + The QueueSequenceRule defines a field on a model. + This method checks if the field's value for the related record of job matches + that of any running job. + + To avoid performance issues, we store the value on the job. The implication is that if field values + on the related record are modified between the job creation and execution, the job will be run with undefined + behavior (either sequential or not). Modify queue_job.rule_value during the write on the related record to avoid + this. + + :return: (str) QueueSequenceRule.name which matches with job and a running job or None + + >>> from pprint import pprint as pp + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,A:4') + + # Test matching rule for a channel with capacity 4 + >>> db = 'db' + # db_name, channel_name, uuid, seq, date_created, priority, eta, state, + # method_name=None, rule_name=None, rule_value=None + >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1') + >>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1') + >>> cm.notify(db, 'A', 'A3', 3, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1') + >>> cm.notify(db, 'A', 'A4', 4, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '2') + >>> pp(list(cm.get_jobs_to_run(now=100))) + # A2 has the same method_name and rule_value as A1, so it has to wait until A1 is done. + # A4 runs concurrently because it has a different rule_value. + [, ] + >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'done', 'model_a', 'method_a', 'rule_a', '1') + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> cm.notify(db, 'A', 'A2', 1, 0, 10, None, 'done', 'model_a', 'method_a', 'rule_a', '1') + >>> pp(list(cm.get_jobs_to_run(now=100))) + # Finally, A3 is run + [] + """ + # Channel doesn't have env, so we set values during the job init. + # Note that job.model_name does not guarantee the same related record model; see automatic.workflow.job. + # Likewise a method name can arbitrarily be the same. So we check both. + if job.rule_value and any( + job.rule_value == running_job.rule_value + for running_job in self._running + if running_job.method_name == job.method_name + and running_job.model_name == job.model_name + ): + return job.rule_name + return None + def get_jobs_to_run(self, now): """Get jobs that are ready to run in channel. @@ -569,17 +622,20 @@ def get_jobs_to_run(self, now): self._queue.add(job) return # Maintain sequence for jobs with same sequence group - if job.sequence_group: - if any(j.sequence_group == job.sequence_group - for j in self._running): - _deferred.add(job) - _logger.debug("job %s re-queued because a job with the same " - "sequence group %s is already running " - "in channel %s", - job.uuid, - job.sequence_group, - self) - continue + # rule_value is computed on job creation. This means that existing jobs will not have it. + # On the view: forbid changing model once a rule has been saved, because once a rule has been assigned to + # a job, changing the model on the rule will mean a mismatch between the job's model and the rule's + # model. + matching_rule = self.get_matching_rule(job) + if matching_rule: + _deferred.add(job) + _logger.debug("job %s re-queued because a job with the same " + "sequence rule '%s' is already running " + "in channel %s", + job.uuid, + matching_rule, + self) + continue self._running.add(job) _logger.debug("job %s marked running in channel %s", job.uuid, self) @@ -1019,7 +1075,8 @@ def get_channel_by_name(self, channel_name, autocreate=False): return parent def notify(self, db_name, channel_name, uuid, - seq, date_created, priority, eta, state, sequence_group=None): + seq, date_created, priority, eta, state, model_name=None, method_name=None, rule_name=None, + rule_value=None): try: channel = self.get_channel_by_name(channel_name) except ChannelNotFound: @@ -1046,7 +1103,7 @@ def notify(self, db_name, channel_name, uuid, job = None if not job: job = ChannelJob(db_name, channel, uuid, - seq, date_created, priority, eta, sequence_group) + seq, date_created, priority, eta, model_name, method_name, rule_name, rule_value) self._jobs_by_uuid[uuid] = job # state transitions if not state or state == DONE: diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 2eeab86f84..f236344e8d 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -305,7 +305,7 @@ def select_jobs(self, where, args): # adding the where conditions, values are added later properly with # parameters query = ("SELECT channel, uuid, id as seq, date_created, " - "priority, EXTRACT(EPOCH FROM eta), state, sequence_group " + "priority, EXTRACT(EPOCH FROM eta), state, model_name, method_name, rule_name, rule_value " "FROM queue_job WHERE %s" % (where, )) with closing(self.conn.cursor("select_jobs", withhold=True)) as cr: diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index 0909032522..cea77e5522 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -1,2 +1,3 @@ from . import base from . import queue_job +from . import queue_sequence_rule diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 11e6d0d7fd..0c91f4e09d 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -90,7 +90,9 @@ class QueueJob(models.Model): index=True) identity_key = fields.Char() - sequence_group = fields.Char() + sequence_rule_ids = fields.Many2many(comodel_name='queue.sequence.rule', string='Sequence Rules') + rule_name = fields.Char(string='Sequence Rule', readonly=True) + rule_value = fields.Char(string='Rule Value', readonly=True) @api.model_cr def init(self): diff --git a/queue_job/models/queue_sequence_rule.py b/queue_job/models/queue_sequence_rule.py new file mode 100644 index 0000000000..fe024c1060 --- /dev/null +++ b/queue_job/models/queue_sequence_rule.py @@ -0,0 +1,58 @@ +# (c) 2022 Vanmoof +# License LGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html) + +from odoo import _, api, fields, models + + +class QueueSequenceRule(models.Model): + """ + A model that defines rules for grouping and running queue jobs sequentially. + + A queue job is considered part of a sequence group if the value of the field specified in the `field_id` + attribute is the same as the value of the field on the related record. + + Example: + - If the `field_id` attribute is set to 'journal_id', jobs will be grouped and run sequentially + if the invoices they are processing have the same journal. + + Don't confuse this with sequential queues. Those are simply channels with capacity 1, while this allows + channels with a larger capacity and can conditionally run jobs in sequence. + + Attributes: + name (str): The name of the rule. + model_id (many2one): The name of the model for which the rule applies. + field_id (many2one): The name of the field on the model used to group jobs. + """ + _name = 'queue.sequence.rule' + _description = 'Sequence rules for queue jobs' + + name = fields.Char(string='Name', required=True) + active = fields.Boolean(default=True) + model_id = fields.Many2one( + comodel_name='ir.model', string='Model', required=True + ) + field_id = fields.Many2one(comodel_name='ir.model.fields', string='Field', required=True, + domain="[('model_id', '=', model_id)]") + + _sql_constraints = [ + ( + "uniq_model_id", + "UNIQUE(model_id)", + ("Only one rule per model allowed"), + ), + ( + "uniq_name", + "UNIQUE(name)", + ("Rule name must be unique"), + ), + ] + + @api.onchange('model_id') + def _onchange_model_id(self): + self.field_id = False + + @api.constrains('model_id', 'field_id') + def _check_field_belongs_to_model(self): + for record in self: + if record.field_id.model_id != record.model_id: + raise ValueError(_('The selected field must belong to the selected model.')) diff --git a/queue_job/readme/DESCRIPTION.rst b/queue_job/readme/DESCRIPTION.rst index 19ab4c041a..0c08fc22fd 100644 --- a/queue_job/readme/DESCRIPTION.rst +++ b/queue_job/readme/DESCRIPTION.rst @@ -48,3 +48,6 @@ Features: description, number of retries * Related Actions: link an action on the job view, such as open the record concerned by the job +* Sequence Rules: specify a model and a field for which the value on the + related records act like a key which determine if a job should be run + in sequence or not. diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index dc3ca77765..d5f1e1f2c8 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -45,3 +45,9 @@ Through the time, two main patterns emerged: 1. For data exposed to users, a model should store the data and the model should be the creator of the job. The job is kept hidden from the users 2. For technical data, that are not exposed to the users, it is generally alright to create directly jobs with data passed as arguments to the job, without intermediary models. + +Sequence Rules +~~~~~~~~~~~~~~ +In the Job Queue/Sequence Rules menu define a new rule having a model and a field. +Newly created jobs will acquire the field values from related records at the time the job is created. +For rules which match, the jobs will be run in sequence. diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv index e90eee9ae4..d51665526c 100644 --- a/queue_job/security/ir.model.access.csv +++ b/queue_job/security/ir.model.access.csv @@ -2,3 +2,4 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1 +access_queue_sequence_rule_manager,queue sequence rule manager,queue_job.model_queue_sequence_rule,queue_job.group_queue_job_manager,1,1,1,1 diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 27b550ccdf..e79a419df0 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -52,6 +52,9 @@ + + +
@@ -289,6 +292,42 @@ + + queue.sequence.rule.form + queue.sequence.rule + form + +
+ + + + + +
+
+
+ + + queue.sequence.rule.tree + queue.sequence.rule + tree + + + + + + + + + + + Sequence Rules + queue.sequence.rule + form + tree,form + + + + + From 3151ffa11739660f4b56f10b6319e9b026714ee3 Mon Sep 17 00:00:00 2001 From: Kevin Graveman Date: Mon, 16 Jan 2023 12:37:10 +0100 Subject: [PATCH 10/10] fixup! OD-1593: [ADD] Queue Job Sequence Rules --- queue_job/job.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index acdad63ee7..5e75b7ee8d 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -471,11 +471,11 @@ def __init__(self, func, # to pick automatic.workflow.job, which doesn't have the fields of the invoice model. if self.model_name == 'automatic.workflow.job': model_name = 'account.invoice' - record_ids = self.env[model_name].browse(self.args) + record_ids = self.env[model_name].sudo().browse(self.args) else: model_name = self.model_name record_ids = self.recordset - sequence_rules = env['queue.sequence.rule'].search([('model_id.model', '=', model_name)]) + sequence_rules = env['queue.sequence.rule'].sudo().search([('model_id.model', '=', model_name)]) if sequence_rules: self.sequence_rule_ids = [(6, 0, sequence_rules.ids)] # Change the following when implementing multiple rules per model @@ -494,6 +494,8 @@ def __init__(self, func, self.rule_value = value else: self.sequence_rule_ids = None + self.rule_name = None + self.rule_value = None def perform(self): """Execute the job.