Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 22 additions & 38 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from ..job import Job, ENQUEUED
from ..exception import (NoSuchJobError,
NotReadableJobError,
RetryableJobError,
FailedJobError,
NothingToDoJob)
from ..exception import (RetryableJobError, FailedJobError, NothingToDoJob)

_logger = logging.getLogger(__name__)

Expand All @@ -26,42 +22,17 @@

class RunJobController(http.Controller):

def _load_job(self, env, job_uuid):
"""Reload a job from the backend"""
try:
job = Job.load(env, job_uuid)
except NoSuchJobError:
# just skip it
job = None
except NotReadableJobError:
_logger.exception('Could not read job: %s', job_uuid)
raise
return job

def _try_perform_job(self, env, job):
"""Try to perform the job."""

# if the job has been manually set to DONE or PENDING,
# or if something tries to run a job that is not enqueued
# before its execution, stop
if job.state != ENQUEUED:
_logger.warning('job %s is in state %s '
'instead of enqueued in /runjob',
job.uuid, job.state)
return

# TODO: set_started should be done atomically with
# update queue_job set=state=started
# where state=enqueid and id=
job.set_started()
job.store()
http.request.env.cr.commit()

env.cr.commit()
_logger.debug('%s started', job)

job.perform()
job.set_done()
job.store()
http.request.env.cr.commit()
env.cr.commit()
_logger.debug('%s done', job)

@http.route('/queue_job/session', type='http', auth="none")
Expand Down Expand Up @@ -91,10 +62,23 @@ def retry_postpone(job, message, seconds=None):
job.store()
new_cr.commit()

job = self._load_job(env, job_uuid)
if job is None:
# ensure the job to run is in the correct state and lock the record
env.cr.execute(
"SELECT state FROM queue_job "
"WHERE uuid=%s AND state=%s "
"FOR UPDATE",
(job_uuid, ENQUEUED)
)
if not env.cr.fetchone():
_logger.warn(
"was requested to run job %s, but it does not exist, "
"or is not in state %s",
job_uuid, ENQUEUED
)
return ""
env.cr.commit()

job = Job.load(env, job_uuid)
assert job and job.state == ENQUEUED

try:
try:
Expand All @@ -105,9 +89,8 @@ def retry_postpone(job, message, seconds=None):
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise

retry_postpone(job, tools.ustr(err.pgerror, errors='replace'),
seconds=PG_RETRY)
_logger.debug('%s OperationalError, postponed', job)
raise RetryableJobError(tools.ustr(err.pgerror, errors='replace'), seconds=PG_RETRY)

except NothingToDoJob as err:
if str(err):
Expand All @@ -122,6 +105,7 @@ def retry_postpone(job, message, seconds=None):
# delay the job later, requeue
retry_postpone(job, str(err), seconds=err.seconds)
_logger.debug('%s postponed', job)
raise

except (FailedJobError, Exception):
buff = StringIO()
Expand Down
1 change: 1 addition & 0 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ def set_pending(self, result=None, reset_retry=True):
self.state = PENDING
self.date_enqueued = None
self.date_started = None
self.date_done = None
if reset_retry:
self.retry = 0
if result is not None:
Expand Down