diff --git a/queue_job/README.rst b/queue_job/README.rst index 439fc6ffc1..b0c92dd241 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -144,6 +144,8 @@ Configuration * ``started_delta``: Spent time in minutes after which a started job is considered stuck. This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration. Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter. + * These parameters are also configurable on a channel by channel basis, if set to 0 it will use the global value. + If set to -1 it will work the same as above. .. code-block:: python diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 8af7468b7c..304e7faeb0 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -417,6 +417,9 @@ def autovacuum(self): break return True + def _get_limit_time_real_delta(self): + return (config["limit_time_real"] // 60) + 1 + def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0): """Fix jobs that are in a bad states @@ -430,37 +433,75 @@ def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0): -1 will use `--limit-time-real` config value """ if started_delta == -1: - started_delta = (config["limit_time_real"] // 60) + 1 + started_delta = self._get_limit_time_real_delta() return self._get_stuck_jobs_to_requeue( enqueued_delta=enqueued_delta, started_delta=started_delta ).requeue() def _get_stuck_jobs_domain(self, queue_dl, started_dl): + def _enqueued_domain(enqueued_dl): + return [ + "&", + ( + "date_enqueued", + "<=", + fields.Datetime.to_string(enqueued_dl), + ), + ("state", "=", "enqueued"), + ] + + def _started_domain(started_dl): + return [ + "&", + ( + "date_started", + "<=", + fields.Datetime.to_string(started_dl), + ), + ("state", "=", "started"), + ] + domain = [] now = fields.datetime.now() - if queue_dl: - queue_dl = now - timedelta(minutes=queue_dl) - domain.append( + searched_channels = [] + for channel in self.env["queue.job.channel"].search([]): + channel_domain = [] + channel_queue_dl = channel.enqueued_delta or queue_dl + if channel_queue_dl == -1: + channel_queue_dl = self._get_limit_time_real_delta() + if channel_queue_dl: + channel_queue_dl = now - timedelta(minutes=channel_queue_dl) + channel_domain.append(_enqueued_domain(channel_queue_dl)) + channel_started_dl = channel.started_delta or started_dl + if channel_started_dl == -1: + channel_started_dl = self._get_limit_time_real_delta() + if channel_started_dl: + channel_started_dl = now - timedelta(minutes=channel_started_dl) + channel_domain.append(_started_domain(channel_started_dl)) + searched_channels.append(channel.complete_name) + channel_domain = expression.AND( [ - "&", - ("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)), - ("state", "=", "enqueued"), + expression.OR(channel_domain), + [("channel", "=", channel.complete_name)], ] ) + domain = expression.OR([domain, channel_domain]) + other_domain = [] + if queue_dl: + queue_dl = now - timedelta(minutes=queue_dl) + other_domain.append(_enqueued_domain(queue_dl)) if started_dl: started_dl = now - timedelta(minutes=started_dl) - domain.append( - [ - "&", - ("date_started", "<=", fields.Datetime.to_string(started_dl)), - ("state", "=", "started"), - ] - ) + other_domain.append(_started_domain(started_dl)) + other_domain = expression.AND( + [expression.OR(other_domain), [("channel", "not in", searched_channels)]] + ) + domain = expression.OR([domain, other_domain]) if not domain: raise exceptions.ValidationError( _("If both parameters are 0, ALL jobs will be requeued!") ) - return expression.OR(domain) + return domain def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta): job_model = self.env["queue.job"] diff --git a/queue_job/models/queue_job_channel.py b/queue_job/models/queue_job_channel.py index 920b021261..08371ecc1c 100644 --- a/queue_job/models/queue_job_channel.py +++ b/queue_job/models/queue_job_channel.py @@ -24,6 +24,20 @@ class QueueJobChannel(models.Model): removal_interval = fields.Integer( default=lambda self: self.env["queue.job"]._removal_interval, required=True ) + enqueued_delta = fields.Integer( + help="If this value is set (in minutes), it will take preference over the " + "enqueued_delta parameter in the 'Jobs Garbage Collector' scheduled action. " + "This action is responsible for requeuing jobs stuck in the 'enqueued' " + "state for more than the configured time. If set to -1 it will use " + "the server's --limit-time-real config parameter.", + ) + started_delta = fields.Integer( + help="If this value is set (in minutes), it will take preference over the " + "started_delta parameter in the 'Jobs Garbage Collector' scheduled action. " + "This action is responsible for requeuing jobs stuck in the 'started' " + "state for more than the configured time. If set to -1 it will use " + "the server's --limit-time-real config parameter.", + ) _sql_constraints = [ ("name_uniq", "unique(complete_name)", "Channel complete name must be unique") diff --git a/queue_job/readme/CONFIGURE.rst b/queue_job/readme/CONFIGURE.rst index fdd3dd1598..7c99d5a453 100644 --- a/queue_job/readme/CONFIGURE.rst +++ b/queue_job/readme/CONFIGURE.rst @@ -54,6 +54,8 @@ * ``started_delta``: Spent time in minutes after which a started job is considered stuck. This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration. Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter. + * These parameters are also configurable on a channel by channel basis, if set to 0 it will use the global value. + If set to -1 it will work the same as above. .. code-block:: python diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index 20ec9c0e0a..257cfd78ef 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -503,6 +503,8 @@
# `model` corresponds to 'queue.job' model diff --git a/queue_job/tests/test_model_job_channel.py b/queue_job/tests/test_model_job_channel.py index 9f93305da9..efb46c50b3 100644 --- a/queue_job/tests/test_model_job_channel.py +++ b/queue_job/tests/test_model_job_channel.py @@ -1,6 +1,9 @@ # copyright 2018 Camptocamp # license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +import uuid +from datetime import timedelta + from psycopg2 import IntegrityError import odoo @@ -57,3 +60,104 @@ def test_channel_name_get(self): {"name": "test", "parent_id": self.root_channel.id} ) self.assertEqual(channel.name_get(), [(channel.id, "root.test")]) + + def _create_job(self, channel_name): + return ( + self.env["queue.job"] + .with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ) + .create( + { + "uuid": str(uuid.uuid4()), + "user_id": self.env.user.id, + "state": "pending", + "model_name": "queue.job", + "method_name": "write", + "args": (), + "channel": channel_name, + } + ) + ) + + def test_requeue_stuck_jobs(self): + def _update_started_job_date(job, minutes): + date = odoo.fields.datetime.now() - timedelta(minutes=minutes) + job.write({"state": "started", "date_started": date}) + self.assertEqual(job.state, "started") + + def _update_enqueued_job_date(job, minutes): + date = odoo.fields.datetime.now() - timedelta(minutes=minutes) + job.write({"state": "enqueued", "date_enqueued": date}) + self.assertEqual(job.state, "enqueued") + + channel_1 = self.Channel.create( + {"name": "test", "parent_id": self.root_channel.id} + ) + channel_2 = self.Channel.create( + {"name": "test2", "parent_id": self.root_channel.id} + ) + job = self._create_job("root.test") + job_2 = self._create_job("root.test2") + self.assertEqual(job.channel, "root.test") + self.assertEqual(job_2.channel, "root.test2") + self.assertEqual(job.state, "pending") + self.assertEqual(job_2.state, "pending") + # Started + # Global config + _update_started_job_date(job, 10) + _update_started_job_date(job_2, 20) + self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=0, started_delta=15) + self.assertEqual(job.state, "started") + self.assertEqual(job_2.state, "pending") + # Per channel config + _update_started_job_date(job, 10) + _update_started_job_date(job_2, 20) + channel_1.write({"started_delta": 5}) + channel_2.write({"started_delta": 25}) + self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=0, started_delta=15) + self.assertEqual(job.state, "pending") + self.assertEqual(job_2.state, "started") + # Mixed + channel_1.write({"started_delta": 0}) + channel_2.write({"started_delta": 25}) + _update_started_job_date(job, 20) + _update_started_job_date(job_2, 20) + self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=0, started_delta=15) + self.assertEqual(job.state, "pending") + self.assertEqual(job_2.state, "started") + # Enqueued + # Global config + _update_enqueued_job_date(job, 10) + _update_enqueued_job_date(job_2, 20) + self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=15, started_delta=0) + self.assertEqual(job.state, "enqueued") + self.assertEqual(job_2.state, "pending") + # Per channel config + _update_enqueued_job_date(job, 10) + _update_enqueued_job_date(job_2, 20) + channel_1.write({"enqueued_delta": 5}) + channel_2.write({"enqueued_delta": 25}) + self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=15, started_delta=0) + self.assertEqual(job.state, "pending") + self.assertEqual(job_2.state, "enqueued") + # Mixed + channel_1.write({"enqueued_delta": 0}) + channel_2.write({"enqueued_delta": 25}) + _update_enqueued_job_date(job, 20) + _update_enqueued_job_date(job_2, 20) + self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=15, started_delta=0) + self.assertEqual(job.state, "pending") + self.assertEqual(job_2.state, "enqueued") + # job without queue.job.channel record for its channel + # it uses the global value + job_3 = self._create_job("root.test3") + channel_1.write({"started_delta": 50}) + channel_2.write({"enqueued_delta": 50}) + _update_started_job_date(job, 10) + _update_enqueued_job_date(job_2, 20) + _update_started_job_date(job_3, 30) + self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=5, started_delta=5) + self.assertEqual(job.state, "started") + self.assertEqual(job_2.state, "enqueued") + self.assertEqual(job_3.state, "pending") diff --git a/queue_job/views/queue_job_channel_views.xml b/queue_job/views/queue_job_channel_views.xml index 0841a2514e..e6601f47fd 100644 --- a/queue_job/views/queue_job_channel_views.xml +++ b/queue_job/views/queue_job_channel_views.xml @@ -17,6 +17,8 @@ />+ +