Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


{'name': 'Job Queue',
'version': '12.0.1.5.3',
'version': '12.0.1.6.0',
'author': 'Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)',
'website': 'https://github.com/OCA/queue/tree/12.0/queue_job',
'license': 'LGPL-3',
Expand Down
57 changes: 51 additions & 6 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ class DelayableRecordset(object):

def __init__(self, recordset, priority=None, eta=None,
max_retries=None, description=None, channel=None,
identity_key=None):
identity_key=None, sequence_rule_ids=None):
self.recordset = recordset
self.priority = priority
self.eta = eta
self.max_retries = max_retries
self.description = description
self.channel = channel
self.identity_key = identity_key
self.sequence_rule_ids = sequence_rule_ids

def __getattr__(self, name):
if name in self.recordset:
Expand All @@ -87,7 +88,8 @@ def delay(*args, **kwargs):
eta=self.eta,
description=self.description,
channel=self.channel,
identity_key=self.identity_key)
identity_key=self.identity_key,
sequence_rule_ids=self.sequence_rule_ids)
return delay

def __str__(self):
Expand Down Expand Up @@ -241,6 +243,10 @@ class Job(object):
be added to a channel if the existing job with the same key is not yet
started or executed.

.. attribute::sequence_rule_ids

Reference to rules to process jobs sequentially.

"""
@classmethod
def load(cls, env, job_uuid):
Expand Down Expand Up @@ -272,7 +278,8 @@ def _load_from_db_record(cls, job_db_record):
job_ = cls(method, args=args, kwargs=kwargs,
priority=stored.priority, eta=eta, job_uuid=stored.uuid,
description=stored.name, channel=stored.channel,
identity_key=stored.identity_key)
identity_key=stored.identity_key,
sequence_rule_ids=stored.sequence_rule_ids)

if stored.date_created:
job_.date_created = stored.date_created
Expand All @@ -296,6 +303,7 @@ def _load_from_db_record(cls, job_db_record):
if stored.company_id:
job_.company_id = stored.company_id.id
job_.identity_key = stored.identity_key
job_.sequence_rule_ids = stored.sequence_rule_ids if stored.sequence_rule_ids else None
return job_

def job_record_with_same_identity_key(self):
Expand All @@ -310,7 +318,7 @@ def job_record_with_same_identity_key(self):
@classmethod
def enqueue(cls, func, args=None, kwargs=None,
priority=None, eta=None, max_retries=None, description=None,
channel=None, identity_key=None):
channel=None, identity_key=None, sequence_rule_ids=None):
"""Create a Job and enqueue it in the queue. Return the job uuid.

This expects the arguments specific to the job to be already extracted
Expand All @@ -323,7 +331,8 @@ def enqueue(cls, func, args=None, kwargs=None,
new_job = cls(func=func, args=args,
kwargs=kwargs, priority=priority, eta=eta,
max_retries=max_retries, description=description,
channel=channel, identity_key=identity_key)
channel=channel, identity_key=identity_key,
sequence_rule_ids=sequence_rule_ids)
if new_job.identity_key:
existing = new_job.job_record_with_same_identity_key()
if existing:
Expand Down Expand Up @@ -354,7 +363,8 @@ def db_record_from_uuid(env, job_uuid):
def __init__(self, func,
args=None, kwargs=None, priority=None,
eta=None, job_uuid=None, max_retries=None,
description=None, channel=None, identity_key=None):
description=None, channel=None, identity_key=None,
sequence_rule_ids=None):
""" Create a Job

:param func: function to execute
Expand All @@ -378,6 +388,7 @@ def __init__(self, func,
:param identity_key: A hash to uniquely identify a job, or a function
that returns this hash (the function takes the job
as argument)
:param sequence_rule_ids: Reference to rules to process jobs sequentially.
:param env: Odoo Environment
:type env: :class:`odoo.api.Environment`
"""
Expand Down Expand Up @@ -454,6 +465,37 @@ def __init__(self, func,
self._eta = None
self.eta = eta
self.channel = channel
# Handle automatic.workflow.job, normally it is a normal model name.
# In practice only _validate_invoice_job is called through automatic.workflow.job
# This allows the user to select the field on account.invoice, instead of having
# to pick automatic.workflow.job, which doesn't have the fields of the invoice model.
if self.model_name == 'automatic.workflow.job':
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I did not add a dependency on this module because this won't break this module. Inheriting the Job class in a custom module for this did not seem reasonable for this kind of change. Albeit, this is a shim.

model_name = 'account.invoice'
record_ids = self.env[model_name].sudo().browse(self.args)
else:
model_name = self.model_name
record_ids = self.recordset
sequence_rules = env['queue.sequence.rule'].sudo().search([('model_id.model', '=', model_name)])
if sequence_rules:
self.sequence_rule_ids = [(6, 0, sequence_rules.ids)]
# Change the following when implementing multiple rules per model
self.rule_name = sequence_rules[0].name
if len(sequence_rules) > 1:
_logger.warning('More than one sequence rule defined for %s',
self.model_name)
if len(record_ids) > 1:
_logger.warning(
'Applying sequence rule for job %s failed because it has multiple related '
'records.', self.uuid
)
else:
for rule in sequence_rules:
value = str(record_ids[rule.field_id.name])
self.rule_value = value

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just asking to get a better grasp on it, still in the process of understanding this flow - in this case the value of a different rule can be set then the rule name set above (first one taken), is this expected or it doesn't matter when we have multiple ones?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one rule is allowed per model:

_sql_constraints = [
(
"uniq_model_id",
"UNIQUE(model_id)",
("Only one rule per model allowed"),

I left some stubs to allow for multiple rules per model, as I had started with that approach. I mid-way decided not to implement multiple rules per model because it would make the solution more complex than what we need, especially considering not making the job queue any slower.

else:
self.sequence_rule_ids = None
self.rule_name = None
self.rule_value = None

def perform(self):
"""Execute the job.
Expand Down Expand Up @@ -496,6 +538,7 @@ def store(self):
'date_done': False,
'eta': False,
'identity_key': False,
'sequence_rule_ids': self.sequence_rule_ids if self.sequence_rule_ids else None,
}

if self.date_enqueued:
Expand All @@ -522,6 +565,8 @@ def store(self):
'model_name': self.model_name,
'method_name': self.method_name,
'record_ids': self.recordset.ids,
'rule_name': self.rule_name,
'rule_value': self.rule_value,
'args': self.args,
'kwargs': self.kwargs,
})
Expand Down
93 changes: 86 additions & 7 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class SafeSet(set):
0
>>> s.remove(1)
"""

def remove(self, o):
# pylint: disable=missing-return,except-pass
try:
Expand Down Expand Up @@ -192,14 +193,18 @@ class ChannelJob(object):
"""

def __init__(self, db_name, channel, uuid,
seq, date_created, priority, eta):
seq, date_created, priority, eta, model_name=None, method_name=None, rule_name=None, rule_value=None):
self.db_name = db_name
self.channel = channel
self.uuid = uuid
self.seq = seq
self.date_created = date_created
self.priority = priority
self.eta = eta
self.rule_value = rule_value
self.rule_name = rule_name
self.model_name = model_name
self.method_name = method_name

def __repr__(self):
return "<ChannelJob %s>" % self.uuid
Expand Down Expand Up @@ -525,6 +530,56 @@ def has_capacity(self):
return True
return len(self._running) < self.capacity

def get_matching_rule(self, job):
"""Check if a running job has a matching QueueSequenceRule.

The QueueSequenceRule defines a field on a model.
This method checks if the field's value for the related record of job matches
that of any running job.

To avoid performance issues, we store the value on the job. The implication is that if field values
on the related record are modified between the job creation and execution, the job will be run with undefined
behavior (either sequential or not). Modify queue_job.rule_value during the write on the related record to avoid
this.

:return: (str) QueueSequenceRule.name which matches with job and a running job or None

>>> from pprint import pprint as pp
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,A:4')

# Test matching rule for a channel with capacity 4
>>> db = 'db'
# db_name, channel_name, uuid, seq, date_created, priority, eta, state,
# method_name=None, rule_name=None, rule_value=None
>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1')
>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1')
>>> cm.notify(db, 'A', 'A3', 3, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '1')
>>> cm.notify(db, 'A', 'A4', 4, 0, 10, None, 'pending', 'model_a', 'method_a', 'rule_a', '2')
>>> pp(list(cm.get_jobs_to_run(now=100)))
# A2 has the same method_name and rule_value as A1, so it has to wait until A1 is done.
# A4 runs concurrently because it has a different rule_value.
[<ChannelJob A1>, <ChannelJob A4>]
>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'done', 'model_a', 'method_a', 'rule_a', '1')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A2>]
>>> cm.notify(db, 'A', 'A2', 1, 0, 10, None, 'done', 'model_a', 'method_a', 'rule_a', '1')
>>> pp(list(cm.get_jobs_to_run(now=100)))
# Finally, A3 is run
[<ChannelJob A3>]
"""
# Channel doesn't have env, so we set values during the job init.
# Note that job.model_name does not guarantee the same related record model; see automatic.workflow.job.
# Likewise a method name can arbitrarily be the same. So we check both.
if job.rule_value and any(
job.rule_value == running_job.rule_value
for running_job in self._running
if running_job.method_name == job.method_name
and running_job.model_name == job.model_name
):
return job.rule_name
return None

def get_jobs_to_run(self, now):
"""Get jobs that are ready to run in channel.

Expand Down Expand Up @@ -559,19 +614,42 @@ def get_jobs_to_run(self, now):
self._pause_until = 0
_logger.debug("channel %s unpaused at %s", self, now)
# yield jobs that are ready to run, while we have capacity
_deferred = SafeSet()
while self.has_capacity():
job = self._queue.pop(now)
if not job:
for job in _deferred:
self._queue.add(job)
return
# Maintain sequence for jobs with same sequence group
# rule_value is computed on job creation. This means that existing jobs will not have it.
# On the view: forbid changing model once a rule has been saved, because once a rule has been assigned to
# a job, changing the model on the rule will mean a mismatch between the job's model and the rule's
# model.
matching_rule = self.get_matching_rule(job)
if matching_rule:
_deferred.add(job)
_logger.debug("job %s re-queued because a job with the same "
"sequence rule '%s' is already running "
"in channel %s",
job.uuid,
matching_rule,
self)
continue
self._running.add(job)
_logger.debug("job %s marked running in channel %s",
job.uuid, self)
yield job
if self.throttle:
self._pause_until = now + self.throttle
_logger.debug("pausing channel %s until %s",
self, self._pause_until)
return
if _deferred:
_logger.debug("Throttling deferred jobs is not implemented")
else:
self._pause_until = now + self.throttle
_logger.debug("pausing channel %s until %s",
self, self._pause_until)
return
for job in _deferred:
self._queue.add(job)

def get_wakeup_time(self, wakeup_time=0):
if not self.has_capacity():
Expand Down Expand Up @@ -997,7 +1075,8 @@ def get_channel_by_name(self, channel_name, autocreate=False):
return parent

def notify(self, db_name, channel_name, uuid,
seq, date_created, priority, eta, state):
seq, date_created, priority, eta, state, model_name=None, method_name=None, rule_name=None,
rule_value=None):
try:
channel = self.get_channel_by_name(channel_name)
except ChannelNotFound:
Expand All @@ -1024,7 +1103,7 @@ def notify(self, db_name, channel_name, uuid,
job = None
if not job:
job = ChannelJob(db_name, channel, uuid,
seq, date_created, priority, eta)
seq, date_created, priority, eta, model_name, method_name, rule_name, rule_value)
self._jobs_by_uuid[uuid] = job
# state transitions
if not state or state == DONE:
Expand Down
2 changes: 1 addition & 1 deletion queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def select_jobs(self, where, args):
# adding the where conditions, values are added later properly with
# parameters
query = ("SELECT channel, uuid, id as seq, date_created, "
"priority, EXTRACT(EPOCH FROM eta), state "
"priority, EXTRACT(EPOCH FROM eta), state, model_name, method_name, rule_name, rule_value "
"FROM queue_job WHERE %s" %
(where, ))
with closing(self.conn.cursor("select_jobs", withhold=True)) as cr:
Expand Down
1 change: 1 addition & 0 deletions queue_job/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from . import base
from . import queue_job
from . import queue_sequence_rule
3 changes: 3 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class QueueJob(models.Model):
index=True)

identity_key = fields.Char()
sequence_rule_ids = fields.Many2many(comodel_name='queue.sequence.rule', string='Sequence Rules')
rule_name = fields.Char(string='Sequence Rule', readonly=True)
rule_value = fields.Char(string='Rule Value', readonly=True)

@api.model_cr
def init(self):
Expand Down
58 changes: 58 additions & 0 deletions queue_job/models/queue_sequence_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# (c) 2022 Vanmoof <https://vanmoof.com>
# License LGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)

from odoo import _, api, fields, models


class QueueSequenceRule(models.Model):
"""
A model that defines rules for grouping and running queue jobs sequentially.

A queue job is considered part of a sequence group if the value of the field specified in the `field_id`
attribute is the same as the value of the field on the related record.

Example:
- If the `field_id` attribute is set to 'journal_id', jobs will be grouped and run sequentially
if the invoices they are processing have the same journal.

Don't confuse this with sequential queues. Those are simply channels with capacity 1, while this allows
channels with a larger capacity and can conditionally run jobs in sequence.

Attributes:
name (str): The name of the rule.
model_id (many2one): The name of the model for which the rule applies.
field_id (many2one): The name of the field on the model used to group jobs.
"""
_name = 'queue.sequence.rule'
_description = 'Sequence rules for queue jobs'

name = fields.Char(string='Name', required=True)
active = fields.Boolean(default=True)
model_id = fields.Many2one(
comodel_name='ir.model', string='Model', required=True
)
field_id = fields.Many2one(comodel_name='ir.model.fields', string='Field', required=True,
domain="[('model_id', '=', model_id)]")

_sql_constraints = [
(
"uniq_model_id",
"UNIQUE(model_id)",
("Only one rule per model allowed"),
),
(
"uniq_name",
"UNIQUE(name)",
("Rule name must be unique"),
),
]

@api.onchange('model_id')
def _onchange_model_id(self):
self.field_id = False

@api.constrains('model_id', 'field_id')
def _check_field_belongs_to_model(self):
for record in self:
if record.field_id.model_id != record.model_id:
raise ValueError(_('The selected field must belong to the selected model.'))
3 changes: 3 additions & 0 deletions queue_job/readme/DESCRIPTION.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ Features:
description, number of retries
* Related Actions: link an action on the job view, such as open the record
concerned by the job
* Sequence Rules: specify a model and a field for which the value on the
related records act like a key which determine if a job should be run
in sequence or not.
Loading