Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

{
"name": "Job Queue",
"version": "14.0.1.3.4",
"version": "14.0.1.4.0",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
Expand Down
33 changes: 26 additions & 7 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ def retry_postpone(job, message, seconds=None):
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise

retry_postpone(
job, tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY
)
_logger.debug("%s OperationalError, postponed", job)
raise RetryableJobError(
tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY
)

except NothingToDoJob as err:
if str(err):
Expand All @@ -95,25 +95,44 @@ def retry_postpone(job, message, seconds=None):
# delay the job later, requeue
retry_postpone(job, str(err), seconds=err.seconds)
_logger.debug("%s postponed", job)
# Do not trigger the error up because we don't want an exception
# traceback in the logs we should have the traceback when all
# retries are exhausted
env.cr.rollback()

except (FailedJobError, Exception):
except (FailedJobError, Exception) as orig_exception:
buff = StringIO()
traceback.print_exc(file=buff)
_logger.error(buff.getvalue())
traceback_txt = buff.getvalue()
_logger.error(traceback_txt)
job.env.clear()
with odoo.api.Environment.manage():
with odoo.registry(job.env.cr.dbname).cursor() as new_cr:
job.env = job.env(cr=new_cr)
job.set_failed(exc_info=buff.getvalue())
vals = self._get_failure_values(job, traceback_txt, orig_exception)
job.set_failed(**vals)
job.store()
new_cr.commit()
buff.close()
raise

return ""

def _get_failure_values(self, job, traceback_txt, orig_exception):
"""Collect relevant data from exception."""
exception_name = orig_exception.__class__.__name__
if hasattr(orig_exception, "__module__"):
exception_name = orig_exception.__module__ + "." + exception_name
exc_message = getattr(orig_exception, "name", str(orig_exception))
return {
"exc_info": traceback_txt,
"exc_name": exception_name,
"exc_message": exc_message,
}

@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
self, priority=None, max_retries=None, channel="root", description="Test job"
self, priority=None, max_retries=None, channel=None, description="Test job"
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))
Expand Down
121 changes: 93 additions & 28 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ class Job(object):

A description of the result (for humans).

.. attribute:: exc_name

Exception error name when the job failed.

.. attribute:: exc_message

Exception error message when the job failed.

.. attribute:: exc_info

Exception information (traceback) when the job failed.
Expand Down Expand Up @@ -441,13 +449,7 @@ def __init__(
self.job_model_name = "queue.job"

self.job_config = (
self.env["queue.job.function"]
.sudo()
.job_config(
self.env["queue.job.function"].job_function_name(
self.model_name, self.method_name
)
)
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
)

self.state = PENDING
Expand Down Expand Up @@ -484,6 +486,8 @@ def __init__(
self.date_done = None

self.result = None
self.exc_name = None
self.exc_message = None
self.exc_info = None

if "company_id" in env.context:
Expand Down Expand Up @@ -524,17 +528,36 @@ def perform(self):

def store(self):
"""Store the Job"""
job_model = self.env["queue.job"]
# The sentinel is used to prevent edition sensitive fields (such as
# method_name) from RPC methods.
edit_sentinel = job_model.EDIT_SENTINEL

db_record = self.db_record()
if db_record:
db_record.with_context(_job_edit_sentinel=edit_sentinel).write(
self._store_values()
)
else:
job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(
self._store_values(create=True)
)

def _store_values(self, create=False):
vals = {
"state": self.state,
"priority": self.priority,
"retry": self.retry,
"max_retries": self.max_retries,
"exc_name": self.exc_name,
"exc_message": self.exc_message,
"exc_info": self.exc_info,
"company_id": self.company_id,
"result": str(self.result) if self.result else False,
"date_enqueued": False,
"date_started": False,
"date_done": False,
"exec_time": False,
"eta": False,
"identity_key": False,
"worker_pid": self.worker_pid,
Expand All @@ -546,40 +569,59 @@ def store(self):
vals["date_started"] = self.date_started
if self.date_done:
vals["date_done"] = self.date_done
if self.exec_time:
vals["exec_time"] = self.exec_time
if self.eta:
vals["eta"] = self.eta
if self.identity_key:
vals["identity_key"] = self.identity_key

job_model = self.env["queue.job"]
# The sentinel is used to prevent edition sensitive fields (such as
# method_name) from RPC methods.
edit_sentinel = job_model.EDIT_SENTINEL

db_record = self.db_record()
if db_record:
db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals)
else:
date_created = self.date_created
# The following values must never be modified after the
# creation of the job
if create:
vals.update(
{
"user_id": self.env.uid,
"channel": self.channel,
# The following values must never be modified after the
# creation of the job
"uuid": self.uuid,
"name": self.description,
"date_created": date_created,
"func_string": self.func_string,
"date_created": self.date_created,
"model_name": self.recordset._name,
"method_name": self.method_name,
"job_function_id": self.job_config.job_function_id,
"channel_method_name": self.job_function_name,
"records": self.recordset,
"args": self.args,
"kwargs": self.kwargs,
}
)
# it the channel is not specified, lets the job_model compute
# the right one to use
if self.channel:
vals.update({"channel": self.channel})

job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals)
vals_from_model = self._store_values_from_model()
# Sanitize values: make sure you cannot screw core values
vals_from_model = {k: v for k, v in vals_from_model.items() if k not in vals}
vals.update(vals_from_model)
return vals

def _store_values_from_model(self):
vals = {}
value_handlers_candidates = (
"_job_store_values_for_" + self.method_name,
"_job_store_values",
)
for candidate in value_handlers_candidates:
handler = getattr(self.recordset, candidate, None)
if handler is not None:
vals = handler(self)
return vals

@property
def func_string(self):
model = repr(self.recordset)
args = [repr(arg) for arg in self.args]
kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()]
all_args = ", ".join(args + kwargs)
return "{}.{}({})".format(model, self.method_name, all_args)

def db_record(self):
return self.db_record_from_uuid(self.env, self.uuid)
Expand All @@ -595,6 +637,11 @@ def func(self):
recordset = recordset.with_company(self.company_id)
return getattr(recordset, self.method_name)

@property
def job_function_name(self):
func_model = self.env["queue.job.function"].sudo()
return func_model.job_function_name(self.recordset._name, self.method_name)

@property
def identity_key(self):
if self._identity_key is None:
Expand Down Expand Up @@ -652,10 +699,25 @@ def eta(self, value):
else:
self._eta = value

@property
def channel(self):
return self._channel or self.job_config.channel

@channel.setter
def channel(self, value):
self._channel = value

@property
def exec_time(self):
if self.date_done and self.date_started:
return (self.date_done - self.date_started).total_seconds()
return None

def set_pending(self, result=None, reset_retry=True):
self.state = PENDING
self.date_enqueued = None
self.date_started = None
self.date_done = None
self.worker_pid = None
if reset_retry:
self.retry = 0
Expand All @@ -675,15 +737,17 @@ def set_started(self):

def set_done(self, result=None):
self.state = DONE
self.exc_name = None
self.exc_info = None
self.date_done = datetime.now()
if result is not None:
self.result = result

def set_failed(self, exc_info=None):
def set_failed(self, **kw):
self.state = FAILED
if exc_info is not None:
self.exc_info = exc_info
for k, v in kw.items():
if v is not None:
setattr(self, k, v)

def __repr__(self):
return "<Job %s, priority:%d>" % (self.uuid, self.priority)
Expand Down Expand Up @@ -714,6 +778,7 @@ def postpone(self, result=None, seconds=None):
"""
eta_seconds = self._get_retry_seconds(seconds)
self.eta = timedelta(seconds=eta_seconds)
self.exc_name = None
self.exc_info = None
if result is not None:
self.result = result
Expand Down
47 changes: 47 additions & 0 deletions queue_job/migrations/14.0.1.4.0/post-migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging

from odoo import SUPERUSER_ID, api

_logger = logging.getLogger(__name__)


def migrate(cr, version):
with api.Environment.manage():
env = api.Environment(cr, SUPERUSER_ID, {})
_logger.info("Computing exception name for failed jobs")
_compute_jobs_new_values(env)


def _compute_jobs_new_values(env):
for job in env["queue.job"].search(
[("state", "=", "failed"), ("exc_info", "!=", False)]
):
exception_details = _get_exception_details(job)
if exception_details:
job.update(exception_details)


def _get_exception_details(job):
for line in reversed(job.exc_info.splitlines()):
if _find_exception(line):
name, msg = line.split(":", 1)
return {
"exc_name": name.strip(),
"exc_message": msg.strip("()', \""),
}


def _find_exception(line):
# Just a list of common errors.
# If you want to target others, add your own migration step for your db.
exceptions = (
"Error:", # catch all well named exceptions
# other live instance errors found
"requests.exceptions.MissingSchema",
"botocore.errorfactory.NoSuchKey",
)
for exc in exceptions:
if exc in line:
return exc
33 changes: 33 additions & 0 deletions queue_job/migrations/14.0.1.4.0/pre-migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

from odoo.tools.sql import column_exists, table_exists


def migrate(cr, version):
if table_exists(cr, "queue_job") and not column_exists(
cr, "queue_job", "exec_time"
):
# Disable trigger otherwise the update takes ages.
cr.execute(
"""
ALTER TABLE queue_job DISABLE TRIGGER queue_job_notify;
"""
)
cr.execute(
"""
ALTER TABLE queue_job ADD COLUMN exec_time double precision DEFAULT 0;
"""
)
cr.execute(
"""
UPDATE
queue_job
SET
exec_time = EXTRACT(EPOCH FROM (date_done - date_started));
"""
)
cr.execute(
"""
ALTER TABLE queue_job ENABLE TRIGGER queue_job_notify;
"""
)
Loading