From 057c916fdb4c2c14d71990f5d4479331c2917311 Mon Sep 17 00:00:00 2001 From: nans Date: Mon, 11 Apr 2022 14:24:34 +0200 Subject: [PATCH] [IMP] queue_job: requeue zombie jobs after hard shutdown --- queue_job/controllers/main.py | 1 + queue_job/job.py | 8 ++++++++ queue_job/jobrunner/runner.py | 35 +++++++++++++++++++---------------- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index dc1add1380..fd3a6ce312 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -59,6 +59,7 @@ def _try_perform_job(self, env, job): http.request.env.cr.commit() _logger.debug('%s started', job) + job.lock() job.perform() job.set_done() job.store() diff --git a/queue_job/job.py b/queue_job/job.py index 4247d7c1b2..7e16b6d88b 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -252,6 +252,14 @@ def load(cls, env, job_uuid): 'Job %s does no longer exist in the storage.' % job_uuid) return cls._load_from_db_record(stored) + @classmethod + def lock_jobs_by_uuids(cls, env, job_uuid_list): + query = "SELECT state FROM queue_job WHERE uuid in %s FOR UPDATE;" + env.cr.execute(query, (tuple(job_uuid_list),)) + + def lock(self): + self.lock_jobs_by_uuids(self.env, [self.uuid]) + @classmethod def _load_from_db_record(cls, job_db_record): stored = job_db_record diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 138a9bc951..33d8301db4 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -113,22 +113,6 @@ * After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it. -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - .. rubric:: Footnotes .. [1] From a security standpoint, it is safe to have an anonymous HTTP @@ -333,6 +317,24 @@ def set_job_enqueued(self, uuid): "WHERE uuid=%s", (ENQUEUED, uuid)) + def reset_dead_jobs(self): + """Set started or enqueued jobs to pending. Only run at server start.""" + # When Odoo shuts down normally, it waits for running jobs to finish. + # However, when the Odoo server crashes or is otherwise force-stopped, + # running jobs are interrupted while the runner has no chance to know + # they have been aborted. In such situations, jobs may remain in + # ``started`` or ``enqueued`` state after the Odoo server is halted. + # inspired from https://github.com/OCA/queue/issues/386 + query = """ +UPDATE queue_job SET state='pending' +WHERE uuid in ( + SELECT uuid FROM queue_job + WHERE state in ('started', 'enqueued') + FOR UPDATE SKIP LOCKED +);""" + with closing(self.conn.cursor()) as cr: + cr.execute(query) + class QueueJobRunner(object): @@ -381,6 +383,7 @@ def initialize_databases(self): _logger.debug('queue_job is not installed for db %s', db_name) else: self.db_by_name[db_name] = db + db.reset_dead_jobs() for job_data in db.select_jobs('state in %s', (NOT_DONE,)): self.channel_manager.notify(db_name, *job_data) _logger.info('queue job runner ready for db %s', db_name)