diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py
index e6538f94dc..8faaeef37f 100644
--- a/queue_job/__manifest__.py
+++ b/queue_job/__manifest__.py
@@ -2,7 +2,7 @@
{'name': 'Job Queue',
- 'version': '12.0.1.5.3',
+ 'version': '12.0.2.3.0',
'author': 'Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)',
'website': 'https://github.com/OCA/queue/tree/12.0/queue_job',
'license': 'LGPL-3',
@@ -17,7 +17,7 @@
'security/ir.model.access.csv',
'views/queue_job_views.xml',
'data/queue_data.xml',
- ],
+ "data/queue_job_function_data.xml"],
'installable': True,
'development_status': 'Mature',
'maintainers': ['guewen'],
diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py
index 68aba17ab8..f7f7da66de 100644
--- a/queue_job/controllers/main.py
+++ b/queue_job/controllers/main.py
@@ -7,6 +7,7 @@
from io import StringIO
from psycopg2 import OperationalError
+from werkzeug.exceptions import Forbidden
import odoo
from odoo import _, http, tools
@@ -113,17 +114,64 @@ def retry_postpone(job, message, seconds=None):
# retries are exhausted
env.cr.rollback()
- except (FailedJobError, Exception):
+ except (FailedJobError, Exception) as orig_exception:
buff = StringIO()
traceback.print_exc(file=buff)
- _logger.error(buff.getvalue())
+ traceback_txt = buff.getvalue()
+ _logger.error(traceback_txt)
job.env.clear()
with odoo.api.Environment.manage():
with odoo.registry(job.env.cr.dbname).cursor() as new_cr:
job.env = job.env(cr=new_cr)
- job.set_failed(exc_info=buff.getvalue())
+ vals = self._get_failure_values(job, traceback_txt, orig_exception)
+ job.set_failed(**vals)
job.store()
new_cr.commit()
+ buff.close()
raise
return ""
+
+ def _get_failure_values(self, job, traceback_txt, orig_exception):
+ """Collect relevant data from exception."""
+ exception_name = orig_exception.__class__.__name__
+ if hasattr(orig_exception, "__module__"):
+ exception_name = orig_exception.__module__ + "." + exception_name
+ exc_message = getattr(orig_exception, "name", str(orig_exception))
+ return {
+ "exc_info": traceback_txt,
+ "exc_name": exception_name,
+ "exc_message": exc_message,
+ }
+
+ @http.route("/queue_job/create_test_job", type="http", auth="user")
+ def create_test_job(
+ self, priority=None, max_retries=None, channel=None, description="Test job"
+ ):
+ if not http.request.env.user.has_group("base.group_erp_manager"):
+ raise Forbidden(_("Access Denied"))
+
+ if priority is not None:
+ try:
+ priority = int(priority)
+ except ValueError:
+ priority = None
+
+ if max_retries is not None:
+ try:
+ max_retries = int(max_retries)
+ except ValueError:
+ max_retries = None
+
+ delayed = (
+ http.request.env["queue.job"]
+ .with_delay(
+ priority=priority,
+ max_retries=max_retries,
+ channel=channel,
+ description=description,
+ )
+ ._test_job()
+ )
+
+ return delayed.db_record().uuid
diff --git a/queue_job/data/queue_job_function_data.xml b/queue_job/data/queue_job_function_data.xml
new file mode 100644
index 0000000000..0105dbc508
--- /dev/null
+++ b/queue_job/data/queue_job_function_data.xml
@@ -0,0 +1,6 @@
+
+
+
+ _test_job
+
+
diff --git a/queue_job/fields.py b/queue_job/fields.py
index 171f7206e9..72ce5db753 100644
--- a/queue_job/fields.py
+++ b/queue_job/fields.py
@@ -5,24 +5,63 @@
from datetime import datetime, date
import dateutil
+import lxml
from odoo import fields, models
class JobSerialized(fields.Field):
- """Serialized fields provide the storage for sparse fields."""
+ """Provide the storage for job fields stored as json
+
+ A base_type must be set, it must be dict, list or tuple.
+ When the field is not set, the json will be the corresponding
+ json string ("{}" or "[]").
+
+ Support for some custom types has been added to the json decoder/encoder
+ (see JobEncoder and JobDecoder).
+ """
+
type = 'job_serialized'
column_type = ('text', 'text')
- def convert_to_column(self, value, record, values=None):
- return json.dumps(value, cls=JobEncoder)
+ _slots = {"_base_type": type}
+
+ _default_json_mapping = {
+ dict: "{}",
+ list: "[]",
+ tuple: "[]",
+ models.BaseModel: lambda env: json.dumps(
+ {"_type": "odoo_recordset", "model": "base", "ids": [], "uid": env.uid}
+ ),
+ }
+
+ def __init__(self, string=fields.Default, base_type=fields.Default, **kwargs):
+ super().__init__(string=string, _base_type=base_type, **kwargs)
+
+ def _setup_attrs(self, model, name):
+ super()._setup_attrs(model, name)
+ if self._base_type not in self._default_json_mapping:
+ raise ValueError("%s is not a supported base type" % (self._base_type))
+
+ def _base_type_default_json(self, env):
+ default_json = self._default_json_mapping.get(self._base_type)
+ if not isinstance(default_json, str):
+ default_json = default_json(env)
+ return default_json
+
+ def convert_to_column(self, value, record, values=None, validate=True):
+ return self.convert_to_cache(value, record, validate=validate)
def convert_to_cache(self, value, record, validate=True):
- # cache format: dict
- value = value or {}
- if isinstance(value, dict):
- return value
- return json.loads(value, cls=JobDecoder, env=record.env)
+ # cache format: json.dumps(value) or None
+ if isinstance(value, self._base_type):
+ return json.dumps(value, cls=JobEncoder)
+ else:
+ return value or None
+
+ def convert_to_record(self, value, record):
+ default = self._base_type_default_json(record.env)
+ return json.loads(value or default, cls=JobDecoder, env=record.env)
class JobEncoder(json.JSONEncoder):
@@ -41,6 +80,11 @@ def default(self, obj):
elif isinstance(obj, date):
return {'_type': 'date_isoformat',
'value': obj.isoformat()}
+ elif isinstance(obj, lxml.etree._Element):
+ return {
+ "_type": "etree_element",
+ "value": lxml.etree.tostring(obj, encoding=str),
+ }
return json.JSONEncoder.default(self, obj)
@@ -63,9 +107,12 @@ def object_hook(self, obj):
model = self.env[obj['model']]
if obj.get('uid'):
model = model.sudo(obj['uid'])
+
return model.browse(obj['ids'])
elif type_ == 'datetime_isoformat':
return dateutil.parser.parse(obj['value'])
elif type_ == 'date_isoformat':
return dateutil.parser.parse(obj['value']).date()
+ elif type_ == "etree_element":
+ return lxml.etree.fromstring(obj["value"])
return obj
diff --git a/queue_job/i18n/queue_job.pot b/queue_job/i18n/queue_job.pot
index 0dc339b89b..54af22e31b 100644
--- a/queue_job/i18n/queue_job.pot
+++ b/queue_job/i18n/queue_job.pot
@@ -481,8 +481,8 @@ msgid "Queue Job"
msgstr ""
#. module: queue_job
-#: model:ir.model.fields,field_description:queue_job.field_queue_job__record_ids
-msgid "Record"
+#: model:ir.model.fields,field_description:queue_job.field_queue_job__records
+msgid "Record(s)"
msgstr ""
#. module: queue_job
diff --git a/queue_job/job.py b/queue_job/job.py
index 66d82f865e..c121e16eef 100644
--- a/queue_job/job.py
+++ b/queue_job/job.py
@@ -1,4 +1,4 @@
-# Copyright 2013-2016 Camptocamp
+# Copyright 2013-2020 Camptocamp
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
import inspect
@@ -6,6 +6,7 @@
import hashlib
import logging
import uuid
+import os
import sys
from datetime import datetime, timedelta
@@ -43,9 +44,6 @@ class DelayableRecordset(object):
delayable = DelayableRecordset(recordset, priority=20)
delayable.method(args, kwargs)
- ``method`` must be a method of the recordset's Model, decorated with
- :func:`~odoo.addons.queue_job.job.job`.
-
The method call will be processed asynchronously in the job queue, with
the passed arguments.
@@ -71,12 +69,6 @@ def __getattr__(self, name):
(name, self.recordset)
)
recordset_method = getattr(self.recordset, name)
- if not getattr(recordset_method, 'delayable', None):
- raise AttributeError(
- 'method %s on %s is not allowed to be delayed, '
- 'it should be decorated with odoo.addons.queue_job.job.job' %
- (name, self.recordset)
- )
def delay(*args, **kwargs):
return Job.enqueue(recordset_method,
@@ -213,6 +205,14 @@ class Job(object):
A description of the result (for humans).
+ .. attribute:: exc_name
+
+ Exception error name when the job failed.
+
+ .. attribute:: exc_message
+
+ Exception error message when the job failed.
+
.. attribute:: exc_info
Exception information (traceback) when the job failed.
@@ -254,15 +254,12 @@ def load(cls, env, job_uuid):
@classmethod
def _load_from_db_record(cls, job_db_record):
stored = job_db_record
- env = job_db_record.env
args = stored.args
kwargs = stored.kwargs
method_name = stored.method_name
- model = env[stored.model_name]
-
- recordset = model.browse(stored.record_ids)
+ recordset = stored.records
method = getattr(recordset, method_name)
eta = None
@@ -289,13 +286,12 @@ def _load_from_db_record(cls, job_db_record):
job_.state = stored.state
job_.result = stored.result if stored.result else None
job_.exc_info = stored.exc_info if stored.exc_info else None
- job_.user_id = stored.user_id.id if stored.user_id else None
- job_.model_name = stored.model_name if stored.model_name else None
job_.retry = stored.retry
job_.max_retries = stored.max_retries
if stored.company_id:
job_.company_id = stored.company_id.id
job_.identity_key = stored.identity_key
+ job_.worker_pid = stored.worker_pid
return job_
def job_record_with_same_identity_key(self):
@@ -349,7 +345,7 @@ def enqueue(cls, func, args=None, kwargs=None,
def db_record_from_uuid(env, job_uuid):
model = env['queue.job'].sudo()
record = model.search([('uuid', '=', job_uuid)], limit=1)
- return record.with_env(env)
+ return record.with_env(env).sudo()
def __init__(self, func,
args=None, kwargs=None, priority=None,
@@ -396,7 +392,6 @@ def __init__(self, func,
recordset = func.__self__
env = recordset.env
- self.model_name = recordset._name
self.method_name = func.__name__
self.recordset = recordset
@@ -404,6 +399,10 @@ def __init__(self, func,
self.job_model = self.env['queue.job']
self.job_model_name = 'queue.job'
+ self.job_config = (
+ self.env["queue.job.function"].sudo().job_config(self.job_function_name)
+ )
+
self.state = PENDING
self.retry = 0
@@ -438,9 +437,10 @@ def __init__(self, func,
self.date_done = None
self.result = None
+ self.exc_name = None
+ self.exc_message = None
self.exc_info = None
- self.user_id = env.uid
if 'company_id' in env.context:
company_id = env.context['company_id']
else:
@@ -454,6 +454,7 @@ def __init__(self, func,
self._eta = None
self.eta = eta
self.channel = channel
+ self.worker_pid = None
def perform(self):
"""Execute the job.
@@ -483,10 +484,28 @@ def perform(self):
def store(self):
"""Store the Job"""
+ job_model = self.env["queue.job"]
+ # The sentinel is used to prevent edition sensitive fields (such as
+ # method_name) from RPC methods.
+ edit_sentinel = job_model.EDIT_SENTINEL
+
+ db_record = self.db_record()
+ if db_record:
+ db_record.with_context(_job_edit_sentinel=edit_sentinel).write(
+ self._store_values()
+ )
+ else:
+ job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(
+ self._store_values(create=True)
+ )
+
+ def _store_values(self, create=False):
vals = {'state': self.state,
'priority': self.priority,
'retry': self.retry,
'max_retries': self.max_retries,
+ 'exc_name': self.exc_name,
+ 'exc_message': self.exc_message,
'exc_info': self.exc_info,
'user_id': self.user_id or self.env.uid,
'company_id': self.company_id,
@@ -494,8 +513,10 @@ def store(self):
'date_enqueued': False,
'date_started': False,
'date_done': False,
+ 'exec_time': False,
'eta': False,
'identity_key': False,
+ "worker_pid": self.worker_pid,
}
if self.date_enqueued:
@@ -504,33 +525,59 @@ def store(self):
vals['date_started'] = self.date_started
if self.date_done:
vals['date_done'] = self.date_done
+ if self.exec_time:
+ vals["exec_time"] = self.exec_time
if self.eta:
vals['eta'] = self.eta
if self.identity_key:
vals['identity_key'] = self.identity_key
- db_record = self.db_record()
- if db_record:
- db_record.write(vals)
- else:
- date_created = self.date_created
- # The following values must never be modified after the
- # creation of the job
- vals.update({'uuid': self.uuid,
- 'name': self.description,
- 'date_created': date_created,
- 'model_name': self.model_name,
- 'method_name': self.method_name,
- 'record_ids': self.recordset.ids,
- 'args': self.args,
- 'kwargs': self.kwargs,
- })
- # it the channel is not specified, lets the job_model compute
- # the right one to use
- if self.channel:
- vals.update({'channel': self.channel})
-
- self.env[self.job_model_name].sudo().create(vals)
+ if create:
+ vals.update(
+ {
+ "user_id": self.env.uid,
+ "channel": self.channel,
+ # The following values must never be modified after the
+ # creation of the job
+ "uuid": self.uuid,
+ "name": self.description,
+ "func_string": self.func_string,
+ "date_created": self.date_created,
+ "model_name": self.recordset._name,
+ "method_name": self.method_name,
+ "job_function_id": self.job_config.job_function_id,
+ "channel_method_name": self.job_function_name,
+ "records": self.recordset,
+ "args": self.args,
+ "kwargs": self.kwargs,
+ }
+ )
+
+ vals_from_model = self._store_values_from_model()
+ # Sanitize values: make sure you cannot screw core values
+ vals_from_model = {k: v for k, v in vals_from_model.items() if k not in vals}
+ vals.update(vals_from_model)
+ return vals
+
+ def _store_values_from_model(self):
+ vals = {}
+ value_handlers_candidates = (
+ "_job_store_values_for_" + self.method_name,
+ "_job_store_values",
+ )
+ for candidate in value_handlers_candidates:
+ handler = getattr(self.recordset, candidate, None)
+ if handler is not None:
+ vals = handler(self)
+ return vals
+
+ @property
+ def func_string(self):
+ model = repr(self.recordset)
+ args = [repr(arg) for arg in self.args]
+ kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()]
+ all_args = ", ".join(args + kwargs)
+ return "{}.{}({})".format(model, self.method_name, all_args)
def db_record(self):
return self.db_record_from_uuid(self.env, self.uuid)
@@ -541,6 +588,11 @@ def func(self):
recordset = recordset.sudo(self.user_id)
return getattr(recordset, self.method_name)
+ @property
+ def job_function_name(self):
+ func_model = self.env["queue.job.function"].sudo()
+ return func_model.job_function_name(self.recordset._name, self.method_name)
+
@property
def identity_key(self):
if self._identity_key is None:
@@ -575,6 +627,14 @@ def uuid(self):
self._uuid = str(uuid.uuid4())
return self._uuid
+ @property
+ def model_name(self):
+ return self.recordset._name
+
+ @property
+ def user_id(self):
+ return self.recordset.env.uid
+
@property
def eta(self):
return self._eta
@@ -590,11 +650,27 @@ def eta(self, value):
else:
self._eta = value
+ @property
+ def channel(self):
+ return self._channel or self.job_config.channel
+
+ @channel.setter
+ def channel(self, value):
+ self._channel = value
+
+ @property
+ def exec_time(self):
+ if self.date_done and self.date_started:
+ return (self.date_done - self.date_started).total_seconds()
+ return None
+
def set_pending(self, result=None, reset_retry=True):
self.state = PENDING
self.date_enqueued = None
self.date_started = None
self.date_done = None
+ self.worker_pid = None
+ self.date_done = None
if reset_retry:
self.retry = 0
if result is not None:
@@ -604,28 +680,35 @@ def set_enqueued(self):
self.state = ENQUEUED
self.date_enqueued = datetime.now()
self.date_started = None
+ self.worker_pid = None
def set_started(self):
self.state = STARTED
self.date_started = datetime.now()
+ self.worker_pid = os.getpid()
def set_done(self, result=None):
self.state = DONE
+ self.exc_name = None
self.exc_info = None
self.date_done = datetime.now()
if result is not None:
self.result = result
- def set_failed(self, exc_info=None):
+ def set_failed(self, **kw):
self.state = FAILED
- if exc_info is not None:
- self.exc_info = exc_info
+ for k, v in kw.items():
+ if v is not None:
+ setattr(self, k, v)
def __repr__(self):
return '' % (self.uuid, self.priority)
def _get_retry_seconds(self, seconds=None):
- retry_pattern = self.func.retry_pattern
+ retry_pattern = self.job_config.retry_pattern
+ if not retry_pattern:
+ # TODO deprecated by :job-no-decorator:
+ retry_pattern = getattr(self.func, "retry_pattern", None)
if not seconds and retry_pattern:
# ordered from higher to lower count of retries
patt = sorted(retry_pattern.items(), key=lambda t: t[0])
@@ -648,24 +731,34 @@ def postpone(self, result=None, seconds=None):
"""
eta_seconds = self._get_retry_seconds(seconds)
self.eta = timedelta(seconds=eta_seconds)
+ self.exc_name = None
self.exc_info = None
if result is not None:
self.result = result
def related_action(self):
record = self.db_record()
- if hasattr(self.func, 'related_action'):
+ if not self.job_config.related_action_enable:
+ return None
+
+ funcname = self.job_config.related_action_func_name
+ if not funcname and hasattr(self.func, 'related_action'):
+ # TODO deprecated by :job-no-decorator:
funcname = self.func.related_action
# decorator is set but empty: disable the default one
if not funcname:
return None
- else:
+
+ if not funcname:
funcname = record._default_related_action
if not isinstance(funcname, str):
raise ValueError('related_action must be the name of the '
'method on queue.job as string')
action = getattr(record, funcname)
- action_kwargs = getattr(self.func, 'kwargs', {})
+ action_kwargs = self.job_config.related_action_kwargs
+ if not action_kwargs:
+ # TODO deprecated by :job-no-decorator:
+ action_kwargs = getattr(self.func, 'kwargs', {})
return action(**action_kwargs)
@@ -674,9 +767,13 @@ def _is_model_method(func):
isinstance(func.__self__.__class__, odoo.models.MetaModel))
+# TODO deprecated by :job-no-decorator:
def job(func=None, default_channel='root', retry_pattern=None):
"""Decorator for job methods.
+ Deprecated. Use ``queue.job.function`` XML records (details in
+ ``readme/USAGE.rst``).
+
It enables the possibility to use a Model's method as a job function.
Optional argument:
@@ -760,6 +857,31 @@ def retryable_example():
return functools.partial(job, default_channel=default_channel,
retry_pattern=retry_pattern)
+ xml_fields = [
+ ' \n'
+ ' _test_job\n'
+ ]
+ if default_channel:
+ xml_fields.append(' ')
+ if retry_pattern:
+ xml_fields.append(' {retry_pattern}')
+
+ _logger.info(
+ "@job is deprecated and no longer needed (on %s), it is advised to use an "
+ "XML record (activate DEBUG log for snippet)",
+ func.__name__,
+ )
+ if _logger.isEnabledFor(logging.DEBUG):
+ xml_record = (
+ '\n' + "\n".join(xml_fields) + "\n"
+ ).format(**{"method": func.__name__, "retry_pattern": retry_pattern})
+ _logger.debug(
+ "XML snippet (to complete) for replacing @job on %s:\n%s",
+ func.__name__,
+ xml_record,
+ )
+
def delay_from_model(*args, **kwargs):
raise AttributeError(
"method.delay() can no longer be used, the general form is "
@@ -781,9 +903,13 @@ def delay_from_model(*args, **kwargs):
return func
+# TODO deprecated by :job-no-decorator:
def related_action(action=None, **kwargs):
"""Attach a *Related Action* to a job (decorator)
+ Deprecated. Use ``queue.job.function`` XML records (details in
+ ``readme/USAGE.rst``).
+
A *Related Action* will appear as a button on the Odoo view.
The button will execute the action, usually it will open the
form view of the record related to the job.
@@ -801,7 +927,7 @@ class QueueJob(models.Model):
def related_action_partner(self):
self.ensure_one()
model = self.model_name
- partner = self.env[model].browse(self.record_ids)
+ partner = self.records
# possibly get the real ID if partner_id is a binding ID
action = {
'name': _("Partner"),
@@ -846,6 +972,34 @@ def export_product(self):
"""
def decorate(func):
+ related_action_dict = {
+ "func_name": action,
+ }
+ if kwargs:
+ related_action_dict["kwargs"] = kwargs
+
+ xml_fields = (
+ ' \n'
+ ' _test_job\n'
+ ' {related_action}'
+ )
+
+ _logger.info(
+ "@related_action is deprecated and no longer needed (on %s),"
+ " it is advised to use an XML record (activate DEBUG log for snippet)",
+ func.__name__,
+ )
+ if _logger.isEnabledFor(logging.DEBUG):
+ xml_record = (
+ '\n' + xml_fields + "\n"
+ ).format(**{"method": func.__name__, "related_action": action})
+ _logger.debug(
+ "XML snippet (to complete) for replacing @related_action on %s:\n%s",
+ func.__name__,
+ xml_record,
+ )
+
func.related_action = action
func.kwargs = kwargs
return func
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index 11d5df15d1..31164f930a 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -287,12 +287,28 @@ def _has_queue_job(self):
cr.execute("SELECT 1 FROM pg_tables WHERE tablename=%s",
('ir_module_module',))
if not cr.fetchone():
+ _logger.debug("%s doesn't seem to be an odoo db", self.db_name)
return False
cr.execute(
"SELECT 1 FROM ir_module_module WHERE name=%s AND state=%s",
('queue_job', 'installed')
)
- return cr.fetchone()
+ if not cr.fetchone():
+ _logger.debug("queue_job is not installed for db %s", self.db_name)
+ return False
+ cr.execute(
+ """SELECT COUNT(1)
+ FROM information_schema.triggers
+ WHERE event_object_table = %s
+ AND trigger_name = %s""",
+ ("queue_job", "queue_job_notify"),
+ )
+ if cr.fetchone()[0] != 3: # INSERT, DELETE, UPDATE
+ _logger.error(
+ "queue_job_notify trigger is missing in db %s", self.db_name
+ )
+ return False
+ return True
def _initialize(self):
with closing(self.conn.cursor()) as cr:
diff --git a/queue_job/migrations/12.0.2.0.0/post-migration.py b/queue_job/migrations/12.0.2.0.0/post-migration.py
new file mode 100644
index 0000000000..a399e53e36
--- /dev/null
+++ b/queue_job/migrations/12.0.2.0.0/post-migration.py
@@ -0,0 +1,23 @@
+# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+import logging
+
+from odoo import SUPERUSER_ID, api, exceptions
+
+_logger = logging.getLogger(__name__)
+
+
+def migrate(cr, version):
+ with api.Environment.manage():
+ env = api.Environment(cr, SUPERUSER_ID, {})
+ for job_func in env["queue.job.function"].search([]):
+ try:
+ # trigger inverse field to set model_id and method
+ job_func.name = job_func.name
+ except exceptions.UserError:
+ # ignore invalid entries not to block migration
+ _logger.error(
+ "could not migrate job function '%s' (id: %s), invalid name",
+ job_func.name,
+ job_func.id,
+ )
diff --git a/queue_job/migrations/12.0.2.1.0/pre-migration.py b/queue_job/migrations/12.0.2.1.0/pre-migration.py
new file mode 100644
index 0000000000..897846fa83
--- /dev/null
+++ b/queue_job/migrations/12.0.2.1.0/pre-migration.py
@@ -0,0 +1,28 @@
+# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+import logging
+
+from odoo.tools.sql import column_exists
+
+_logger = logging.getLogger(__name__)
+
+
+def migrate(cr, version):
+ if not column_exists(cr, "queue_job", "records"):
+ cr.execute(
+ """
+ ALTER TABLE queue_job
+ ADD COLUMN records text;
+ """
+ )
+ cr.execute(
+ """
+ UPDATE queue_job
+ SET records = '{"_type": "odoo_recordset"'
+ || ', "model": "' || model_name || '"'
+ || ', "uid": ' || user_id
+ || ', "ids": ' || record_ids
+ || '}'
+ WHERE records IS NULL;
+ """
+ )
diff --git a/queue_job/migrations/12.0.2.2.0/pre-migration.py b/queue_job/migrations/12.0.2.2.0/pre-migration.py
new file mode 100644
index 0000000000..c14d6800ad
--- /dev/null
+++ b/queue_job/migrations/12.0.2.2.0/pre-migration.py
@@ -0,0 +1,35 @@
+# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+import logging
+
+from odoo.tools.sql import column_exists
+
+_logger = logging.getLogger(__name__)
+
+
+def migrate(cr, version):
+ if not column_exists(cr, "queue_job", "exec_time"):
+ # Disable trigger otherwise the update takes ages.
+ cr.execute(
+ """
+ ALTER TABLE queue_job DISABLE TRIGGER queue_job_notify;
+ """
+ )
+ cr.execute(
+ """
+ ALTER TABLE queue_job ADD COLUMN exec_time double precision DEFAULT 0;
+ """
+ )
+ cr.execute(
+ """
+ UPDATE
+ queue_job
+ SET
+ exec_time = EXTRACT(EPOCH FROM (date_done - date_started));
+ """
+ )
+ cr.execute(
+ """
+ ALTER TABLE queue_job ENABLE TRIGGER queue_job_notify;
+ """
+ )
diff --git a/queue_job/migrations/12.0.2.3.0/post-migration.py b/queue_job/migrations/12.0.2.3.0/post-migration.py
new file mode 100644
index 0000000000..358bdb5b31
--- /dev/null
+++ b/queue_job/migrations/12.0.2.3.0/post-migration.py
@@ -0,0 +1,49 @@
+# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+import logging
+
+from odoo import SUPERUSER_ID, api
+
+_logger = logging.getLogger(__name__)
+
+
+def migrate(cr, version):
+ with api.Environment.manage():
+ env = api.Environment(cr, SUPERUSER_ID, {})
+ _logger.info("Computing exception name for failed jobs")
+ _compute_jobs_new_values(env)
+
+
+def _compute_jobs_new_values(env):
+ job_exc_infos = env["queue.job"].search(
+ [("state", "=", "failed"), ("exc_info", "!=", False)]
+ ).read(fields=["exc_info"])
+ for job_row in job_exc_infos:
+ exception_details = _get_exception_details(job_row["exc_info"])
+ if exception_details:
+ job = env["queue.job"].browse(job_row["id"])
+ job.write(exception_details)
+
+
+def _get_exception_details(exc_info):
+ for line in reversed(exc_info.splitlines()):
+ if _find_exception(line):
+ name, msg = line.split(":", 1)
+ return {
+ "exc_name": name.strip(),
+ "exc_message": msg.strip("()', \""),
+ }
+
+
+def _find_exception(line):
+ # Just a list of common errors.
+ # If you want to target others, add your own migration step for your db.
+ exceptions = (
+ "Error:", # catch all well named exceptions
+ # other live instance errors found
+ "requests.exceptions.MissingSchema",
+ "botocore.errorfactory.NoSuchKey",
+ )
+ for exc in exceptions:
+ if exc in line:
+ return exc
diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py
index 0909032522..b7b1791462 100644
--- a/queue_job/models/__init__.py
+++ b/queue_job/models/__init__.py
@@ -1,2 +1,3 @@
from . import base
+from . import ir_model_fields
from . import queue_job
diff --git a/queue_job/models/base.py b/queue_job/models/base.py
index 82ae657c08..4e763aa10f 100644
--- a/queue_job/models/base.py
+++ b/queue_job/models/base.py
@@ -1,11 +1,12 @@
# Copyright 2016 Camptocamp
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+import functools
import inspect
import logging
import os
-from odoo import models, api
+from odoo import api, models
from ..job import DelayableRecordset
_logger = logging.getLogger(__name__)
@@ -19,6 +20,7 @@ class Base(models.AbstractModel):
"""
_inherit = 'base'
+ # TODO deprecated by :job-no-decorator:
@api.model_cr
def _register_hook(self):
"""Register marked jobs"""
@@ -37,15 +39,22 @@ def with_delay(self, priority=None, eta=None,
channel=None, identity_key=None):
""" Return a ``DelayableRecordset``
- The returned instance allow to enqueue any method of the recordset's
- Model which is decorated by :func:`~odoo.addons.queue_job.job.job`.
+ The returned instance allows to enqueue any method of the recordset's
+ Model.
Usage::
self.env['res.users'].with_delay().write({'name': 'test'})
- In the line above, in so far ``write`` is allowed to be delayed with
- ``@job``, the write will be executed in an asynchronous job.
+ ``with_delay()`` accepts job properties which specify how the job will
+ be executed.
+
+ Usage with job properties::
+
+ delayable = env['a.model'].with_delay(priority=30, eta=60*60*5)
+ delayable.export_one_thing(the_thing_to_export)
+ # => the job will be executed with a low priority and not before a
+ # delay of 5 hours from now
:param priority: Priority of the job, 0 being the higher priority.
Default is 10.
@@ -61,7 +70,9 @@ def with_delay(self, priority=None, eta=None,
defined on the function
:param identity_key: key uniquely identifying the job, if specified
and a job with the same key has not yet been run,
- the new job will not be added.
+ the new job will not be added. It is either a
+ string, either a function that takes the job as
+ argument (see :py:func:`..job.identity_exact`).
:return: instance of a DelayableRecordset
:rtype: :class:`odoo.addons.queue_job.job.DelayableRecordset`
@@ -91,3 +102,110 @@ def with_delay(self, priority=None, eta=None,
description=description,
channel=channel,
identity_key=identity_key)
+
+ def _patch_job_auto_delay(self, method_name, context_key=None):
+ """Patch a method to be automatically delayed as job method when called
+
+ This patch method has to be called in ``_register_hook`` (example
+ below).
+
+ When a method is patched, any call to the method will not directly
+ execute the method's body, but will instead enqueue a job.
+
+ When a ``context_key`` is set when calling ``_patch_job_auto_delay``,
+ the patched method is automatically delayed only when this key is
+ ``True`` in the caller's context. It is advised to patch the method
+ with a ``context_key``, because making the automatic delay *in any
+ case* can produce nasty and unexpected side effects (e.g. another
+ module calls the method and expects it to be computed before doing
+ something else, expecting a result, ...).
+
+ A typical use case is when a method in a module we don't control is
+ called synchronously in the middle of another method, and we'd like all
+ the calls to this method become asynchronous.
+
+ The options of the job usually passed to ``with_delay()`` (priority,
+ description, identity_key, ...) can be returned in a dictionary by a
+ method named after the name of the method suffixed by ``_job_options``
+ which takes the same parameters as the initial method.
+
+ It is still possible to force synchronous execution of the method by
+ setting a key ``_job_force_sync`` to True in the environment context.
+
+ Example patching the "foo" method to be automatically delayed as job
+ (the job options method is optional):
+
+ .. code-block:: python
+
+ # original method:
+ def foo(self, arg1):
+ print("hello", arg1)
+
+ def large_method(self):
+ # doing a lot of things
+ self.foo("world)
+ # doing a lot of other things
+
+ def button_x(self):
+ self.with_context(auto_delay_foo=True).large_method()
+
+ # auto delay patch:
+ def foo_job_options(self, arg1):
+ return {
+ "priority": 100,
+ "description": "Saying hello to {}".format(arg1)
+ }
+
+ def _register_hook(self):
+ self._patch_method(
+ "foo",
+ self._patch_job_auto_delay("foo", context_key="auto_delay_foo")
+ )
+ return super()._register_hook()
+
+ The result when ``button_x`` is called, is that a new job for ``foo``
+ is delayed.
+ """
+
+ def auto_delay_wrapper(self, *args, **kwargs):
+ # when no context_key is set, we delay in any case (warning, can be
+ # dangerous)
+ context_delay = self.env.context.get(context_key) if context_key else True
+ if (
+ self.env.context.get("job_uuid")
+ or not context_delay
+ or self.env.context.get("_job_force_sync")
+ or self.env.context.get("test_queue_job_no_delay")
+ ):
+ # we are in the job execution
+ return auto_delay_wrapper.origin(self, *args, **kwargs)
+ else:
+ # replace the synchronous call by a job on itself
+ method_name = auto_delay_wrapper.origin.__name__
+ job_options_method = getattr(
+ self, "{}_job_options".format(method_name), None
+ )
+ job_options = {}
+ if job_options_method:
+ job_options.update(job_options_method(*args, **kwargs))
+ delayed = self.with_delay(**job_options)
+ return getattr(delayed, method_name)(*args, **kwargs)
+
+ origin = getattr(self, method_name)
+ return functools.update_wrapper(auto_delay_wrapper, origin)
+
+ @api.model
+ def _job_store_values(self, job):
+ """Hook for manipulating job stored values.
+
+ You can define a more specific hook for a job function
+ by defining a method name with this pattern:
+
+ `_queue_job_store_values_${func_name}`
+
+ NOTE: values will be stored only if they match stored fields on `queue.job`.
+
+ :param job: current queue_job.job.Job instance.
+ :return: dictionary for setting job values.
+ """
+ return {}
diff --git a/queue_job/models/ir_model_fields.py b/queue_job/models/ir_model_fields.py
new file mode 100644
index 0000000000..30d48dc236
--- /dev/null
+++ b/queue_job/models/ir_model_fields.py
@@ -0,0 +1,10 @@
+# Copyright 2020 Camptocamp
+# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+from odoo import fields, models
+
+
+class IrModelFields(models.Model):
+ _inherit = "ir.model.fields"
+
+ ttype = fields.Selection(selection_add=[("job_serialized", "Job Serialized")])
diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py
index 696ed9da56..d58744f387 100644
--- a/queue_job/models/queue_job.py
+++ b/queue_job/models/queue_job.py
@@ -1,25 +1,22 @@
# Copyright 2013-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+import ast
import logging
+import re
+from collections import namedtuple
from datetime import datetime, timedelta
-from odoo import models, fields, api, exceptions, _
+from odoo import _, api, exceptions, fields, models, tools
from odoo.osv import expression
-# Import `Serialized` field straight to avoid:
-# * remember to use --load=base_sparse_field...
-# * make pytest happy
-# * make everybody happy :
-from odoo.addons.base_sparse_field.models.fields import Serialized
-from ..job import STATES, DONE, PENDING, Job
from ..fields import JobSerialized
+from ..job import DONE, PENDING, STATES, Job
_logger = logging.getLogger(__name__)
-def channel_func_name(model, method):
- return '<%s>.%s' % (model._name, method.__name__)
+regex_job_function_name = re.compile(r"^<([0-9a-z_\.]+)>\.([0-9a-zA-Z_]+)$")
class QueueJob(models.Model):
@@ -34,30 +31,54 @@ class QueueJob(models.Model):
_removal_interval = 30 # days
_default_related_action = 'related_action_open_record'
+ # This must be passed in a context key "_job_edit_sentinel" to write on
+ # protected fields. It protects against crafting "queue.job" records from
+ # RPC (e.g. on internal methods). When ``with_delay`` is used, the sentinel
+ # is set.
+ EDIT_SENTINEL = object()
+ _protected_fields = (
+ "uuid",
+ "name",
+ "date_created",
+ "model_name",
+ "method_name",
+ "func_string",
+ "channel_method_name",
+ "job_function_id",
+ "records",
+ "args",
+ "kwargs",
+ )
+
uuid = fields.Char(string='UUID',
readonly=True,
index=True,
required=True)
user_id = fields.Many2one(comodel_name='res.users',
- string='User ID',
- required=True)
+ string='User ID')
company_id = fields.Many2one(comodel_name='res.company',
string='Company', index=True)
name = fields.Char(string='Description', readonly=True)
model_name = fields.Char(string='Model', readonly=True)
method_name = fields.Char(readonly=True)
- record_ids = Serialized(readonly=True)
- args = JobSerialized(readonly=True)
- kwargs = JobSerialized(readonly=True)
- func_string = fields.Char(string='Task', compute='_compute_func_string',
- readonly=True, store=True)
+ # record_ids field is only for backward compatibility (e.g. used in related
+ # actions), can be removed (replaced by "records") in 14.0
+ record_ids = JobSerialized(compute="_compute_record_ids", base_type=list)
+ records = JobSerialized(
+ string="Record(s)", readonly=True, base_type=models.BaseModel,
+ )
+ args = JobSerialized(readonly=True, base_type=tuple)
+ kwargs = JobSerialized(readonly=True, base_type=dict)
+ func_string = fields.Char(string="Task", readonly=True)
state = fields.Selection(STATES,
readonly=True,
required=True,
index=True)
priority = fields.Integer()
+ exc_name = fields.Char(string="Exception", readonly=True)
+ exc_message = fields.Char(string="Exception Message", readonly=True)
exc_info = fields.Text(string='Exception Info', readonly=True)
result = fields.Text(readonly=True)
@@ -65,6 +86,11 @@ class QueueJob(models.Model):
date_started = fields.Datetime(string='Start Date', readonly=True)
date_enqueued = fields.Datetime(string='Enqueue Time', readonly=True)
date_done = fields.Datetime(readonly=True)
+ exec_time = fields.Float(
+ string="Execution Time (avg)",
+ group_operator="avg",
+ help="Time required to execute this job in seconds. Average when grouped.",
+ )
eta = fields.Datetime(string='Execute only after')
retry = fields.Integer(string='Current try')
@@ -74,22 +100,19 @@ class QueueJob(models.Model):
"max. retries.\n"
"Retries are infinite when empty.",
)
+ # FIXME the name of this field is very confusing
+
channel_method_name = fields.Char(readonly=True,
compute='_compute_job_function',
store=True)
job_function_id = fields.Many2one(comodel_name='queue.job.function',
- compute='_compute_job_function',
string='Job Function',
- readonly=True,
- store=True)
+ readonly=True)
- override_channel = fields.Char()
- channel = fields.Char(compute='_compute_channel',
- inverse='_inverse_channel',
- store=True,
- index=True)
+ channel = fields.Char(index=True)
- identity_key = fields.Char()
+ identity_key = fields.Char(readonly=True)
+ worker_pid = fields.Integer(readonly=True)
@api.model_cr
def init(self):
@@ -104,43 +127,55 @@ def init(self):
"'enqueued') AND identity_key IS NOT NULL;"
)
- @api.multi
- def _inverse_channel(self):
+ @api.depends("records")
+ def _compute_record_ids(self):
for record in self:
- record.override_channel = record.channel
+ record.record_ids = record.records.ids
+
+ @api.model_create_multi
+ def create(self, vals_list):
+ if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL:
+ # Prevent to create a queue.job record "raw" from RPC.
+ # ``with_delay()`` must be used.
+ raise exceptions.AccessError(
+ _("Queue jobs must be created by calling 'with_delay()'.")
+ )
+ return super(
+ QueueJob,
+ self.with_context(mail_create_nolog=True, mail_create_nosubscribe=True),
+ ).create(vals_list)
- @api.multi
- @api.depends('job_function_id.channel_id')
- def _compute_channel(self):
- for record in self:
- record.channel = (record.override_channel or
- record.job_function_id.channel)
+ def write(self, vals):
+ if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL:
+ write_on_protected_fields = [
+ fieldname for fieldname in vals if fieldname in self._protected_fields
+ ]
+ if write_on_protected_fields:
+ raise exceptions.AccessError(
+ _("Not allowed to change field(s): {}").format(
+ write_on_protected_fields
+ )
+ )
+
+ different_user_jobs = self.browse()
+ if vals.get("user_id"):
+ different_user_jobs = self.filtered(
+ lambda records: records.env.user.id != vals["user_id"]
+ )
- @api.multi
- @api.depends('model_name', 'method_name', 'job_function_id.channel_id')
- def _compute_job_function(self):
- for record in self:
- model = self.env[record.model_name]
- method = getattr(model, record.method_name)
- channel_method_name = channel_func_name(model, method)
- func_model = self.env['queue.job.function']
- function = func_model.search([('name', '=', channel_method_name)], limit=1)
- record.channel_method_name = channel_method_name
- record.job_function_id = function
+ if vals.get("state") == "failed":
+ self._message_post_on_failure()
- @api.multi
- @api.depends('model_name', 'method_name', 'record_ids', 'args', 'kwargs')
- def _compute_func_string(self):
- for record in self:
- record_ids = record.record_ids
- model = repr(self.env[record.model_name].browse(record_ids))
- args = [repr(arg) for arg in record.args]
- kwargs = ['%s=%r' % (key, val) for key, val
- in record.kwargs.items()]
- all_args = ', '.join(args + kwargs)
- record.func_string = (
- "%s.%s(%s)" % (model, record.method_name, all_args)
+ result = super().write(vals)
+
+ for record in different_user_jobs:
+ # the user is stored in the env of the record, but we still want to
+ # have a stored user_id field to be able to search/groupby, so
+ # synchronize the env of records with user_id
+ super(QueueJob, record).write(
+ {"records": record.records.sudo(vals["user_id"])}
)
+ return result
@api.multi
def open_related_action(self):
@@ -184,22 +219,15 @@ def _message_post_on_failure(self):
# subscribe the users now to avoid to subscribe them
# at every job creation
domain = self._subscribe_users_domain()
- users = self.env['res.users'].search(domain)
- self.message_subscribe(partner_ids=users.mapped('partner_id').ids)
+ base_users = self.env["res.users"].search(domain)
for record in self:
+ users = base_users | record.user_id
+ record.message_subscribe(partner_ids=users.mapped("partner_id").ids)
msg = record._message_failed_job()
if msg:
record.message_post(body=msg,
subtype='queue_job.mt_job_failed')
- @api.multi
- def write(self, vals):
- res = super(QueueJob, self).write(vals)
- if vals.get('state') == 'failed':
- self._message_post_on_failure()
- return res
-
- @api.multi
def _subscribe_users_domain(self):
"""Subscribe all users having the 'Queue Job Manager' group"""
group = self.env.ref('queue_job.group_queue_job_manager')
@@ -312,8 +340,7 @@ def related_action_open_record(self):
"""
self.ensure_one()
- model_name = self.model_name
- records = self.env[model_name].browse(self.record_ids).exists()
+ records = self.records.exists()
if not records:
return None
action = {
@@ -333,6 +360,9 @@ def related_action_open_record(self):
})
return action
+ def _test_job(self):
+ _logger.info("Running test job.")
+
class RequeueJob(models.TransientModel):
_name = 'queue.requeue.job'
@@ -414,6 +444,31 @@ def parent_required(self):
if record.name != 'root' and not record.parent_id:
raise exceptions.ValidationError(_('Parent channel required.'))
+ @api.model_create_multi
+ def create(self, vals_list):
+ records = self.browse()
+ if self.env.context.get("install_mode"):
+ # installing a module that creates a channel: rebinds the channel
+ # to an existing one (likely we already had the channel created by
+ # the @job decorator previously)
+ new_vals_list = []
+ for vals in vals_list:
+ name = vals.get("name")
+ parent_id = vals.get("parent_id")
+ if name and parent_id:
+ existing = self.search(
+ [("name", "=", name), ("parent_id", "=", parent_id)]
+ )
+ if existing:
+ if not existing.get_metadata()[0].get("noupdate"):
+ existing.write(vals)
+ records |= existing
+ continue
+ new_vals_list.append(vals)
+ vals_list = new_vals_list
+ records |= super().create(vals_list)
+ return records
+
@api.multi
def write(self, values):
for channel in self:
@@ -443,11 +498,31 @@ class JobFunction(models.Model):
_description = 'Job Functions'
_log_access = False
+ JobConfig = namedtuple(
+ "JobConfig",
+ "channel "
+ "retry_pattern "
+ "related_action_enable "
+ "related_action_func_name "
+ "related_action_kwargs "
+ "job_function_id ",
+ )
+
@api.model
def _default_channel(self):
return self.env.ref('queue_job.channel_root')
- name = fields.Char(index=True)
+ name = fields.Char(
+ compute="_compute_name", inverse="_inverse_name", index=True, store=True,
+ )
+
+ # model and method should be required, but the required flag doesn't
+ # let a chance to _inverse_name to be executed
+ model_id = fields.Many2one(
+ comodel_name="ir.model", string="Model", ondelete="cascade"
+ )
+ method = fields.Char()
+
channel_id = fields.Many2one(comodel_name='queue.job.channel',
string='Channel',
required=True,
@@ -455,7 +530,76 @@ def _default_channel(self):
channel = fields.Char(related='channel_id.complete_name',
store=True,
readonly=True)
+ retry_pattern = JobSerialized(string="Retry Pattern (serialized)", base_type=dict)
+ edit_retry_pattern = fields.Text(
+ string="Retry Pattern",
+ compute="_compute_edit_retry_pattern",
+ inverse="_inverse_edit_retry_pattern",
+ help="Pattern expressing from the count of retries on retryable errors,"
+ " the number of of seconds to postpone the next execution.\n"
+ "Example: {1: 10, 5: 20, 10: 30, 15: 300}.\n"
+ "See the module description for details.",
+ )
+ related_action = JobSerialized(string="Related Action (serialized)", base_type=dict)
+ edit_related_action = fields.Text(
+ string="Related Action",
+ compute="_compute_edit_related_action",
+ inverse="_inverse_edit_related_action",
+ help="The action when the button *Related Action* is used on a job. "
+ "The default action is to open the view of the record related "
+ "to the job. Configured as a dictionary with optional keys: "
+ "enable, func_name, kwargs.\n"
+ "See the module description for details.",
+ )
+ @api.depends("model_id.model", "method")
+ def _compute_name(self):
+ for record in self:
+ if not (record.model_id and record.method):
+ record.name = ""
+ continue
+ record.name = self.job_function_name(record.model_id.model, record.method)
+
+ def _inverse_name(self):
+ groups = regex_job_function_name.match(self.name)
+ if not groups:
+ raise exceptions.UserError(_("Invalid job function: {}").format(self.name))
+ model_name = groups.group(1)
+ method = groups.group(2)
+ model = self.env["ir.model"].search([("model", "=", model_name)], limit=1)
+ if not model:
+ raise exceptions.UserError(_("Model {} not found").format(model_name))
+ self.model_id = model.id
+ self.method = method
+
+ @api.depends("retry_pattern")
+ def _compute_edit_retry_pattern(self):
+ for record in self:
+ retry_pattern = record._parse_retry_pattern()
+ record.edit_retry_pattern = str(retry_pattern)
+
+ def _inverse_edit_retry_pattern(self):
+ try:
+ self.retry_pattern = ast.literal_eval(self.edit_retry_pattern or "{}")
+ except (ValueError, TypeError):
+ raise exceptions.UserError(self._retry_pattern_format_error_message())
+
+ @api.depends("related_action")
+ def _compute_edit_related_action(self):
+ for record in self:
+ record.edit_related_action = str(record.related_action)
+
+ def _inverse_edit_related_action(self):
+ try:
+ self.related_action = ast.literal_eval(self.edit_related_action or "{}")
+ except (ValueError, TypeError):
+ raise exceptions.UserError(self._related_action_format_error_message())
+
+ @staticmethod
+ def job_function_name(model_name, method_name):
+ return "<{}>.{}".format(model_name, method_name)
+
+ # TODO deprecated by :job-no-decorator:
@api.model
def _find_or_create_channel(self, channel_path):
channel_model = self.env['queue.job.channel']
@@ -479,9 +623,129 @@ def _find_or_create_channel(self, channel_path):
})
return channel
- @api.model
+ def job_default_config(self):
+ return self.JobConfig(
+ channel="root",
+ retry_pattern={},
+ related_action_enable=True,
+ related_action_func_name=None,
+ related_action_kwargs={},
+ job_function_id=None,
+ )
+
+ def _parse_retry_pattern(self):
+ try:
+ # as json can't have integers as keys and the field is stored
+ # as json, convert back to int
+ retry_pattern = {
+ int(try_count): postpone_seconds
+ for try_count, postpone_seconds in self.retry_pattern.items()
+ }
+ except ValueError:
+ _logger.error(
+ "Invalid retry pattern for job function %s,"
+ " keys could not be parsed as integers, fallback"
+ " to the default retry pattern.",
+ self.name,
+ )
+ retry_pattern = {}
+ return retry_pattern
+
+ @tools.ormcache("name")
+ def job_config(self, name):
+ config = self.search([("name", "=", name)], limit=1)
+ if not config:
+ return self.job_default_config()
+ retry_pattern = config._parse_retry_pattern()
+ return self.JobConfig(
+ channel=config.channel,
+ retry_pattern=retry_pattern,
+ related_action_enable=config.related_action.get("enable", True),
+ related_action_func_name=config.related_action.get("func_name"),
+ related_action_kwargs=config.related_action.get("kwargs"),
+ job_function_id=config.id,
+ )
+
+ def _retry_pattern_format_error_message(self):
+ return _(
+ "Unexpected format of Retry Pattern for {}.\n"
+ "Example of valid format:\n"
+ "{{1: 300, 5: 600, 10: 1200, 15: 3000}}"
+ ).format(self.name)
+
+ @api.constrains("retry_pattern")
+ def _check_retry_pattern(self):
+ for record in self:
+ retry_pattern = record.retry_pattern
+ if not retry_pattern:
+ continue
+
+ all_values = list(retry_pattern) + list(retry_pattern.values())
+ for value in all_values:
+ try:
+ int(value)
+ except ValueError:
+ raise exceptions.UserError(
+ record._retry_pattern_format_error_message()
+ )
+
+ def _related_action_format_error_message(self):
+ return _(
+ "Unexpected format of Related Action for {}.\n"
+ "Example of valid format:\n"
+ '{{"enable": True, "func_name": "related_action_foo",'
+ ' "kwargs" {{"limit": 10}}}}'
+ ).format(self.name)
+
+ @api.constrains("related_action")
+ def _check_related_action(self):
+ valid_keys = ("enable", "func_name", "kwargs")
+ for record in self:
+ related_action = record.related_action
+ if not related_action:
+ continue
+
+ if any(key not in valid_keys for key in related_action):
+ raise exceptions.UserError(
+ record._related_action_format_error_message()
+ )
+
+ @api.model_create_multi
+ def create(self, vals_list):
+ records = self.browse()
+ if self.env.context.get("install_mode"):
+ # installing a module that creates a job function: rebinds the record
+ # to an existing one (likely we already had the job function created by
+ # the @job decorator previously)
+ new_vals_list = []
+ for vals in vals_list:
+ name = vals.get("name")
+ if name:
+ existing = self.search([("name", "=", name)], limit=1)
+ if existing:
+ if not existing.get_metadata()[0].get("noupdate"):
+ existing.write(vals)
+ records |= existing
+ continue
+ new_vals_list.append(vals)
+ vals_list = new_vals_list
+ records |= super().create(vals_list)
+ self.clear_caches()
+ return records
+
+ def write(self, values):
+ res = super().write(values)
+ self.clear_caches()
+ return res
+
+ def unlink(self):
+ res = super().unlink()
+ self.clear_caches()
+ return res
+
+ # TODO deprecated by :job-no-decorator:
def _register_job(self, model, job_method):
- func_name = channel_func_name(model, job_method)
+ func_name = self.job_function_name(model._name, job_method.__name__)
if not self.search_count([('name', '=', func_name)]):
channel = self._find_or_create_channel(job_method.default_channel)
self.create({'name': func_name, 'channel_id': channel.id})
diff --git a/queue_job/readme/DESCRIPTION.rst b/queue_job/readme/DESCRIPTION.rst
index 19ab4c041a..b06904d699 100644
--- a/queue_job/readme/DESCRIPTION.rst
+++ b/queue_job/readme/DESCRIPTION.rst
@@ -9,13 +9,10 @@ Example:
.. code-block:: python
from odoo import models, fields, api
- from odoo.addons.queue_job.job import job
class MyModel(models.Model):
_name = 'my.model'
- @api.multi
- @job
def my_method(self, a, k=None):
_logger.info('executed with a: %s and k: %s', a, k)
@@ -28,8 +25,8 @@ Example:
self.env['my.model'].with_delay().my_method('a', k=2)
-In the snippet of code above, when we call ``button_do_stuff``, a job capturing
-the method and arguments will be postponed. It will be executed as soon as the
+In the snippet of code above, when we call ``button_do_stuff``, a job **capturing
+the method and arguments** will be postponed. It will be executed as soon as the
Jobrunner has a free bucket, which can be instantaneous if no other job is
running.
diff --git a/queue_job/readme/HISTORY.rst b/queue_job/readme/HISTORY.rst
index 90c654ba65..cbb77efda4 100644
--- a/queue_job/readme/HISTORY.rst
+++ b/queue_job/readme/HISTORY.rst
@@ -13,6 +13,8 @@ Next
* [ADD] Run jobrunner as a worker process instead of a thread in the main
process (when running with --workers > 0)
+* [REF] ``@job`` and ``@related_action`` deprecated, any method can be delayed,
+ and configured using ``queue.job.function`` records
12.0.1.1.0 (2019-11-01)
~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst
index dc3ca77765..0b3d62c03d 100644
--- a/queue_job/readme/USAGE.rst
+++ b/queue_job/readme/USAGE.rst
@@ -5,6 +5,119 @@ To use this module, you need to:
Developers
~~~~~~~~~~
+**Configure default options for jobs**
+
+In earlier versions, jobs could be configured using the ``@job`` decorator.
+This is now obsolete, they can be configured using optional ``queue.job.function``
+and ``queue.job.channel`` XML records.
+
+Example of channel:
+
+.. code-block:: XML
+
+
+ sale
+
+
+
+Example of job function:
+
+.. code-block:: XML
+
+
+
+ action_done
+
+
+
+
+
+The general form for the ``name`` is: ``.method``.
+
+The channel, related action and retry pattern options are optional, they are
+documented below.
+
+When writing modules, if 2+ modules add a job function or channel with the same
+name (and parent for channels), they'll be merged in the same record, even if
+they have different xmlids. On uninstall, the merged record is deleted when all
+the modules using it are uninstalled.
+
+
+**Job function: channel**
+
+The channel where the job will be delayed. The default channel is ``root``.
+
+**Job function: related action**
+
+The *Related Action* appears as a button on the Job's view.
+The button will execute the defined action.
+
+The default one is to open the view of the record related to the job (form view
+when there is a single record, list view for several records).
+In many cases, the default related action is enough and doesn't need
+customization, but it can be customized by providing a dictionary on the job
+function:
+
+.. code-block:: python
+
+ {
+ "enable": False,
+ "func_name": "related_action_partner",
+ "kwargs": {"name": "Partner"},
+ }
+
+* ``enable``: when ``False``, the button has no effect (default: ``True``)
+* ``func_name``: name of the method on ``queue.job`` that returns an action
+* ``kwargs``: extra arguments to pass to the related action method
+
+Example of related action code:
+
+.. code-block:: python
+
+ class QueueJob(models.Model):
+ _inherit = 'queue.job'
+
+ def related_action_partner(self, name):
+ self.ensure_one()
+ model = self.model_name
+ partner = self.records
+ action = {
+ 'name': name,
+ 'type': 'ir.actions.act_window',
+ 'res_model': model,
+ 'view_type': 'form',
+ 'view_mode': 'form',
+ 'res_id': partner.id,
+ }
+ return action
+
+
+**Job function: retry pattern**
+
+When a job fails with a retryable error type, it is automatically
+retried later. By default, the retry is always 10 minutes later.
+
+A retry pattern can be configured on the job function. What a pattern represents
+is "from X tries, postpone to Y seconds". It is expressed as a dictionary where
+keys are tries and values are seconds to postpone as integers:
+
+
+.. code-block:: python
+
+ {
+ 1: 10,
+ 5: 20,
+ 10: 30,
+ 15: 300,
+ }
+
+Based on this configuration, we can tell that:
+
+* 5 first retries are postponed 10 seconds later
+* retries 5 to 10 postponed 20 seconds later
+* retries 10 to 15 postponed 30 seconds later
+* all subsequent retries postponed 5 minutes later
+
**Bypass jobs on running Odoo**
When you are developing (ie: connector modules) you might want
diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py
index 75f3a5536c..10138c469e 100644
--- a/queue_job/tests/__init__.py
+++ b/queue_job/tests/__init__.py
@@ -2,3 +2,5 @@
from . import test_runner_runner
from . import test_json_field
from . import test_model_job_channel
+from . import test_model_job_function
+from . import test_queue_job_protected_write
diff --git a/queue_job/tests/common.py b/queue_job/tests/common.py
index 867f647fae..9d636df130 100644
--- a/queue_job/tests/common.py
+++ b/queue_job/tests/common.py
@@ -1,8 +1,13 @@
# Copyright 2019 Camptocamp
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
+import doctest
+
import mock
from contextlib import contextmanager
+from odoo.tests import BaseCase, tagged
+
+# pylint: disable=odoo-addons-relative-import
from odoo.addons.queue_job.job import Job
@@ -97,3 +102,37 @@ def test_export(self):
delayable = mock.MagicMock(name='DelayableBinding')
delayable_cls.return_value = delayable
yield delayable_cls, delayable
+
+
+@tagged("doctest")
+class OdooDocTestCase(BaseCase):
+ """
+ We need a custom DocTestCase class in order to:
+ - define test_tags to run as part of standard tests
+ - output a more meaningful test name than default "DocTestCase.runTest"
+ """
+
+ __qualname__ = "doctests for "
+
+ def __init__(self, test):
+ self.__test = test
+ self.__name = test._dt_test.name
+ super().__init__(self.__name)
+
+ def __getattr__(self, item):
+ if item == self.__name:
+ return self.__test
+
+
+def load_doctests(module):
+ """
+ Generates a tests loading method for the doctests of the given module
+ https://docs.python.org/3/library/unittest.html#load-tests-protocol
+ """
+
+ def load_tests(loader, tests, ignore):
+ for test in doctest.DocTestSuite(module):
+ tests.addTest(OdooDocTestCase(test))
+ return tests
+
+ return load_tests
diff --git a/queue_job/tests/test_json_field.py b/queue_job/tests/test_json_field.py
index 72c7c1816d..37f787dd46 100644
--- a/queue_job/tests/test_json_field.py
+++ b/queue_job/tests/test_json_field.py
@@ -4,6 +4,8 @@
from datetime import datetime, date
import json
+from lxml import etree
+
from odoo.tests import common
# pylint: disable=odoo-addons-relative-import
@@ -14,6 +16,19 @@
class TestJson(common.TransactionCase):
def test_encoder_recordset(self):
+ demo_user = self.env.ref("base.user_demo")
+ partner = self.env(user=demo_user).ref("base.main_partner")
+ value = partner
+ value_json = json.dumps(value, cls=JobEncoder)
+ expected = {
+ "uid": demo_user.id,
+ "_type": "odoo_recordset",
+ "model": "res.partner",
+ "ids": [partner.id],
+ }
+ self.assertEqual(json.loads(value_json), expected)
+
+ def test_encoder_recordset_list(self):
demo_user = self.env.ref('base.user_demo')
partner = self.env(user=demo_user).ref('base.main_partner')
value = ['a', 1, partner]
@@ -23,12 +38,25 @@ def test_encoder_recordset(self):
"_type": "odoo_recordset",
"model": "res.partner",
"ids": [partner.id],
- }]
+ }]
self.assertEqual(json.loads(value_json), expected)
def test_decoder_recordset(self):
demo_user = self.env.ref('base.user_demo')
partner = self.env(user=demo_user).ref('base.main_partner')
+ value_json = (
+ '{"_type": "odoo_recordset",'
+ '"model": "res.partner",'
+ '"ids": [%s],"uid": %s}' % (partner.id, demo_user.id)
+ )
+ expected = partner
+ value = json.loads(value_json, cls=JobDecoder, env=self.env)
+ self.assertEqual(value, expected)
+ self.assertEqual(demo_user, expected.env.user)
+
+ def test_decoder_recordset_list(self):
+ demo_user = self.env.ref("base.user_demo")
+ partner = self.env(user=demo_user).ref("base.main_partner")
value_json = (
'["a", 1, '
'{"_type": "odoo_recordset",'
@@ -40,7 +68,7 @@ def test_decoder_recordset(self):
self.assertEqual(value, expected)
self.assertEqual(demo_user, expected[2].env.user)
- def test_decoder_recordset_without_user(self):
+ def test_decoder_recordset_list_without_user(self):
value_json = ('["a", 1, {"_type": "odoo_recordset",'
'"model": "res.users", "ids": [1]}]')
expected = ['a', 1, self.env.ref('base.user_root')]
@@ -79,3 +107,28 @@ def test_decoder_date(self):
expected = ['a', 1, date(2017, 4, 19)]
value = json.loads(value_json, cls=JobDecoder, env=self.env)
self.assertEqual(value, expected)
+
+ def test_encoder_etree(self):
+ etree_el = etree.Element("root", attr="val")
+ etree_el.append(etree.Element("child", attr="val"))
+ value = ["a", 1, etree_el]
+ value_json = json.dumps(value, cls=JobEncoder)
+ expected = [
+ "a",
+ 1,
+ {
+ "_type": "etree_element",
+ "value": '',
+ },
+ ]
+ self.assertEqual(json.loads(value_json), expected)
+
+ def test_decoder_etree(self):
+ value_json = '["a", 1, {"_type": "etree_element", "value": \
+ ""}]'
+ etree_el = etree.Element("root", attr="val")
+ etree_el.append(etree.Element("child", attr="val"))
+ expected = ["a", 1, etree.tostring(etree_el)]
+ value = json.loads(value_json, cls=JobDecoder, env=self.env)
+ value[2] = etree.tostring(value[2])
+ self.assertEqual(value, expected)
diff --git a/queue_job/tests/test_model_job_function.py b/queue_job/tests/test_model_job_function.py
new file mode 100644
index 0000000000..e6ddf3fcc3
--- /dev/null
+++ b/queue_job/tests/test_model_job_function.py
@@ -0,0 +1,57 @@
+# copyright 2020 Camptocamp
+# license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+from odoo import exceptions
+from odoo.tests import common
+
+
+class TestJobFunction(common.SavepointCase):
+ def test_function_name_compute(self):
+ function = self.env["queue.job.function"].create(
+ {"model_id": self.env.ref("base.model_res_users").id, "method": "read"}
+ )
+ self.assertEqual(function.name, ".read")
+
+ def test_function_name_inverse(self):
+ function = self.env["queue.job.function"].create({"name": ".read"})
+ self.assertEqual(function.model_id.model, "res.users")
+ self.assertEqual(function.method, "read")
+
+ def test_function_name_inverse_invalid_regex(self):
+ with self.assertRaises(exceptions.UserError):
+ self.env["queue.job.function"].create({"name": ".read"}
+ )
+
+ def test_function_job_config(self):
+ channel = self.env["queue.job.channel"].create(
+ {"name": "foo", "parent_id": self.env.ref("queue_job.channel_root").id}
+ )
+ job_function = self.env["queue.job.function"].create(
+ {
+ "model_id": self.env.ref("base.model_res_users").id,
+ "method": "read",
+ "channel_id": channel.id,
+ "edit_retry_pattern": "{1: 2, 3: 4}",
+ "edit_related_action": (
+ '{"enable": True,'
+ ' "func_name": "related_action_foo",'
+ ' "kwargs": {"b": 1}}'
+ ),
+ }
+ )
+ self.assertEqual(
+ self.env["queue.job.function"].job_config(".read"),
+ self.env["queue.job.function"].JobConfig(
+ channel="root.foo",
+ retry_pattern={1: 2, 3: 4},
+ related_action_enable=True,
+ related_action_func_name="related_action_foo",
+ related_action_kwargs={"b": 1},
+ job_function_id=job_function.id,
+ ),
+ )
diff --git a/queue_job/tests/test_queue_job_protected_write.py b/queue_job/tests/test_queue_job_protected_write.py
new file mode 100644
index 0000000000..cf8380bcec
--- /dev/null
+++ b/queue_job/tests/test_queue_job_protected_write.py
@@ -0,0 +1,25 @@
+# copyright 2020 Camptocamp
+# license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+from odoo import exceptions
+from odoo.tests import common
+
+
+class TestJobWriteProtected(common.SavepointCase):
+ def test_create_error(self):
+ with self.assertRaises(exceptions.AccessError):
+ self.env["queue.job"].create(
+ {"uuid": "test", "model_name": "res.partner", "method_name": "write"}
+ )
+
+ def test_write_protected_field_error(self):
+ job_ = self.env["res.partner"].with_delay().create({"name": "test"})
+ db_job = job_.db_record()
+ with self.assertRaises(exceptions.AccessError):
+ db_job.method_name = "unlink"
+
+ def test_write_allow_no_protected_field_error(self):
+ job_ = self.env["res.partner"].with_delay().create({"name": "test"})
+ db_job = job_.db_record()
+ db_job.priority = 30
+ self.assertEqual(db_job.priority, 30)
diff --git a/queue_job/tests/test_runner_channels.py b/queue_job/tests/test_runner_channels.py
index 54e7223c92..d323d00683 100644
--- a/queue_job/tests/test_runner_channels.py
+++ b/queue_job/tests/test_runner_channels.py
@@ -1,12 +1,10 @@
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
-import doctest
# pylint: disable=odoo-addons-relative-import
# we are testing, we want to test as we were an external consumer of the API
from odoo.addons.queue_job.jobrunner import channels
+from .common import load_doctests
-def load_tests(loader, tests, ignore):
- tests.addTests(doctest.DocTestSuite(channels))
- return tests
+load_tests = load_doctests(channels)
diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py
index 5f5ef3c56d..c6486e27ef 100644
--- a/queue_job/tests/test_runner_runner.py
+++ b/queue_job/tests/test_runner_runner.py
@@ -1,12 +1,10 @@
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
-import doctest
# pylint: disable=odoo-addons-relative-import
# we are testing, we want to test as we were an external consumer of the API
from odoo.addons.queue_job.jobrunner import runner
+from .common import load_doctests
-def load_tests(loader, tests, ignore):
- tests.addTests(doctest.DocTestSuite(runner))
- return tests
+load_tests = load_doctests(runner)
diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml
index 27b550ccdf..b8e4d33fe5 100644
--- a/queue_job/views/queue_job_views.xml
+++ b/queue_job/views/queue_job_views.xml
@@ -45,12 +45,15 @@
+
+
+
@@ -61,12 +64,21 @@
If the max. retries is 0, the number of retries is infinite.
+
+
+
+
+
+
+
-
-
-
@@ -87,15 +99,39 @@
-
+
+
+
+
+
+ queue.job.pivot
+ queue.job
+
+
+
+
+
+
+
+
+
+ queue.job.graph
+ queue.job
+
+
+
+
+
+
+
queue.job.search
@@ -107,6 +143,11 @@
+
+
+
+
+
@@ -122,6 +163,21 @@
+
+
+
@@ -131,7 +187,7 @@
Jobs
queue.job
form
- tree,form
+ tree,form,pivot,graph
{'search_default_pending': 1,
'search_default_enqueued': 1,
'search_default_started': 1,
@@ -246,10 +302,14 @@
queue.job.function.form
queue.job.function
-
@@ -259,7 +319,7 @@
queue.job.function.tree
queue.job.function
-
+
diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py
index 0a2f00d0fc..4ba5d6b075 100644
--- a/test_queue_job/__manifest__.py
+++ b/test_queue_job/__manifest__.py
@@ -9,7 +9,10 @@
'depends': ['queue_job',
],
'website': 'http://www.camptocamp.com',
- 'data': ['security/ir.model.access.csv',
- ],
+ 'data': [
+ 'data/queue_job_channel_data.xml',
+ 'data/queue_job_function_data.xml',
+ 'security/ir.model.access.csv',
+ ],
'installable': True,
}
diff --git a/test_queue_job/data/queue_job_channel_data.xml b/test_queue_job/data/queue_job_channel_data.xml
new file mode 100644
index 0000000000..2b442117eb
--- /dev/null
+++ b/test_queue_job/data/queue_job_channel_data.xml
@@ -0,0 +1,10 @@
+
+
+ sub
+
+
+
+ subsub
+
+
+
diff --git a/test_queue_job/data/queue_job_function_data.xml b/test_queue_job/data/queue_job_function_data.xml
new file mode 100644
index 0000000000..8338045141
--- /dev/null
+++ b/test_queue_job/data/queue_job_function_data.xml
@@ -0,0 +1,65 @@
+
+
+
+ testing_method
+
+
+
+
+ job_with_retry_pattern
+
+
+
+
+ job_with_retry_pattern__no_zero
+
+
+
+
+ job_sub_channel
+
+
+
+
+ job_a
+
+
+
+ testing_related_action__return_none
+
+
+
+
+ testing_related_action__kwargs
+
+
+
+
+ testing_related_action__store
+
+
+
diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py
index 6dc13ba2a5..c2cf9bd68d 100644
--- a/test_queue_job/models/test_models.py
+++ b/test_queue_job/models/test_models.py
@@ -2,7 +2,7 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
from odoo import api, fields, models
-from odoo.addons.queue_job.job import job, related_action
+
from odoo.addons.queue_job.exception import RetryableJobError
@@ -10,6 +10,8 @@ class QueueJob(models.Model):
_inherit = 'queue.job'
+ additional_info = fields.Char()
+
@api.multi
def testing_related_method(self, **kwargs):
return self, kwargs
@@ -36,9 +38,6 @@ class TestQueueJob(models.Model):
name = fields.Char()
- @job
- @related_action(action='testing_related_method')
- @api.multi
def testing_method(self, *args, **kwargs):
""" Method used for tests
@@ -50,46 +49,72 @@ def testing_method(self, *args, **kwargs):
return self.env.context
return args, kwargs
- @job
def no_description(self):
return
- @job(retry_pattern={1: 60, 2: 180, 3: 10, 5: 300})
def job_with_retry_pattern(self):
return
- @job(retry_pattern={3: 180})
def job_with_retry_pattern__no_zero(self):
return
- @job
def mapped(self, func):
return super(TestQueueJob, self).mapped(func)
- @job
def job_alter_mutable(self, mutable_arg, mutable_kwarg=None):
mutable_arg.append(2)
mutable_kwarg['b'] = 2
return mutable_arg, mutable_kwarg
+ def delay_me(self, arg, kwarg=None):
+ return arg, kwarg
+
+ def delay_me_options_job_options(self):
+ return {
+ "identity_key": "my_job_identity",
+ }
+
+ def delay_me_options(self):
+ return "ok"
+
+ def delay_me_context_key(self):
+ return "ok"
+
+ def _register_hook(self):
+ self._patch_method("delay_me", self._patch_job_auto_delay("delay_me"))
+ self._patch_method(
+ "delay_me_options", self._patch_job_auto_delay("delay_me_options")
+ )
+ self._patch_method(
+ "delay_me_context_key",
+ self._patch_job_auto_delay(
+ "delay_me_context_key", context_key="auto_delay_delay_me_context_key"
+ ),
+ )
+ return super()._register_hook()
+
+ def _job_store_values(self, job):
+ value = "JUST_TESTING"
+ if job.state == "failed":
+ value += "_BUT_FAILED"
+ return {"additional_info": value}
+
class TestQueueChannel(models.Model):
_name = 'test.queue.channel'
_description = "Test model for queue.channel"
- @job
def job_a(self):
return
- @job
def job_b(self):
return
- @job(default_channel='root.sub.subsub')
def job_sub_channel(self):
return
+ # TODO deprecated by :job-no-decorator:
@property
def dummy_property(self):
""" Return foo
@@ -106,22 +131,14 @@ class TestRelatedAction(models.Model):
_name = 'test.related.action'
_description = "Test model for related actions"
- @job
def testing_related_action__no(self):
return
- @job
- @related_action() # default action returns None
def testing_related_action__return_none(self):
return
- @job
- @related_action(action='testing_related_method', b=4)
def testing_related_action__kwargs(self):
return
- @job
- @related_action(action='testing_related__url',
- url='https://en.wikipedia.org/wiki/{subject}')
def testing_related_action__store(self):
return
diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py
index 9af8df15a0..502a0752fd 100644
--- a/test_queue_job/tests/__init__.py
+++ b/test_queue_job/tests/__init__.py
@@ -1,4 +1,5 @@
from . import test_autovacuum
from . import test_job
+from . import test_job_auto_delay
from . import test_job_channels
from . import test_related_actions
diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py
index 8b5c4fc052..4add6c7b3b 100644
--- a/test_queue_job/tests/test_job.py
+++ b/test_queue_job/tests/test_job.py
@@ -6,7 +6,6 @@
from datetime import datetime, timedelta
import mock
-from odoo import SUPERUSER_ID
import odoo.tests.common as common
from odoo.addons.queue_job.exception import (
@@ -142,8 +141,23 @@ def test_set_started(self):
self.assertEquals(job_a.date_started,
datetime(2015, 3, 15, 16, 41, 0))
+ def test_worker_pid(self):
+ """When a job is started, it gets the PID of the worker that starts it"""
+ method = self.env["res.users"].mapped
+ job_a = Job(method)
+ self.assertFalse(job_a.worker_pid)
+ with mock.patch("os.getpid", autospec=True) as mock_getpid:
+ mock_getpid.return_value = 99999
+ job_a.set_started()
+ self.assertEqual(job_a.worker_pid, 99999)
+
+ # reset the pid
+ job_a.set_pending()
+ self.assertFalse(job_a.worker_pid)
+
def test_set_done(self):
job_a = Job(self.method)
+ job_a.date_started = datetime(2015, 3, 15, 16, 40, 0)
datetime_path = 'odoo.addons.queue_job.job.datetime'
with mock.patch(datetime_path, autospec=True) as mock_datetime:
mock_datetime.now.return_value = datetime(2015, 3, 15, 16, 41, 0)
@@ -153,13 +167,20 @@ def test_set_done(self):
self.assertEquals(job_a.result, 'test')
self.assertEquals(job_a.date_done,
datetime(2015, 3, 15, 16, 41, 0))
+ self.assertEquals(job_a.exec_time, 60.0)
self.assertFalse(job_a.exc_info)
def test_set_failed(self):
job_a = Job(self.method)
- job_a.set_failed(exc_info='failed test')
+ job_a.set_failed(
+ exc_info="failed test",
+ exc_name="FailedTest",
+ exc_message="Sadly this job failed",
+ )
self.assertEquals(job_a.state, FAILED)
self.assertEquals(job_a.exc_info, 'failed test')
+ self.assertEquals(job_a.exc_name, "FailedTest")
+ self.assertEquals(job_a.exc_message, "Sadly this job failed")
def test_postpone(self):
job_a = Job(self.method)
@@ -178,6 +199,16 @@ def test_store(self):
stored = self.queue_job.search([('uuid', '=', test_job.uuid)])
self.assertEqual(len(stored), 1)
+ def test_store_extra_data(self):
+ test_job = Job(self.method)
+ test_job.store()
+ stored = self.queue_job.search([("uuid", "=", test_job.uuid)])
+ self.assertEqual(stored.additional_info, "JUST_TESTING")
+ test_job.set_failed(exc_info="failed test", exc_name="FailedTest")
+ test_job.store()
+ stored.invalidate_cache()
+ self.assertEqual(stored.additional_info, "JUST_TESTING_BUT_FAILED")
+
def test_read(self):
eta = datetime.now() + timedelta(hours=5)
test_job = Job(self.method,
@@ -186,7 +217,7 @@ def test_read(self):
priority=15,
eta=eta,
description="My description")
- test_job.user_id = 1
+ test_job.worker_pid = 99999 # normally set on "set_start"
test_job.company_id = self.env.ref("base.main_company").id
test_job.store()
job_read = Job.load(self.env, test_job.uuid)
@@ -203,6 +234,7 @@ def test_read(self):
self.assertEqual(test_job.result, job_read.result)
self.assertEqual(test_job.user_id, job_read.user_id)
self.assertEqual(test_job.company_id, job_read.company_id)
+ self.assertEqual(test_job.worker_pid, 99999)
delta = timedelta(seconds=1) # DB does not keep milliseconds
self.assertAlmostEqual(test_job.date_created, job_read.date_created,
delta=delta)
@@ -228,6 +260,7 @@ def test_read(self):
delta=delta)
self.assertAlmostEqual(job_read.date_done, test_date,
delta=delta)
+ self.assertAlmostEqual(job_read.exec_time, 0.0)
def test_job_unlinked(self):
test_job = Job(self.method,
@@ -245,7 +278,6 @@ def test_unicode(self):
kwargs={'c': u'ßø'},
priority=15,
description=u"My dé^Wdescription")
- test_job.user_id = 1
test_job.store()
job_read = Job.load(self.env, test_job.uuid)
self.assertEqual(test_job.args, job_read.args)
@@ -261,7 +293,6 @@ def test_accented_bytestring(self):
kwargs={'c': 'ßø'},
priority=15,
description="My dé^Wdescription")
- test_job.user_id = 1
test_job.store()
job_read = Job.load(self.env, test_job.uuid)
self.assertEqual(job_read.args, ('öô¿‽', 'ñě'))
@@ -295,7 +326,6 @@ def test_job_identity_key_str(self):
priority=15,
description="Test I am the first one",
identity_key=id_key)
- test_job_1.user_id = 1
test_job_1.store()
job1 = Job.load(self.env, test_job_1.uuid)
self.assertEqual(job1.identity_key, id_key)
@@ -469,7 +499,7 @@ def test_message_when_write_fail(self):
stored.write({'state': 'failed'})
self.assertEqual(stored.state, FAILED)
messages = stored.message_ids
- self.assertEqual(len(messages), 2)
+ self.assertEqual(len(messages), 1)
def test_follower_when_write_fail(self):
"""Check that inactive users doesn't are not followers even if
@@ -513,6 +543,12 @@ def test_override_channel(self):
test_job = delayable.testing_method(return_context=True)
self.assertEqual('root.sub.sub', test_job.channel)
+ def test_job_change_user_id(self):
+ demo_user = self.env.ref("base.user_demo")
+ stored = self._create_job()
+ stored.user_id = demo_user
+ self.assertEqual(stored.records.env.uid, demo_user.id)
+
class TestJobStorageMultiCompany(common.TransactionCase):
""" Test storage of jobs """
@@ -524,6 +560,21 @@ def setUp(self):
User = self.env['res.users']
Company = self.env['res.company']
Partner = self.env['res.partner']
+
+ main_company = self.env.ref("base.main_company")
+
+ self.partner_user = Partner.create(
+ {"name": "Simple User", "email": "simple.user@example.com"}
+ )
+ self.simple_user = User.create(
+ {
+ "partner_id": self.partner_user.id,
+ "company_ids": [(4, main_company.id)],
+ "login": "simple_user",
+ "name": "simple user",
+ "groups_id": [],
+ }
+ )
self.other_partner_a = Partner.create(
{"name": "My Company a",
"is_company": True,
@@ -539,7 +590,7 @@ def setUp(self):
"company_id": self.other_company_a.id,
"company_ids": [(4, self.other_company_a.id)],
"login": "my_login a",
- "name": "my user",
+ "name": "my user A",
"groups_id": [(4, grp_queue_job_manager)]
})
self.other_partner_b = Partner.create(
@@ -557,15 +608,10 @@ def setUp(self):
"company_id": self.other_company_b.id,
"company_ids": [(4, self.other_company_b.id)],
"login": "my_login_b",
- "name": "my user 1",
+ "name": "my user B",
"groups_id": [(4, grp_queue_job_manager)]
})
- def _subscribe_users(self, stored):
- domain = stored._subscribe_users_domain()
- users = self.env['res.users'].search(domain)
- stored.message_subscribe(partner_ids=users.mapped('partner_id').ids)
-
def _create_job(self, env):
self.cr.execute('delete from queue_job')
env['test.queue.job'].with_delay().testing_method()
@@ -608,11 +654,14 @@ def test_job_subscription(self):
# queue_job.group_queue_job_manager must be followers
User = self.env['res.users']
no_company_context = dict(self.env.context, company_id=None)
- no_company_env = self.env(context=no_company_context)
+ no_company_env = self.env(user=self.simple_user, context=no_company_context)
stored = self._create_job(no_company_env)
- self._subscribe_users(stored)
- users = User.with_context(active_test=False).search(
- [('groups_id', '=', self.ref('queue_job.group_queue_job_manager'))]
+ stored._message_post_on_failure()
+ users = (
+ User.search(
+ [('groups_id', '=', self.ref('queue_job.group_queue_job_manager'))]
+ )
+ + stored.user_id
)
self.assertEqual(len(stored.message_follower_ids), len(users))
expected_partners = [u.partner_id for u in users]
@@ -626,13 +675,13 @@ def test_job_subscription(self):
# company's members
company_a_context = dict(self.env.context,
company_id=self.other_company_a.id)
- company_a_env = self.env(context=company_a_context)
+ company_a_env = self.env(user=self.simple_user, context=company_a_context)
stored = self._create_job(company_a_env)
stored.sudo(self.other_user_a.id)
- self._subscribe_users(stored)
- # 2 because admin + self.other_partner_a
+ stored._message_post_on_failure()
+ # 2 because simple_user (creator of job) + self.other_partner_a
self.assertEqual(len(stored.message_follower_ids), 2)
- users = User.browse([SUPERUSER_ID, self.other_user_a.id])
+ users = self.simple_user + self.other_user_a
expected_partners = [u.partner_id for u in users]
self.assertSetEqual(
set(stored.message_follower_ids.mapped('partner_id')),
diff --git a/test_queue_job/tests/test_job_auto_delay.py b/test_queue_job/tests/test_job_auto_delay.py
new file mode 100644
index 0000000000..5549fc7487
--- /dev/null
+++ b/test_queue_job/tests/test_job_auto_delay.py
@@ -0,0 +1,54 @@
+# Copyright 2020 Camptocamp SA
+# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+from odoo.tests.common import tagged
+
+from odoo.addons.queue_job.job import Job
+
+from .common import JobCommonCase
+
+
+@tagged("post_install", "-at_install")
+class TestJobAutoDelay(JobCommonCase):
+ """Test auto delay of jobs"""
+
+ def test_auto_delay(self):
+ """method decorated by @job_auto_delay is automatically delayed"""
+ result = self.env["test.queue.job"].delay_me(1, kwarg=2)
+ self.assertTrue(isinstance(result, Job))
+ self.assertEqual(result.args, (1,))
+ self.assertEqual(result.kwargs, {"kwarg": 2})
+
+ def test_auto_delay_options(self):
+ """method automatically delayed une _job_options arguments"""
+ result = self.env["test.queue.job"].delay_me_options()
+ self.assertTrue(isinstance(result, Job))
+ self.assertEqual(result.identity_key, "my_job_identity")
+
+ def test_auto_delay_inside_job(self):
+ """when a delayed job is processed, it must not delay itself"""
+ job_ = self.env["test.queue.job"].delay_me(1, kwarg=2)
+ self.assertTrue(job_.perform(), (1, 2))
+
+ def test_auto_delay_force_sync(self):
+ """method forced to run synchronously"""
+ result = (
+ self.env["test.queue.job"]
+ .with_context(_job_force_sync=True)
+ .delay_me(1, kwarg=2)
+ )
+ self.assertTrue(result, (1, 2))
+
+ def test_auto_delay_context_key_set(self):
+ """patched with context_key delays only if context keys is set"""
+ result = (
+ self.env["test.queue.job"]
+ .with_context(auto_delay_delay_me_context_key=True)
+ .delay_me_context_key()
+ )
+ self.assertTrue(isinstance(result, Job))
+
+ def test_auto_delay_context_key_unset(self):
+ """patched with context_key do not delay if context keys is not set"""
+ result = self.env["test.queue.job"].delay_me_context_key()
+ self.assertEqual(result, "ok")
diff --git a/test_queue_job/tests/test_job_channels.py b/test_queue_job/tests/test_job_channels.py
index 316e98e830..5729db13d3 100644
--- a/test_queue_job/tests/test_job_channels.py
+++ b/test_queue_job/tests/test_job_channels.py
@@ -35,79 +35,49 @@ def test_channel_root(self):
with self.assertRaises(exceptions.Warning):
self.root_channel.name = 'leaf'
- def test_register_jobs(self):
- self.env['queue.job.function'].search([]).unlink()
- self.env['queue.job.channel'].search([('name', '!=', 'root')]).unlink()
-
- method_a = self.env['test.queue.channel'].job_a
- self.env['queue.job.function']._register_job(
- self.env['test.queue.channel'],
- method_a
- )
- method_b = self.env['test.queue.channel'].job_b
- self.env['queue.job.function']._register_job(
- self.env['test.queue.channel'],
- method_b
- )
-
- path_a = '.job_a'
- path_b = '.job_b'
- self.assertTrue(
- self.function_model.search([('name', '=', path_a)])
- )
- self.assertTrue(
- self.function_model.search([('name', '=', path_b)])
- )
-
def test_channel_on_job(self):
- self.env['queue.job.function'].search([]).unlink()
- self.env['queue.job.channel'].search([('name', '!=', 'root')]).unlink()
-
- method = self.env['test.queue.channel'].job_a
- self.env['queue.job.function']._register_job(
- self.env['test.queue.channel'],
- method
+ method = self.env["test.queue.channel"].job_a
+ path_a = self.env["queue.job.function"].job_function_name(
+ "test.queue.channel", "job_a"
)
- path_a = '<%s>.%s' % (method.__self__.__class__._name, method.__name__)
- job_func = self.function_model.search([('name', '=', path_a)])
- self.assertEquals(job_func.channel, 'root')
+ job_func = self.function_model.search([("name", "=", path_a)])
+
+ self.assertEquals(job_func.channel, "root")
test_job = Job(method)
test_job.store()
- stored = self.env['queue.job'].search([('uuid', '=', test_job.uuid)])
- self.assertEquals(stored.channel, 'root')
+ stored = test_job.db_record()
+ self.assertEquals(stored.channel, "root")
job_read = Job.load(self.env, test_job.uuid)
- self.assertEquals(job_read.channel, 'root')
+ self.assertEquals(job_read.channel, "root")
- channel = self.channel_model.create(
- {'name': 'sub', 'parent_id': self.root_channel.id}
- )
- job_func.channel_id = channel
+ sub_channel = self.env.ref("test_queue_job.channel_sub")
+ job_func.channel_id = sub_channel
test_job = Job(method)
test_job.store()
- stored = self.env['queue.job'].search([('uuid', '=', test_job.uuid)])
- self.assertEquals(stored.channel, 'root.sub')
+ stored = test_job.db_record()
+ self.assertEquals(stored.channel, "root.sub")
# it's also possible to override the channel
- test_job = Job(method, channel='root.sub.sub.sub')
+ test_job = Job(method, channel="root.sub")
test_job.store()
- stored = self.env['queue.job'].search([('uuid', '=', test_job.uuid)])
+ stored = test_job.db_record()
self.assertEquals(stored.channel, test_job.channel)
- def test_default_channel(self):
- self.env['queue.job.function'].search([]).unlink()
- self.env['queue.job.channel'].search([('name', '!=', 'root')]).unlink()
+ def test_default_channel_no_xml(self):
+ """Channel on job is root if there is no queue.job.function record"""
+ test_job = Job(self.env["res.users"].browse)
+ test_job.store()
+ stored = test_job.db_record()
+ self.assertEquals(stored.channel, "root")
- method = self.env['test.queue.channel'].job_sub_channel
- self.env['queue.job.function']._register_job(
- self.env['test.queue.channel'],
- method
+ def test_set_channel_from_record(self):
+ func_name = self.env["queue.job.function"].job_function_name(
+ "test.queue.channel", "job_sub_channel"
)
- self.assertEquals(method.default_channel, 'root.sub.subsub')
-
- path_a = '<%s>.%s' % (method.__self__.__class__._name, method.__name__)
- job_func = self.function_model.search([('name', '=', path_a)])
+ job_func = self.function_model.search([("name", "=", func_name)])
+ self.assertEqual(job_func.channel, "root.sub.subsub")
channel = job_func.channel_id
self.assertEquals(channel.name, 'subsub')
@@ -115,6 +85,7 @@ def test_default_channel(self):
self.assertEquals(channel.parent_id.parent_id.name, 'root')
self.assertEquals(job_func.channel, 'root.sub.subsub')
+ # TODO deprecated by :job-no-decorator:
def test_job_decorator(self):
""" Test the job decorator """
default_channel = 'channel'
diff --git a/test_queue_job/tests/test_related_actions.py b/test_queue_job/tests/test_related_actions.py
index 3a865e755e..41d3a62521 100644
--- a/test_queue_job/tests/test_related_actions.py
+++ b/test_queue_job/tests/test_related_actions.py
@@ -24,15 +24,9 @@ def test_attributes(self):
self.assertEqual(act_kwargs, {'b': 4})
def test_decorator_empty(self):
- """ Job with decorator without value disable the default action
-
- The function is::
-
- @job
- @related_action() # default action returns None
- def testing_related_action__return_none(self):
- return
+ """Job with decorator without value disable the default action
+ The ``related_action`` configuration is: ``{"enable": False}``
"""
# default action returns None
job_ = self.record.with_delay().testing_related_action__return_none()
@@ -51,12 +45,7 @@ def test_default_no_record(self):
When called on no record.
- The function is::
-
- @job
- def testing_related_action__no(self):
- return
-
+ The ``related_action`` configuration is: ``{}``
"""
job_ = self.model.with_delay().testing_related_action__no()
expected = None
@@ -76,12 +65,7 @@ def test_default_one_record(self):
When called on one record.
- The function is::
-
- @job
- def testing_related_action__no(self):
- return
-
+ The ``related_action`` configuration is: ``{}``
"""
job_ = self.record.with_delay().testing_related_action__no()
expected = {
@@ -99,12 +83,7 @@ def test_default_several_record(self):
When called on several record.
- The function is::
-
- @job
- def testing_related_action__no(self):
- return
-
+ The ``related_action`` configuration is: ``{}``
"""
job_ = self.records.with_delay().testing_related_action__no()
expected = {
@@ -122,12 +101,12 @@ def test_decorator(self):
The function is::
- @job
- @related_action(action='testing_related__url',
- url='https://en.wikipedia.org/wiki/{subject}')
- def testing_related_action__store(self):
- return
+ The ``related_action`` configuration is::
+ {
+ "func_name": "testing_related__url",
+ "kwargs": {"url": "https://en.wikipedia.org/wiki/{subject}"}
+ }
"""
job_ = self.record.with_delay().testing_related_action__store(
'Discworld'