From 6775e56f263bfce568fe864481ecfc1e7f189da0 Mon Sep 17 00:00:00 2001 From: nottegeo Date: Sat, 6 Feb 2021 10:29:08 +0100 Subject: [PATCH 1/2] [FIX] Fixed #261. Retryable Job Exception must also throw the exception and rollback Backport from https://github.com/OCA/queue/commit/ca2af1582f8d8f0b770afe392ada94991319d2f9 --- queue_job/controllers/main.py | 60 +++++++++++++---------------------- 1 file changed, 22 insertions(+), 38 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 843bfa6e46..de388f51da 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -13,11 +13,7 @@ from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY from ..job import Job, ENQUEUED -from ..exception import (NoSuchJobError, - NotReadableJobError, - RetryableJobError, - FailedJobError, - NothingToDoJob) +from ..exception import (RetryableJobError, FailedJobError, NothingToDoJob) _logger = logging.getLogger(__name__) @@ -26,42 +22,17 @@ class RunJobController(http.Controller): - def _load_job(self, env, job_uuid): - """Reload a job from the backend""" - try: - job = Job.load(env, job_uuid) - except NoSuchJobError: - # just skip it - job = None - except NotReadableJobError: - _logger.exception('Could not read job: %s', job_uuid) - raise - return job - def _try_perform_job(self, env, job): """Try to perform the job.""" - - # if the job has been manually set to DONE or PENDING, - # or if something tries to run a job that is not enqueued - # before its execution, stop - if job.state != ENQUEUED: - _logger.warning('job %s is in state %s ' - 'instead of enqueued in /runjob', - job.uuid, job.state) - return - - # TODO: set_started should be done atomically with - # update queue_job set=state=started - # where state=enqueid and id= job.set_started() job.store() - http.request.env.cr.commit() - + env.cr.commit() _logger.debug('%s started', job) + job.perform() job.set_done() job.store() - http.request.env.cr.commit() + env.cr.commit() _logger.debug('%s done', job) @http.route('/queue_job/session', type='http', auth="none") @@ -91,10 +62,23 @@ def retry_postpone(job, message, seconds=None): job.store() new_cr.commit() - job = self._load_job(env, job_uuid) - if job is None: + # ensure the job to run is in the correct state and lock the record + env.cr.execute( + "SELECT state FROM queue_job " + "WHERE uuid=%s AND state=%s " + "FOR UPDATE", + (job_uuid, ENQUEUED) + ) + if not env.cr.fetchone(): + _logger.warn( + "was requested to run job %s, but it does not exist, " + "or is not in state %s", + job_uuid, ENQUEUED + ) return "" - env.cr.commit() + + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED try: try: @@ -105,9 +89,8 @@ def retry_postpone(job, message, seconds=None): if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise - retry_postpone(job, tools.ustr(err.pgerror, errors='replace'), - seconds=PG_RETRY) _logger.debug('%s OperationalError, postponed', job) + raise RetryableJobError(tools.ustr(err.pgerror, errors='replace'), seconds=PG_RETRY) except NothingToDoJob as err: if str(err): @@ -122,6 +105,7 @@ def retry_postpone(job, message, seconds=None): # delay the job later, requeue retry_postpone(job, str(err), seconds=err.seconds) _logger.debug('%s postponed', job) + raise except (FailedJobError, Exception): buff = StringIO() From 5ca07fcdfe9857c4430f432b6325cb6eac5cdbbe Mon Sep 17 00:00:00 2001 From: nottegeo Date: Thu, 25 Mar 2021 14:51:50 +0100 Subject: [PATCH 2/2] Fix date_done set when state changes back to pending When Job.date_done has been set, for instance because the job has been performed, if the job is set back to pending (e.g. a RetryableJobError is raised), the date_done is kept --- queue_job/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/job.py b/queue_job/job.py index 4463968b48..65f77d9bd9 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -596,6 +596,7 @@ def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None self.date_started = None + self.date_done = None if reset_retry: self.retry = 0 if result is not None: