diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index a0814bbbd5..eaa78e9a3b 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -113,7 +113,7 @@ def retry_postpone(job, message, seconds=None): @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( - self, priority=None, max_retries=None, channel="root", description="Test job" + self, priority=None, max_retries=None, channel=None, description="Test job" ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) diff --git a/queue_job/job.py b/queue_job/job.py index 19742deb48..45dca7cbb6 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -441,13 +441,7 @@ def __init__( self.job_model_name = "queue.job" self.job_config = ( - self.env["queue.job.function"] - .sudo() - .job_config( - self.env["queue.job.function"].job_function_name( - self.model_name, self.method_name - ) - ) + self.env["queue.job.function"].sudo().job_config(self.job_function_name) ) self.state = PENDING @@ -560,27 +554,35 @@ def store(self): if db_record: db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals) else: - date_created = self.date_created - # The following values must never be modified after the - # creation of the job vals.update( { + "user_id": self.env.uid, + "channel": self.channel, + # The following values must never be modified after the + # creation of the job "uuid": self.uuid, "name": self.description, - "date_created": date_created, + "func_string": self.func_string, + "date_created": self.date_created, + "model_name": self.recordset._name, "method_name": self.method_name, + "job_function_id": self.job_config.job_function_id, + "channel_method_name": self.job_function_name, "records": self.recordset, "args": self.args, "kwargs": self.kwargs, } ) - # it the channel is not specified, lets the job_model compute - # the right one to use - if self.channel: - vals.update({"channel": self.channel}) - job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals) + @property + def func_string(self): + model = repr(self.recordset) + args = [repr(arg) for arg in self.args] + kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()] + all_args = ", ".join(args + kwargs) + return "{}.{}({})".format(model, self.method_name, all_args) + def db_record(self): return self.db_record_from_uuid(self.env, self.uuid) @@ -589,6 +591,11 @@ def func(self): recordset = self.recordset.with_context(job_uuid=self.uuid) return getattr(recordset, self.method_name) + @property + def job_function_name(self): + func_model = self.env["queue.job.function"].sudo() + return func_model.job_function_name(self.recordset._name, self.method_name) + @property def identity_key(self): if self._identity_key is None: @@ -646,6 +653,14 @@ def eta(self, value): else: self._eta = value + @property + def channel(self): + return self._channel or self.job_config.channel + + @channel.setter + def channel(self, value): + self._channel = value + def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 1da0eaf86d..7fa834640a 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -43,27 +43,22 @@ class QueueJob(models.Model): "date_created", "model_name", "method_name", + "func_string", + "channel_method_name", + "job_function_id", "records", "args", "kwargs", ) uuid = fields.Char(string="UUID", readonly=True, index=True, required=True) - user_id = fields.Many2one( - comodel_name="res.users", - string="User ID", - compute="_compute_user_id", - inverse="_inverse_user_id", - store=True, - ) + user_id = fields.Many2one(comodel_name="res.users", string="User ID") company_id = fields.Many2one( comodel_name="res.company", string="Company", index=True ) name = fields.Char(string="Description", readonly=True) - model_name = fields.Char( - string="Model", compute="_compute_model_name", store=True, readonly=True - ) + model_name = fields.Char(string="Model", readonly=True) method_name = fields.Char(readonly=True) # record_ids field is only for backward compatibility (e.g. used in related # actions), can be removed (replaced by "records") in 14.0 @@ -73,9 +68,7 @@ class QueueJob(models.Model): ) args = JobSerialized(readonly=True, base_type=tuple) kwargs = JobSerialized(readonly=True, base_type=dict) - func_string = fields.Char( - string="Task", compute="_compute_func_string", readonly=True, store=True - ) + func_string = fields.Char(string="Task", readonly=True) state = fields.Selection(STATES, readonly=True, required=True, index=True) priority = fields.Integer() @@ -95,21 +88,13 @@ class QueueJob(models.Model): "max. retries.\n" "Retries are infinite when empty.", ) - channel_method_name = fields.Char( - readonly=True, compute="_compute_job_function", store=True - ) + # FIXME the name of this field is very confusing + channel_method_name = fields.Char(readonly=True) job_function_id = fields.Many2one( - comodel_name="queue.job.function", - compute="_compute_job_function", - string="Job Function", - readonly=True, - store=True, + comodel_name="queue.job.function", string="Job Function", readonly=True, ) - override_channel = fields.Char() - channel = fields.Char( - compute="_compute_channel", inverse="_inverse_channel", store=True, index=True - ) + channel = fields.Char(index=True) identity_key = fields.Char() worker_pid = fields.Integer() @@ -126,65 +111,18 @@ def init(self): "'enqueued') AND identity_key IS NOT NULL;" ) - @api.depends("records") - def _compute_user_id(self): - for record in self: - record.user_id = record.records.env.uid - - def _inverse_user_id(self): - for record in self.with_context(_job_edit_sentinel=self.EDIT_SENTINEL): - record.records = record.records.with_user(record.user_id.id) - - @api.depends("records") - def _compute_model_name(self): - for record in self: - record.model_name = record.records._name - @api.depends("records") def _compute_record_ids(self): for record in self: record.record_ids = record.records.ids - def _inverse_channel(self): - for record in self: - record.override_channel = record.channel - - @api.depends("job_function_id.channel_id") - def _compute_channel(self): - for record in self: - channel = ( - record.override_channel or record.job_function_id.channel or "root" - ) - if record.channel != channel: - record.channel = channel - - @api.depends("model_name", "method_name", "job_function_id.channel_id") - def _compute_job_function(self): - for record in self: - func_model = self.env["queue.job.function"] - channel_method_name = func_model.job_function_name( - record.model_name, record.method_name - ) - function = func_model.search([("name", "=", channel_method_name)], limit=1) - record.channel_method_name = channel_method_name - record.job_function_id = function - - @api.depends("model_name", "method_name", "records", "args", "kwargs") - def _compute_func_string(self): - for record in self: - model = repr(record.records) - args = [repr(arg) for arg in record.args] - kwargs = ["{}={!r}".format(key, val) for key, val in record.kwargs.items()] - all_args = ", ".join(args + kwargs) - record.func_string = "{}.{}({})".format(model, record.method_name, all_args) - @api.model_create_multi def create(self, vals_list): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: # Prevent to create a queue.job record "raw" from RPC. # ``with_delay()`` must be used. raise exceptions.AccessError( - _("Queue jobs must created by calling 'with_delay()'.") + _("Queue jobs must be created by calling 'with_delay()'.") ) return super().create(vals_list) @@ -200,10 +138,25 @@ def write(self, vals): ) ) + different_user_jobs = self.browse() + if vals.get("user_id"): + different_user_jobs = self.filtered( + lambda records: records.env.user.id != vals["user_id"] + ) + if vals.get("state") == "failed": self._message_post_on_failure() - return super().write(vals) + result = super().write(vals) + + for record in different_user_jobs: + # the user is stored in the env of the record, but we still want to + # have a stored user_id field to be able to search/groupby, so + # synchronize the env of records with user_id + super(QueueJob, record).write( + {"records": record.records.with_user(vals["user_id"])} + ) + return result def open_related_action(self): """Open the related action associated to the job""" @@ -515,7 +468,8 @@ class JobFunction(models.Model): "retry_pattern " "related_action_enable " "related_action_func_name " - "related_action_kwargs ", + "related_action_kwargs " + "job_function_id ", ) def _default_channel(self): @@ -637,6 +591,7 @@ def job_default_config(self): related_action_enable=True, related_action_func_name=None, related_action_kwargs={}, + job_function_id=None, ) def _parse_retry_pattern(self): @@ -669,6 +624,7 @@ def job_config(self, name): related_action_enable=config.related_action.get("enable", True), related_action_func_name=config.related_action.get("func_name"), related_action_kwargs=config.related_action.get("kwargs"), + job_function_id=config.id, ) def _retry_pattern_format_error_message(self): diff --git a/queue_job/tests/test_model_job_function.py b/queue_job/tests/test_model_job_function.py index c9bdea56e8..e6ddf3fcc3 100644 --- a/queue_job/tests/test_model_job_function.py +++ b/queue_job/tests/test_model_job_function.py @@ -31,7 +31,7 @@ def test_function_job_config(self): channel = self.env["queue.job.channel"].create( {"name": "foo", "parent_id": self.env.ref("queue_job.channel_root").id} ) - self.env["queue.job.function"].create( + job_function = self.env["queue.job.function"].create( { "model_id": self.env.ref("base.model_res_users").id, "method": "read", @@ -52,5 +52,6 @@ def test_function_job_config(self): related_action_enable=True, related_action_func_name="related_action_foo", related_action_kwargs={"b": 1}, + job_function_id=job_function.id, ), )