diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index d877254bfe..8732d7c700 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -9,6 +9,7 @@ from psycopg2 import OperationalError from werkzeug.exceptions import Forbidden +import odoo from odoo import SUPERUSER_ID, _, api, http, registry, tools from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY @@ -74,10 +75,10 @@ def retry_postpone(job, message, seconds=None): if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise - retry_postpone( - job, tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY - ) _logger.debug("%s OperationalError, postponed", job) + raise RetryableJobError( + tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY + ) except NothingToDoJob as err: if str(err): @@ -92,23 +93,30 @@ def retry_postpone(job, message, seconds=None): # delay the job later, requeue retry_postpone(job, str(err), seconds=err.seconds) _logger.debug("%s postponed", job) + # Do not trigger the error up because we don't want an exception + # traceback in the logs we should have the traceback when all + # retries are exhausted + env.cr.rollback() except (FailedJobError, Exception): buff = StringIO() traceback.print_exc(file=buff) _logger.error(buff.getvalue()) job.env.clear() - with registry(job.env.cr.dbname).cursor() as new_cr: - job.env = api.Environment(new_cr, SUPERUSER_ID, {}) - job.set_failed(exc_info=buff.getvalue()) - job.store() + with odoo.api.Environment.manage(): + with odoo.registry(job.env.cr.dbname).cursor() as new_cr: + job.env = job.env(cr=new_cr) + job.set_failed(exc_info=buff.getvalue()) + job.store() + new_cr.commit() + buff.close() raise return "" @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( - self, priority=None, max_retries=None, channel="root", description="Test job" + self, priority=None, max_retries=None, channel=None, description="Test job" ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) diff --git a/queue_job/job.py b/queue_job/job.py index 4ee90390e6..a5d5513adc 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -441,13 +441,7 @@ def __init__( self.job_model_name = "queue.job" self.job_config = ( - self.env["queue.job.function"] - .sudo() - .job_config( - self.env["queue.job.function"].job_function_name( - self.model_name, self.method_name - ) - ) + self.env["queue.job.function"].sudo().job_config(self.job_function_name) ) self.state = PENDING @@ -535,6 +529,7 @@ def store(self): "date_enqueued": False, "date_started": False, "date_done": False, + "exec_time": False, "eta": False, "identity_key": False, "worker_pid": self.worker_pid, @@ -546,6 +541,8 @@ def store(self): vals["date_started"] = self.date_started if self.date_done: vals["date_done"] = self.date_done + if self.exec_time: + vals["exec_time"] = self.exec_time if self.eta: vals["eta"] = self.eta if self.identity_key: @@ -560,35 +557,54 @@ def store(self): if db_record: db_record.with_context(_job_edit_sentinel=edit_sentinel).write(vals) else: - date_created = self.date_created - # The following values must never be modified after the - # creation of the job vals.update( { + "user_id": self.env.uid, + "channel": self.channel, + # The following values must never be modified after the + # creation of the job "uuid": self.uuid, "name": self.description, - "date_created": date_created, + "func_string": self.func_string, + "date_created": self.date_created, + "model_name": self.recordset._name, "method_name": self.method_name, + "job_function_id": self.job_config.job_function_id, + "channel_method_name": self.job_function_name, "records": self.recordset, "args": self.args, "kwargs": self.kwargs, } ) - # it the channel is not specified, lets the job_model compute - # the right one to use - if self.channel: - vals.update({"channel": self.channel}) - job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create(vals) + @property + def func_string(self): + model = repr(self.recordset) + args = [repr(arg) for arg in self.args] + kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()] + all_args = ", ".join(args + kwargs) + return "{}.{}({})".format(model, self.method_name, all_args) + def db_record(self): return self.db_record_from_uuid(self.env, self.uuid) @property def func(self): + # We can fill only one company into allowed_company_ids. + # Because if you have many, you can have unexpected records due to ir.rule. + # ir.rule use allowed_company_ids to load every records in many companies. + # But most of the time, a job should be executed on a single company. recordset = self.recordset.with_context(job_uuid=self.uuid) + if self.company_id: + recordset = recordset.with_company(self.company_id) return getattr(recordset, self.method_name) + @property + def job_function_name(self): + func_model = self.env["queue.job.function"].sudo() + return func_model.job_function_name(self.recordset._name, self.method_name) + @property def identity_key(self): if self._identity_key is None: @@ -646,10 +662,25 @@ def eta(self, value): else: self._eta = value + @property + def channel(self): + return self._channel or self.job_config.channel + + @channel.setter + def channel(self, value): + self._channel = value + + @property + def exec_time(self): + if self.date_done and self.date_started: + return (self.date_done - self.date_started).total_seconds() + return None + def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None self.date_started = None + self.date_done = None self.worker_pid = None if reset_retry: self.retry = 0 diff --git a/queue_job/migrations/13.0.3.7.0/pre-migration.py b/queue_job/migrations/13.0.3.7.0/pre-migration.py new file mode 100644 index 0000000000..c14d6800ad --- /dev/null +++ b/queue_job/migrations/13.0.3.7.0/pre-migration.py @@ -0,0 +1,35 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging + +from odoo.tools.sql import column_exists + +_logger = logging.getLogger(__name__) + + +def migrate(cr, version): + if not column_exists(cr, "queue_job", "exec_time"): + # Disable trigger otherwise the update takes ages. + cr.execute( + """ + ALTER TABLE queue_job DISABLE TRIGGER queue_job_notify; + """ + ) + cr.execute( + """ + ALTER TABLE queue_job ADD COLUMN exec_time double precision DEFAULT 0; + """ + ) + cr.execute( + """ + UPDATE + queue_job + SET + exec_time = EXTRACT(EPOCH FROM (date_done - date_started)); + """ + ) + cr.execute( + """ + ALTER TABLE queue_job ENABLE TRIGGER queue_job_notify; + """ + ) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 157e4d8cbc..7a1992972a 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -37,27 +37,22 @@ class QueueJob(models.Model): "date_created", "model_name", "method_name", + "func_string", + "channel_method_name", + "job_function_id", "records", "args", "kwargs", ) uuid = fields.Char(string="UUID", readonly=True, index=True, required=True) - user_id = fields.Many2one( - comodel_name="res.users", - string="User ID", - compute="_compute_user_id", - inverse="_inverse_user_id", - store=True, - ) + user_id = fields.Many2one(comodel_name="res.users", string="User ID") company_id = fields.Many2one( comodel_name="res.company", string="Company", index=True ) name = fields.Char(string="Description", readonly=True) - model_name = fields.Char( - string="Model", compute="_compute_model_name", store=True, readonly=True - ) + model_name = fields.Char(string="Model", readonly=True) method_name = fields.Char(readonly=True) # record_ids field is only for backward compatibility (e.g. used in related # actions), can be removed (replaced by "records") in 14.0 @@ -69,9 +64,7 @@ class QueueJob(models.Model): ) args = JobSerialized(readonly=True, base_type=tuple) kwargs = JobSerialized(readonly=True, base_type=dict) - func_string = fields.Char( - string="Task", compute="_compute_func_string", readonly=True, store=True - ) + func_string = fields.Char(string="Task", readonly=True) state = fields.Selection(STATES, readonly=True, required=True, index=True) priority = fields.Integer() @@ -82,6 +75,11 @@ class QueueJob(models.Model): date_started = fields.Datetime(string="Start Date", readonly=True) date_enqueued = fields.Datetime(string="Enqueue Time", readonly=True) date_done = fields.Datetime(readonly=True) + exec_time = fields.Float( + string="Execution Time (avg)", + group_operator="avg", + help="Time required to execute this job in seconds. Average when grouped.", + ) eta = fields.Datetime(string="Execute only after") retry = fields.Integer(string="Current try") @@ -91,24 +89,16 @@ class QueueJob(models.Model): "max. retries.\n" "Retries are infinite when empty.", ) - channel_method_name = fields.Char( - readonly=True, compute="_compute_job_function", store=True - ) + # FIXME the name of this field is very confusing + channel_method_name = fields.Char(readonly=True) job_function_id = fields.Many2one( - comodel_name="queue.job.function", - compute="_compute_job_function", - string="Job Function", - readonly=True, - store=True, + comodel_name="queue.job.function", string="Job Function", readonly=True, ) - override_channel = fields.Char() - channel = fields.Char( - compute="_compute_channel", inverse="_inverse_channel", store=True, index=True - ) + channel = fields.Char(index=True) - identity_key = fields.Char() - worker_pid = fields.Integer() + identity_key = fields.Char(readonly=True) + worker_pid = fields.Integer(readonly=True) def init(self): self._cr.execute( @@ -122,67 +112,23 @@ def init(self): "'enqueued') AND identity_key IS NOT NULL;" ) - @api.depends("records") - def _compute_user_id(self): - for record in self: - record.user_id = record.records.env.uid - - def _inverse_user_id(self): - for record in self.with_context(_job_edit_sentinel=self.EDIT_SENTINEL): - record.records = record.records.with_user(record.user_id.id) - - @api.depends("records") - def _compute_model_name(self): - for record in self: - record.model_name = record.records._name - @api.depends("records") def _compute_record_ids(self): for record in self: record.record_ids = record.records.ids - def _inverse_channel(self): - for record in self: - record.override_channel = record.channel - - @api.depends("job_function_id.channel_id") - def _compute_channel(self): - for record in self: - channel = ( - record.override_channel or record.job_function_id.channel or "root" - ) - if record.channel != channel: - record.channel = channel - - @api.depends("model_name", "method_name", "job_function_id.channel_id") - def _compute_job_function(self): - for record in self: - func_model = self.env["queue.job.function"] - channel_method_name = func_model.job_function_name( - record.model_name, record.method_name - ) - function = func_model.search([("name", "=", channel_method_name)], limit=1) - record.channel_method_name = channel_method_name - record.job_function_id = function - - @api.depends("model_name", "method_name", "records", "args", "kwargs") - def _compute_func_string(self): - for record in self: - model = repr(record.records) - args = [repr(arg) for arg in record.args] - kwargs = ["{}={!r}".format(key, val) for key, val in record.kwargs.items()] - all_args = ", ".join(args + kwargs) - record.func_string = "{}.{}({})".format(model, record.method_name, all_args) - @api.model_create_multi def create(self, vals_list): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: # Prevent to create a queue.job record "raw" from RPC. # ``with_delay()`` must be used. raise exceptions.AccessError( - _("Queue jobs must created by calling 'with_delay()'.") + _("Queue jobs must be created by calling 'with_delay()'.") ) - return super().create(vals_list) + return super( + QueueJob, + self.with_context(mail_create_nolog=True, mail_create_nosubscribe=True), + ).create(vals_list) def write(self, vals): if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: @@ -196,10 +142,25 @@ def write(self, vals): ) ) + different_user_jobs = self.browse() + if vals.get("user_id"): + different_user_jobs = self.filtered( + lambda records: records.env.user.id != vals["user_id"] + ) + if vals.get("state") == "failed": self._message_post_on_failure() - return super().write(vals) + result = super().write(vals) + + for record in different_user_jobs: + # the user is stored in the env of the record, but we still want to + # have a stored user_id field to be able to search/groupby, so + # synchronize the env of records with user_id + super(QueueJob, record).write( + {"records": record.records.with_user(vals["user_id"])} + ) + return result def open_related_action(self): """Open the related action associated to the job""" @@ -239,9 +200,10 @@ def _message_post_on_failure(self): # subscribe the users now to avoid to subscribe them # at every job creation domain = self._subscribe_users_domain() - users = self.env["res.users"].search(domain) - self.message_subscribe(partner_ids=users.mapped("partner_id").ids) + base_users = self.env["res.users"].search(domain) for record in self: + users = base_users | record.user_id + record.message_subscribe(partner_ids=users.mapped("partner_id").ids) msg = record._message_failed_job() if msg: record.message_post(body=msg, subtype_xmlid="queue_job.mt_job_failed") diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index db9eea3c94..4f351659bd 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -27,7 +27,8 @@ class QueueJobFunction(models.Model): "retry_pattern " "related_action_enable " "related_action_func_name " - "related_action_kwargs ", + "related_action_kwargs " + "job_function_id ", ) def _default_channel(self): @@ -147,6 +148,7 @@ def job_default_config(self): related_action_enable=True, related_action_func_name=None, related_action_kwargs={}, + job_function_id=None, ) def _parse_retry_pattern(self): @@ -179,6 +181,7 @@ def job_config(self, name): related_action_enable=config.related_action.get("enable", True), related_action_func_name=config.related_action.get("func_name"), related_action_kwargs=config.related_action.get("kwargs", {}), + job_function_id=config.id, ) def _retry_pattern_format_error_message(self): diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index 6c472eccf9..66827e716a 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -25,7 +25,7 @@ Example of job function: .. code-block:: XML - + action_done @@ -43,6 +43,13 @@ they have different xmlids. On uninstall, the merged record is deleted when all the modules using it are uninstalled. +**Job function: model** + +If the function is defined in an abstract model, you can not write +```` +but you have to define a function for each model that inherits from the abstract model. + + **Job function: channel** The channel where the job will be delayed. The default channel is ``root``. diff --git a/queue_job/tests/test_model_job_function.py b/queue_job/tests/test_model_job_function.py index 965e26d8f2..84676fdb65 100644 --- a/queue_job/tests/test_model_job_function.py +++ b/queue_job/tests/test_model_job_function.py @@ -31,7 +31,7 @@ def test_function_job_config(self): channel = self.env["queue.job.channel"].create( {"name": "foo", "parent_id": self.env.ref("queue_job.channel_root").id} ) - self.env["queue.job.function"].create( + job_function = self.env["queue.job.function"].create( { "model_id": self.env.ref("base.model_res_users").id, "method": "read", @@ -52,5 +52,6 @@ def test_function_job_config(self): related_action_enable=True, related_action_func_name="related_action_foo", related_action_kwargs={"b": 1}, + job_function_id=job_function.id, ), ) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index a68c03d34f..a8b9a4ab92 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -57,6 +57,8 @@ + + @@ -109,6 +111,9 @@ + + + @@ -116,6 +121,29 @@ + + queue.job.pivot + queue.job + + + + + + + + + + + queue.job.graph + queue.job + + + + + + + + queue.job.search queue.job @@ -126,6 +154,7 @@ + + @@ -176,7 +210,7 @@ Jobs queue.job - tree,form + tree,form,pivot,graph {'search_default_pending': 1, 'search_default_enqueued': 1, 'search_default_started': 1, diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index ad9d2b37f5..d2d886b090 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -6,7 +6,6 @@ from unittest import mock import odoo.tests.common as common -from odoo import SUPERUSER_ID from odoo.addons.queue_job import identity_exact from odoo.addons.queue_job.exception import ( @@ -149,14 +148,16 @@ def test_worker_pid(self): def test_set_done(self): job_a = Job(self.method) + job_a.date_started = datetime(2015, 3, 15, 16, 40, 0) datetime_path = "odoo.addons.queue_job.job.datetime" with mock.patch(datetime_path, autospec=True) as mock_datetime: mock_datetime.now.return_value = datetime(2015, 3, 15, 16, 41, 0) job_a.set_done(result="test") - self.assertEqual(job_a.state, DONE) - self.assertEqual(job_a.result, "test") - self.assertEqual(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEquals(job_a.state, DONE) + self.assertEquals(job_a.result, "test") + self.assertEquals(job_a.date_done, datetime(2015, 3, 15, 16, 41, 0)) + self.assertEquals(job_a.exec_time, 60.0) self.assertFalse(job_a.exc_info) def test_set_failed(self): @@ -176,6 +177,49 @@ def test_postpone(self): self.assertEqual(job_a.result, "test") self.assertFalse(job_a.exc_info) + def test_company_simple(self): + company = self.env.ref("base.main_company") + eta = datetime.now() + timedelta(hours=5) + test_job = Job( + self.method, + args=("o", "k"), + kwargs={"return_context": 1}, + priority=15, + eta=eta, + description="My description", + ) + test_job.worker_pid = 99999 # normally set on "set_start" + test_job.company_id = company.id + test_job.store() + job_read = Job.load(self.env, test_job.uuid) + self.assertEqual(test_job.func, job_read.func) + result_ctx = test_job.func(*tuple(test_job.args), **test_job.kwargs) + self.assertEqual(result_ctx.get("allowed_company_ids"), company.ids) + + def test_company_complex(self): + company1 = self.env.ref("base.main_company") + company2 = company1.create({"name": "Queue job company"}) + companies = company1 | company2 + self.env.user.write({"company_ids": [(6, False, companies.ids)]}) + # Ensure the main company still the first + self.assertEqual(self.env.user.company_id, company1) + eta = datetime.now() + timedelta(hours=5) + test_job = Job( + self.method, + args=("o", "k"), + kwargs={"return_context": 1}, + priority=15, + eta=eta, + description="My description", + ) + test_job.worker_pid = 99999 # normally set on "set_start" + test_job.company_id = company2.id + test_job.store() + job_read = Job.load(self.env, test_job.uuid) + self.assertEqual(test_job.func, job_read.func) + result_ctx = test_job.func(*tuple(test_job.args), **test_job.kwargs) + self.assertEqual(result_ctx.get("allowed_company_ids"), company2.ids) + def test_store(self): test_job = Job(self.method) test_job.store() @@ -233,6 +277,7 @@ def test_read(self): self.assertAlmostEqual(job_read.date_started, test_date, delta=delta) self.assertAlmostEqual(job_read.date_enqueued, test_date, delta=delta) self.assertAlmostEqual(job_read.date_done, test_date, delta=delta) + self.assertAlmostEqual(job_read.exec_time, 0.0) def test_job_unlinked(self): test_job = Job(self.method, args=("o", "k"), kwargs={"c": "!"}) @@ -480,7 +525,7 @@ def test_message_when_write_fail(self): stored.write({"state": "failed"}) self.assertEqual(stored.state, FAILED) messages = stored.message_ids - self.assertEqual(len(messages), 2) + self.assertEqual(len(messages), 1) def test_follower_when_write_fail(self): """Check that inactive users doesn't are not followers even if @@ -539,6 +584,22 @@ def setUp(self): User = self.env["res.users"] Company = self.env["res.company"] Partner = self.env["res.partner"] + + main_company = self.env.ref("base.main_company") + + self.partner_user = Partner.create( + {"name": "Simple User", "email": "simple.user@example.com"} + ) + self.simple_user = User.create( + { + "partner_id": self.partner_user.id, + "company_ids": [(4, main_company.id)], + "login": "simple_user", + "name": "simple user", + "groups_id": [], + } + ) + self.other_partner_a = Partner.create( {"name": "My Company a", "is_company": True, "email": "test@tes.ttest"} ) @@ -555,7 +616,7 @@ def setUp(self): "company_id": self.other_company_a.id, "company_ids": [(4, self.other_company_a.id)], "login": "my_login a", - "name": "my user", + "name": "my user A", "groups_id": [(4, grp_queue_job_manager)], } ) @@ -575,16 +636,11 @@ def setUp(self): "company_id": self.other_company_b.id, "company_ids": [(4, self.other_company_b.id)], "login": "my_login_b", - "name": "my user 1", + "name": "my user B", "groups_id": [(4, grp_queue_job_manager)], } ) - def _subscribe_users(self, stored): - domain = stored._subscribe_users_domain() - users = self.env["res.users"].search(domain) - stored.message_subscribe(partner_ids=users.mapped("partner_id").ids) - def _create_job(self, env): self.cr.execute("delete from queue_job") env["test.queue.job"].with_delay().testing_method() @@ -630,11 +686,14 @@ def test_job_subscription(self): # queue_job.group_queue_job_manager must be followers User = self.env["res.users"] no_company_context = dict(self.env.context, company_id=None) - no_company_env = self.env(context=no_company_context) + no_company_env = self.env(user=self.simple_user, context=no_company_context) stored = self._create_job(no_company_env) - self._subscribe_users(stored) - users = User.with_context(active_test=False).search( - [("groups_id", "=", self.ref("queue_job.group_queue_job_manager"))] + stored._message_post_on_failure() + users = ( + User.search( + [("groups_id", "=", self.ref("queue_job.group_queue_job_manager"))] + ) + + stored.user_id ) self.assertEqual(len(stored.message_follower_ids), len(users)) expected_partners = [u.partner_id for u in users] @@ -648,13 +707,13 @@ def test_job_subscription(self): # jobs created for a specific company_id are followed only by # company's members company_a_context = dict(self.env.context, company_id=self.other_company_a.id) - company_a_env = self.env(context=company_a_context) + company_a_env = self.env(user=self.simple_user, context=company_a_context) stored = self._create_job(company_a_env) stored.with_user(self.other_user_a.id) - self._subscribe_users(stored) - # 2 because admin + self.other_partner_a + stored._message_post_on_failure() + # 2 because simple_user (creator of job) + self.other_partner_a self.assertEqual(len(stored.message_follower_ids), 2) - users = User.browse([SUPERUSER_ID, self.other_user_a.id]) + users = self.simple_user + self.other_user_a expected_partners = [u.partner_id for u in users] self.assertSetEqual( set(stored.message_follower_ids.mapped("partner_id")),