Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ Changed
level is set to ``DEBUG`` or ``system.debug`` config option is set to ``True``.

Reported by Nick Maludy. (improvement) #4538 #4502
* Moved the lock from concurrency policies into the scheduler to fix a race condition when there
are multiple scheduler instances scheduling execution for action with concurrency policies.
#4481 (bug fix)

Fixed
~~~~~
Expand Down
44 changes: 1 addition & 43 deletions st2actions/st2actions/policies/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from st2common.persistence import action as action_access
from st2common.policies.concurrency import BaseConcurrencyApplicator
from st2common.services import action as action_service
from st2common.services import coordination


__all__ = [
Expand Down Expand Up @@ -89,47 +88,6 @@ def apply_before(self, target):
'"%s" cannot be applied. %s', self._policy_ref, target)
return target

# Warn users that the coordination service is not configured.
if not coordination.configured():
LOG.warn('Coordination service is not configured. Policy enforcement is best effort.')

# Acquire a distributed lock before querying the database to make sure that only one
# scheduler is scheduling execution for this action. Even if the coordination service
# is not configured, the fake driver using zake or the file driver can still acquire
# a lock for the local process or server respectively.
lock_uid = self._get_lock_uid(target)
LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
with self.coordinator.get_lock(lock_uid):
target = self._apply_before(target)

return target

def _apply_after(self, target):
# Schedule the oldest delayed executions.
requests = action_access.LiveAction.query(
action=target.action,
status=action_constants.LIVEACTION_STATUS_DELAYED,
order_by=['start_timestamp'],
limit=1
)

if requests:
action_service.update_status(
requests[0],
action_constants.LIVEACTION_STATUS_REQUESTED,
publish=True
)

def apply_after(self, target):
target = super(ConcurrencyApplicator, self).apply_after(target=target)

# Acquire a distributed lock before querying the database to make sure that only one
# scheduler is scheduling execution for this action. Even if the coordination service
# is not configured, the fake driver using zake or the file driver can still acquire
# a lock for the local process or server respectively.
lock_uid = self._get_lock_uid(target)
LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
with self.coordinator.get_lock(lock_uid):
self._apply_after(target)
target = self._apply_before(target)

return target
43 changes: 1 addition & 42 deletions st2actions/st2actions/policies/concurrency_by_attr.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,47 +115,6 @@ def apply_before(self, target):
if not coordination.configured():
LOG.warn('Coordination service is not configured. Policy enforcement is best effort.')

# Acquire a distributed lock before querying the database to make sure that only one
# scheduler is scheduling execution for this action. Even if the coordination service
# is not configured, the fake driver using zake or the file driver can still acquire
# a lock for the local process or server respectively.
lock_uid = self._get_lock_uid(target)
LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
with self.coordinator.get_lock(lock_uid):
target = self._apply_before(target)

return target

def _apply_after(self, target):
# Schedule the oldest delayed executions.
filters = self._get_filters(target)
filters['status'] = action_constants.LIVEACTION_STATUS_DELAYED

requests = action_access.LiveAction.query(
order_by=['start_timestamp'],
limit=1,
**filters
)

if requests:
action_service.update_status(
requests[0],
action_constants.LIVEACTION_STATUS_REQUESTED,
publish=True
)

def apply_after(self, target):
# Warn users that the coordination service is not configured.
if not coordination.configured():
LOG.warn('Coordination service is not configured. Policy enforcement is best effort.')

# Acquire a distributed lock before querying the database to make sure that only one
# scheduler is scheduling execution for this action. Even if the coordination service
# is not configured, the fake driver using zake or the file driver can still acquire
# a lock for the local process or server respectively.
lock_uid = self._get_lock_uid(target)
LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
with self.coordinator.get_lock(lock_uid):
self._apply_after(target)
target = self._apply_before(target)

return target
82 changes: 69 additions & 13 deletions st2actions/st2actions/scheduler/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
from st2common import log as logging
from st2common.util import date
from st2common.constants import action as action_constants
from st2common.constants import policy as policy_constants
from st2common.exceptions.db import StackStormDBObjectNotFoundError
from st2common.models.db.liveaction import LiveActionDB
from st2common.services import action as action_service
from st2common.services import coordination as coordination_service
from st2common.services import policies as policy_service
from st2common.persistence.liveaction import LiveAction
from st2common.persistence.execution_queue import ActionExecutionSchedulingQueue
Expand Down Expand Up @@ -57,6 +59,7 @@ def __init__(self):
self.message_type = LiveActionDB
self._shutdown = False
self._pool = eventlet.GreenPool(size=cfg.CONF.scheduler.pool_size)
self._coordinator = coordination_service.get_coordinator()

def run(self):
LOG.debug('Entering scheduler loop')
Expand Down Expand Up @@ -161,16 +164,41 @@ def _handle_execution(self, execution_queue_item_db):
ActionExecutionSchedulingQueue.delete(execution_queue_item_db)
raise

liveaction_db = self._apply_pre_run(liveaction_db, execution_queue_item_db)
# Identify if the action has policies that require locking.
action_has_policies_require_lock = policy_service.has_policies(
liveaction_db,
policy_types=policy_constants.POLICY_TYPES_REQUIRING_LOCK
)

if not liveaction_db:
return
# Acquire a distributed lock if the referenced action has specific policies attached.
if action_has_policies_require_lock:
# Warn users that the coordination service is not configured.
if not coordination_service.configured():
LOG.warn(
'Coordination backend is not configured. '
'Policy enforcement is best effort.'
)

if self._is_execution_queue_item_runnable(liveaction_db, execution_queue_item_db):
self._update_to_scheduled(liveaction_db, execution_queue_item_db)
# Acquire a distributed lock before querying the database to make sure that only one
# scheduler is scheduling execution for this action. Even if the coordination service
# is not configured, the fake driver using zake or the file driver can still acquire
# a lock for the local process or server respectively.
lock_uid = liveaction_db.action
LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid)
lock = self._coordinator.get_lock(lock_uid)

@staticmethod
def _apply_pre_run(liveaction_db, execution_queue_item_db):
try:
if lock.acquire(blocking=False):
self._regulate_and_schedule(liveaction_db, execution_queue_item_db)
else:
self._delay(liveaction_db, execution_queue_item_db)
finally:
lock.release()
else:
# Otherwise if there is no policy, then schedule away.
self._schedule(liveaction_db, execution_queue_item_db)

def _regulate_and_schedule(self, liveaction_db, execution_queue_item_db):
# Apply policies defined for the action.
liveaction_db = policy_service.apply_pre_run_policies(liveaction_db)

Expand All @@ -190,10 +218,13 @@ def _apply_pre_run(liveaction_db, execution_queue_item_db):
liveaction_db = action_service.update_status(
liveaction_db, action_constants.LIVEACTION_STATUS_DELAYED, publish=False
)

execution_queue_item_db.handling = False
execution_queue_item_db.scheduled_start_timestamp = date.append_milliseconds_to_time(
date.get_datetime_utc_now(),
POLICY_DELAYED_EXECUTION_RESCHEDULE_TIME_MS
)

try:
ActionExecutionSchedulingQueue.add_or_update(execution_queue_item_db, publish=False)
except db_exc.StackStormDBObjectWriteConflictError:
Expand All @@ -202,16 +233,40 @@ def _apply_pre_run(liveaction_db, execution_queue_item_db):
execution_queue_item_db.id
)

return None
return

if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES or
liveaction_db.status in action_constants.LIVEACTION_CANCEL_STATES):
ActionExecutionSchedulingQueue.delete(execution_queue_item_db)
return None
return

self._schedule(liveaction_db, execution_queue_item_db)

return liveaction_db
def _delay(self, liveaction_db, execution_queue_item_db):
liveaction_db = action_service.update_status(
liveaction_db, action_constants.LIVEACTION_STATUS_DELAYED, publish=False
)

execution_queue_item_db.scheduled_start_timestamp = date.append_milliseconds_to_time(
date.get_datetime_utc_now(),
POLICY_DELAYED_EXECUTION_RESCHEDULE_TIME_MS
)

def _is_execution_queue_item_runnable(self, liveaction_db, execution_queue_item_db):
try:
execution_queue_item_db.handling = False
ActionExecutionSchedulingQueue.add_or_update(execution_queue_item_db, publish=False)
except db_exc.StackStormDBObjectWriteConflictError:
LOG.warning(
'Execution queue item update conflict during scheduling: %s',
execution_queue_item_db.id
)

def _schedule(self, liveaction_db, execution_queue_item_db):
if self._is_execution_queue_item_runnable(liveaction_db, execution_queue_item_db):
self._update_to_scheduled(liveaction_db, execution_queue_item_db)

@staticmethod
def _is_execution_queue_item_runnable(liveaction_db, execution_queue_item_db):
"""
Return True if a particular execution request is runnable.

Expand All @@ -228,13 +283,14 @@ def _is_execution_queue_item_runnable(self, liveaction_db, execution_queue_item_
return True

LOG.info(
'%s is ignoring %s (id=%s) with "%s" status after policies are applied.',
self.__class__.__name__,
'Scheduler is ignoring %s (id=%s) with "%s" status after policies are applied.',
type(execution_queue_item_db),
execution_queue_item_db.id,
liveaction_db.status
)

ActionExecutionSchedulingQueue.delete(execution_queue_item_db)

return False

@staticmethod
Expand Down
40 changes: 30 additions & 10 deletions st2actions/tests/unit/policies/test_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,30 +149,38 @@ def test_over_threshold_delay_executions(self):
# Execution is expected to be delayed since concurrency threshold is reached.
liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo-last'})
liveaction, _ = action_service.request(liveaction)
expected_num_exec += 1 # This request is expected to be executed.

expected_num_pubs += 1 # Tally requested state.
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)

# Run the scheduler to schedule action executions.
self._process_scheduling_queue()

# Since states are being processed async, wait for the liveaction to go into delayed state.
liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_DELAYED)

expected_num_exec += 0 # This request will not be scheduled for execution.
expected_num_pubs += 0 # The delayed status change should not be published.
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)
self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count)

# Mark one of the scheduled/running execution as completed.
action_service.update_status(
scheduled[0],
action_constants.LIVEACTION_STATUS_SUCCEEDED,
publish=True
)

expected_num_pubs += 1 # Tally requested state.

# Once capacity freed up, the delayed execution is published as requested again.
expected_num_pubs += 3 # Tally requested, scheduled, and running state.
expected_num_pubs += 1 # Tally succeeded state.
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)

# Run the scheduler to schedule action executions.
self._process_scheduling_queue()

# Once capacity freed up, the delayed execution is published as scheduled.
expected_num_exec += 1 # This request is expected to be executed.
expected_num_pubs += 2 # Tally scheduled and running state.

# Since states are being processed async, wait for the liveaction to be scheduled.
liveaction = self._wait_on_statuses(liveaction, SCHEDULED_STATES)
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)
Expand Down Expand Up @@ -212,8 +220,9 @@ def test_over_threshold_cancel_executions(self):
# Execution is expected to be canceled since concurrency threshold is reached.
liveaction = LiveActionDB(action='wolfpack.action-2', parameters={'actionstr': 'foo'})
liveaction, _ = action_service.request(liveaction)
expected_num_exec += 0 # This request will not be scheduled for execution.

expected_num_pubs += 1 # Tally requested state.
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)

# Run the scheduler to schedule action executions.
self._process_scheduling_queue()
Expand All @@ -222,6 +231,9 @@ def test_over_threshold_cancel_executions(self):
calls = [call(liveaction, action_constants.LIVEACTION_STATUS_CANCELING)]
LiveActionPublisher.publish_state.assert_has_calls(calls)
expected_num_pubs += 2 # Tally canceling and canceled state changes.
expected_num_exec += 0 # This request will not be scheduled for execution.
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)
self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count)

# Assert the action is canceled.
liveaction = LiveAction.get_by_id(str(liveaction.id))
Expand Down Expand Up @@ -262,25 +274,33 @@ def test_on_cancellation(self):
# Execution is expected to be delayed since concurrency threshold is reached.
liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo'})
liveaction, _ = action_service.request(liveaction)
expected_num_exec += 1 # This request will be scheduled for execution.

expected_num_pubs += 1 # Tally requested state.
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)

# Run the scheduler to schedule action executions.
self._process_scheduling_queue()

# Since states are being processed async, wait for the liveaction to go into delayed state.
liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_DELAYED)

expected_num_exec += 0 # This request will not be scheduled for execution.
expected_num_pubs += 0 # The delayed status change should not be published.
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)
self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count)

# Cancel execution.
action_service.request_cancellation(scheduled[0], 'stanley')
expected_num_pubs += 2 # Tally the canceling and canceled states.

# Once capacity freed up, the delayed execution is published as requested again.
expected_num_pubs += 3 # Tally requested, scheduled, and running state.
self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count)

# Run the scheduler to schedule action executions.
self._process_scheduling_queue()

# Once capacity freed up, the delayed execution is published as requested again.
expected_num_exec += 1 # This request is expected to be executed.
expected_num_pubs += 2 # Tally scheduled and running state.

# Execution is expected to be rescheduled.
liveaction = LiveAction.get_by_id(str(liveaction.id))
self.assertIn(liveaction.status, SCHEDULED_STATES)
Expand Down
Loading