Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5747104
[DOC] describe how to write queue.job.function in case of function de…
legalsylvain Jul 19, 2021
2ca4d9e
Change technical fields to read-only
Feb 1, 2021
f6d09ca
Optimize queue.job creation
Feb 2, 2021
578292e
Remove initial create notification and follower
Feb 1, 2021
adc8d46
Add model in search view / group by
Feb 10, 2021
18b7ab8
queue_job: add exec time to view some stats
simahawk Feb 8, 2021
9a360b5
Fix missing rollback on retried jobs
wpichler Nov 19, 2020
8a1de9f
Fix date_done set when state changes back to pending
Feb 10, 2021
a56ec10
queue_job: close buffer when done
simahawk Mar 29, 2021
1bfa9d8
queue_job: improve filtering and grouping
simahawk Mar 29, 2021
f973b2d
queue_job: add hook to customize stored values
simahawk Mar 29, 2021
8b5caee
queue_job: migration step to store exception data
simahawk Mar 29, 2021
a0b8b9e
Fix display on exec_time on tree view as seconds
guewen May 31, 2021
192aef8
[IMP] queue_job: black, isort, prettier
dzungtran89 Nov 23, 2021
9db4493
Forward migration scripts from #309 #328
dzungtran89 Nov 23, 2021
93c6570
queue_job: store exception name and message
simahawk Mar 29, 2021
1236ca6
[FIX] queue_job: Migrations raising errors with OpenUpgrade
etobella Jun 22, 2021
bb08610
[IMP] queue_job: Add cancelled state to queue.job
hbrunn May 14, 2021
962c117
[IMP] queue_job: tests for wizards
hbrunn May 21, 2021
5b8390a
[IMP] queue_job: use a widget in eta field in queue job tree view
fernandahf Nov 9, 2021
165f71a
queue_job: use parent channel if configured
simahawk Mar 11, 2022
e282ef0
queue_job: update contributors
simahawk Mar 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

{
"name": "Job Queue",
"version": "15.0.1.0.2",
"version": "15.0.1.1.0",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
Expand All @@ -17,6 +17,7 @@
"views/queue_job_channel_views.xml",
"views/queue_job_function_views.xml",
"wizards/queue_jobs_to_done_views.xml",
"wizards/queue_jobs_to_cancelled_views.xml",
"wizards/queue_requeue_job_views.xml",
"views/queue_job_menus.xml",
"data/queue_data.xml",
Expand Down
35 changes: 27 additions & 8 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ def retry_postpone(job, message, seconds=None):
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise

retry_postpone(
job, tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY
)
_logger.debug("%s OperationalError, postponed", job)
raise RetryableJobError(
tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY
) from err

except NothingToDoJob as err:
if str(err):
Expand All @@ -92,23 +92,42 @@ def retry_postpone(job, message, seconds=None):
# delay the job later, requeue
retry_postpone(job, str(err), seconds=err.seconds)
_logger.debug("%s postponed", job)
# Do not trigger the error up because we don't want an exception
# traceback in the logs we should have the traceback when all
# retries are exhausted
env.cr.rollback()

except (FailedJobError, Exception):
except (FailedJobError, Exception) as orig_exception:
buff = StringIO()
traceback.print_exc(file=buff)
_logger.error(buff.getvalue())
traceback_txt = buff.getvalue()
_logger.error(traceback_txt)
job.env.clear()
with registry(job.env.cr.dbname).cursor() as new_cr:
job.env = api.Environment(new_cr, SUPERUSER_ID, {})
job.set_failed(exc_info=buff.getvalue())
job.env = job.env(cr=new_cr)
vals = self._get_failure_values(job, traceback_txt, orig_exception)
job.set_failed(**vals)
job.store()
buff.close()
raise

return ""

def _get_failure_values(self, job, traceback_txt, orig_exception):
"""Collect relevant data from exception."""
exception_name = orig_exception.__class__.__name__
if hasattr(orig_exception, "__module__"):
exception_name = orig_exception.__module__ + "." + exception_name
exc_message = getattr(orig_exception, "name", str(orig_exception))
return {
"exc_info": traceback_txt,
"exc_name": exception_name,
"exc_message": exc_message,
}

@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
self, priority=None, max_retries=None, channel="root", description="Test job"
self, priority=None, max_retries=None, channel=None, description="Test job"
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))
Expand Down
137 changes: 109 additions & 28 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

PENDING = "pending"
ENQUEUED = "enqueued"
CANCELLED = "cancelled"
DONE = "done"
STARTED = "started"
FAILED = "failed"
Expand All @@ -25,6 +26,7 @@
(ENQUEUED, "Enqueued"),
(STARTED, "Started"),
(DONE, "Done"),
(CANCELLED, "Cancelled"),
(FAILED, "Failed"),
]

Expand Down Expand Up @@ -214,6 +216,14 @@ class Job(object):

A description of the result (for humans).

.. attribute:: exc_name

Exception error name when the job failed.

.. attribute:: exc_message

Exception error message when the job failed.

.. attribute:: exc_info

Exception information (traceback) when the job failed.
Expand Down Expand Up @@ -293,6 +303,9 @@ def _load_from_db_record(cls, job_db_record):
if stored.date_done:
job_.date_done = stored.date_done

if stored.date_cancelled:
job_.date_cancelled = stored.date_cancelled

job_.state = stored.state
job_.result = stored.result if stored.result else None
job_.exc_info = stored.exc_info if stored.exc_info else None
Expand Down Expand Up @@ -441,13 +454,7 @@ def __init__(
self.job_model_name = "queue.job"

self.job_config = (
self.env["queue.job.function"]
.sudo()
.job_config(
self.env["queue.job.function"].job_function_name(
self.model_name, self.method_name
)
)
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
)

self.state = PENDING
Expand Down Expand Up @@ -482,8 +489,11 @@ def __init__(
self.date_enqueued = None
self.date_started = None
self.date_done = None
self.date_cancelled = None

self.result = None
self.exc_name = None
self.exc_message = None
self.exc_info = None

if "company_id" in env.context:
Expand Down Expand Up @@ -524,17 +534,37 @@ def perform(self):

def store(self):
"""Store the Job"""
job_model = self.env["queue.job"]
# The sentinel is used to prevent edition sensitive fields (such as
# method_name) from RPC methods.
edit_sentinel = job_model.EDIT_SENTINEL

db_record = self.db_record()
if db_record:
db_record.with_context(_job_edit_sentinel=edit_sentinel).write(
self._store_values()
)
else:
job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(
self._store_values(create=True)
)

def _store_values(self, create=False):
vals = {
"state": self.state,
"priority": self.priority,
"retry": self.retry,
"max_retries": self.max_retries,
"exc_name": self.exc_name,
"exc_message": self.exc_message,
"exc_info": self.exc_info,
"company_id": self.company_id,
"result": str(self.result) if self.result else False,
"date_enqueued": False,
"date_started": False,
"date_done": False,
"exec_time": False,
"date_cancelled": False,
"eta": False,
"identity_key": False,
"worker_pid": self.worker_pid,
Expand All @@ -546,40 +576,61 @@ def store(self):
vals["date_started"] = self.date_started
if self.date_done:
vals["date_done"] = self.date_done
if self.exec_time:
vals["exec_time"] = self.exec_time
if self.date_cancelled:
vals["date_cancelled"] = self.date_cancelled
if self.eta:
vals["eta"] = self.eta
if self.identity_key:
vals["identity_key"] = self.identity_key

job_model = self.env["queue.job"]
# The sentinel is used to prevent edition sensitive fields (such as
# method_name) from RPC methods.
edit_sentinel = job_model.EDIT_SENTINEL

db_record = self.db_record()
if db_record:
db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals)
else:
date_created = self.date_created
# The following values must never be modified after the
# creation of the job
if create:
vals.update(
{
"user_id": self.env.uid,
"channel": self.channel,
# The following values must never be modified after the
# creation of the job
"uuid": self.uuid,
"name": self.description,
"date_created": date_created,
"func_string": self.func_string,
"date_created": self.date_created,
"model_name": self.recordset._name,
"method_name": self.method_name,
"job_function_id": self.job_config.job_function_id,
"channel_method_name": self.job_function_name,
"records": self.recordset,
"args": self.args,
"kwargs": self.kwargs,
}
)
# it the channel is not specified, lets the job_model compute
# the right one to use
if self.channel:
vals.update({"channel": self.channel})

job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals)
vals_from_model = self._store_values_from_model()
# Sanitize values: make sure you cannot screw core values
vals_from_model = {k: v for k, v in vals_from_model.items() if k not in vals}
vals.update(vals_from_model)
return vals

def _store_values_from_model(self):
vals = {}
value_handlers_candidates = (
"_job_store_values_for_" + self.method_name,
"_job_store_values",
)
for candidate in value_handlers_candidates:
handler = getattr(self.recordset, candidate, None)
if handler is not None:
vals = handler(self)
return vals

@property
def func_string(self):
model = repr(self.recordset)
args = [repr(arg) for arg in self.args]
kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()]
all_args = ", ".join(args + kwargs)
return "{}.{}({})".format(model, self.method_name, all_args)

def db_record(self):
return self.db_record_from_uuid(self.env, self.uuid)
Expand All @@ -589,6 +640,11 @@ def func(self):
recordset = self.recordset.with_context(job_uuid=self.uuid)
return getattr(recordset, self.method_name)

@property
def job_function_name(self):
func_model = self.env["queue.job.function"].sudo()
return func_model.job_function_name(self.recordset._name, self.method_name)

@property
def identity_key(self):
if self._identity_key is None:
Expand Down Expand Up @@ -646,11 +702,27 @@ def eta(self, value):
else:
self._eta = value

@property
def channel(self):
return self._channel or self.job_config.channel

@channel.setter
def channel(self, value):
self._channel = value

@property
def exec_time(self):
if self.date_done and self.date_started:
return (self.date_done - self.date_started).total_seconds()
return None

def set_pending(self, result=None, reset_retry=True):
self.state = PENDING
self.date_enqueued = None
self.date_started = None
self.date_done = None
self.worker_pid = None
self.date_cancelled = None
if reset_retry:
self.retry = 0
if result is not None:
Expand All @@ -669,15 +741,23 @@ def set_started(self):

def set_done(self, result=None):
self.state = DONE
self.exc_name = None
self.exc_info = None
self.date_done = datetime.now()
if result is not None:
self.result = result

def set_failed(self, exc_info=None):
def set_cancelled(self, result=None):
self.state = CANCELLED
self.date_cancelled = datetime.now()
if result is not None:
self.result = result

def set_failed(self, **kw):
self.state = FAILED
if exc_info is not None:
self.exc_info = exc_info
for k, v in kw.items():
if v is not None:
setattr(self, k, v)

def __repr__(self):
return "<Job %s, priority:%d>" % (self.uuid, self.priority)
Expand Down Expand Up @@ -708,6 +788,7 @@ def postpone(self, result=None, seconds=None):
"""
eta_seconds = self._get_retry_seconds(seconds)
self.eta = timedelta(seconds=eta_seconds)
self.exc_name = None
self.exc_info = None
if result is not None:
self.result = result
Expand Down
Loading