diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index e6538f94dc..f5b905fc80 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ {'name': 'Job Queue', - 'version': '12.0.1.5.3', + 'version': '12.0.1.6.0', 'author': 'Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)', 'website': 'https://github.com/OCA/queue/tree/12.0/queue_job', 'license': 'LGPL-3', diff --git a/queue_job/job.py b/queue_job/job.py index 66d82f865e..5e75b7ee8d 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -55,7 +55,7 @@ class DelayableRecordset(object): def __init__(self, recordset, priority=None, eta=None, max_retries=None, description=None, channel=None, - identity_key=None): + identity_key=None, sequence_rule_ids=None): self.recordset = recordset self.priority = priority self.eta = eta @@ -63,6 +63,7 @@ def __init__(self, recordset, priority=None, eta=None, self.description = description self.channel = channel self.identity_key = identity_key + self.sequence_rule_ids = sequence_rule_ids def __getattr__(self, name): if name in self.recordset: @@ -87,7 +88,8 @@ def delay(*args, **kwargs): eta=self.eta, description=self.description, channel=self.channel, - identity_key=self.identity_key) + identity_key=self.identity_key, + sequence_rule_ids=self.sequence_rule_ids) return delay def __str__(self): @@ -241,6 +243,10 @@ class Job(object): be added to a channel if the existing job with the same key is not yet started or executed. + .. attribute::sequence_rule_ids + + Reference to rules to process jobs sequentially. + """ @classmethod def load(cls, env, job_uuid): @@ -272,7 +278,8 @@ def _load_from_db_record(cls, job_db_record): job_ = cls(method, args=args, kwargs=kwargs, priority=stored.priority, eta=eta, job_uuid=stored.uuid, description=stored.name, channel=stored.channel, - identity_key=stored.identity_key) + identity_key=stored.identity_key, + sequence_rule_ids=stored.sequence_rule_ids) if stored.date_created: job_.date_created = stored.date_created @@ -296,6 +303,7 @@ def _load_from_db_record(cls, job_db_record): if stored.company_id: job_.company_id = stored.company_id.id job_.identity_key = stored.identity_key + job_.sequence_rule_ids = stored.sequence_rule_ids if stored.sequence_rule_ids else None return job_ def job_record_with_same_identity_key(self): @@ -310,7 +318,7 @@ def job_record_with_same_identity_key(self): @classmethod def enqueue(cls, func, args=None, kwargs=None, priority=None, eta=None, max_retries=None, description=None, - channel=None, identity_key=None): + channel=None, identity_key=None, sequence_rule_ids=None): """Create a Job and enqueue it in the queue. Return the job uuid. This expects the arguments specific to the job to be already extracted @@ -323,7 +331,8 @@ def enqueue(cls, func, args=None, kwargs=None, new_job = cls(func=func, args=args, kwargs=kwargs, priority=priority, eta=eta, max_retries=max_retries, description=description, - channel=channel, identity_key=identity_key) + channel=channel, identity_key=identity_key, + sequence_rule_ids=sequence_rule_ids) if new_job.identity_key: existing = new_job.job_record_with_same_identity_key() if existing: @@ -354,7 +363,8 @@ def db_record_from_uuid(env, job_uuid): def __init__(self, func, args=None, kwargs=None, priority=None, eta=None, job_uuid=None, max_retries=None, - description=None, channel=None, identity_key=None): + description=None, channel=None, identity_key=None, + sequence_rule_ids=None): """ Create a Job :param func: function to execute @@ -378,6 +388,7 @@ def __init__(self, func, :param identity_key: A hash to uniquely identify a job, or a function that returns this hash (the function takes the job as argument) + :param sequence_rule_ids: Reference to rules to process jobs sequentially. :param env: Odoo Environment :type env: :class:`odoo.api.Environment` """ @@ -454,6 +465,37 @@ def __init__(self, func, self._eta = None self.eta = eta self.channel = channel + # Handle automatic.workflow.job, normally it is a normal model name. + # In practice only _validate_invoice_job is called through automatic.workflow.job + # This allows the user to select the field on account.invoice, instead of having + # to pick automatic.workflow.job, which doesn't have the fields of the invoice model. + if self.model_name == 'automatic.workflow.job': + model_name = 'account.invoice' + record_ids = self.env[model_name].sudo().browse(self.args) + else: + model_name = self.model_name + record_ids = self.recordset + sequence_rules = env['queue.sequence.rule'].sudo().search([('model_id.model', '=', model_name)]) + if sequence_rules: + self.sequence_rule_ids = [(6, 0, sequence_rules.ids)] + # Change the following when implementing multiple rules per model + self.rule_name = sequence_rules[0].name + if len(sequence_rules) > 1: + _logger.warning('More than one sequence rule defined for %s', + self.model_name) + if len(record_ids) > 1: + _logger.warning( + 'Applying sequence rule for job %s failed because it has multiple related ' + 'records.', self.uuid + ) + else: + for rule in sequence_rules: + value = str(record_ids[rule.field_id.name]) + self.rule_value = value + else: + self.sequence_rule_ids = None + self.rule_name = None + self.rule_value = None def perform(self): """Execute the job. @@ -496,6 +538,7 @@ def store(self): 'date_done': False, 'eta': False, 'identity_key': False, + 'sequence_rule_ids': self.sequence_rule_ids if self.sequence_rule_ids else None, } if self.date_enqueued: @@ -522,6 +565,8 @@ def store(self): 'model_name': self.model_name, 'method_name': self.method_name, 'record_ids': self.recordset.ids, + 'rule_name': self.rule_name, + 'rule_value': self.rule_value, 'args': self.args, 'kwargs': self.kwargs, }) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index ec3d9b9228..c3d0336d49 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -112,6 +112,7 @@ class SafeSet(set): 0 >>> s.remove(1) """ + def remove(self, o): # pylint: disable=missing-return,except-pass try: @@ -192,7 +193,7 @@ class ChannelJob(object): """ def __init__(self, db_name, channel, uuid, - seq, date_created, priority, eta): + seq, date_created, priority, eta, model_name=None, method_name=None, rule_name=None, rule_value=None): self.db_name = db_name self.channel = channel self.uuid = uuid @@ -200,6 +201,10 @@ def __init__(self, db_name, channel, uuid, self.date_created = date_created self.priority = priority self.eta = eta + self.rule_value = rule_value + self.rule_name = rule_name + self.model_name = model_name + self.method_name = method_name def __repr__(self): return "" % self.uuid @@ -525,6 +530,56 @@ def has_capacity(self): return True return len(self._running) < self.capacity + def get_matching_rule(self, job): + """Check if a running job has a matching QueueSequenceRule. + + The QueueSequenceRule defines a field on a model. + This method checks if the field's value for the related record of job matches + that of any running job. + + To avoid performance issues, we store the value on the job. The implication is that if field values + on the related record are modified between the job creation and execution, the job will be run with undefined + behavior (either sequential or not). Modify queue_job.rule_value during the write on the related record to avoid + this. + + :return: (str) QueueSequenceRule.name which matches with job and a running job or None + + >>> from pprint import pprint as pp + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,A:4') + + # Test matching rule for a channel with capacity 4 + >>> db = 'db' + # db_name, channel_name, uuid, seq, date_created, priority, eta, state, + # method_name=None, rule_name=None, rule_value=None + >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1') + >>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1') + >>> cm.notify(db, 'A', 'A3', 3, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1') + >>> cm.notify(db, 'A', 'A4', 4, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '2') + >>> pp(list(cm.get_jobs_to_run(now=100))) + # A2 has the same method_name and rule_value as A1, so it has to wait until A1 is done. + # A4 runs concurrently because it has a different rule_value. + [, ] + >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'done', 'model_a', 'method_a', 'rule_a', '1') + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> cm.notify(db, 'A', 'A2', 1, 0, 10, None, 'done', 'model_a', 'method_a', 'rule_a', '1') + >>> pp(list(cm.get_jobs_to_run(now=100))) + # Finally, A3 is run + [] + """ + # Channel doesn't have env, so we set values during the job init. + # Note that job.model_name does not guarantee the same related record model; see automatic.workflow.job. + # Likewise a method name can arbitrarily be the same. So we check both. + if job.rule_value and any( + job.rule_value == running_job.rule_value + for running_job in self._running + if running_job.method_name == job.method_name + and running_job.model_name == job.model_name + ): + return job.rule_name + return None + def get_jobs_to_run(self, now): """Get jobs that are ready to run in channel. @@ -559,19 +614,42 @@ def get_jobs_to_run(self, now): self._pause_until = 0 _logger.debug("channel %s unpaused at %s", self, now) # yield jobs that are ready to run, while we have capacity + _deferred = SafeSet() while self.has_capacity(): job = self._queue.pop(now) if not job: + for job in _deferred: + self._queue.add(job) return + # Maintain sequence for jobs with same sequence group + # rule_value is computed on job creation. This means that existing jobs will not have it. + # On the view: forbid changing model once a rule has been saved, because once a rule has been assigned to + # a job, changing the model on the rule will mean a mismatch between the job's model and the rule's + # model. + matching_rule = self.get_matching_rule(job) + if matching_rule: + _deferred.add(job) + _logger.debug("job %s re-queued because a job with the same " + "sequence rule '%s' is already running " + "in channel %s", + job.uuid, + matching_rule, + self) + continue self._running.add(job) _logger.debug("job %s marked running in channel %s", job.uuid, self) yield job if self.throttle: - self._pause_until = now + self.throttle - _logger.debug("pausing channel %s until %s", - self, self._pause_until) - return + if _deferred: + _logger.debug("Throttling deferred jobs is not implemented") + else: + self._pause_until = now + self.throttle + _logger.debug("pausing channel %s until %s", + self, self._pause_until) + return + for job in _deferred: + self._queue.add(job) def get_wakeup_time(self, wakeup_time=0): if not self.has_capacity(): @@ -997,7 +1075,8 @@ def get_channel_by_name(self, channel_name, autocreate=False): return parent def notify(self, db_name, channel_name, uuid, - seq, date_created, priority, eta, state): + seq, date_created, priority, eta, state, model_name=None, method_name=None, rule_name=None, + rule_value=None): try: channel = self.get_channel_by_name(channel_name) except ChannelNotFound: @@ -1024,7 +1103,7 @@ def notify(self, db_name, channel_name, uuid, job = None if not job: job = ChannelJob(db_name, channel, uuid, - seq, date_created, priority, eta) + seq, date_created, priority, eta, model_name, method_name, rule_name, rule_value) self._jobs_by_uuid[uuid] = job # state transitions if not state or state == DONE: diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 11d5df15d1..f236344e8d 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -305,7 +305,7 @@ def select_jobs(self, where, args): # adding the where conditions, values are added later properly with # parameters query = ("SELECT channel, uuid, id as seq, date_created, " - "priority, EXTRACT(EPOCH FROM eta), state " + "priority, EXTRACT(EPOCH FROM eta), state, model_name, method_name, rule_name, rule_value " "FROM queue_job WHERE %s" % (where, )) with closing(self.conn.cursor("select_jobs", withhold=True)) as cr: diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index 0909032522..cea77e5522 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -1,2 +1,3 @@ from . import base from . import queue_job +from . import queue_sequence_rule diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 696ed9da56..0c91f4e09d 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -90,6 +90,9 @@ class QueueJob(models.Model): index=True) identity_key = fields.Char() + sequence_rule_ids = fields.Many2many(comodel_name='queue.sequence.rule', string='Sequence Rules') + rule_name = fields.Char(string='Sequence Rule', readonly=True) + rule_value = fields.Char(string='Rule Value', readonly=True) @api.model_cr def init(self): diff --git a/queue_job/models/queue_sequence_rule.py b/queue_job/models/queue_sequence_rule.py new file mode 100644 index 0000000000..fe024c1060 --- /dev/null +++ b/queue_job/models/queue_sequence_rule.py @@ -0,0 +1,58 @@ +# (c) 2022 Vanmoof +# License LGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html) + +from odoo import _, api, fields, models + + +class QueueSequenceRule(models.Model): + """ + A model that defines rules for grouping and running queue jobs sequentially. + + A queue job is considered part of a sequence group if the value of the field specified in the `field_id` + attribute is the same as the value of the field on the related record. + + Example: + - If the `field_id` attribute is set to 'journal_id', jobs will be grouped and run sequentially + if the invoices they are processing have the same journal. + + Don't confuse this with sequential queues. Those are simply channels with capacity 1, while this allows + channels with a larger capacity and can conditionally run jobs in sequence. + + Attributes: + name (str): The name of the rule. + model_id (many2one): The name of the model for which the rule applies. + field_id (many2one): The name of the field on the model used to group jobs. + """ + _name = 'queue.sequence.rule' + _description = 'Sequence rules for queue jobs' + + name = fields.Char(string='Name', required=True) + active = fields.Boolean(default=True) + model_id = fields.Many2one( + comodel_name='ir.model', string='Model', required=True + ) + field_id = fields.Many2one(comodel_name='ir.model.fields', string='Field', required=True, + domain="[('model_id', '=', model_id)]") + + _sql_constraints = [ + ( + "uniq_model_id", + "UNIQUE(model_id)", + ("Only one rule per model allowed"), + ), + ( + "uniq_name", + "UNIQUE(name)", + ("Rule name must be unique"), + ), + ] + + @api.onchange('model_id') + def _onchange_model_id(self): + self.field_id = False + + @api.constrains('model_id', 'field_id') + def _check_field_belongs_to_model(self): + for record in self: + if record.field_id.model_id != record.model_id: + raise ValueError(_('The selected field must belong to the selected model.')) diff --git a/queue_job/readme/DESCRIPTION.rst b/queue_job/readme/DESCRIPTION.rst index 19ab4c041a..0c08fc22fd 100644 --- a/queue_job/readme/DESCRIPTION.rst +++ b/queue_job/readme/DESCRIPTION.rst @@ -48,3 +48,6 @@ Features: description, number of retries * Related Actions: link an action on the job view, such as open the record concerned by the job +* Sequence Rules: specify a model and a field for which the value on the + related records act like a key which determine if a job should be run + in sequence or not. diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index dc3ca77765..d5f1e1f2c8 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -45,3 +45,9 @@ Through the time, two main patterns emerged: 1. For data exposed to users, a model should store the data and the model should be the creator of the job. The job is kept hidden from the users 2. For technical data, that are not exposed to the users, it is generally alright to create directly jobs with data passed as arguments to the job, without intermediary models. + +Sequence Rules +~~~~~~~~~~~~~~ +In the Job Queue/Sequence Rules menu define a new rule having a model and a field. +Newly created jobs will acquire the field values from related records at the time the job is created. +For rules which match, the jobs will be run in sequence. diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv index e90eee9ae4..d51665526c 100644 --- a/queue_job/security/ir.model.access.csv +++ b/queue_job/security/ir.model.access.csv @@ -2,3 +2,4 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1 +access_queue_sequence_rule_manager,queue sequence rule manager,queue_job.model_queue_sequence_rule,queue_job.group_queue_job_manager,1,1,1,1 diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 27b550ccdf..e79a419df0 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -52,6 +52,9 @@ + + +
@@ -289,6 +292,42 @@ + + queue.sequence.rule.form + queue.sequence.rule + form + +
+ + + + + +
+
+
+ + + queue.sequence.rule.tree + queue.sequence.rule + tree + + + + + + + + + + + Sequence Rules + queue.sequence.rule + form + tree,form + + + + +