From 0fa4a217fbb5aad6b2e4bb4ab8e47a57a8e9ed33 Mon Sep 17 00:00:00 2001 From: Eugene Molotov Date: Wed, 17 Nov 2021 17:47:14 +0500 Subject: [PATCH 01/12] queue_job: fix typo in USAGE.rst --- queue_job/readme/USAGE.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index 6c472eccf9..c8ff94b793 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -25,7 +25,7 @@ Example of job function: .. code-block:: XML - + action_done From 296311e66c714f570c6a092a7844ae4fc8788ec8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Honor=C3=A9?= Date: Mon, 2 Aug 2021 16:12:16 +0200 Subject: [PATCH 02/12] [14.0][IMP] queue_job current company Use the current company to trigger the job (+ add related tests) [14.0][FIX] queu_job: allowed_company_ids => use with_company(...) Fill allowed_company_ids from context with the job's company instead of every allowed companies of the user. Because most of the time, a job is related to only one company. And adding every allowed companies of the user into the context may load some unexpected records (during search for example). Because standards ir.rule use ['|',('company_id','=',False),('company_id', 'in', company_ids)] and this 'company_ids' is filled with every allowed companies from the context. --- queue_job/job.py | 6 +++++ test_queue_job/tests/test_job.py | 43 ++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/queue_job/job.py b/queue_job/job.py index 4ee90390e6..7bf261cb4b 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -586,7 +586,13 @@ def db_record(self): @property def func(self): + # We can fill only one company into allowed_company_ids. + # Because if you have many, you can have unexpected records due to ir.rule. + # ir.rule use allowed_company_ids to load every records in many companies. + # But most of the time, a job should be executed on a single company. recordset = self.recordset.with_context(job_uuid=self.uuid) + if self.company_id: + recordset = recordset.with_company(self.company_id) return getattr(recordset, self.method_name) @property diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index ad9d2b37f5..0edcdd8d9f 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -176,6 +176,49 @@ def test_postpone(self): self.assertEqual(job_a.result, "test") self.assertFalse(job_a.exc_info) + def test_company_simple(self): + company = self.env.ref("base.main_company") + eta = datetime.now() + timedelta(hours=5) + test_job = Job( + self.method, + args=("o", "k"), + kwargs={"return_context": 1}, + priority=15, + eta=eta, + description="My description", + ) + test_job.worker_pid = 99999 # normally set on "set_start" + test_job.company_id = company.id + test_job.store() + job_read = Job.load(self.env, test_job.uuid) + self.assertEqual(test_job.func, job_read.func) + result_ctx = test_job.func(*tuple(test_job.args), **test_job.kwargs) + self.assertEqual(result_ctx.get("allowed_company_ids"), company.ids) + + def test_company_complex(self): + company1 = self.env.ref("base.main_company") + company2 = company1.create({"name": "Queue job company"}) + companies = company1 | company2 + self.env.user.write({"company_ids": [(6, False, companies.ids)]}) + # Ensure the main company still the first + self.assertEqual(self.env.user.company_id, company1) + eta = datetime.now() + timedelta(hours=5) + test_job = Job( + self.method, + args=("o", "k"), + kwargs={"return_context": 1}, + priority=15, + eta=eta, + description="My description", + ) + test_job.worker_pid = 99999 # normally set on "set_start" + test_job.company_id = company2.id + test_job.store() + job_read = Job.load(self.env, test_job.uuid) + self.assertEqual(test_job.func, job_read.func) + result_ctx = test_job.func(*tuple(test_job.args), **test_job.kwargs) + self.assertEqual(result_ctx.get("allowed_company_ids"), company2.ids) + def test_store(self): test_job = Job(self.method) test_job.store() From 16495db2dae9d67cff6c0131e7fbc8d9ccbb4de6 Mon Sep 17 00:00:00 2001 From: Sylvain LE GAL Date: Mon, 19 Jul 2021 15:18:35 +0200 Subject: [PATCH 03/12] [DOC] describe how to write queue.job.function in case of function defined in abstract model --- queue_job/readme/USAGE.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index c8ff94b793..66827e716a 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -43,6 +43,13 @@ they have different xmlids. On uninstall, the merged record is deleted when all the modules using it are uninstalled. +**Job function: model** + +If the function is defined in an abstract model, you can not write +```` +but you have to define a function for each model that inherits from the abstract model. + + **Job function: channel** The channel where the job will be delayed. The default channel is ``root``. From 257ad971fb85942e2fbeb9712cedbdd66f814d71 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 1 Feb 2021 11:28:10 +0100 Subject: [PATCH 04/12] Change technical fields to read-only These fields should not be changed by users. --- queue_job/models/queue_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 157e4d8cbc..a80eb2f9af 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -107,8 +107,8 @@ class QueueJob(models.Model): compute="_compute_channel", inverse="_inverse_channel", store=True, index=True ) - identity_key = fields.Char() - worker_pid = fields.Integer() + identity_key = fields.Char(readonly=True) + worker_pid = fields.Integer(readonly=True) def init(self): self._cr.execute( From b9e83899ad7298c10e330eae61abfef2690e3df2 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 2 Feb 2021 07:57:50 +0100 Subject: [PATCH 05/12] Optimize queue.job creation Several fields on queue.job are initialized using computed fields, then never changed again. On creation of a queue.job record, we'll have an initial INSERT + at least one following UPDATE for the computed fields. Replace all the stored computed fields by a raw initialization of the values in `Job.store()` when the job is created, so we have only a single INSERT. Highlights: * as channel is no longer a compute/inverse field, override_channel is useless, I dropped it (actually the same value was stored in both channel and override_channel as the channel field was stored) * one functional diff is that now, when changing a channel on a job.function, the channel is no longer synchronized on existing jobs, it will be applied only on new jobs: actually this is an improvement, because it was impossible to change the channel of a job function in a large queue_job table as it meant writing on all the done/started jobs * searching the queue.job.function is now cached, as each job using the same will run a query on queue_job_function --- queue_job/controllers/main.py | 2 +- queue_job/job.py | 47 ++++++---- queue_job/models/queue_job.py | 101 ++++++--------------- queue_job/models/queue_job_function.py | 5 +- queue_job/tests/test_model_job_function.py | 3 +- 5 files changed, 65 insertions(+), 93 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index d877254bfe..f42af7beaa 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -108,7 +108,7 @@ def retry_postpone(job, message, seconds=None): @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( - self, priority=None, max_retries=None, channel="root", description="Test job" + self, priority=None, max_retries=None, channel=None, description="Test job" ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) diff --git a/queue_job/job.py b/queue_job/job.py index 7bf261cb4b..7bf12a0363 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -441,13 +441,7 @@ def __init__( self.job_model_name = "queue.job" self.job_config = ( - self.env["queue.job.function"] - .sudo() - .job_config( - self.env["queue.job.function"].job_function_name( - self.model_name, self.method_name - ) - ) + self.env["queue.job.function"].sudo().job_config(self.job_function_name) ) self.state = PENDING @@ -560,27 +554,35 @@ def store(self): if db_record: db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals) else: - date_created = self.date_created - # The following values must never be modified after the - # creation of the job vals.update( { + "user_id": self.env.uid, + "channel": self.channel, + # The following values must never be modified after the + # creation of the job "uuid": self.uuid, "name": self.description, - "date_created": date_created, + "func_string": self.func_string, + "date_created": self.date_created, + "model_name": self.recordset._name, "method_name": self.method_name, + "job_function_id": self.job_config.job_function_id, + "channel_method_name": self.job_function_name, "records": self.recordset, "args": self.args, "kwargs": self.kwargs, } ) - # it the channel is not specified, lets the job_model compute - # the right one to use - if self.channel: - vals.update({"channel": self.channel}) - job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals) + @property + def func_string(self): + model = repr(self.recordset) + args = [repr(arg) for arg in self.args] + kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()] + all_args = ", ".join(args + kwargs) + return "{}.{}({})".format(model, self.method_name, all_args) + def db_record(self): return self.db_record_from_uuid(self.env, self.uuid) @@ -595,6 +597,11 @@ def func(self): recordset = recordset.with_company(self.company_id) return getattr(recordset, self.method_name) + @property + def job_function_name(self): + func_model = self.env["queue.job.function"].sudo() + return func_model.job_function_name(self.recordset._name, self.method_name) + @property def identity_key(self): if self._identity_key is None: @@ -652,6 +659,14 @@ def eta(self, value): else: self._eta = value + @property + def channel(self): + return self._channel or self.job_config.channel + + @channel.setter + def channel(self, value): + self._channel = value + def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index a80eb2f9af..b360bdc502 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -37,27 +37,22 @@ class QueueJob(models.Model): "date_created", "model_name", "method_name", + "func_string", + "channel_method_name", + "job_function_id", "records", "args", "kwargs", ) uuid = fields.Char(string="UUID", readonly=True, index=True, required=True) - user_id = fields.Many2one( - comodel_name="res.users", - string="User ID", - compute="_compute_user_id", - inverse="_inverse_user_id", - store=True, - ) + user_id = fields.Many2one(comodel_name="res.users", string="User ID") company_id = fields.Many2one( comodel_name="res.company", string="Company", index=True ) name = fields.Char(string="Description", readonly=True) - model_name = fields.Char( - string="Model", compute="_compute_model_name", store=True, readonly=True - ) + model_name = fields.Char(string="Model", readonly=True) method_name = fields.Char(readonly=True) # record_ids field is only for backward compatibility (e.g. used in related # actions), can be removed (replaced by "records") in 14.0 @@ -69,9 +64,7 @@ class QueueJob(models.Model): ) args = JobSerialized(readonly=True, base_type=tuple) kwargs = JobSerialized(readonly=True, base_type=dict) - func_string = fields.Char( - string="Task", compute="_compute_func_string", readonly=True, store=True - ) + func_string = fields.Char(string="Task", readonly=True) state = fields.Selection(STATES, readonly=True, required=True, index=True) priority = fields.Integer() @@ -91,21 +84,13 @@ class QueueJob(models.Model): "max. retries.\n" "Retries are infinite when empty.", ) - channel_method_name = fields.Char( - readonly=True, compute="_compute_job_function", store=True - ) + # FIXME the name of this field is very confusing + channel_method_name = fields.Char(readonly=True) job_function_id = fields.Many2one( - comodel_name="queue.job.function", - compute="_compute_job_function", - string="Job Function", - readonly=True, - store=True, + comodel_name="queue.job.function", string="Job Function", readonly=True, ) - override_channel = fields.Char() - channel = fields.Char( - compute="_compute_channel", inverse="_inverse_channel", store=True, index=True - ) + channel = fields.Char(index=True) identity_key = fields.Char(readonly=True) worker_pid = fields.Integer(readonly=True) @@ -122,65 +107,18 @@ def init(self): "'enqueued') AND identity_key IS NOT NULL;" ) - @api.depends("records") - def _compute_user_id(self): - for record in self: - record.user_id = record.records.env.uid - - def _inverse_user_id(self): - for record in self.with_context(_job_edit_sentinel=self.EDIT_SENTINEL): - record.records = record.records.with_user(record.user_id.id) - - @api.depends("records") - def _compute_model_name(self): - for record in self: - record.model_name = record.records._name - @api.depends("records") def _compute_record_ids(self): for record in self: record.record_ids = record.records.ids - def _inverse_channel(self): - for record in self: - record.override_channel = record.channel - - @api.depends("job_function_id.channel_id") - def _compute_channel(self): - for record in self: - channel = ( - record.override_channel or record.job_function_id.channel or "root" - ) - if record.channel != channel: - record.channel = channel - - @api.depends("model_name", "method_name", "job_function_id.channel_id") - def _compute_job_function(self): - for record in self: - func_model = self.env["queue.job.function"] - channel_method_name = func_model.job_function_name( - record.model_name, record.method_name - ) - function = func_model.search([("name", "=", channel_method_name)], limit=1) - record.channel_method_name = channel_method_name - record.job_function_id = function - - @api.depends("model_name", "method_name", "records", "args", "kwargs") - def _compute_func_string(self): - for record in self: - model = repr(record.records) - args = [repr(arg) for arg in record.args] - kwargs = ["{}={!r}".format(key, val) for key, val in record.kwargs.items()] - all_args = ", ".join(args + kwargs) - record.func_string = "{}.{}({})".format(model, record.method_name, all_args) - @api.model_create_multi def create(self, vals_list): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: # Prevent to create a queue.job record "raw" from RPC. # ``with_delay()`` must be used. raise exceptions.AccessError( - _("Queue jobs must created by calling 'with_delay()'.") + _("Queue jobs must be created by calling 'with_delay()'.") ) return super().create(vals_list) @@ -196,10 +134,25 @@ def write(self, vals): ) ) + different_user_jobs = self.browse() + if vals.get("user_id"): + different_user_jobs = self.filtered( + lambda records: records.env.user.id != vals["user_id"] + ) + if vals.get("state") == "failed": self._message_post_on_failure() - return super().write(vals) + result = super().write(vals) + + for record in different_user_jobs: + # the user is stored in the env of the record, but we still want to + # have a stored user_id field to be able to search/groupby, so + # synchronize the env of records with user_id + super(QueueJob, record).write( + {"records": record.records.with_user(vals["user_id"])} + ) + return result def open_related_action(self): """Open the related action associated to the job""" diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index db9eea3c94..4f351659bd 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -27,7 +27,8 @@ class QueueJobFunction(models.Model): "retry_pattern " "related_action_enable " "related_action_func_name " - "related_action_kwargs ", + "related_action_kwargs " + "job_function_id ", ) def _default_channel(self): @@ -147,6 +148,7 @@ def job_default_config(self): related_action_enable=True, related_action_func_name=None, related_action_kwargs={}, + job_function_id=None, ) def _parse_retry_pattern(self): @@ -179,6 +181,7 @@ def job_config(self, name): related_action_enable=config.related_action.get("enable", True), related_action_func_name=config.related_action.get("func_name"), related_action_kwargs=config.related_action.get("kwargs", {}), + job_function_id=config.id, ) def _retry_pattern_format_error_message(self): diff --git a/queue_job/tests/test_model_job_function.py b/queue_job/tests/test_model_job_function.py index 965e26d8f2..84676fdb65 100644 --- a/queue_job/tests/test_model_job_function.py +++ b/queue_job/tests/test_model_job_function.py @@ -31,7 +31,7 @@ def test_function_job_config(self): channel = self.env["queue.job.channel"].create( {"name": "foo", "parent_id": self.env.ref("queue_job.channel_root").id} ) - self.env["queue.job.function"].create( + job_function = self.env["queue.job.function"].create( { "model_id": self.env.ref("base.model_res_users").id, "method": "read", @@ -52,5 +52,6 @@ def test_function_job_config(self): related_action_enable=True, related_action_func_name="related_action_foo", related_action_kwargs={"b": 1}, + job_function_id=job_function.id, ), ) From 65144fdaecf62f9a57b35308ceb9101bbb4671a4 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 1 Feb 2021 10:32:56 +0100 Subject: [PATCH 06/12] Remove initial create notification and follower Everytime a job is created, a mail.message "Queue Job created" is created. This is useless, as we already have the creation date and user, and nobody will ever want to receive a notification for a created job anyway. Removing the on creation auto-subscription of the user that created the job makes sense as well since we automatically subscribe the queue job managers for failures. Add the owner of the jobs to the followers on failures only as well. It allows to remove a lot of insertions of records (and of deletions when autovacuuming jobs). --- queue_job/models/queue_job.py | 10 +++++-- test_queue_job/tests/test_job.py | 47 ++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index b360bdc502..6b729c68e8 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -120,7 +120,10 @@ def create(self, vals_list): raise exceptions.AccessError( _("Queue jobs must be created by calling 'with_delay()'.") ) - return super().create(vals_list) + return super( + QueueJob, + self.with_context(mail_create_nolog=True, mail_create_nosubscribe=True), + ).create(vals_list) def write(self, vals): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: @@ -192,9 +195,10 @@ def _message_post_on_failure(self): # subscribe the users now to avoid to subscribe them # at every job creation domain = self._subscribe_users_domain() - users = self.env["res.users"].search(domain) - self.message_subscribe(partner_ids=users.mapped("partner_id").ids) + base_users = self.env["res.users"].search(domain) for record in self: + users = base_users | record.user_id + record.message_subscribe(partner_ids=users.mapped("partner_id").ids) msg = record._message_failed_job() if msg: record.message_post(body=msg, subtype_xmlid="queue_job.mt_job_failed") diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 0edcdd8d9f..868a706ebf 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -6,7 +6,6 @@ from unittest import mock import odoo.tests.common as common -from odoo import SUPERUSER_ID from odoo.addons.queue_job import identity_exact from odoo.addons.queue_job.exception import ( @@ -523,7 +522,7 @@ def test_message_when_write_fail(self): stored.write({"state": "failed"}) self.assertEqual(stored.state, FAILED) messages = stored.message_ids - self.assertEqual(len(messages), 2) + self.assertEqual(len(messages), 1) def test_follower_when_write_fail(self): """Check that inactive users doesn't are not followers even if @@ -582,6 +581,22 @@ def setUp(self): User = self.env["res.users"] Company = self.env["res.company"] Partner = self.env["res.partner"] + + main_company = self.env.ref("base.main_company") + + self.partner_user = Partner.create( + {"name": "Simple User", "email": "simple.user@example.com"} + ) + self.simple_user = User.create( + { + "partner_id": self.partner_user.id, + "company_ids": [(4, main_company.id)], + "login": "simple_user", + "name": "simple user", + "groups_id": [], + } + ) + self.other_partner_a = Partner.create( {"name": "My Company a", "is_company": True, "email": "test@tes.ttest"} ) @@ -598,7 +613,7 @@ def setUp(self): "company_id": self.other_company_a.id, "company_ids": [(4, self.other_company_a.id)], "login": "my_login a", - "name": "my user", + "name": "my user A", "groups_id": [(4, grp_queue_job_manager)], } ) @@ -618,16 +633,11 @@ def setUp(self): "company_id": self.other_company_b.id, "company_ids": [(4, self.other_company_b.id)], "login": "my_login_b", - "name": "my user 1", + "name": "my user B", "groups_id": [(4, grp_queue_job_manager)], } ) - def _subscribe_users(self, stored): - domain = stored._subscribe_users_domain() - users = self.env["res.users"].search(domain) - stored.message_subscribe(partner_ids=users.mapped("partner_id").ids) - def _create_job(self, env): self.cr.execute("delete from queue_job") env["test.queue.job"].with_delay().testing_method() @@ -673,11 +683,14 @@ def test_job_subscription(self): # queue_job.group_queue_job_manager must be followers User = self.env["res.users"] no_company_context = dict(self.env.context, company_id=None) - no_company_env = self.env(context=no_company_context) + no_company_env = self.env(user=self.simple_user, context=no_company_context) stored = self._create_job(no_company_env) - self._subscribe_users(stored) - users = User.with_context(active_test=False).search( - [("groups_id", "=", self.ref("queue_job.group_queue_job_manager"))] + stored._message_post_on_failure() + users = ( + User.search( + [("groups_id", "=", self.ref("queue_job.group_queue_job_manager"))] + ) + + stored.user_id ) self.assertEqual(len(stored.message_follower_ids), len(users)) expected_partners = [u.partner_id for u in users] @@ -691,13 +704,13 @@ def test_job_subscription(self): # jobs created for a specific company_id are followed only by # company's members company_a_context = dict(self.env.context, company_id=self.other_company_a.id) - company_a_env = self.env(context=company_a_context) + company_a_env = self.env(user=self.simple_user, context=company_a_context) stored = self._create_job(company_a_env) stored.with_user(self.other_user_a.id) - self._subscribe_users(stored) - # 2 because admin + self.other_partner_a + stored._message_post_on_failure() + # 2 because simple_user (creator of job) + self.other_partner_a self.assertEqual(len(stored.message_follower_ids), 2) - users = User.browse([SUPERUSER_ID, self.other_user_a.id]) + users = self.simple_user + self.other_user_a expected_partners = [u.partner_id for u in users] self.assertSetEqual( set(stored.message_follower_ids.mapped("partner_id")), From 88812046606c616ee6d937c2ff2ceb26432ab41b Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 10 Feb 2021 09:42:06 +0100 Subject: [PATCH 07/12] Add model in search view / group by --- queue_job/views/queue_job_views.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index a68c03d34f..2bb4d58771 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -126,6 +126,7 @@ + + From 7722313b863a34ac12d1c2125aa0339417c8cb2d Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 8 Feb 2021 16:43:43 +0100 Subject: [PATCH 08/12] queue_job: add exec time to view some stats --- queue_job/job.py | 9 +++++ .../migrations/13.0.3.7.0/pre-migration.py | 35 +++++++++++++++++++ queue_job/models/queue_job.py | 5 +++ queue_job/views/queue_job_views.xml | 28 ++++++++++++++- test_queue_job/tests/test_job.py | 9 +++-- 5 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 queue_job/migrations/13.0.3.7.0/pre-migration.py diff --git a/queue_job/job.py b/queue_job/job.py index 7bf12a0363..259bea14a2 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -529,6 +529,7 @@ def store(self): "date_enqueued": False, "date_started": False, "date_done": False, + "exec_time": False, "eta": False, "identity_key": False, "worker_pid": self.worker_pid, @@ -540,6 +541,8 @@ def store(self): vals["date_started"] = self.date_started if self.date_done: vals["date_done"] = self.date_done + if self.exec_time: + vals["exec_time"] = self.exec_time if self.eta: vals["eta"] = self.eta if self.identity_key: @@ -667,6 +670,12 @@ def channel(self): def channel(self, value): self._channel = value + @property + def exec_time(self): + if self.date_done and self.date_started: + return (self.date_done - self.date_started).total_seconds() + return None + def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None diff --git a/queue_job/migrations/13.0.3.7.0/pre-migration.py b/queue_job/migrations/13.0.3.7.0/pre-migration.py new file mode 100644 index 0000000000..c14d6800ad --- /dev/null +++ b/queue_job/migrations/13.0.3.7.0/pre-migration.py @@ -0,0 +1,35 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging + +from odoo.tools.sql import column_exists + +_logger = logging.getLogger(__name__) + + +def migrate(cr, version): + if not column_exists(cr, "queue_job", "exec_time"): + # Disable trigger otherwise the update takes ages. + cr.execute( + """ + ALTER TABLE queue_job DISABLE TRIGGER queue_job_notify; + """ + ) + cr.execute( + """ + ALTER TABLE queue_job ADD COLUMN exec_time double precision DEFAULT 0; + """ + ) + cr.execute( + """ + UPDATE + queue_job + SET + exec_time = EXTRACT(EPOCH FROM (date_done - date_started)); + """ + ) + cr.execute( + """ + ALTER TABLE queue_job ENABLE TRIGGER queue_job_notify; + """ + ) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 6b729c68e8..7a1992972a 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -75,6 +75,11 @@ class QueueJob(models.Model): date_started = fields.Datetime(string="Start Date", readonly=True) date_enqueued = fields.Datetime(string="Enqueue Time", readonly=True) date_done = fields.Datetime(readonly=True) + exec_time = fields.Float( + string="Execution Time (avg)", + group_operator="avg", + help="Time required to execute this job in seconds. Average when grouped.", + ) eta = fields.Datetime(string="Execute only after") retry = fields.Integer(string="Current try") diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 2bb4d58771..59d2fa01df 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -57,6 +57,8 @@ + + @@ -109,6 +111,7 @@ + @@ -116,6 +119,29 @@ + + queue.job.pivot + queue.job + + + + + + + + + + + queue.job.graph + queue.job + + + + + + + + queue.job.search queue.job @@ -182,7 +208,7 @@ Jobs queue.job - tree,form + tree,form,pivot,graph {'search_default_pending': 1, 'search_default_enqueued': 1, 'search_default_started': 1, diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 868a706ebf..d2d886b090 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -148,14 +148,16 @@ def test_worker_pid(self): def test_set_done(self): job_a = Job(self.method) + job_a.date_started = datetime(2015, 3, 15, 16, 40, 0) datetime_path = "odoo.addons.queue_job.job.datetime" with mock.patch(datetime_path, autospec=True) as mock_datetime: mock_datetime.now.return_value = datetime(2015, 3, 15, 16, 41, 0) job_a.set_done(result="test") - self.assertEqual(job_a.state, DONE) - self.assertEqual(job_a.result, "test") - self.assertEqual(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEquals(job_a.state, DONE) + self.assertEquals(job_a.result, "test") + self.assertEquals(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEquals(job_a.exec_time, 60.0) self.assertFalse(job_a.exc_info) def test_set_failed(self): @@ -275,6 +277,7 @@ def test_read(self): self.assertAlmostEqual(job_read.date_started, test_date, delta=delta) self.assertAlmostEqual(job_read.date_enqueued, test_date, delta=delta) self.assertAlmostEqual(job_read.date_done, test_date, delta=delta) + self.assertAlmostEqual(job_read.exec_time, 0.0) def test_job_unlinked(self): test_job = Job(self.method, args=("o", "k"), kwargs={"c": "!"}) From afd597f23fa70e9a4918bac8162dd8bb0b1e91eb Mon Sep 17 00:00:00 2001 From: Wolfgang Pichler Date: Thu, 19 Nov 2020 08:16:47 +0100 Subject: [PATCH 09/12] Fix missing rollback on retried jobs When RetryableJobError was raised, any change done by the job was not rollbacked. Using `raise` would throw the exception up to the core and rollback, but we would have a stack trace in the logs for each try. Calling rollback manually (rollback also clears the env) hide the tracebacks, however, when the last try fails, the full traceback is still shown in the logs. Fixes #261 --- queue_job/controllers/main.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index f42af7beaa..2ef6e4144a 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -74,10 +74,10 @@ def retry_postpone(job, message, seconds=None): if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise - retry_postpone( - job, tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY - ) _logger.debug("%s OperationalError, postponed", job) + raise RetryableJobError( + tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY + ) except NothingToDoJob as err: if str(err): @@ -92,6 +92,10 @@ def retry_postpone(job, message, seconds=None): # delay the job later, requeue retry_postpone(job, str(err), seconds=err.seconds) _logger.debug("%s postponed", job) + # Do not trigger the error up because we don't want an exception + # traceback in the logs we should have the traceback when all + # retries are exhausted + env.cr.rollback() except (FailedJobError, Exception): buff = StringIO() From be8342017d32eb482c00846542354e8ab9ab374c Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Wed, 10 Feb 2021 12:01:51 +0100 Subject: [PATCH 10/12] Fix date_done set when state changes back to pending When Job.date_done has been set, for instance because the job has been performed, if the job is set back to pending (e.g. a RetryableJobError is raised), the date_done is kept. --- queue_job/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/job.py b/queue_job/job.py index 259bea14a2..a5d5513adc 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -680,6 +680,7 @@ def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None self.date_started = None + self.date_done = None self.worker_pid = None if reset_retry: self.retry = 0 From 86a2439d6c9a0b890602793d371dfaf9cd3eec3a Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Mon, 29 Mar 2021 08:39:28 +0200 Subject: [PATCH 11/12] queue_job: close buffer when done --- queue_job/controllers/main.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 2ef6e4144a..8732d7c700 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -9,6 +9,7 @@ from psycopg2 import OperationalError from werkzeug.exceptions import Forbidden +import odoo from odoo import SUPERUSER_ID, _, api, http, registry, tools from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY @@ -102,10 +103,13 @@ def retry_postpone(job, message, seconds=None): traceback.print_exc(file=buff) _logger.error(buff.getvalue()) job.env.clear() - with registry(job.env.cr.dbname).cursor() as new_cr: - job.env = api.Environment(new_cr, SUPERUSER_ID, {}) - job.set_failed(exc_info=buff.getvalue()) - job.store() + with odoo.api.Environment.manage(): + with odoo.registry(job.env.cr.dbname).cursor() as new_cr: + job.env = job.env(cr=new_cr) + job.set_failed(exc_info=buff.getvalue()) + job.store() + new_cr.commit() + buff.close() raise return "" From ef7222acb856c3d6cb84cd9545d6787de7224a61 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 31 May 2021 21:34:19 +0200 Subject: [PATCH 12/12] Fix display on exec_time on tree view as seconds The float_time widget shows hours seconds, we only want seconds. The widget had been removed on the form view, but not on the tree view. --- queue_job/views/queue_job_views.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 59d2fa01df..a8b9a4ab92 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -112,6 +112,8 @@ + +