Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 220 additions & 29 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class ChannelJob(object):
Channel jobs are comparable according to the following rules:
* jobs with an eta come before all other jobs
* then jobs with a smaller eta come first
* then jobs with smaller priority come first
* then jobs with a smaller priority come first
* then jobs with a smaller creation time come first
* then jobs with a smaller sequence come first

Expand Down Expand Up @@ -182,6 +182,11 @@ class ChannelJob(object):
False
>>> j0 == j0
True

Comparison excluding eta:

>>> j1.cmp_no_eta(j2)
-1
"""

def __init__(self, db_name, channel, uuid,
Expand All @@ -203,21 +208,26 @@ def __eq__(self, other):
def __hash__(self):
return id(self)

def cmp_no_eta(self, other):
return (cmp(self.priority, other.priority) or
cmp(self.date_created, other.date_created) or
cmp(self.seq, other.seq))

def __cmp__(self, other):
if self.eta and not other.eta:
return -1
elif not self.eta and other.eta:
return 1
else:
return (cmp(self.eta, other.eta) or
cmp(self.priority, other.priority) or
cmp(self.date_created, other.date_created) or
cmp(self.seq, other.seq))
self.cmp_no_eta(other))


class ChannelQueue(object):
"""A channel queue is a priority queue for jobs that returns
jobs with a past ETA first.
"""A channel queue is a priority queue for jobs.

Jobs with an eta are set aside until their eta is past due, at
which point they start competing normally with other jobs.

>>> q = ChannelQueue()
>>> j1 = ChannelJob(None, None, 1,
Expand Down Expand Up @@ -257,11 +267,49 @@ class ChannelQueue(object):
>>> q.get_wakeup_time()
0
>>> q.pop(now=13)

Observe that job with past eta still run after jobs with higher priority.

>>> j4 = ChannelJob(None, None, 4,
... seq=0, date_created=4, priority=10, eta=20)
>>> j5 = ChannelJob(None, None, 5,
... seq=0, date_created=5, priority=1, eta=None)
>>> q.add(j4)
>>> q.add(j5)
>>> q.get_wakeup_time()
20
>>> q.pop(21)
<ChannelJob 5>
>>> q.get_wakeup_time()
0
>>> q.pop(22)
<ChannelJob 4>

Test a sequential queue.

>>> sq = ChannelQueue(sequential=True)
>>> j6 = ChannelJob(None, None, 6,
... seq=0, date_created=6, priority=1, eta=None)
>>> j7 = ChannelJob(None, None, 7,
... seq=0, date_created=7, priority=1, eta=20)
>>> j8 = ChannelJob(None, None, 8,
... seq=0, date_created=8, priority=1, eta=None)
>>> sq.add(j6)
>>> sq.add(j7)
>>> sq.add(j8)
>>> sq.pop(10)
<ChannelJob 6>
>>> sq.pop(15)
>>> sq.pop(20)
<ChannelJob 7>
>>> sq.pop(30)
<ChannelJob 8>
"""

def __init__(self):
def __init__(self, sequential=False):
self._queue = PriorityQueue()
self._eta_queue = PriorityQueue()
self.sequential = sequential

def __len__(self):
return len(self._eta_queue) + len(self._queue)
Expand All @@ -280,10 +328,19 @@ def remove(self, job):
self._queue.remove(job)

def pop(self, now):
if len(self._eta_queue) and self._eta_queue[0].eta <= now:
return self._eta_queue.pop()
else:
return self._queue.pop()
while len(self._eta_queue) and self._eta_queue[0].eta <= now:
eta_job = self._eta_queue.pop()
eta_job.eta = None
self._queue.add(eta_job)
if self.sequential and len(self._eta_queue) and len(self._queue):
eta_job = self._eta_queue[0]
job = self._queue[0]
if eta_job.cmp_no_eta(job) < 0:
# eta ignored, the job with eta has higher priority
# than the job without eta; since it's a sequential
# queue we wait until eta
return
return self._queue.pop()

def get_wakeup_time(self, wakeup_time=0):
if len(self._eta_queue):
Expand Down Expand Up @@ -348,13 +405,21 @@ def __init__(self, name, parent, capacity=None, sequential=False,
if self.parent:
self.parent.children[name] = self
self.children = {}
self.capacity = capacity
self.sequential = sequential
self.throttle = throttle # seconds
self._queue = ChannelQueue()
self._running = SafeSet()
self._failed = SafeSet()
self._pause_until = 0 # utc seconds since the epoch
self.capacity = capacity
self.throttle = throttle # seconds
self.sequential = sequential

@property
def sequential(self):
return self._queue.sequential

@sequential.setter
def sequential(self, val):
self._queue.sequential = val

def configure(self, config):
""" Configure a channel from a dictionary.
Expand All @@ -363,6 +428,7 @@ def configure(self, config):

* capacity
* sequential
* throttle
"""
assert self.fullname.endswith(config['name'])
self.capacity = config.get('capacity', None)
Expand Down Expand Up @@ -447,6 +513,15 @@ def set_failed(self, job):
_logger.debug("job %s marked failed in channel %s",
job.uuid, self)

def has_capacity(self):
if self.sequential and self._failed:
# a sequential queue blocks on failed jobs
return False
if not self.capacity:
# unlimited capacity
return True
return len(self._running) < self.capacity

def get_jobs_to_run(self, now):
""" Get jobs that are ready to run in channel.

Expand All @@ -468,25 +543,20 @@ def get_jobs_to_run(self, now):
for job in child.get_jobs_to_run(now):
self._queue.add(job)
# is this channel paused?
if self.throttle:
if self.throttle and self._pause_until:
if now < self._pause_until:
if not self.capacity or len(self._running) < self.capacity:
_logger.debug("channel %s paused because of throttle "
"delay between jobs", self)
if self.has_capacity():
_logger.debug("channel %s paused until %s because "
"of throttle delay between jobs",
self, self._pause_until)
return
else:
# unpause, this is important to avoid perpetual wakeup
# while the channel is at full capacity
self._pause_until = 0
# sequential channels block when there are failed jobs
# TODO: this is probably not sufficient to ensure
# sequentiality because of the behaviour in presence
# of jobs with eta; plus: check if there are no
# race conditions.
if self.sequential and len(self._failed):
return
_logger.debug("channel %s unpaused at %s", self, now)
# yield jobs that are ready to run, while we have capacity
while not self.capacity or len(self._running) < self.capacity:
while self.has_capacity():
job = self._queue.pop(now)
if not job:
return
Expand All @@ -496,10 +566,12 @@ def get_jobs_to_run(self, now):
yield job
if self.throttle:
self._pause_until = now + self.throttle
_logger.debug("pausing channel %s until %s",
self, self._pause_until)
return

def get_wakeup_time(self, wakeup_time=0):
if self.capacity and len(self._running) >= self.capacity:
if not self.has_capacity():
# this channel is full, do not request timed wakeup, as
# a notification will wakeup the runner when a job finishes
return wakeup_time
Expand All @@ -518,14 +590,16 @@ def get_wakeup_time(self, wakeup_time=0):
wakeup_time = child.get_wakeup_time(wakeup_time)
return wakeup_time


def split_strip(s, sep, maxsplit=-1):
"""Split string and strip each component.

>>> ChannelManager.split_strip("foo: bar baz\\n: fred:", ":")
>>> split_strip("foo: bar baz\\n: fred:", ":")
['foo', 'bar baz', 'fred', '']
"""
return [x.strip() for x in s.split(sep, maxsplit)]


class ChannelManager(object):
""" High level interface for channels

Expand Down Expand Up @@ -613,6 +687,115 @@ class ChannelManager(object):
[<ChannelJob A2>]
>>> cm.get_wakeup_time()
104

Let's test throttling in combination with a queue reaching full capacity.

>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,T:2:throttle=2')
>>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'T', 'T2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'T', 'T3', 3, 0, 10, None, 'pending')

>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob T1>]
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob T2>]

Channel is now full, so no job to run even though throttling
delay is over.

>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> cm.get_wakeup_time() # no wakeup time, since queue is full
0
>>> pp(list(cm.get_jobs_to_run(now=104)))
[]
>>> cm.get_wakeup_time() # queue is still full
0

>>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob T3>]
>>> cm.get_wakeup_time() # queue is full
0
>>> cm.notify(db, 'T', 'T2', 1, 0, 10, None, 'done')
>>> cm.get_wakeup_time()
107

Test wakeup time behaviour in presence of eta.

>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,E:1')
>>> cm.notify(db, 'E', 'E1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'E', 'E2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'E', 'E3', 3, 0, 10, None, 'pending')

>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob E1>]
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.notify(db, 'E', 'E1', 1, 0, 10, 105, 'pending')
>>> cm.get_wakeup_time() # wakeup at eta
105
>>> pp(list(cm.get_jobs_to_run(now=102))) # but there is capacity
[<ChannelJob E2>]
>>> pp(list(cm.get_jobs_to_run(now=106))) # no capacity anymore
[]
>>> cm.get_wakeup_time() # no timed wakeup because no capacity
0
>>> cm.notify(db, 'E', 'E2', 1, 0, 10, None, 'done')
>>> cm.get_wakeup_time()
105
>>> pp(list(cm.get_jobs_to_run(now=107))) # no capacity anymore
[<ChannelJob E1>]
>>> cm.get_wakeup_time()
0

Test wakeup time behaviour in a sequential queue.

>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,S:1:sequential')
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'pending')

>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob S1>]
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'failed')
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=102)))
[]

No wakeup time because due to eta, because the sequential queue
is waiting for a failed job.

>>> cm.get_wakeup_time()
0
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending')
>>> cm.get_wakeup_time()
105
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob S1>]
>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'done')

At this stage, we have S2 with an eta of 105 and since the
queue is sequential, we wait for it.

>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob S2>]
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob S3>]
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[]

"""

def __init__(self):
Expand Down Expand Up @@ -720,14 +903,22 @@ def simple_configure(self, config_string):
>>> c = cm.get_channel_by_name('root')
>>> c.capacity
1
>>> cm.simple_configure('root:4,autosub.sub:2')
>>> cm.simple_configure('root:4,autosub.sub:2,seq:1:sequential')
>>> cm.get_channel_by_name('root').capacity
4
>>> cm.get_channel_by_name('root').sequential
False
>>> cm.get_channel_by_name('root.autosub').capacity
>>> cm.get_channel_by_name('root.autosub.sub').capacity
2
>>> cm.get_channel_by_name('root.autosub.sub').sequential
False
>>> cm.get_channel_by_name('autosub.sub').capacity
2
>>> cm.get_channel_by_name('seq').capacity
1
>>> cm.get_channel_by_name('seq').sequential
True
"""
for config in ChannelManager.parse_simple_config(config_string):
self.get_channel_from_config(config)
Expand Down