From 57471040719b113e24233f85f72b210602de7f4b Mon Sep 17 00:00:00 2001 From: Sylvain LE GAL Date: Mon, 19 Jul 2021 15:18:35 +0200 Subject: [PATCH 01/22] [DOC] describe how to write queue.job.function in case of function defined in abstract model --- queue_job/readme/USAGE.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index 6c472eccf9..b595a9c595 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -43,6 +43,13 @@ they have different xmlids. On uninstall, the merged record is deleted when all the modules using it are uninstalled. +**Job function: model** + +If the function is defined in an abstract model, you can not write +```` +but you have to define a function for each model that inherits from the abstract model. + + **Job function: channel** The channel where the job will be delayed. The default channel is ``root``. From 2ca4d9eba889d49e16c57c363effe7d61b2512e2 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 1 Feb 2021 11:28:10 +0100 Subject: [PATCH 02/22] Change technical fields to read-only These fields should not be changed by users. --- queue_job/models/queue_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 157e4d8cbc..a80eb2f9af 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -107,8 +107,8 @@ class QueueJob(models.Model): compute="_compute_channel", inverse="_inverse_channel", store=True, index=True ) - identity_key = fields.Char() - worker_pid = fields.Integer() + identity_key = fields.Char(readonly=True) + worker_pid = fields.Integer(readonly=True) def init(self): self._cr.execute( From f6d09ca902b065d65807a548f3a6bcad42c25e69 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 2 Feb 2021 07:57:50 +0100 Subject: [PATCH 03/22] Optimize queue.job creation Several fields on queue.job are initialized using computed fields, then never changed again. On creation of a queue.job record, we'll have an initial INSERT + at least one following UPDATE for the computed fields. Replace all the stored computed fields by a raw initialization of the values in `Job.store()` when the job is created, so we have only a single INSERT. Highlights: * as channel is no longer a compute/inverse field, override_channel is useless, I dropped it (actually the same value was stored in both channel and override_channel as the channel field was stored) * one functional diff is that now, when changing a channel on a job.function, the channel is no longer synchronized on existing jobs, it will be applied only on new jobs: actually this is an improvement, because it was impossible to change the channel of a job function in a large queue_job table as it meant writing on all the done/started jobs * searching the queue.job.function is now cached, as each job using the same will run a query on queue_job_function --- queue_job/controllers/main.py | 2 +- queue_job/job.py | 47 ++++++---- queue_job/models/queue_job.py | 101 ++++++--------------- queue_job/models/queue_job_function.py | 5 +- queue_job/tests/test_model_job_function.py | 3 +- 5 files changed, 65 insertions(+), 93 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index a68897e7f4..f2997ce8a0 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -108,7 +108,7 @@ def retry_postpone(job, message, seconds=None): @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( - self, priority=None, max_retries=None, channel="root", description="Test job" + self, priority=None, max_retries=None, channel=None, description="Test job" ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) diff --git a/queue_job/job.py b/queue_job/job.py index 4ee90390e6..5b76957928 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -441,13 +441,7 @@ def __init__( self.job_model_name = "queue.job" self.job_config = ( - self.env["queue.job.function"] - .sudo() - .job_config( - self.env["queue.job.function"].job_function_name( - self.model_name, self.method_name - ) - ) + self.env["queue.job.function"].sudo().job_config(self.job_function_name) ) self.state = PENDING @@ -560,27 +554,35 @@ def store(self): if db_record: db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals) else: - date_created = self.date_created - # The following values must never be modified after the - # creation of the job vals.update( { + "user_id": self.env.uid, + "channel": self.channel, + # The following values must never be modified after the + # creation of the job "uuid": self.uuid, "name": self.description, - "date_created": date_created, + "func_string": self.func_string, + "date_created": self.date_created, + "model_name": self.recordset._name, "method_name": self.method_name, + "job_function_id": self.job_config.job_function_id, + "channel_method_name": self.job_function_name, "records": self.recordset, "args": self.args, "kwargs": self.kwargs, } ) - # it the channel is not specified, lets the job_model compute - # the right one to use - if self.channel: - vals.update({"channel": self.channel}) - job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals) + @property + def func_string(self): + model = repr(self.recordset) + args = [repr(arg) for arg in self.args] + kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()] + all_args = ", ".join(args + kwargs) + return "{}.{}({})".format(model, self.method_name, all_args) + def db_record(self): return self.db_record_from_uuid(self.env, self.uuid) @@ -589,6 +591,11 @@ def func(self): recordset = self.recordset.with_context(job_uuid=self.uuid) return getattr(recordset, self.method_name) + @property + def job_function_name(self): + func_model = self.env["queue.job.function"].sudo() + return func_model.job_function_name(self.recordset._name, self.method_name) + @property def identity_key(self): if self._identity_key is None: @@ -646,6 +653,14 @@ def eta(self, value): else: self._eta = value + @property + def channel(self): + return self._channel or self.job_config.channel + + @channel.setter + def channel(self, value): + self._channel = value + def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index a80eb2f9af..b360bdc502 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -37,27 +37,22 @@ class QueueJob(models.Model): "date_created", "model_name", "method_name", + "func_string", + "channel_method_name", + "job_function_id", "records", "args", "kwargs", ) uuid = fields.Char(string="UUID", readonly=True, index=True, required=True) - user_id = fields.Many2one( - comodel_name="res.users", - string="User ID", - compute="_compute_user_id", - inverse="_inverse_user_id", - store=True, - ) + user_id = fields.Many2one(comodel_name="res.users", string="User ID") company_id = fields.Many2one( comodel_name="res.company", string="Company", index=True ) name = fields.Char(string="Description", readonly=True) - model_name = fields.Char( - string="Model", compute="_compute_model_name", store=True, readonly=True - ) + model_name = fields.Char(string="Model", readonly=True) method_name = fields.Char(readonly=True) # record_ids field is only for backward compatibility (e.g. used in related # actions), can be removed (replaced by "records") in 14.0 @@ -69,9 +64,7 @@ class QueueJob(models.Model): ) args = JobSerialized(readonly=True, base_type=tuple) kwargs = JobSerialized(readonly=True, base_type=dict) - func_string = fields.Char( - string="Task", compute="_compute_func_string", readonly=True, store=True - ) + func_string = fields.Char(string="Task", readonly=True) state = fields.Selection(STATES, readonly=True, required=True, index=True) priority = fields.Integer() @@ -91,21 +84,13 @@ class QueueJob(models.Model): "max. retries.\n" "Retries are infinite when empty.", ) - channel_method_name = fields.Char( - readonly=True, compute="_compute_job_function", store=True - ) + # FIXME the name of this field is very confusing + channel_method_name = fields.Char(readonly=True) job_function_id = fields.Many2one( - comodel_name="queue.job.function", - compute="_compute_job_function", - string="Job Function", - readonly=True, - store=True, + comodel_name="queue.job.function", string="Job Function", readonly=True, ) - override_channel = fields.Char() - channel = fields.Char( - compute="_compute_channel", inverse="_inverse_channel", store=True, index=True - ) + channel = fields.Char(index=True) identity_key = fields.Char(readonly=True) worker_pid = fields.Integer(readonly=True) @@ -122,65 +107,18 @@ def init(self): "'enqueued') AND identity_key IS NOT NULL;" ) - @api.depends("records") - def _compute_user_id(self): - for record in self: - record.user_id = record.records.env.uid - - def _inverse_user_id(self): - for record in self.with_context(_job_edit_sentinel=self.EDIT_SENTINEL): - record.records = record.records.with_user(record.user_id.id) - - @api.depends("records") - def _compute_model_name(self): - for record in self: - record.model_name = record.records._name - @api.depends("records") def _compute_record_ids(self): for record in self: record.record_ids = record.records.ids - def _inverse_channel(self): - for record in self: - record.override_channel = record.channel - - @api.depends("job_function_id.channel_id") - def _compute_channel(self): - for record in self: - channel = ( - record.override_channel or record.job_function_id.channel or "root" - ) - if record.channel != channel: - record.channel = channel - - @api.depends("model_name", "method_name", "job_function_id.channel_id") - def _compute_job_function(self): - for record in self: - func_model = self.env["queue.job.function"] - channel_method_name = func_model.job_function_name( - record.model_name, record.method_name - ) - function = func_model.search([("name", "=", channel_method_name)], limit=1) - record.channel_method_name = channel_method_name - record.job_function_id = function - - @api.depends("model_name", "method_name", "records", "args", "kwargs") - def _compute_func_string(self): - for record in self: - model = repr(record.records) - args = [repr(arg) for arg in record.args] - kwargs = ["{}={!r}".format(key, val) for key, val in record.kwargs.items()] - all_args = ", ".join(args + kwargs) - record.func_string = "{}.{}({})".format(model, record.method_name, all_args) - @api.model_create_multi def create(self, vals_list): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: # Prevent to create a queue.job record "raw" from RPC. # ``with_delay()`` must be used. raise exceptions.AccessError( - _("Queue jobs must created by calling 'with_delay()'.") + _("Queue jobs must be created by calling 'with_delay()'.") ) return super().create(vals_list) @@ -196,10 +134,25 @@ def write(self, vals): ) ) + different_user_jobs = self.browse() + if vals.get("user_id"): + different_user_jobs = self.filtered( + lambda records: records.env.user.id != vals["user_id"] + ) + if vals.get("state") == "failed": self._message_post_on_failure() - return super().write(vals) + result = super().write(vals) + + for record in different_user_jobs: + # the user is stored in the env of the record, but we still want to + # have a stored user_id field to be able to search/groupby, so + # synchronize the env of records with user_id + super(QueueJob, record).write( + {"records": record.records.with_user(vals["user_id"])} + ) + return result def open_related_action(self): """Open the related action associated to the job""" diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index db9eea3c94..4f351659bd 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -27,7 +27,8 @@ class QueueJobFunction(models.Model): "retry_pattern " "related_action_enable " "related_action_func_name " - "related_action_kwargs ", + "related_action_kwargs " + "job_function_id ", ) def _default_channel(self): @@ -147,6 +148,7 @@ def job_default_config(self): related_action_enable=True, related_action_func_name=None, related_action_kwargs={}, + job_function_id=None, ) def _parse_retry_pattern(self): @@ -179,6 +181,7 @@ def job_config(self, name): related_action_enable=config.related_action.get("enable", True), related_action_func_name=config.related_action.get("func_name"), related_action_kwargs=config.related_action.get("kwargs", {}), + job_function_id=config.id, ) def _retry_pattern_format_error_message(self): diff --git a/queue_job/tests/test_model_job_function.py b/queue_job/tests/test_model_job_function.py index 965e26d8f2..84676fdb65 100644 --- a/queue_job/tests/test_model_job_function.py +++ b/queue_job/tests/test_model_job_function.py @@ -31,7 +31,7 @@ def test_function_job_config(self): channel = self.env["queue.job.channel"].create( {"name": "foo", "parent_id": self.env.ref("queue_job.channel_root").id} ) - self.env["queue.job.function"].create( + job_function = self.env["queue.job.function"].create( { "model_id": self.env.ref("base.model_res_users").id, "method": "read", @@ -52,5 +52,6 @@ def test_function_job_config(self): related_action_enable=True, related_action_func_name="related_action_foo", related_action_kwargs={"b": 1}, + job_function_id=job_function.id, ), ) From 578292ebc373fd1dfa1b4a06ef32366c5e64ff5b Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 1 Feb 2021 10:32:56 +0100 Subject: [PATCH 04/22] Remove initial create notification and follower Everytime a job is created, a mail.message "Queue Job created" is created. This is useless, as we already have the creation date and user, and nobody will ever want to receive a notification for a created job anyway. Removing the on creation auto-subscription of the user that created the job makes sense as well since we automatically subscribe the queue job managers for failures. Add the owner of the jobs to the followers on failures only as well. It allows to remove a lot of insertions of records (and of deletions when autovacuuming jobs). --- queue_job/models/queue_job.py | 10 +++++-- test_queue_job/tests/test_job.py | 47 ++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index b360bdc502..6b729c68e8 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -120,7 +120,10 @@ def create(self, vals_list): raise exceptions.AccessError( _("Queue jobs must be created by calling 'with_delay()'.") ) - return super().create(vals_list) + return super( + QueueJob, + self.with_context(mail_create_nolog=True, mail_create_nosubscribe=True), + ).create(vals_list) def write(self, vals): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: @@ -192,9 +195,10 @@ def _message_post_on_failure(self): # subscribe the users now to avoid to subscribe them # at every job creation domain = self._subscribe_users_domain() - users = self.env["res.users"].search(domain) - self.message_subscribe(partner_ids=users.mapped("partner_id").ids) + base_users = self.env["res.users"].search(domain) for record in self: + users = base_users | record.user_id + record.message_subscribe(partner_ids=users.mapped("partner_id").ids) msg = record._message_failed_job() if msg: record.message_post(body=msg, subtype_xmlid="queue_job.mt_job_failed") diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 4414e870d5..33593ed039 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -7,7 +7,6 @@ import mock import odoo.tests.common as common -from odoo import SUPERUSER_ID from odoo.addons.queue_job import identity_exact from odoo.addons.queue_job.exception import ( @@ -481,7 +480,7 @@ def test_message_when_write_fail(self): stored.write({"state": "failed"}) self.assertEqual(stored.state, FAILED) messages = stored.message_ids - self.assertEqual(len(messages), 2) + self.assertEqual(len(messages), 1) def test_follower_when_write_fail(self): """Check that inactive users doesn't are not followers even if @@ -540,6 +539,22 @@ def setUp(self): User = self.env["res.users"] Company = self.env["res.company"] Partner = self.env["res.partner"] + + main_company = self.env.ref("base.main_company") + + self.partner_user = Partner.create( + {"name": "Simple User", "email": "simple.user@example.com"} + ) + self.simple_user = User.create( + { + "partner_id": self.partner_user.id, + "company_ids": [(4, main_company.id)], + "login": "simple_user", + "name": "simple user", + "groups_id": [], + } + ) + self.other_partner_a = Partner.create( {"name": "My Company a", "is_company": True, "email": "test@tes.ttest"} ) @@ -556,7 +571,7 @@ def setUp(self): "company_id": self.other_company_a.id, "company_ids": [(4, self.other_company_a.id)], "login": "my_login a", - "name": "my user", + "name": "my user A", "groups_id": [(4, grp_queue_job_manager)], } ) @@ -576,16 +591,11 @@ def setUp(self): "company_id": self.other_company_b.id, "company_ids": [(4, self.other_company_b.id)], "login": "my_login_b", - "name": "my user 1", + "name": "my user B", "groups_id": [(4, grp_queue_job_manager)], } ) - def _subscribe_users(self, stored): - domain = stored._subscribe_users_domain() - users = self.env["res.users"].search(domain) - stored.message_subscribe(partner_ids=users.mapped("partner_id").ids) - def _create_job(self, env): self.cr.execute("delete from queue_job") env["test.queue.job"].with_delay().testing_method() @@ -631,11 +641,14 @@ def test_job_subscription(self): # queue_job.group_queue_job_manager must be followers User = self.env["res.users"] no_company_context = dict(self.env.context, company_id=None) - no_company_env = self.env(context=no_company_context) + no_company_env = self.env(user=self.simple_user, context=no_company_context) stored = self._create_job(no_company_env) - self._subscribe_users(stored) - users = User.with_context(active_test=False).search( - [("groups_id", "=", self.ref("queue_job.group_queue_job_manager"))] + stored._message_post_on_failure() + users = ( + User.search( + [("groups_id", "=", self.ref("queue_job.group_queue_job_manager"))] + ) + + stored.user_id ) self.assertEqual(len(stored.message_follower_ids), len(users)) expected_partners = [u.partner_id for u in users] @@ -649,13 +662,13 @@ def test_job_subscription(self): # jobs created for a specific company_id are followed only by # company's members company_a_context = dict(self.env.context, company_id=self.other_company_a.id) - company_a_env = self.env(context=company_a_context) + company_a_env = self.env(user=self.simple_user, context=company_a_context) stored = self._create_job(company_a_env) stored.with_user(self.other_user_a.id) - self._subscribe_users(stored) - # 2 because admin + self.other_partner_a + stored._message_post_on_failure() + # 2 because simple_user (creator of job) + self.other_partner_a self.assertEqual(len(stored.message_follower_ids), 2) - users = User.browse([SUPERUSER_ID, self.other_user_a.id]) + users = self.simple_user + self.other_user_a expected_partners = [u.partner_id for u in users] self.assertSetEqual( set(stored.message_follower_ids.mapped("partner_id")), From adc8d46c89d2917fc752a6afd24831c61c02ee82 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 10 Feb 2021 09:42:06 +0100 Subject: [PATCH 05/22] Add model in search view / group by --- queue_job/views/queue_job_views.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index a68c03d34f..2bb4d58771 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -126,6 +126,7 @@ + + From 18b7ab8fc55f4de7016ab640615567e8b63659d2 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 8 Feb 2021 16:43:43 +0100 Subject: [PATCH 06/22] queue_job: add exec time to view some stats --- queue_job/job.py | 9 +++++ .../migrations/13.0.3.7.0/pre-migration.py | 35 +++++++++++++++++++ queue_job/models/queue_job.py | 5 +++ queue_job/views/queue_job_views.xml | 28 ++++++++++++++- test_queue_job/tests/test_job.py | 9 +++-- 5 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 queue_job/migrations/13.0.3.7.0/pre-migration.py diff --git a/queue_job/job.py b/queue_job/job.py index 5b76957928..133c5b2a4e 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -529,6 +529,7 @@ def store(self): "date_enqueued": False, "date_started": False, "date_done": False, + "exec_time": False, "eta": False, "identity_key": False, "worker_pid": self.worker_pid, @@ -540,6 +541,8 @@ def store(self): vals["date_started"] = self.date_started if self.date_done: vals["date_done"] = self.date_done + if self.exec_time: + vals["exec_time"] = self.exec_time if self.eta: vals["eta"] = self.eta if self.identity_key: @@ -661,6 +664,12 @@ def channel(self): def channel(self, value): self._channel = value + @property + def exec_time(self): + if self.date_done and self.date_started: + return (self.date_done - self.date_started).total_seconds() + return None + def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None diff --git a/queue_job/migrations/13.0.3.7.0/pre-migration.py b/queue_job/migrations/13.0.3.7.0/pre-migration.py new file mode 100644 index 0000000000..c14d6800ad --- /dev/null +++ b/queue_job/migrations/13.0.3.7.0/pre-migration.py @@ -0,0 +1,35 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging + +from odoo.tools.sql import column_exists + +_logger = logging.getLogger(__name__) + + +def migrate(cr, version): + if not column_exists(cr, "queue_job", "exec_time"): + # Disable trigger otherwise the update takes ages. + cr.execute( + """ + ALTER TABLE queue_job DISABLE TRIGGER queue_job_notify; + """ + ) + cr.execute( + """ + ALTER TABLE queue_job ADD COLUMN exec_time double precision DEFAULT 0; + """ + ) + cr.execute( + """ + UPDATE + queue_job + SET + exec_time = EXTRACT(EPOCH FROM (date_done - date_started)); + """ + ) + cr.execute( + """ + ALTER TABLE queue_job ENABLE TRIGGER queue_job_notify; + """ + ) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 6b729c68e8..7a1992972a 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -75,6 +75,11 @@ class QueueJob(models.Model): date_started = fields.Datetime(string="Start Date", readonly=True) date_enqueued = fields.Datetime(string="Enqueue Time", readonly=True) date_done = fields.Datetime(readonly=True) + exec_time = fields.Float( + string="Execution Time (avg)", + group_operator="avg", + help="Time required to execute this job in seconds. Average when grouped.", + ) eta = fields.Datetime(string="Execute only after") retry = fields.Integer(string="Current try") diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 2bb4d58771..59d2fa01df 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -57,6 +57,8 @@ + + @@ -109,6 +111,7 @@ + @@ -116,6 +119,29 @@ + + queue.job.pivot + queue.job + + + + + + + + + + + queue.job.graph + queue.job + + + + + + + + queue.job.search queue.job @@ -182,7 +208,7 @@ Jobs queue.job - tree,form + tree,form,pivot,graph {'search_default_pending': 1, 'search_default_enqueued': 1, 'search_default_started': 1, diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 33593ed039..6a1fc9a7de 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -149,14 +149,16 @@ def test_worker_pid(self): def test_set_done(self): job_a = Job(self.method) + job_a.date_started = datetime(2015, 3, 15, 16, 40, 0) datetime_path = "odoo.addons.queue_job.job.datetime" with mock.patch(datetime_path, autospec=True) as mock_datetime: mock_datetime.now.return_value = datetime(2015, 3, 15, 16, 41, 0) job_a.set_done(result="test") - self.assertEqual(job_a.state, DONE) - self.assertEqual(job_a.result, "test") - self.assertEqual(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEquals(job_a.state, DONE) + self.assertEquals(job_a.result, "test") + self.assertEquals(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEquals(job_a.exec_time, 60.0) self.assertFalse(job_a.exc_info) def test_set_failed(self): @@ -233,6 +235,7 @@ def test_read(self): self.assertAlmostEqual(job_read.date_started, test_date, delta=delta) self.assertAlmostEqual(job_read.date_enqueued, test_date, delta=delta) self.assertAlmostEqual(job_read.date_done, test_date, delta=delta) + self.assertAlmostEqual(job_read.exec_time, 0.0) def test_job_unlinked(self): test_job = Job(self.method, args=("o", "k"), kwargs={"c": "!"}) From 9a360b57baf9a8f8155321d96d95cb9f20922ab0 Mon Sep 17 00:00:00 2001 From: Wolfgang Pichler Date: Thu, 19 Nov 2020 08:16:47 +0100 Subject: [PATCH 07/22] Fix missing rollback on retried jobs When RetryableJobError was raised, any change done by the job was not rollbacked. Using `raise` would throw the exception up to the core and rollback, but we would have a stack trace in the logs for each try. Calling rollback manually (rollback also clears the env) hide the tracebacks, however, when the last try fails, the full traceback is still shown in the logs. Fixes #261 --- queue_job/controllers/main.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index f2997ce8a0..a087bb2e1b 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -74,10 +74,10 @@ def retry_postpone(job, message, seconds=None): if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise - retry_postpone( - job, tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY - ) _logger.debug("%s OperationalError, postponed", job) + raise RetryableJobError( + tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY + ) except NothingToDoJob as err: if str(err): @@ -92,6 +92,10 @@ def retry_postpone(job, message, seconds=None): # delay the job later, requeue retry_postpone(job, str(err), seconds=err.seconds) _logger.debug("%s postponed", job) + # Do not trigger the error up because we don't want an exception + # traceback in the logs we should have the traceback when all + # retries are exhausted + env.cr.rollback() except (FailedJobError, Exception): buff = StringIO() From 8a1de9fec3c318bc12d13f854c1d7e418f99572d Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 10 Feb 2021 12:01:51 +0100 Subject: [PATCH 08/22] Fix date_done set when state changes back to pending When Job.date_done has been set, for instance because the job has been performed, if the job is set back to pending (e.g. a RetryableJobError is raised), the date_done is kept. --- queue_job/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/job.py b/queue_job/job.py index 133c5b2a4e..e74a114b53 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -674,6 +674,7 @@ def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None self.date_started = None + self.date_done = None self.worker_pid = None if reset_retry: self.retry = 0 From a56ec10e3cc7edc551f00e4b8f76468ff68bbb81 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 08:39:28 +0200 Subject: [PATCH 09/22] queue_job: close buffer when done --- queue_job/controllers/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index a087bb2e1b..602eb75993 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -106,6 +106,7 @@ def retry_postpone(job, message, seconds=None): job.env = api.Environment(new_cr, SUPERUSER_ID, {}) job.set_failed(exc_info=buff.getvalue()) job.store() + buff.close() raise return "" From 1bfa9d831f0292272edf54359f01d411df787cd7 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 09:15:36 +0200 Subject: [PATCH 10/22] queue_job: improve filtering and grouping --- queue_job/views/queue_job_views.xml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 59d2fa01df..ac62e973fd 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -112,6 +112,8 @@ + + @@ -153,6 +155,10 @@ + + + + + + From f973b2de503b8c866ea7662ac9da27fb2d1bf4d6 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 10:06:32 +0200 Subject: [PATCH 11/22] queue_job: add hook to customize stored values --- queue_job/job.py | 45 +++++++++++++++++++++------- queue_job/models/base.py | 18 ++++++++++- test_queue_job/models/test_models.py | 8 +++++ test_queue_job/tests/test_job.py | 10 +++++++ 4 files changed, 70 insertions(+), 11 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index e74a114b53..a458628f9c 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -518,6 +518,22 @@ def perform(self): def store(self): """Store the Job""" + job_model = self.env["queue.job"] + # The sentinel is used to prevent edition sensitive fields (such as + # method_name) from RPC methods. + edit_sentinel = job_model.EDIT_SENTINEL + + db_record = self.db_record() + if db_record: + db_record.with_context(_job_edit_sentinel=edit_sentinel).write( + self._store_values() + ) + else: + job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create( + self._store_values(create=True) + ) + + def _store_values(self, create=False): vals = { "state": self.state, "priority": self.priority, @@ -548,15 +564,7 @@ def store(self): if self.identity_key: vals["identity_key"] = self.identity_key - job_model = self.env["queue.job"] - # The sentinel is used to prevent edition sensitive fields (such as - # method_name) from RPC methods. - edit_sentinel = job_model.EDIT_SENTINEL - - db_record = self.db_record() - if db_record: - db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals) - else: + if create: vals.update( { "user_id": self.env.uid, @@ -576,7 +584,24 @@ def store(self): "kwargs": self.kwargs, } ) - job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals) + + vals_from_model = self._store_values_from_model() + # Sanitize values: make sure you cannot screw core values + vals_from_model = {k: v for k, v in vals_from_model.items() if k not in vals} + vals.update(vals_from_model) + return vals + + def _store_values_from_model(self): + vals = {} + value_handlers_candidates = ( + "_job_store_values_for_" + self.method_name, + "_job_store_values", + ) + for candidate in value_handlers_candidates: + handler = getattr(self.recordset, candidate, None) + if handler is not None: + vals = handler(self) + return vals @property def func_string(self): diff --git a/queue_job/models/base.py b/queue_job/models/base.py index 5b1f8fee79..a398bf85fb 100644 --- a/queue_job/models/base.py +++ b/queue_job/models/base.py @@ -5,7 +5,7 @@ import logging import os -from odoo import models +from odoo import api, models from ..job import DelayableRecordset @@ -189,3 +189,19 @@ def auto_delay_wrapper(self, *args, **kwargs): origin = getattr(self, method_name) return functools.update_wrapper(auto_delay_wrapper, origin) + + @api.model + def _job_store_values(self, job): + """Hook for manipulating job stored values. + + You can define a more specific hook for a job function + by defining a method name with this pattern: + + `_queue_job_store_values_${func_name}` + + NOTE: values will be stored only if they match stored fields on `queue.job`. + + :param job: current queue_job.job.Job instance. + :return: dictionary for setting job values. + """ + return {} diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index a5a3843230..a9f32dec0e 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -10,6 +10,8 @@ class QueueJob(models.Model): _inherit = "queue.job" + additional_info = fields.Char() + def testing_related_method(self, **kwargs): return self, kwargs @@ -88,6 +90,12 @@ def _register_hook(self): ) return super()._register_hook() + def _job_store_values(self, job): + value = "JUST_TESTING" + if job.state == "failed": + value += "_BUT_FAILED" + return {"additional_info": value} + class TestQueueChannel(models.Model): diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 6a1fc9a7de..9e432cbbe4 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -184,6 +184,16 @@ def test_store(self): stored = self.queue_job.search([("uuid", "=", test_job.uuid)]) self.assertEqual(len(stored), 1) + def test_store_extra_data(self): + test_job = Job(self.method) + test_job.store() + stored = self.queue_job.search([("uuid", "=", test_job.uuid)]) + self.assertEqual(stored.additional_info, "JUST_TESTING") + test_job.set_failed(exc_info="failed test", exc_name="FailedTest") + test_job.store() + stored.invalidate_cache() + self.assertEqual(stored.additional_info, "JUST_TESTING_BUT_FAILED") + def test_read(self): eta = datetime.now() + timedelta(hours=5) test_job = Job( From 8b5caee08ddd676f52861651f1a6288803c66d33 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 11:34:30 +0200 Subject: [PATCH 12/22] queue_job: migration step to store exception data --- .../migrations/13.0.3.8.0/post-migration.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 queue_job/migrations/13.0.3.8.0/post-migration.py diff --git a/queue_job/migrations/13.0.3.8.0/post-migration.py b/queue_job/migrations/13.0.3.8.0/post-migration.py new file mode 100644 index 0000000000..f6eff72707 --- /dev/null +++ b/queue_job/migrations/13.0.3.8.0/post-migration.py @@ -0,0 +1,47 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging + +from odoo import SUPERUSER_ID, api + +_logger = logging.getLogger(__name__) + + +def migrate(cr, version): + with api.Environment.manage(): + env = api.Environment(cr, SUPERUSER_ID, {}) + _logger.info("Computing exception name for failed jobs") + _compute_jobs_new_values(env) + + +def _compute_jobs_new_values(env): + for job in env["queue.job"].search( + [("state", "=", "failed"), ("exc_info", "!=", False)] + ): + exception_details = _get_exception_details(job) + if exception_details: + job.update(exception_details) + + +def _get_exception_details(job): + for line in reversed(job.exc_info.splitlines()): + if _find_exception(line): + name, msg = line.split(":", 1) + return { + "exc_name": name.strip(), + "exc_message": msg.strip("()', \""), + } + + +def _find_exception(line): + # Just a list of common errors. + # If you want to target others, add your own migration step for your db. + exceptions = ( + "Error:", # catch all well named exceptions + # other live instance errors found + "requests.exceptions.MissingSchema", + "botocore.errorfactory.NoSuchKey", + ) + for exc in exceptions: + if exc in line: + return exc From a0b8b9ea5e968cf89ce20e9c31dbdfb0cab3666b Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 31 May 2021 21:34:19 +0200 Subject: [PATCH 13/22] Fix display on exec_time on tree view as seconds The float_time widget shows hours seconds, we only want seconds. The widget had been removed on the form view, but not on the tree view. --- queue_job/views/queue_job_views.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index ac62e973fd..5032fdd8bb 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -111,7 +111,7 @@ - + From 192aef84ad358f5aa6d90cef3c593a0f7ea527ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C5=A9ng=20=28Tr=E1=BA=A7n=20=C4=90=C3=ACnh=29?= Date: Tue, 23 Nov 2021 18:24:18 +0700 Subject: [PATCH 14/22] [IMP] queue_job: black, isort, prettier --- queue_job/models/queue_job.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 7a1992972a..46d78bfe14 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -92,7 +92,9 @@ class QueueJob(models.Model): # FIXME the name of this field is very confusing channel_method_name = fields.Char(readonly=True) job_function_id = fields.Many2one( - comodel_name="queue.job.function", string="Job Function", readonly=True, + comodel_name="queue.job.function", + string="Job Function", + readonly=True, ) channel = fields.Char(index=True) From 9db4493dc7c761a04bb0650f4513f37557ba78af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C5=A9ng=20=28Tr=E1=BA=A7n=20=C4=90=C3=ACnh=29?= Date: Tue, 23 Nov 2021 19:37:47 +0700 Subject: [PATCH 15/22] Forward migration scripts from #309 #328 --- queue_job/__manifest__.py | 2 +- .../migrations/{13.0.3.8.0 => 15.0.1.1.0}/post-migration.py | 0 .../migrations/{13.0.3.7.0 => 15.0.1.1.0}/pre-migration.py | 4 ---- 3 files changed, 1 insertion(+), 5 deletions(-) rename queue_job/migrations/{13.0.3.8.0 => 15.0.1.1.0}/post-migration.py (100%) rename queue_job/migrations/{13.0.3.7.0 => 15.0.1.1.0}/pre-migration.py (93%) diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index ecf64fd8f3..3da1c8a4fa 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -3,7 +3,7 @@ { "name": "Job Queue", - "version": "15.0.1.0.2", + "version": "15.0.1.1.0", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", diff --git a/queue_job/migrations/13.0.3.8.0/post-migration.py b/queue_job/migrations/15.0.1.1.0/post-migration.py similarity index 100% rename from queue_job/migrations/13.0.3.8.0/post-migration.py rename to queue_job/migrations/15.0.1.1.0/post-migration.py diff --git a/queue_job/migrations/13.0.3.7.0/pre-migration.py b/queue_job/migrations/15.0.1.1.0/pre-migration.py similarity index 93% rename from queue_job/migrations/13.0.3.7.0/pre-migration.py rename to queue_job/migrations/15.0.1.1.0/pre-migration.py index c14d6800ad..7b15de8e58 100644 --- a/queue_job/migrations/13.0.3.7.0/pre-migration.py +++ b/queue_job/migrations/15.0.1.1.0/pre-migration.py @@ -1,11 +1,7 @@ # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) -import logging - from odoo.tools.sql import column_exists -_logger = logging.getLogger(__name__) - def migrate(cr, version): if not column_exists(cr, "queue_job", "exec_time"): From 93c65703cb960ea0600b2d50476e7261805cd685 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 09:12:07 +0200 Subject: [PATCH 16/22] queue_job: store exception name and message --- queue_job/controllers/main.py | 24 +++++++++++++++++++----- queue_job/job.py | 21 ++++++++++++++++++--- queue_job/models/queue_job.py | 2 ++ queue_job/views/queue_job_views.xml | 21 +++++++++++++-------- test_queue_job/tests/test_job.py | 16 +++++++++++----- 5 files changed, 63 insertions(+), 21 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 602eb75993..33b5778476 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -77,7 +77,7 @@ def retry_postpone(job, message, seconds=None): _logger.debug("%s OperationalError, postponed", job) raise RetryableJobError( tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY - ) + ) from err except NothingToDoJob as err: if str(err): @@ -97,20 +97,34 @@ def retry_postpone(job, message, seconds=None): # retries are exhausted env.cr.rollback() - except (FailedJobError, Exception): + except (FailedJobError, Exception) as orig_exception: buff = StringIO() traceback.print_exc(file=buff) - _logger.error(buff.getvalue()) + traceback_txt = buff.getvalue() + _logger.error(traceback_txt) job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: - job.env = api.Environment(new_cr, SUPERUSER_ID, {}) - job.set_failed(exc_info=buff.getvalue()) + job.env = job.env(cr=new_cr) + vals = self._get_failure_values(job, traceback_txt, orig_exception) + job.set_failed(**vals) job.store() buff.close() raise return "" + def _get_failure_values(self, job, traceback_txt, orig_exception): + """Collect relevant data from exception.""" + exception_name = orig_exception.__class__.__name__ + if hasattr(orig_exception, "__module__"): + exception_name = orig_exception.__module__ + "." + exception_name + exc_message = getattr(orig_exception, "name", str(orig_exception)) + return { + "exc_info": traceback_txt, + "exc_name": exception_name, + "exc_message": exc_message, + } + @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( self, priority=None, max_retries=None, channel=None, description="Test job" diff --git a/queue_job/job.py b/queue_job/job.py index a458628f9c..6d1adf1cdc 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -214,6 +214,14 @@ class Job(object): A description of the result (for humans). + .. attribute:: exc_name + + Exception error name when the job failed. + + .. attribute:: exc_message + + Exception error message when the job failed. + .. attribute:: exc_info Exception information (traceback) when the job failed. @@ -478,6 +486,8 @@ def __init__( self.date_done = None self.result = None + self.exc_name = None + self.exc_message = None self.exc_info = None if "company_id" in env.context: @@ -539,6 +549,8 @@ def _store_values(self, create=False): "priority": self.priority, "retry": self.retry, "max_retries": self.max_retries, + "exc_name": self.exc_name, + "exc_message": self.exc_message, "exc_info": self.exc_info, "company_id": self.company_id, "result": str(self.result) if self.result else False, @@ -719,15 +731,17 @@ def set_started(self): def set_done(self, result=None): self.state = DONE + self.exc_name = None self.exc_info = None self.date_done = datetime.now() if result is not None: self.result = result - def set_failed(self, exc_info=None): + def set_failed(self, **kw): self.state = FAILED - if exc_info is not None: - self.exc_info = exc_info + for k, v in kw.items(): + if v is not None: + setattr(self, k, v) def __repr__(self): return "" % (self.uuid, self.priority) @@ -758,6 +772,7 @@ def postpone(self, result=None, seconds=None): """ eta_seconds = self._get_retry_seconds(seconds) self.eta = timedelta(seconds=eta_seconds) + self.exc_name = None self.exc_info = None if result is not None: self.result = result diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 46d78bfe14..2d8ef74538 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -68,6 +68,8 @@ class QueueJob(models.Model): state = fields.Selection(STATES, readonly=True, required=True, index=True) priority = fields.Integer() + exc_name = fields.Char(string="Exception", readonly=True) + exc_message = fields.Char(string="Exception Message", readonly=True) exc_info = fields.Text(string="Exception Info", readonly=True) result = fields.Text(readonly=True) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 5032fdd8bb..cb7aeac28b 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -71,6 +71,18 @@ > If the max. retries is 0, the number of retries is infinite. + +
+
+ +
- - -
@@ -108,8 +113,8 @@ - + diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 9e432cbbe4..c382289608 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -155,17 +155,23 @@ def test_set_done(self): mock_datetime.now.return_value = datetime(2015, 3, 15, 16, 41, 0) job_a.set_done(result="test") - self.assertEquals(job_a.state, DONE) - self.assertEquals(job_a.result, "test") - self.assertEquals(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) - self.assertEquals(job_a.exec_time, 60.0) + self.assertEqual(job_a.state, DONE) + self.assertEqual(job_a.result, "test") + self.assertEqual(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEqual(job_a.exec_time, 60.0) self.assertFalse(job_a.exc_info) def test_set_failed(self): job_a = Job(self.method) - job_a.set_failed(exc_info="failed test") + job_a.set_failed( + exc_info="failed test", + exc_name="FailedTest", + exc_message="Sadly this job failed", + ) self.assertEqual(job_a.state, FAILED) self.assertEqual(job_a.exc_info, "failed test") + self.assertEqual(job_a.exc_name, "FailedTest") + self.assertEqual(job_a.exc_message, "Sadly this job failed") def test_postpone(self): job_a = Job(self.method) From 1236ca6340a91f6262a77e7576da64befab93777 Mon Sep 17 00:00:00 2001 From: Enric Tobella Date: Tue, 22 Jun 2021 15:11:13 +0200 Subject: [PATCH 17/22] [FIX] queue_job: Migrations raising errors with OpenUpgrade --- queue_job/migrations/15.0.1.1.0/pre-migration.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/queue_job/migrations/15.0.1.1.0/pre-migration.py b/queue_job/migrations/15.0.1.1.0/pre-migration.py index 7b15de8e58..8ae6cb3a5f 100644 --- a/queue_job/migrations/15.0.1.1.0/pre-migration.py +++ b/queue_job/migrations/15.0.1.1.0/pre-migration.py @@ -1,10 +1,12 @@ # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) -from odoo.tools.sql import column_exists +from odoo.tools.sql import column_exists, table_exists def migrate(cr, version): - if not column_exists(cr, "queue_job", "exec_time"): + if table_exists(cr, "queue_job") and not column_exists( + cr, "queue_job", "exec_time" + ): # Disable trigger otherwise the update takes ages. cr.execute( """ From bb08610547ba94d1de4fcebb4826c4fe2f61142c Mon Sep 17 00:00:00 2001 From: Holger Brunn Date: Fri, 14 May 2021 15:37:20 +0200 Subject: [PATCH 18/22] [IMP] queue_job: Add cancelled state to queue.job --- queue_job/__manifest__.py | 1 + queue_job/job.py | 16 +++++++++ queue_job/models/queue_job.py | 12 ++++++- queue_job/security/ir.model.access.csv | 1 + queue_job/views/queue_job_views.xml | 10 +++++- queue_job/wizards/__init__.py | 1 + queue_job/wizards/queue_jobs_to_cancelled.py | 17 ++++++++++ .../wizards/queue_jobs_to_cancelled_views.xml | 34 +++++++++++++++++++ 8 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 queue_job/wizards/queue_jobs_to_cancelled.py create mode 100644 queue_job/wizards/queue_jobs_to_cancelled_views.xml diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 3da1c8a4fa..57084deccc 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -17,6 +17,7 @@ "views/queue_job_channel_views.xml", "views/queue_job_function_views.xml", "wizards/queue_jobs_to_done_views.xml", + "wizards/queue_jobs_to_cancelled_views.xml", "wizards/queue_requeue_job_views.xml", "views/queue_job_menus.xml", "data/queue_data.xml", diff --git a/queue_job/job.py b/queue_job/job.py index 6d1adf1cdc..794c7fb030 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -16,6 +16,7 @@ PENDING = "pending" ENQUEUED = "enqueued" +CANCELLED = "cancelled" DONE = "done" STARTED = "started" FAILED = "failed" @@ -25,6 +26,7 @@ (ENQUEUED, "Enqueued"), (STARTED, "Started"), (DONE, "Done"), + (CANCELLED, "Cancelled"), (FAILED, "Failed"), ] @@ -301,6 +303,9 @@ def _load_from_db_record(cls, job_db_record): if stored.date_done: job_.date_done = stored.date_done + if stored.date_cancelled: + job_.date_cancelled = stored.date_cancelled + job_.state = stored.state job_.result = stored.result if stored.result else None job_.exc_info = stored.exc_info if stored.exc_info else None @@ -484,6 +489,7 @@ def __init__( self.date_enqueued = None self.date_started = None self.date_done = None + self.date_cancelled = None self.result = None self.exc_name = None @@ -558,6 +564,7 @@ def _store_values(self, create=False): "date_started": False, "date_done": False, "exec_time": False, + "date_cancelled": False, "eta": False, "identity_key": False, "worker_pid": self.worker_pid, @@ -571,6 +578,8 @@ def _store_values(self, create=False): vals["date_done"] = self.date_done if self.exec_time: vals["exec_time"] = self.exec_time + if self.date_cancelled: + vals["date_cancelled"] = self.date_cancelled if self.eta: vals["eta"] = self.eta if self.identity_key: @@ -713,6 +722,7 @@ def set_pending(self, result=None, reset_retry=True): self.date_started = None self.date_done = None self.worker_pid = None + self.date_cancelled = None if reset_retry: self.retry = 0 if result is not None: @@ -737,6 +747,12 @@ def set_done(self, result=None): if result is not None: self.result = result + def set_cancelled(self, result=None): + self.state = CANCELLED + self.date_cancelled = datetime.now() + if result is not None: + self.result = result + def set_failed(self, **kw): self.state = FAILED for k, v in kw.items(): diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 2d8ef74538..b6f8b52a6e 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -8,7 +8,7 @@ from odoo.osv import expression from ..fields import JobSerialized -from ..job import DONE, PENDING, STATES, Job +from ..job import CANCELLED, DONE, PENDING, STATES, Job _logger = logging.getLogger(__name__) @@ -82,6 +82,7 @@ class QueueJob(models.Model): group_operator="avg", help="Time required to execute this job in seconds. Average when grouped.", ) + date_cancelled = fields.Datetime(readonly=True) eta = fields.Datetime(string="Execute only after") retry = fields.Integer(string="Current try") @@ -187,6 +188,8 @@ def _change_job_state(self, state, result=None): job_.set_done(result=result) elif state == PENDING: job_.set_pending(result=result) + elif state == CANCELLED: + job_.set_cancelled(result=result) else: raise ValueError("State not supported: %s" % state) job_.store() @@ -196,6 +199,11 @@ def button_done(self): self._change_job_state(DONE, result=result) return True + def button_cancelled(self): + result = _("Cancelled by %s") % self.env.user.name + self._change_job_state(CANCELLED, result=result) + return True + def requeue(self): self._change_job_state(PENDING) return True @@ -255,7 +263,9 @@ def autovacuum(self): while True: jobs = self.search( [ + "|", ("date_done", "<=", deadline), + ("date_cancelled", "<=", deadline), ("channel", "=", channel.complete_name), ], limit=1000, diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv index 9242305158..634daf8ede 100644 --- a/queue_job/security/ir.model.access.csv +++ b/queue_job/security/ir.model.access.csv @@ -4,3 +4,4 @@ access_queue_job_function_manager,queue job functions manager,queue_job.model_qu access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1 access_queue_requeue_job,queue requeue job manager,queue_job.model_queue_requeue_job,queue_job.group_queue_job_manager,1,1,1,1 access_queue_jobs_to_done,queue jobs to done manager,queue_job.model_queue_jobs_to_done,queue_job.group_queue_job_manager,1,1,1,1 +access_queue_jobs_to_cancelled,queue jobs to cancelled manager,queue_job.model_queue_jobs_to_cancelled,queue_job.group_queue_job_manager,1,1,1,1 diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index cb7aeac28b..9121e1b188 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -23,12 +23,20 @@ type="object" groups="queue_job.group_queue_job_manager" /> +