diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 843bfa6e46..de388f51da 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -13,11 +13,7 @@ from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY from ..job import Job, ENQUEUED -from ..exception import (NoSuchJobError, - NotReadableJobError, - RetryableJobError, - FailedJobError, - NothingToDoJob) +from ..exception import (RetryableJobError, FailedJobError, NothingToDoJob) _logger = logging.getLogger(__name__) @@ -26,42 +22,17 @@ class RunJobController(http.Controller): - def _load_job(self, env, job_uuid): - """Reload a job from the backend""" - try: - job = Job.load(env, job_uuid) - except NoSuchJobError: - # just skip it - job = None - except NotReadableJobError: - _logger.exception('Could not read job: %s', job_uuid) - raise - return job - def _try_perform_job(self, env, job): """Try to perform the job.""" - - # if the job has been manually set to DONE or PENDING, - # or if something tries to run a job that is not enqueued - # before its execution, stop - if job.state != ENQUEUED: - _logger.warning('job %s is in state %s ' - 'instead of enqueued in /runjob', - job.uuid, job.state) - return - - # TODO: set_started should be done atomically with - # update queue_job set=state=started - # where state=enqueid and id= job.set_started() job.store() - http.request.env.cr.commit() - + env.cr.commit() _logger.debug('%s started', job) + job.perform() job.set_done() job.store() - http.request.env.cr.commit() + env.cr.commit() _logger.debug('%s done', job) @http.route('/queue_job/session', type='http', auth="none") @@ -91,10 +62,23 @@ def retry_postpone(job, message, seconds=None): job.store() new_cr.commit() - job = self._load_job(env, job_uuid) - if job is None: + # ensure the job to run is in the correct state and lock the record + env.cr.execute( + "SELECT state FROM queue_job " + "WHERE uuid=%s AND state=%s " + "FOR UPDATE", + (job_uuid, ENQUEUED) + ) + if not env.cr.fetchone(): + _logger.warn( + "was requested to run job %s, but it does not exist, " + "or is not in state %s", + job_uuid, ENQUEUED + ) return "" - env.cr.commit() + + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED try: try: @@ -105,9 +89,8 @@ def retry_postpone(job, message, seconds=None): if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise - retry_postpone(job, tools.ustr(err.pgerror, errors='replace'), - seconds=PG_RETRY) _logger.debug('%s OperationalError, postponed', job) + raise RetryableJobError(tools.ustr(err.pgerror, errors='replace'), seconds=PG_RETRY) except NothingToDoJob as err: if str(err): @@ -122,6 +105,7 @@ def retry_postpone(job, message, seconds=None): # delay the job later, requeue retry_postpone(job, str(err), seconds=err.seconds) _logger.debug('%s postponed', job) + raise except (FailedJobError, Exception): buff = StringIO() diff --git a/queue_job/job.py b/queue_job/job.py index 4463968b48..65f77d9bd9 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -596,6 +596,7 @@ def set_pending(self, result=None, reset_retry=True): self.state = PENDING self.date_enqueued = None self.date_started = None + self.date_done = None if reset_retry: self.retry = 0 if result is not None: