Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ Configuration
* ``started_delta``: Spent time in minutes after which a started job is considered stuck.
This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration.
Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter.
* These parameters are also configurable on a channel by channel basis, if set to 0 it will use the global value.
If set to -1 it will work the same as above.

.. code-block:: python

Expand Down
71 changes: 56 additions & 15 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ def autovacuum(self):
break
return True

def _get_limit_time_real_delta(self):
return (config["limit_time_real"] // 60) + 1

def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
"""Fix jobs that are in a bad states

Expand All @@ -430,37 +433,75 @@ def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
-1 will use `--limit-time-real` config value
"""
if started_delta == -1:
started_delta = (config["limit_time_real"] // 60) + 1
started_delta = self._get_limit_time_real_delta()
return self._get_stuck_jobs_to_requeue(
enqueued_delta=enqueued_delta, started_delta=started_delta
).requeue()

def _get_stuck_jobs_domain(self, queue_dl, started_dl):
def _enqueued_domain(enqueued_dl):
return [
"&",
(
"date_enqueued",
"<=",
fields.Datetime.to_string(enqueued_dl),
),
("state", "=", "enqueued"),
]

def _started_domain(started_dl):
return [
"&",
(
"date_started",
"<=",
fields.Datetime.to_string(started_dl),
),
("state", "=", "started"),
]

domain = []
now = fields.datetime.now()
if queue_dl:
queue_dl = now - timedelta(minutes=queue_dl)
domain.append(
searched_channels = []
for channel in self.env["queue.job.channel"].search([]):
channel_domain = []
channel_queue_dl = channel.enqueued_delta or queue_dl
if channel_queue_dl == -1:
channel_queue_dl = self._get_limit_time_real_delta()
if channel_queue_dl:
channel_queue_dl = now - timedelta(minutes=channel_queue_dl)
channel_domain.append(_enqueued_domain(channel_queue_dl))
channel_started_dl = channel.started_delta or started_dl
if channel_started_dl == -1:
channel_started_dl = self._get_limit_time_real_delta()
if channel_started_dl:
channel_started_dl = now - timedelta(minutes=channel_started_dl)
channel_domain.append(_started_domain(channel_started_dl))
searched_channels.append(channel.complete_name)
channel_domain = expression.AND(
[
"&",
("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)),
("state", "=", "enqueued"),
expression.OR(channel_domain),
[("channel", "=", channel.complete_name)],
]
)
domain = expression.OR([domain, channel_domain])
other_domain = []
if queue_dl:
queue_dl = now - timedelta(minutes=queue_dl)
other_domain.append(_enqueued_domain(queue_dl))
if started_dl:
started_dl = now - timedelta(minutes=started_dl)
domain.append(
[
"&",
("date_started", "<=", fields.Datetime.to_string(started_dl)),
("state", "=", "started"),
]
)
other_domain.append(_started_domain(started_dl))
other_domain = expression.AND(
[expression.OR(other_domain), [("channel", "not in", searched_channels)]]
)
domain = expression.OR([domain, other_domain])
if not domain:
raise exceptions.ValidationError(
_("If both parameters are 0, ALL jobs will be requeued!")
)
return expression.OR(domain)
return domain

def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta):
job_model = self.env["queue.job"]
Expand Down
14 changes: 14 additions & 0 deletions queue_job/models/queue_job_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ class QueueJobChannel(models.Model):
removal_interval = fields.Integer(
default=lambda self: self.env["queue.job"]._removal_interval, required=True
)
enqueued_delta = fields.Integer(
help="If this value is set (in minutes), it will take preference over the "
"enqueued_delta parameter in the 'Jobs Garbage Collector' scheduled action. "
"This action is responsible for requeuing jobs stuck in the 'enqueued' "
"state for more than the configured time. If set to -1 it will use "
"the server's --limit-time-real config parameter.",
)
started_delta = fields.Integer(
help="If this value is set (in minutes), it will take preference over the "
"started_delta parameter in the 'Jobs Garbage Collector' scheduled action. "
"This action is responsible for requeuing jobs stuck in the 'started' "
"state for more than the configured time. If set to -1 it will use "
"the server's --limit-time-real config parameter.",
)

_sql_constraints = [
("name_uniq", "unique(complete_name)", "Channel complete name must be unique")
Expand Down
2 changes: 2 additions & 0 deletions queue_job/readme/CONFIGURE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
* ``started_delta``: Spent time in minutes after which a started job is considered stuck.
This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration.
Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter.
* These parameters are also configurable on a channel by channel basis, if set to 0 it will use the global value.
If set to -1 it will work the same as above.

.. code-block:: python

Expand Down
2 changes: 2 additions & 0 deletions queue_job/static/description/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ <h1><a class="toc-backref" href="#toc-entry-2">Configuration</a></h1>
<li><tt class="docutils literal">started_delta</tt>: Spent time in minutes after which a started job is considered stuck.
This parameter should not be less than <tt class="docutils literal"><span class="pre">--limit-time-real</span> // 60</tt> parameter in your configuration.
Set it to 0 to disable this check. Set it to -1 to automate it, based in the server’s <tt class="docutils literal"><span class="pre">--limit-time-real</span></tt> config parameter.</li>
<li>These parameters are also configurable on a channel by channel basis, if set to 0 it will use the global value.
If set to -1 it will work the same as above.</li>
</ul>
<pre class="code python literal-block">
<span class="c1"># `model` corresponds to 'queue.job' model</span><span class="w">
Expand Down
104 changes: 104 additions & 0 deletions queue_job/tests/test_model_job_channel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# copyright 2018 Camptocamp
# license lgpl-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import uuid
from datetime import timedelta

from psycopg2 import IntegrityError

import odoo
Expand Down Expand Up @@ -57,3 +60,104 @@ def test_channel_name_get(self):
{"name": "test", "parent_id": self.root_channel.id}
)
self.assertEqual(channel.name_get(), [(channel.id, "root.test")])

def _create_job(self, channel_name):
return (
self.env["queue.job"]
.with_context(
_job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL,
)
.create(
{
"uuid": str(uuid.uuid4()),
"user_id": self.env.user.id,
"state": "pending",
"model_name": "queue.job",
"method_name": "write",
"args": (),
"channel": channel_name,
}
)
)

def test_requeue_stuck_jobs(self):
def _update_started_job_date(job, minutes):
date = odoo.fields.datetime.now() - timedelta(minutes=minutes)
job.write({"state": "started", "date_started": date})
self.assertEqual(job.state, "started")

def _update_enqueued_job_date(job, minutes):
date = odoo.fields.datetime.now() - timedelta(minutes=minutes)
job.write({"state": "enqueued", "date_enqueued": date})
self.assertEqual(job.state, "enqueued")

channel_1 = self.Channel.create(
{"name": "test", "parent_id": self.root_channel.id}
)
channel_2 = self.Channel.create(
{"name": "test2", "parent_id": self.root_channel.id}
)
job = self._create_job("root.test")
job_2 = self._create_job("root.test2")
self.assertEqual(job.channel, "root.test")
self.assertEqual(job_2.channel, "root.test2")
self.assertEqual(job.state, "pending")
self.assertEqual(job_2.state, "pending")
# Started
# Global config
_update_started_job_date(job, 10)
_update_started_job_date(job_2, 20)
self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=0, started_delta=15)
self.assertEqual(job.state, "started")
self.assertEqual(job_2.state, "pending")
# Per channel config
_update_started_job_date(job, 10)
_update_started_job_date(job_2, 20)
channel_1.write({"started_delta": 5})
channel_2.write({"started_delta": 25})
self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=0, started_delta=15)
self.assertEqual(job.state, "pending")
self.assertEqual(job_2.state, "started")
# Mixed
channel_1.write({"started_delta": 0})
channel_2.write({"started_delta": 25})
_update_started_job_date(job, 20)
_update_started_job_date(job_2, 20)
self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=0, started_delta=15)
self.assertEqual(job.state, "pending")
self.assertEqual(job_2.state, "started")
# Enqueued
# Global config
_update_enqueued_job_date(job, 10)
_update_enqueued_job_date(job_2, 20)
self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=15, started_delta=0)
self.assertEqual(job.state, "enqueued")
self.assertEqual(job_2.state, "pending")
# Per channel config
_update_enqueued_job_date(job, 10)
_update_enqueued_job_date(job_2, 20)
channel_1.write({"enqueued_delta": 5})
channel_2.write({"enqueued_delta": 25})
self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=15, started_delta=0)
self.assertEqual(job.state, "pending")
self.assertEqual(job_2.state, "enqueued")
# Mixed
channel_1.write({"enqueued_delta": 0})
channel_2.write({"enqueued_delta": 25})
_update_enqueued_job_date(job, 20)
_update_enqueued_job_date(job_2, 20)
self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=15, started_delta=0)
self.assertEqual(job.state, "pending")
self.assertEqual(job_2.state, "enqueued")
# job without queue.job.channel record for its channel
# it uses the global value
job_3 = self._create_job("root.test3")
channel_1.write({"started_delta": 50})
channel_2.write({"enqueued_delta": 50})
_update_started_job_date(job, 10)
_update_enqueued_job_date(job_2, 20)
_update_started_job_date(job_3, 30)
self.env["queue.job"].requeue_stuck_jobs(enqueued_delta=5, started_delta=5)
self.assertEqual(job.state, "started")
self.assertEqual(job_2.state, "enqueued")
self.assertEqual(job_3.state, "pending")
2 changes: 2 additions & 0 deletions queue_job/views/queue_job_channel_views.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
/>
<field name="complete_name" />
<field name="removal_interval" />
<field name="enqueued_delta" />
<field name="started_delta" />
</group>
<group>
<field name="job_function_ids" widget="many2many_tags" />
Expand Down