From c6809345f8e307fde9722a3d3d95ffad82c1e6ba Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 1 Feb 2021 11:28:10 +0100 Subject: [PATCH 01/16] Change technical fields to read-only These fields should not be changed by users. --- queue_job/models/queue_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 157e4d8cbc..a80eb2f9af 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -107,8 +107,8 @@ class QueueJob(models.Model): compute="_compute_channel", inverse="_inverse_channel", store=True, index=True ) - identity_key = fields.Char() - worker_pid = fields.Integer() + identity_key = fields.Char(readonly=True) + worker_pid = fields.Integer(readonly=True) def init(self): self._cr.execute( From 3f27f10d7a4876a7f6d7829deca8b6639356eeb5 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 2 Feb 2021 07:57:50 +0100 Subject: [PATCH 02/16] Optimize queue.job creation Several fields on queue.job are initialized using computed fields, then never changed again. On creation of a queue.job record, we'll have an initial INSERT + at least one following UPDATE for the computed fields. Replace all the stored computed fields by a raw initialization of the values in `Job.store()` when the job is created, so we have only a single INSERT. Highlights: * as channel is no longer a compute/inverse field, override_channel is useless, I dropped it (actually the same value was stored in both channel and override_channel as the channel field was stored) * one functional diff is that now, when changing a channel on a job.function, the channel is no longer synchronized on existing jobs, it will be applied only on new jobs: actually this is an improvement, because it was impossible to change the channel of a job function in a large queue_job table as it meant writing on all the done/started jobs * searching the queue.job.function is now cached, as each job using the same will run a query on queue_job_function --- queue_job/controllers/main.py | 2 +- queue_job/job.py | 47 ++++++---- queue_job/models/queue_job.py | 101 ++++++--------------- queue_job/models/queue_job_function.py | 5 +- queue_job/tests/test_model_job_function.py | 3 +- 5 files changed, 65 insertions(+), 93 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 8078285ff3..400ef2389c 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -113,7 +113,7 @@ def retry_postpone(job, message, seconds=None): @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( - self, priority=None, max_retries=None, channel="root", description="Test job" + self, priority=None, max_retries=None, channel=None, description="Test job" ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) diff --git a/queue_job/job.py b/queue_job/job.py index 2145b62cb8..befd1182fa 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -441,13 +441,7 @@ def __init__( self.job_model_name = "queue.job" self.job_config = ( - self.env["queue.job.function"] - .sudo() - .job_config( - self.env["queue.job.function"].job_function_name( - self.model_name, self.method_name - ) - ) + self.env["queue.job.function"].sudo().job_config(self.job_function_name) ) self.state = PENDING @@ -560,27 +554,35 @@ def store(self): if db_record: db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals) else: - date_created = self.date_created - # The following values must never be modified after the - # creation of the job vals.update( { + "user_id": self.env.uid, + "channel": self.channel, + # The following values must never be modified after the + # creation of the job "uuid": self.uuid, "name": self.description, - "date_created": date_created, + "func_string": self.func_string, + "date_created": self.date_created, + "model_name": self.recordset._name, "method_name": self.method_name, + "job_function_id": self.job_config.job_function_id, + "channel_method_name": self.job_function_name, "records": self.recordset, "args": self.args, "kwargs": self.kwargs, } ) - # it the channel is not specified, lets the job_model compute - # the right one to use - if self.channel: - vals.update({"channel": self.channel}) - job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals) + @property + def func_string(self): + model = repr(self.recordset) + args = [repr(arg) for arg in self.args] + kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()] + all_args = ", ".join(args + kwargs) + return "{}.{}({})".format(model, self.method_name, all_args) + def db_record(self): return self.db_record_from_uuid(self.env, self.uuid) @@ -595,6 +597,11 @@ def func(self): recordset = recordset.with_company(self.company_id) return getattr(recordset, self.method_name) + @property + def job_function_name(self): + func_model = self.env["queue.job.function"].sudo() + return func_model.job_function_name(self.recordset._name, self.method_name) + @property def identity_key(self): if self._identity_key is None: @@ -652,6 +659,14 @@ def eta(self, value): else: self._eta = value + @property + def channel(self): + return self._channel or self.job_config.channel + + @channel.setter + def channel(self, value): + self._channel = value + def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index a80eb2f9af..b360bdc502 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -37,27 +37,22 @@ class QueueJob(models.Model): "date_created", "model_name", "method_name", + "func_string", + "channel_method_name", + "job_function_id", "records", "args", "kwargs", ) uuid = fields.Char(string="UUID", readonly=True, index=True, required=True) - user_id = fields.Many2one( - comodel_name="res.users", - string="User ID", - compute="_compute_user_id", - inverse="_inverse_user_id", - store=True, - ) + user_id = fields.Many2one(comodel_name="res.users", string="User ID") company_id = fields.Many2one( comodel_name="res.company", string="Company", index=True ) name = fields.Char(string="Description", readonly=True) - model_name = fields.Char( - string="Model", compute="_compute_model_name", store=True, readonly=True - ) + model_name = fields.Char(string="Model", readonly=True) method_name = fields.Char(readonly=True) # record_ids field is only for backward compatibility (e.g. used in related # actions), can be removed (replaced by "records") in 14.0 @@ -69,9 +64,7 @@ class QueueJob(models.Model): ) args = JobSerialized(readonly=True, base_type=tuple) kwargs = JobSerialized(readonly=True, base_type=dict) - func_string = fields.Char( - string="Task", compute="_compute_func_string", readonly=True, store=True - ) + func_string = fields.Char(string="Task", readonly=True) state = fields.Selection(STATES, readonly=True, required=True, index=True) priority = fields.Integer() @@ -91,21 +84,13 @@ class QueueJob(models.Model): "max. retries.\n" "Retries are infinite when empty.", ) - channel_method_name = fields.Char( - readonly=True, compute="_compute_job_function", store=True - ) + # FIXME the name of this field is very confusing + channel_method_name = fields.Char(readonly=True) job_function_id = fields.Many2one( - comodel_name="queue.job.function", - compute="_compute_job_function", - string="Job Function", - readonly=True, - store=True, + comodel_name="queue.job.function", string="Job Function", readonly=True, ) - override_channel = fields.Char() - channel = fields.Char( - compute="_compute_channel", inverse="_inverse_channel", store=True, index=True - ) + channel = fields.Char(index=True) identity_key = fields.Char(readonly=True) worker_pid = fields.Integer(readonly=True) @@ -122,65 +107,18 @@ def init(self): "'enqueued') AND identity_key IS NOT NULL;" ) - @api.depends("records") - def _compute_user_id(self): - for record in self: - record.user_id = record.records.env.uid - - def _inverse_user_id(self): - for record in self.with_context(_job_edit_sentinel=self.EDIT_SENTINEL): - record.records = record.records.with_user(record.user_id.id) - - @api.depends("records") - def _compute_model_name(self): - for record in self: - record.model_name = record.records._name - @api.depends("records") def _compute_record_ids(self): for record in self: record.record_ids = record.records.ids - def _inverse_channel(self): - for record in self: - record.override_channel = record.channel - - @api.depends("job_function_id.channel_id") - def _compute_channel(self): - for record in self: - channel = ( - record.override_channel or record.job_function_id.channel or "root" - ) - if record.channel != channel: - record.channel = channel - - @api.depends("model_name", "method_name", "job_function_id.channel_id") - def _compute_job_function(self): - for record in self: - func_model = self.env["queue.job.function"] - channel_method_name = func_model.job_function_name( - record.model_name, record.method_name - ) - function = func_model.search([("name", "=", channel_method_name)], limit=1) - record.channel_method_name = channel_method_name - record.job_function_id = function - - @api.depends("model_name", "method_name", "records", "args", "kwargs") - def _compute_func_string(self): - for record in self: - model = repr(record.records) - args = [repr(arg) for arg in record.args] - kwargs = ["{}={!r}".format(key, val) for key, val in record.kwargs.items()] - all_args = ", ".join(args + kwargs) - record.func_string = "{}.{}({})".format(model, record.method_name, all_args) - @api.model_create_multi def create(self, vals_list): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: # Prevent to create a queue.job record "raw" from RPC. # ``with_delay()`` must be used. raise exceptions.AccessError( - _("Queue jobs must created by calling 'with_delay()'.") + _("Queue jobs must be created by calling 'with_delay()'.") ) return super().create(vals_list) @@ -196,10 +134,25 @@ def write(self, vals): ) ) + different_user_jobs = self.browse() + if vals.get("user_id"): + different_user_jobs = self.filtered( + lambda records: records.env.user.id != vals["user_id"] + ) + if vals.get("state") == "failed": self._message_post_on_failure() - return super().write(vals) + result = super().write(vals) + + for record in different_user_jobs: + # the user is stored in the env of the record, but we still want to + # have a stored user_id field to be able to search/groupby, so + # synchronize the env of records with user_id + super(QueueJob, record).write( + {"records": record.records.with_user(vals["user_id"])} + ) + return result def open_related_action(self): """Open the related action associated to the job""" diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index a80f4920d3..f593b6520a 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -27,7 +27,8 @@ class QueueJobFunction(models.Model): "retry_pattern " "related_action_enable " "related_action_func_name " - "related_action_kwargs ", + "related_action_kwargs " + "job_function_id ", ) def _default_channel(self): @@ -141,6 +142,7 @@ def job_default_config(self): related_action_enable=True, related_action_func_name=None, related_action_kwargs={}, + job_function_id=None, ) def _parse_retry_pattern(self): @@ -173,6 +175,7 @@ def job_config(self, name): related_action_enable=config.related_action.get("enable", True), related_action_func_name=config.related_action.get("func_name"), related_action_kwargs=config.related_action.get("kwargs", {}), + job_function_id=config.id, ) def _retry_pattern_format_error_message(self): diff --git a/queue_job/tests/test_model_job_function.py b/queue_job/tests/test_model_job_function.py index c9bdea56e8..e6ddf3fcc3 100644 --- a/queue_job/tests/test_model_job_function.py +++ b/queue_job/tests/test_model_job_function.py @@ -31,7 +31,7 @@ def test_function_job_config(self): channel = self.env["queue.job.channel"].create( {"name": "foo", "parent_id": self.env.ref("queue_job.channel_root").id} ) - self.env["queue.job.function"].create( + job_function = self.env["queue.job.function"].create( { "model_id": self.env.ref("base.model_res_users").id, "method": "read", @@ -52,5 +52,6 @@ def test_function_job_config(self): related_action_enable=True, related_action_func_name="related_action_foo", related_action_kwargs={"b": 1}, + job_function_id=job_function.id, ), ) From b0156d45d2bb97f503077a01ebc30f46498f3e91 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 1 Feb 2021 10:32:56 +0100 Subject: [PATCH 03/16] Remove initial create notification and follower Everytime a job is created, a mail.message "Queue Job created" is created. This is useless, as we already have the creation date and user, and nobody will ever want to receive a notification for a created job anyway. Removing the on creation auto-subscription of the user that created the job makes sense as well since we automatically subscribe the queue job managers for failures. Add the owner of the jobs to the followers on failures only as well. It allows to remove a lot of insertions of records (and of deletions when autovacuuming jobs). --- queue_job/models/queue_job.py | 10 +++++-- test_queue_job/tests/test_job.py | 47 ++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index b360bdc502..6b729c68e8 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -120,7 +120,10 @@ def create(self, vals_list): raise exceptions.AccessError( _("Queue jobs must be created by calling 'with_delay()'.") ) - return super().create(vals_list) + return super( + QueueJob, + self.with_context(mail_create_nolog=True, mail_create_nosubscribe=True), + ).create(vals_list) def write(self, vals): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: @@ -192,9 +195,10 @@ def _message_post_on_failure(self): # subscribe the users now to avoid to subscribe them # at every job creation domain = self._subscribe_users_domain() - users = self.env["res.users"].search(domain) - self.message_subscribe(partner_ids=users.mapped("partner_id").ids) + base_users = self.env["res.users"].search(domain) for record in self: + users = base_users | record.user_id + record.message_subscribe(partner_ids=users.mapped("partner_id").ids) msg = record._message_failed_job() if msg: record.message_post(body=msg, subtype_xmlid="queue_job.mt_job_failed") diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 3cbb034acb..3cd2c0fb7a 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -7,7 +7,6 @@ import mock import odoo.tests.common as common -from odoo import SUPERUSER_ID from odoo.addons.queue_job import identity_exact from odoo.addons.queue_job.exception import ( @@ -524,7 +523,7 @@ def test_message_when_write_fail(self): stored.write({"state": "failed"}) self.assertEqual(stored.state, FAILED) messages = stored.message_ids - self.assertEqual(len(messages), 2) + self.assertEqual(len(messages), 1) def test_follower_when_write_fail(self): """Check that inactive users doesn't are not followers even if @@ -583,6 +582,22 @@ def setUp(self): User = self.env["res.users"] Company = self.env["res.company"] Partner = self.env["res.partner"] + + main_company = self.env.ref("base.main_company") + + self.partner_user = Partner.create( + {"name": "Simple User", "email": "simple.user@example.com"} + ) + self.simple_user = User.create( + { + "partner_id": self.partner_user.id, + "company_ids": [(4, main_company.id)], + "login": "simple_user", + "name": "simple user", + "groups_id": [], + } + ) + self.other_partner_a = Partner.create( {"name": "My Company a", "is_company": True, "email": "test@tes.ttest"} ) @@ -599,7 +614,7 @@ def setUp(self): "company_id": self.other_company_a.id, "company_ids": [(4, self.other_company_a.id)], "login": "my_login a", - "name": "my user", + "name": "my user A", "groups_id": [(4, grp_queue_job_manager)], } ) @@ -619,16 +634,11 @@ def setUp(self): "company_id": self.other_company_b.id, "company_ids": [(4, self.other_company_b.id)], "login": "my_login_b", - "name": "my user 1", + "name": "my user B", "groups_id": [(4, grp_queue_job_manager)], } ) - def _subscribe_users(self, stored): - domain = stored._subscribe_users_domain() - users = self.env["res.users"].search(domain) - stored.message_subscribe(partner_ids=users.mapped("partner_id").ids) - def _create_job(self, env): self.cr.execute("delete from queue_job") env["test.queue.job"].with_delay().testing_method() @@ -674,11 +684,14 @@ def test_job_subscription(self): # queue_job.group_queue_job_manager must be followers User = self.env["res.users"] no_company_context = dict(self.env.context, company_id=None) - no_company_env = self.env(context=no_company_context) + no_company_env = self.env(user=self.simple_user, context=no_company_context) stored = self._create_job(no_company_env) - self._subscribe_users(stored) - users = User.with_context(active_test=False).search( - [("groups_id", "=", self.ref("queue_job.group_queue_job_manager"))] + stored._message_post_on_failure() + users = ( + User.search( + [("groups_id", "=", self.ref("queue_job.group_queue_job_manager"))] + ) + + stored.user_id ) self.assertEqual(len(stored.message_follower_ids), len(users)) expected_partners = [u.partner_id for u in users] @@ -692,13 +705,13 @@ def test_job_subscription(self): # jobs created for a specific company_id are followed only by # company's members company_a_context = dict(self.env.context, company_id=self.other_company_a.id) - company_a_env = self.env(context=company_a_context) + company_a_env = self.env(user=self.simple_user, context=company_a_context) stored = self._create_job(company_a_env) stored.with_user(self.other_user_a.id) - self._subscribe_users(stored) - # 2 because admin + self.other_partner_a + stored._message_post_on_failure() + # 2 because simple_user (creator of job) + self.other_partner_a self.assertEqual(len(stored.message_follower_ids), 2) - users = User.browse([SUPERUSER_ID, self.other_user_a.id]) + users = self.simple_user + self.other_user_a expected_partners = [u.partner_id for u in users] self.assertSetEqual( set(stored.message_follower_ids.mapped("partner_id")), From 0d89c40abe8112e22834409cffcc79a81d460cf4 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 10 Feb 2021 09:42:06 +0100 Subject: [PATCH 04/16] Add model in search view / group by --- queue_job/views/queue_job_views.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index a68c03d34f..2bb4d58771 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -126,6 +126,7 @@ + + From ec9d607e1ce66f018903033a7a9a82ecfd9c4378 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 8 Feb 2021 16:43:43 +0100 Subject: [PATCH 05/16] queue_job: add exec time to view some stats --- queue_job/job.py | 9 +++++ .../migrations/13.0.3.7.0/pre-migration.py | 35 +++++++++++++++++++ queue_job/models/queue_job.py | 5 +++ queue_job/views/queue_job_views.xml | 28 ++++++++++++++- test_queue_job/tests/test_job.py | 9 +++-- 5 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 queue_job/migrations/13.0.3.7.0/pre-migration.py diff --git a/queue_job/job.py b/queue_job/job.py index befd1182fa..a6525ac2d6 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -529,6 +529,7 @@ def store(self): "date_enqueued": False, "date_started": False, "date_done": False, + "exec_time": False, "eta": False, "identity_key": False, "worker_pid": self.worker_pid, @@ -540,6 +541,8 @@ def store(self): vals["date_started"] = self.date_started if self.date_done: vals["date_done"] = self.date_done + if self.exec_time: + vals["exec_time"] = self.exec_time if self.eta: vals["eta"] = self.eta if self.identity_key: @@ -667,6 +670,12 @@ def channel(self): def channel(self, value): self._channel = value + @property + def exec_time(self): + if self.date_done and self.date_started: + return (self.date_done - self.date_started).total_seconds() + return None + def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None diff --git a/queue_job/migrations/13.0.3.7.0/pre-migration.py b/queue_job/migrations/13.0.3.7.0/pre-migration.py new file mode 100644 index 0000000000..c14d6800ad --- /dev/null +++ b/queue_job/migrations/13.0.3.7.0/pre-migration.py @@ -0,0 +1,35 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging + +from odoo.tools.sql import column_exists + +_logger = logging.getLogger(__name__) + + +def migrate(cr, version): + if not column_exists(cr, "queue_job", "exec_time"): + # Disable trigger otherwise the update takes ages. + cr.execute( + """ + ALTER TABLE queue_job DISABLE TRIGGER queue_job_notify; + """ + ) + cr.execute( + """ + ALTER TABLE queue_job ADD COLUMN exec_time double precision DEFAULT 0; + """ + ) + cr.execute( + """ + UPDATE + queue_job + SET + exec_time = EXTRACT(EPOCH FROM (date_done - date_started)); + """ + ) + cr.execute( + """ + ALTER TABLE queue_job ENABLE TRIGGER queue_job_notify; + """ + ) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 6b729c68e8..7a1992972a 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -75,6 +75,11 @@ class QueueJob(models.Model): date_started = fields.Datetime(string="Start Date", readonly=True) date_enqueued = fields.Datetime(string="Enqueue Time", readonly=True) date_done = fields.Datetime(readonly=True) + exec_time = fields.Float( + string="Execution Time (avg)", + group_operator="avg", + help="Time required to execute this job in seconds. Average when grouped.", + ) eta = fields.Datetime(string="Execute only after") retry = fields.Integer(string="Current try") diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 2bb4d58771..59d2fa01df 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -57,6 +57,8 @@ + + @@ -109,6 +111,7 @@ + @@ -116,6 +119,29 @@ + + queue.job.pivot + queue.job + + + + + + + + + + + queue.job.graph + queue.job + + + + + + + + queue.job.search queue.job @@ -182,7 +208,7 @@ Jobs queue.job - tree,form + tree,form,pivot,graph {'search_default_pending': 1, 'search_default_enqueued': 1, 'search_default_started': 1, diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 3cd2c0fb7a..9eef6d2195 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -149,14 +149,16 @@ def test_worker_pid(self): def test_set_done(self): job_a = Job(self.method) + job_a.date_started = datetime(2015, 3, 15, 16, 40, 0) datetime_path = "odoo.addons.queue_job.job.datetime" with mock.patch(datetime_path, autospec=True) as mock_datetime: mock_datetime.now.return_value = datetime(2015, 3, 15, 16, 41, 0) job_a.set_done(result="test") - self.assertEqual(job_a.state, DONE) - self.assertEqual(job_a.result, "test") - self.assertEqual(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEquals(job_a.state, DONE) + self.assertEquals(job_a.result, "test") + self.assertEquals(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEquals(job_a.exec_time, 60.0) self.assertFalse(job_a.exc_info) def test_set_failed(self): @@ -276,6 +278,7 @@ def test_read(self): self.assertAlmostEqual(job_read.date_started, test_date, delta=delta) self.assertAlmostEqual(job_read.date_enqueued, test_date, delta=delta) self.assertAlmostEqual(job_read.date_done, test_date, delta=delta) + self.assertAlmostEqual(job_read.exec_time, 0.0) def test_job_unlinked(self): test_job = Job(self.method, args=("o", "k"), kwargs={"c": "!"}) From b20c4d537688a2cd7a267c38ebdf54f41a7b49e1 Mon Sep 17 00:00:00 2001 From: Wolfgang Pichler Date: Thu, 19 Nov 2020 08:16:47 +0100 Subject: [PATCH 06/16] Fix missing rollback on retried jobs When RetryableJobError was raised, any change done by the job was not rollbacked. Using `raise` would throw the exception up to the core and rollback, but we would have a stack trace in the logs for each try. Calling rollback manually (rollback also clears the env) hide the tracebacks, however, when the last try fails, the full traceback is still shown in the logs. Fixes #261 --- queue_job/controllers/main.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 400ef2389c..67a0adb716 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -77,10 +77,10 @@ def retry_postpone(job, message, seconds=None): if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise - retry_postpone( - job, tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY - ) _logger.debug("%s OperationalError, postponed", job) + raise RetryableJobError( + tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY + ) except NothingToDoJob as err: if str(err): @@ -95,6 +95,10 @@ def retry_postpone(job, message, seconds=None): # delay the job later, requeue retry_postpone(job, str(err), seconds=err.seconds) _logger.debug("%s postponed", job) + # Do not trigger the error up because we don't want an exception + # traceback in the logs we should have the traceback when all + # retries are exhausted + env.cr.rollback() except (FailedJobError, Exception): buff = StringIO() From 4563fc34e401a16bef96aeaaf8c5d6ac98c8a58c Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 10 Feb 2021 12:01:51 +0100 Subject: [PATCH 07/16] Fix date_done set when state changes back to pending When Job.date_done has been set, for instance because the job has been performed, if the job is set back to pending (e.g. a RetryableJobError is raised), the date_done is kept. --- queue_job/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/job.py b/queue_job/job.py index a6525ac2d6..eda0827193 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -680,6 +680,7 @@ def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None self.date_started = None + self.date_done = None self.worker_pid = None if reset_retry: self.retry = 0 From 8418c9a5e1d44ee66dbabd6685896b41c218b40d Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 08:39:28 +0200 Subject: [PATCH 08/16] queue_job: close buffer when done --- queue_job/controllers/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 67a0adb716..4c3d30b596 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -111,6 +111,7 @@ def retry_postpone(job, message, seconds=None): job.set_failed(exc_info=buff.getvalue()) job.store() new_cr.commit() + buff.close() raise return "" From 2a380b1c0fa42a22c3ee9fb3e375e8be470cc95e Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 09:12:07 +0200 Subject: [PATCH 09/16] queue_job: store exception name and message --- queue_job/controllers/main.py | 20 +++++++++++++++++--- queue_job/job.py | 21 ++++++++++++++++++--- queue_job/models/queue_job.py | 2 ++ queue_job/views/queue_job_views.xml | 22 ++++++++++++++-------- test_queue_job/tests/test_job.py | 12 +++++++++--- 5 files changed, 60 insertions(+), 17 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 4c3d30b596..1b35c1ac99 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -100,15 +100,17 @@ def retry_postpone(job, message, seconds=None): # retries are exhausted env.cr.rollback() - except (FailedJobError, Exception): + except (FailedJobError, Exception) as orig_exception: buff = StringIO() traceback.print_exc(file=buff) - _logger.error(buff.getvalue()) + traceback_txt = buff.getvalue() + _logger.error(traceback_txt) job.env.clear() with odoo.api.Environment.manage(): with odoo.registry(job.env.cr.dbname).cursor() as new_cr: job.env = job.env(cr=new_cr) - job.set_failed(exc_info=buff.getvalue()) + vals = self._get_failure_values(job, traceback_txt, orig_exception) + job.set_failed(**vals) job.store() new_cr.commit() buff.close() @@ -116,6 +118,18 @@ def retry_postpone(job, message, seconds=None): return "" + def _get_failure_values(self, job, traceback_txt, orig_exception): + """Collect relevant data from exception.""" + exception_name = orig_exception.__class__.__name__ + if hasattr(orig_exception, "__module__"): + exception_name = orig_exception.__module__ + "." + exception_name + exc_message = getattr(orig_exception, "name", str(orig_exception)) + return { + "exc_info": traceback_txt, + "exc_name": exception_name, + "exc_message": exc_message, + } + @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( self, priority=None, max_retries=None, channel=None, description="Test job" diff --git a/queue_job/job.py b/queue_job/job.py index eda0827193..287ab701db 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -214,6 +214,14 @@ class Job(object): A description of the result (for humans). + .. attribute:: exc_name + + Exception error name when the job failed. + + .. attribute:: exc_message + + Exception error message when the job failed. + .. attribute:: exc_info Exception information (traceback) when the job failed. @@ -478,6 +486,8 @@ def __init__( self.date_done = None self.result = None + self.exc_name = None + self.exc_message = None self.exc_info = None if "company_id" in env.context: @@ -523,6 +533,8 @@ def store(self): "priority": self.priority, "retry": self.retry, "max_retries": self.max_retries, + "exc_name": self.exc_name, + "exc_message": self.exc_message, "exc_info": self.exc_info, "company_id": self.company_id, "result": str(self.result) if self.result else False, @@ -700,15 +712,17 @@ def set_started(self): def set_done(self, result=None): self.state = DONE + self.exc_name = None self.exc_info = None self.date_done = datetime.now() if result is not None: self.result = result - def set_failed(self, exc_info=None): + def set_failed(self, **kw): self.state = FAILED - if exc_info is not None: - self.exc_info = exc_info + for k, v in kw.items(): + if v is not None: + setattr(self, k, v) def __repr__(self): return "" % (self.uuid, self.priority) @@ -739,6 +753,7 @@ def postpone(self, result=None, seconds=None): """ eta_seconds = self._get_retry_seconds(seconds) self.eta = timedelta(seconds=eta_seconds) + self.exc_name = None self.exc_info = None if result is not None: self.result = result diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 7a1992972a..23d12abb28 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -68,6 +68,8 @@ class QueueJob(models.Model): state = fields.Selection(STATES, readonly=True, required=True, index=True) priority = fields.Integer() + exc_name = fields.Char(string="Exception", readonly=True) + exc_message = fields.Char(string="Exception Message", readonly=True) exc_info = fields.Text(string="Exception Info", readonly=True) result = fields.Text(readonly=True) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 59d2fa01df..e5f7991593 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -71,6 +71,18 @@ > If the max. retries is 0, the number of retries is infinite. + +
+
+ +
- - -
@@ -108,10 +113,11 @@ - + + diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 9eef6d2195..5399d795b5 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -163,9 +163,15 @@ def test_set_done(self): def test_set_failed(self): job_a = Job(self.method) - job_a.set_failed(exc_info="failed test") - self.assertEqual(job_a.state, FAILED) - self.assertEqual(job_a.exc_info, "failed test") + job_a.set_failed( + exc_info="failed test", + exc_name="FailedTest", + exc_message="Sadly this job failed", + ) + self.assertEquals(job_a.state, FAILED) + self.assertEquals(job_a.exc_info, "failed test") + self.assertEquals(job_a.exc_name, "FailedTest") + self.assertEquals(job_a.exc_message, "Sadly this job failed") def test_postpone(self): job_a = Job(self.method) From 5607bd7886780016e671252cafdd8179de2fba38 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 09:15:36 +0200 Subject: [PATCH 10/16] queue_job: improve filtering and grouping --- queue_job/views/queue_job_views.xml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index e5f7991593..fd6f3e0bba 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -118,6 +118,7 @@ + @@ -159,6 +160,10 @@ + + + + + + From 6c1de75d753d215a6f08e5474537cbf73aee8a4a Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 10:06:32 +0200 Subject: [PATCH 11/16] queue_job: add hook to customize stored values --- queue_job/job.py | 45 +++++++++++++++++++++------- queue_job/models/base.py | 18 ++++++++++- test_queue_job/models/test_models.py | 8 +++++ test_queue_job/tests/test_job.py | 10 +++++++ 4 files changed, 70 insertions(+), 11 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index 287ab701db..a4dc224b85 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -528,6 +528,22 @@ def perform(self): def store(self): """Store the Job""" + job_model = self.env["queue.job"] + # The sentinel is used to prevent edition sensitive fields (such as + # method_name) from RPC methods. + edit_sentinel = job_model.EDIT_SENTINEL + + db_record = self.db_record() + if db_record: + db_record.with_context(_job_edit_sentinel=edit_sentinel).write( + self._store_values() + ) + else: + job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create( + self._store_values(create=True) + ) + + def _store_values(self, create=False): vals = { "state": self.state, "priority": self.priority, @@ -560,15 +576,7 @@ def store(self): if self.identity_key: vals["identity_key"] = self.identity_key - job_model = self.env["queue.job"] - # The sentinel is used to prevent edition sensitive fields (such as - # method_name) from RPC methods. - edit_sentinel = job_model.EDIT_SENTINEL - - db_record = self.db_record() - if db_record: - db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals) - else: + if create: vals.update( { "user_id": self.env.uid, @@ -588,7 +596,24 @@ def store(self): "kwargs": self.kwargs, } ) - job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals) + + vals_from_model = self._store_values_from_model() + # Sanitize values: make sure you cannot screw core values + vals_from_model = {k: v for k, v in vals_from_model.items() if k not in vals} + vals.update(vals_from_model) + return vals + + def _store_values_from_model(self): + vals = {} + value_handlers_candidates = ( + "_job_store_values_for_" + self.method_name, + "_job_store_values", + ) + for candidate in value_handlers_candidates: + handler = getattr(self.recordset, candidate, None) + if handler is not None: + vals = handler(self) + return vals @property def func_string(self): diff --git a/queue_job/models/base.py b/queue_job/models/base.py index 5b1f8fee79..a398bf85fb 100644 --- a/queue_job/models/base.py +++ b/queue_job/models/base.py @@ -5,7 +5,7 @@ import logging import os -from odoo import models +from odoo import api, models from ..job import DelayableRecordset @@ -189,3 +189,19 @@ def auto_delay_wrapper(self, *args, **kwargs): origin = getattr(self, method_name) return functools.update_wrapper(auto_delay_wrapper, origin) + + @api.model + def _job_store_values(self, job): + """Hook for manipulating job stored values. + + You can define a more specific hook for a job function + by defining a method name with this pattern: + + `_queue_job_store_values_${func_name}` + + NOTE: values will be stored only if they match stored fields on `queue.job`. + + :param job: current queue_job.job.Job instance. + :return: dictionary for setting job values. + """ + return {} diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index a5a3843230..a9f32dec0e 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -10,6 +10,8 @@ class QueueJob(models.Model): _inherit = "queue.job" + additional_info = fields.Char() + def testing_related_method(self, **kwargs): return self, kwargs @@ -88,6 +90,12 @@ def _register_hook(self): ) return super()._register_hook() + def _job_store_values(self, job): + value = "JUST_TESTING" + if job.state == "failed": + value += "_BUT_FAILED" + return {"additional_info": value} + class TestQueueChannel(models.Model): diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 5399d795b5..c78c551f2a 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -233,6 +233,16 @@ def test_store(self): stored = self.queue_job.search([("uuid", "=", test_job.uuid)]) self.assertEqual(len(stored), 1) + def test_store_extra_data(self): + test_job = Job(self.method) + test_job.store() + stored = self.queue_job.search([("uuid", "=", test_job.uuid)]) + self.assertEqual(stored.additional_info, "JUST_TESTING") + test_job.set_failed(exc_info="failed test", exc_name="FailedTest") + test_job.store() + stored.invalidate_cache() + self.assertEqual(stored.additional_info, "JUST_TESTING_BUT_FAILED") + def test_read(self): eta = datetime.now() + timedelta(hours=5) test_job = Job( From 4b8781c733e75de9dfe5f25e4629f9d76cafd566 Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 11:34:30 +0200 Subject: [PATCH 12/16] queue_job: migration step to store exception data --- .../migrations/13.0.3.8.0/post-migration.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 queue_job/migrations/13.0.3.8.0/post-migration.py diff --git a/queue_job/migrations/13.0.3.8.0/post-migration.py b/queue_job/migrations/13.0.3.8.0/post-migration.py new file mode 100644 index 0000000000..f6eff72707 --- /dev/null +++ b/queue_job/migrations/13.0.3.8.0/post-migration.py @@ -0,0 +1,47 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging + +from odoo import SUPERUSER_ID, api + +_logger = logging.getLogger(__name__) + + +def migrate(cr, version): + with api.Environment.manage(): + env = api.Environment(cr, SUPERUSER_ID, {}) + _logger.info("Computing exception name for failed jobs") + _compute_jobs_new_values(env) + + +def _compute_jobs_new_values(env): + for job in env["queue.job"].search( + [("state", "=", "failed"), ("exc_info", "!=", False)] + ): + exception_details = _get_exception_details(job) + if exception_details: + job.update(exception_details) + + +def _get_exception_details(job): + for line in reversed(job.exc_info.splitlines()): + if _find_exception(line): + name, msg = line.split(":", 1) + return { + "exc_name": name.strip(), + "exc_message": msg.strip("()', \""), + } + + +def _find_exception(line): + # Just a list of common errors. + # If you want to target others, add your own migration step for your db. + exceptions = ( + "Error:", # catch all well named exceptions + # other live instance errors found + "requests.exceptions.MissingSchema", + "botocore.errorfactory.NoSuchKey", + ) + for exc in exceptions: + if exc in line: + return exc From 192cc652d802da2d8c597ccc0fb845f58d742915 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 31 May 2021 21:34:19 +0200 Subject: [PATCH 13/16] Fix display on exec_time on tree view as seconds The float_time widget shows hours seconds, we only want seconds. The widget had been removed on the form view, but not on the tree view. --- queue_job/views/queue_job_views.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index fd6f3e0bba..cb7aeac28b 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -116,7 +116,7 @@ - + From fc484964040d283c4bbe791e33117cd242b70ed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C5=A9ng=20=28Tr=E1=BA=A7n=20=C4=90=C3=ACnh=29?= Date: Tue, 23 Nov 2021 18:24:18 +0700 Subject: [PATCH 14/16] [IMP] queue_job: black, isort, prettier --- queue_job/models/queue_job.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 23d12abb28..2d8ef74538 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -94,7 +94,9 @@ class QueueJob(models.Model): # FIXME the name of this field is very confusing channel_method_name = fields.Char(readonly=True) job_function_id = fields.Many2one( - comodel_name="queue.job.function", string="Job Function", readonly=True, + comodel_name="queue.job.function", + string="Job Function", + readonly=True, ) channel = fields.Char(index=True) From 3051240d5da37fb523aeaf27fa50de8eafb0f322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C5=A9ng=20=28Tr=E1=BA=A7n=20=C4=90=C3=ACnh=29?= Date: Tue, 23 Nov 2021 19:37:47 +0700 Subject: [PATCH 15/16] Forward migration scripts from #309 #328 --- queue_job/__manifest__.py | 2 +- .../migrations/{13.0.3.8.0 => 14.0.1.4.0}/post-migration.py | 0 .../migrations/{13.0.3.7.0 => 14.0.1.4.0}/pre-migration.py | 4 ---- 3 files changed, 1 insertion(+), 5 deletions(-) rename queue_job/migrations/{13.0.3.8.0 => 14.0.1.4.0}/post-migration.py (100%) rename queue_job/migrations/{13.0.3.7.0 => 14.0.1.4.0}/pre-migration.py (93%) diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 2e5a2ef610..d100873272 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -3,7 +3,7 @@ { "name": "Job Queue", - "version": "14.0.1.3.4", + "version": "14.0.1.4.0", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", diff --git a/queue_job/migrations/13.0.3.8.0/post-migration.py b/queue_job/migrations/14.0.1.4.0/post-migration.py similarity index 100% rename from queue_job/migrations/13.0.3.8.0/post-migration.py rename to queue_job/migrations/14.0.1.4.0/post-migration.py diff --git a/queue_job/migrations/13.0.3.7.0/pre-migration.py b/queue_job/migrations/14.0.1.4.0/pre-migration.py similarity index 93% rename from queue_job/migrations/13.0.3.7.0/pre-migration.py rename to queue_job/migrations/14.0.1.4.0/pre-migration.py index c14d6800ad..7b15de8e58 100644 --- a/queue_job/migrations/13.0.3.7.0/pre-migration.py +++ b/queue_job/migrations/14.0.1.4.0/pre-migration.py @@ -1,11 +1,7 @@ # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) -import logging - from odoo.tools.sql import column_exists -_logger = logging.getLogger(__name__) - def migrate(cr, version): if not column_exists(cr, "queue_job", "exec_time"): From fb925cb38aa1f916ac9ca0b0983705ce35624123 Mon Sep 17 00:00:00 2001 From: Enric Tobella Date: Tue, 22 Jun 2021 15:11:13 +0200 Subject: [PATCH 16/16] [FIX] queue_job: Migrations raising errors with OpenUpgrade --- queue_job/migrations/14.0.1.4.0/pre-migration.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/queue_job/migrations/14.0.1.4.0/pre-migration.py b/queue_job/migrations/14.0.1.4.0/pre-migration.py index 7b15de8e58..8ae6cb3a5f 100644 --- a/queue_job/migrations/14.0.1.4.0/pre-migration.py +++ b/queue_job/migrations/14.0.1.4.0/pre-migration.py @@ -1,10 +1,12 @@ # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) -from odoo.tools.sql import column_exists +from odoo.tools.sql import column_exists, table_exists def migrate(cr, version): - if not column_exists(cr, "queue_job", "exec_time"): + if table_exists(cr, "queue_job") and not column_exists( + cr, "queue_job", "exec_time" + ): # Disable trigger otherwise the update takes ages. cr.execute( """