Skip to content
Closed
24 changes: 16 additions & 8 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from psycopg2 import OperationalError
from werkzeug.exceptions import Forbidden

import odoo
from odoo import SUPERUSER_ID, _, api, http, registry, tools
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

Expand Down Expand Up @@ -74,10 +75,10 @@ def retry_postpone(job, message, seconds=None):
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise

retry_postpone(
job, tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY
)
_logger.debug("%s OperationalError, postponed", job)
raise RetryableJobError(
tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY
)

except NothingToDoJob as err:
if str(err):
Expand All @@ -92,23 +93,30 @@ def retry_postpone(job, message, seconds=None):
# delay the job later, requeue
retry_postpone(job, str(err), seconds=err.seconds)
_logger.debug("%s postponed", job)
# Do not trigger the error up because we don't want an exception
# traceback in the logs we should have the traceback when all
# retries are exhausted
env.cr.rollback()

except (FailedJobError, Exception):
buff = StringIO()
traceback.print_exc(file=buff)
_logger.error(buff.getvalue())
job.env.clear()
with registry(job.env.cr.dbname).cursor() as new_cr:
job.env = api.Environment(new_cr, SUPERUSER_ID, {})
job.set_failed(exc_info=buff.getvalue())
job.store()
with odoo.api.Environment.manage():
with odoo.registry(job.env.cr.dbname).cursor() as new_cr:
job.env = job.env(cr=new_cr)
job.set_failed(exc_info=buff.getvalue())
job.store()
new_cr.commit()
buff.close()
raise

return ""

@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
self, priority=None, max_retries=None, channel="root", description="Test job"
self, priority=None, max_retries=None, channel=None, description="Test job"
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))
Expand Down
63 changes: 47 additions & 16 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,7 @@ def __init__(
self.job_model_name = "queue.job"

self.job_config = (
self.env["queue.job.function"]
.sudo()
.job_config(
self.env["queue.job.function"].job_function_name(
self.model_name, self.method_name
)
)
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
)

self.state = PENDING
Expand Down Expand Up @@ -535,6 +529,7 @@ def store(self):
"date_enqueued": False,
"date_started": False,
"date_done": False,
"exec_time": False,
"eta": False,
"identity_key": False,
"worker_pid": self.worker_pid,
Expand All @@ -546,6 +541,8 @@ def store(self):
vals["date_started"] = self.date_started
if self.date_done:
vals["date_done"] = self.date_done
if self.exec_time:
vals["exec_time"] = self.exec_time
if self.eta:
vals["eta"] = self.eta
if self.identity_key:
Expand All @@ -560,35 +557,54 @@ def store(self):
if db_record:
db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals)
else:
date_created = self.date_created
# The following values must never be modified after the
# creation of the job
vals.update(
{
"user_id": self.env.uid,
"channel": self.channel,
# The following values must never be modified after the
# creation of the job
"uuid": self.uuid,
"name": self.description,
"date_created": date_created,
"func_string": self.func_string,
"date_created": self.date_created,
"model_name": self.recordset._name,
"method_name": self.method_name,
"job_function_id": self.job_config.job_function_id,
"channel_method_name": self.job_function_name,
"records": self.recordset,
"args": self.args,
"kwargs": self.kwargs,
}
)
# it the channel is not specified, lets the job_model compute
# the right one to use
if self.channel:
vals.update({"channel": self.channel})

job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals)

@property
def func_string(self):
model = repr(self.recordset)
args = [repr(arg) for arg in self.args]
kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()]
all_args = ", ".join(args + kwargs)
return "{}.{}({})".format(model, self.method_name, all_args)

def db_record(self):
return self.db_record_from_uuid(self.env, self.uuid)

@property
def func(self):
# We can fill only one company into allowed_company_ids.
# Because if you have many, you can have unexpected records due to ir.rule.
# ir.rule use allowed_company_ids to load every records in many companies.
# But most of the time, a job should be executed on a single company.
recordset = self.recordset.with_context(job_uuid=self.uuid)
if self.company_id:
recordset = recordset.with_company(self.company_id)
return getattr(recordset, self.method_name)

@property
def job_function_name(self):
func_model = self.env["queue.job.function"].sudo()
return func_model.job_function_name(self.recordset._name, self.method_name)

@property
def identity_key(self):
if self._identity_key is None:
Expand Down Expand Up @@ -646,10 +662,25 @@ def eta(self, value):
else:
self._eta = value

@property
def channel(self):
return self._channel or self.job_config.channel

@channel.setter
def channel(self, value):
self._channel = value

@property
def exec_time(self):
if self.date_done and self.date_started:
return (self.date_done - self.date_started).total_seconds()
return None

def set_pending(self, result=None, reset_retry=True):
self.state = PENDING
self.date_enqueued = None
self.date_started = None
self.date_done = None
self.worker_pid = None
if reset_retry:
self.retry = 0
Expand Down
35 changes: 35 additions & 0 deletions queue_job/migrations/13.0.3.7.0/pre-migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging

from odoo.tools.sql import column_exists

_logger = logging.getLogger(__name__)


def migrate(cr, version):
if not column_exists(cr, "queue_job", "exec_time"):
# Disable trigger otherwise the update takes ages.
cr.execute(
"""
ALTER TABLE queue_job DISABLE TRIGGER queue_job_notify;
"""
)
cr.execute(
"""
ALTER TABLE queue_job ADD COLUMN exec_time double precision DEFAULT 0;
"""
)
cr.execute(
"""
UPDATE
queue_job
SET
exec_time = EXTRACT(EPOCH FROM (date_done - date_started));
"""
)
cr.execute(
"""
ALTER TABLE queue_job ENABLE TRIGGER queue_job_notify;
"""
)
Loading