Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def retry_postpone(job, message, seconds=None):

@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
self, priority=None, max_retries=None, channel="root", description="Test job"
self, priority=None, max_retries=None, channel=None, description="Test job"
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))
Expand Down
47 changes: 31 additions & 16 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,7 @@ def __init__(
self.job_model_name = "queue.job"

self.job_config = (
self.env["queue.job.function"]
.sudo()
.job_config(
self.env["queue.job.function"].job_function_name(
self.model_name, self.method_name
)
)
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
)

self.state = PENDING
Expand Down Expand Up @@ -560,27 +554,35 @@ def store(self):
if db_record:
db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals)
else:
date_created = self.date_created
# The following values must never be modified after the
# creation of the job
vals.update(
{
"user_id": self.env.uid,
"channel": self.channel,
# The following values must never be modified after the
# creation of the job
"uuid": self.uuid,
"name": self.description,
"date_created": date_created,
"func_string": self.func_string,
"date_created": self.date_created,
"model_name": self.recordset._name,
"method_name": self.method_name,
"job_function_id": self.job_config.job_function_id,
"channel_method_name": self.job_function_name,
"records": self.recordset,
"args": self.args,
"kwargs": self.kwargs,
}
)
# it the channel is not specified, lets the job_model compute
# the right one to use
if self.channel:
vals.update({"channel": self.channel})

job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals)

@property
def func_string(self):
model = repr(self.recordset)
args = [repr(arg) for arg in self.args]
kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()]
all_args = ", ".join(args + kwargs)
return "{}.{}({})".format(model, self.method_name, all_args)

def db_record(self):
return self.db_record_from_uuid(self.env, self.uuid)

Expand All @@ -589,6 +591,11 @@ def func(self):
recordset = self.recordset.with_context(job_uuid=self.uuid)
return getattr(recordset, self.method_name)

@property
def job_function_name(self):
func_model = self.env["queue.job.function"].sudo()
return func_model.job_function_name(self.recordset._name, self.method_name)

@property
def identity_key(self):
if self._identity_key is None:
Expand Down Expand Up @@ -646,6 +653,14 @@ def eta(self, value):
else:
self._eta = value

@property
def channel(self):
return self._channel or self.job_config.channel

@channel.setter
def channel(self, value):
self._channel = value

def set_pending(self, result=None, reset_retry=True):
self.state = PENDING
self.date_enqueued = None
Expand Down
106 changes: 31 additions & 75 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,22 @@ class QueueJob(models.Model):
"date_created",
"model_name",
"method_name",
"func_string",
"channel_method_name",
"job_function_id",
"records",
"args",
"kwargs",
)

uuid = fields.Char(string="UUID", readonly=True, index=True, required=True)
user_id = fields.Many2one(
comodel_name="res.users",
string="User ID",
compute="_compute_user_id",
inverse="_inverse_user_id",
store=True,
)
user_id = fields.Many2one(comodel_name="res.users", string="User ID")
company_id = fields.Many2one(
comodel_name="res.company", string="Company", index=True
)
name = fields.Char(string="Description", readonly=True)

model_name = fields.Char(
string="Model", compute="_compute_model_name", store=True, readonly=True
)
model_name = fields.Char(string="Model", readonly=True)
method_name = fields.Char(readonly=True)
# record_ids field is only for backward compatibility (e.g. used in related
# actions), can be removed (replaced by "records") in 14.0
Expand All @@ -73,9 +68,7 @@ class QueueJob(models.Model):
)
args = JobSerialized(readonly=True, base_type=tuple)
kwargs = JobSerialized(readonly=True, base_type=dict)
func_string = fields.Char(
string="Task", compute="_compute_func_string", readonly=True, store=True
)
func_string = fields.Char(string="Task", readonly=True)

state = fields.Selection(STATES, readonly=True, required=True, index=True)
priority = fields.Integer()
Expand All @@ -95,21 +88,13 @@ class QueueJob(models.Model):
"max. retries.\n"
"Retries are infinite when empty.",
)
channel_method_name = fields.Char(
readonly=True, compute="_compute_job_function", store=True
)
# FIXME the name of this field is very confusing
channel_method_name = fields.Char(readonly=True)
job_function_id = fields.Many2one(
comodel_name="queue.job.function",
compute="_compute_job_function",
string="Job Function",
readonly=True,
store=True,
comodel_name="queue.job.function", string="Job Function", readonly=True,
)

override_channel = fields.Char()
channel = fields.Char(
compute="_compute_channel", inverse="_inverse_channel", store=True, index=True
)
channel = fields.Char(index=True)

identity_key = fields.Char()
worker_pid = fields.Integer()
Expand All @@ -126,65 +111,18 @@ def init(self):
"'enqueued') AND identity_key IS NOT NULL;"
)

@api.depends("records")
def _compute_user_id(self):
for record in self:
record.user_id = record.records.env.uid

def _inverse_user_id(self):
for record in self.with_context(_job_edit_sentinel=self.EDIT_SENTINEL):
record.records = record.records.with_user(record.user_id.id)

@api.depends("records")
def _compute_model_name(self):
for record in self:
record.model_name = record.records._name

@api.depends("records")
def _compute_record_ids(self):
for record in self:
record.record_ids = record.records.ids

def _inverse_channel(self):
for record in self:
record.override_channel = record.channel

@api.depends("job_function_id.channel_id")
def _compute_channel(self):
for record in self:
channel = (
record.override_channel or record.job_function_id.channel or "root"
)
if record.channel != channel:
record.channel = channel

@api.depends("model_name", "method_name", "job_function_id.channel_id")
def _compute_job_function(self):
for record in self:
func_model = self.env["queue.job.function"]
channel_method_name = func_model.job_function_name(
record.model_name, record.method_name
)
function = func_model.search([("name", "=", channel_method_name)], limit=1)
record.channel_method_name = channel_method_name
record.job_function_id = function

@api.depends("model_name", "method_name", "records", "args", "kwargs")
def _compute_func_string(self):
for record in self:
model = repr(record.records)
args = [repr(arg) for arg in record.args]
kwargs = ["{}={!r}".format(key, val) for key, val in record.kwargs.items()]
all_args = ", ".join(args + kwargs)
record.func_string = "{}.{}({})".format(model, record.method_name, all_args)

@api.model_create_multi
def create(self, vals_list):
if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL:
# Prevent to create a queue.job record "raw" from RPC.
# ``with_delay()`` must be used.
raise exceptions.AccessError(
_("Queue jobs must created by calling 'with_delay()'.")
_("Queue jobs must be created by calling 'with_delay()'.")
)
return super().create(vals_list)

Expand All @@ -200,10 +138,25 @@ def write(self, vals):
)
)

different_user_jobs = self.browse()
if vals.get("user_id"):
different_user_jobs = self.filtered(
lambda records: records.env.user.id != vals["user_id"]
)

if vals.get("state") == "failed":
self._message_post_on_failure()

return super().write(vals)
result = super().write(vals)

for record in different_user_jobs:
# the user is stored in the env of the record, but we still want to
# have a stored user_id field to be able to search/groupby, so
# synchronize the env of records with user_id
super(QueueJob, record).write(
{"records": record.records.with_user(vals["user_id"])}
)
return result

def open_related_action(self):
"""Open the related action associated to the job"""
Expand Down Expand Up @@ -515,7 +468,8 @@ class JobFunction(models.Model):
"retry_pattern "
"related_action_enable "
"related_action_func_name "
"related_action_kwargs ",
"related_action_kwargs "
"job_function_id ",
)

def _default_channel(self):
Expand Down Expand Up @@ -637,6 +591,7 @@ def job_default_config(self):
related_action_enable=True,
related_action_func_name=None,
related_action_kwargs={},
job_function_id=None,
)

def _parse_retry_pattern(self):
Expand Down Expand Up @@ -669,6 +624,7 @@ def job_config(self, name):
related_action_enable=config.related_action.get("enable", True),
related_action_func_name=config.related_action.get("func_name"),
related_action_kwargs=config.related_action.get("kwargs"),
job_function_id=config.id,
)

def _retry_pattern_format_error_message(self):
Expand Down
3 changes: 2 additions & 1 deletion queue_job/tests/test_model_job_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_function_job_config(self):
channel = self.env["queue.job.channel"].create(
{"name": "foo", "parent_id": self.env.ref("queue_job.channel_root").id}
)
self.env["queue.job.function"].create(
job_function = self.env["queue.job.function"].create(
{
"model_id": self.env.ref("base.model_res_users").id,
"method": "read",
Expand All @@ -52,5 +52,6 @@ def test_function_job_config(self):
related_action_enable=True,
related_action_func_name="related_action_foo",
related_action_kwargs={"b": 1},
job_function_id=job_function.id,
),
)