From 08eab3f1757d88cf4f594e89f6f985ba7a939bfe Mon Sep 17 00:00:00 2001 From: W Chan Date: Wed, 6 Feb 2019 00:54:13 +0000 Subject: [PATCH 1/4] Move lock for concurrency policies into scheduler Move the lock for coordinating concurrency policies into the scheduler. With the current approach, when there are more than one schedulers, there is a race in scheduling that results in failure to enforce the concurrency accurately. --- CHANGELOG.rst | 1 + st2actions/st2actions/policies/concurrency.py | 44 +------- .../policies/concurrency_by_attr.py | 43 +------ st2actions/st2actions/scheduler/handler.py | 82 +++++++++++--- .../tests/unit/policies/test_concurrency.py | 40 +++++-- .../unit/policies/test_concurrency_by_attr.py | 54 ++++----- st2common/st2common/constants/policy.py | 25 +++++ st2common/st2common/policies/base.py | 9 -- st2common/st2common/services/policies.py | 14 +++ st2common/tests/unit/services/test_policy.py | 106 ++++++++++++++++++ 10 files changed, 276 insertions(+), 142 deletions(-) create mode 100644 st2common/st2common/constants/policy.py create mode 100644 st2common/tests/unit/services/test_policy.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c4be32dd9d..34f8afc26f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -31,6 +31,7 @@ Changed level is set to ``DEBUG`` or ``system.debug`` config option is set to ``True``. Reported by Nick Maludy. (improvement) #4538 #4502 +* Moved the lock from concurrency policies into the scheduler. #4481 (bug fix) Fixed ~~~~~ diff --git a/st2actions/st2actions/policies/concurrency.py b/st2actions/st2actions/policies/concurrency.py index 43dbf287b7..a55c5cd0be 100644 --- a/st2actions/st2actions/policies/concurrency.py +++ b/st2actions/st2actions/policies/concurrency.py @@ -20,7 +20,6 @@ from st2common.persistence import action as action_access from st2common.policies.concurrency import BaseConcurrencyApplicator from st2common.services import action as action_service -from st2common.services import coordination __all__ = [ @@ -89,47 +88,6 @@ def apply_before(self, target): '"%s" cannot be applied. %s', self._policy_ref, target) return target - # Warn users that the coordination service is not configured. - if not coordination.configured(): - LOG.warn('Coordination service is not configured. Policy enforcement is best effort.') - - # Acquire a distributed lock before querying the database to make sure that only one - # scheduler is scheduling execution for this action. Even if the coordination service - # is not configured, the fake driver using zake or the file driver can still acquire - # a lock for the local process or server respectively. - lock_uid = self._get_lock_uid(target) - LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid) - with self.coordinator.get_lock(lock_uid): - target = self._apply_before(target) - - return target - - def _apply_after(self, target): - # Schedule the oldest delayed executions. - requests = action_access.LiveAction.query( - action=target.action, - status=action_constants.LIVEACTION_STATUS_DELAYED, - order_by=['start_timestamp'], - limit=1 - ) - - if requests: - action_service.update_status( - requests[0], - action_constants.LIVEACTION_STATUS_REQUESTED, - publish=True - ) - - def apply_after(self, target): - target = super(ConcurrencyApplicator, self).apply_after(target=target) - - # Acquire a distributed lock before querying the database to make sure that only one - # scheduler is scheduling execution for this action. Even if the coordination service - # is not configured, the fake driver using zake or the file driver can still acquire - # a lock for the local process or server respectively. - lock_uid = self._get_lock_uid(target) - LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid) - with self.coordinator.get_lock(lock_uid): - self._apply_after(target) + target = self._apply_before(target) return target diff --git a/st2actions/st2actions/policies/concurrency_by_attr.py b/st2actions/st2actions/policies/concurrency_by_attr.py index f9d4061147..b4cb160333 100644 --- a/st2actions/st2actions/policies/concurrency_by_attr.py +++ b/st2actions/st2actions/policies/concurrency_by_attr.py @@ -115,47 +115,6 @@ def apply_before(self, target): if not coordination.configured(): LOG.warn('Coordination service is not configured. Policy enforcement is best effort.') - # Acquire a distributed lock before querying the database to make sure that only one - # scheduler is scheduling execution for this action. Even if the coordination service - # is not configured, the fake driver using zake or the file driver can still acquire - # a lock for the local process or server respectively. - lock_uid = self._get_lock_uid(target) - LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid) - with self.coordinator.get_lock(lock_uid): - target = self._apply_before(target) - - return target - - def _apply_after(self, target): - # Schedule the oldest delayed executions. - filters = self._get_filters(target) - filters['status'] = action_constants.LIVEACTION_STATUS_DELAYED - - requests = action_access.LiveAction.query( - order_by=['start_timestamp'], - limit=1, - **filters - ) - - if requests: - action_service.update_status( - requests[0], - action_constants.LIVEACTION_STATUS_REQUESTED, - publish=True - ) - - def apply_after(self, target): - # Warn users that the coordination service is not configured. - if not coordination.configured(): - LOG.warn('Coordination service is not configured. Policy enforcement is best effort.') - - # Acquire a distributed lock before querying the database to make sure that only one - # scheduler is scheduling execution for this action. Even if the coordination service - # is not configured, the fake driver using zake or the file driver can still acquire - # a lock for the local process or server respectively. - lock_uid = self._get_lock_uid(target) - LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid) - with self.coordinator.get_lock(lock_uid): - self._apply_after(target) + target = self._apply_before(target) return target diff --git a/st2actions/st2actions/scheduler/handler.py b/st2actions/st2actions/scheduler/handler.py index fcd50a9292..e93adf6ec9 100644 --- a/st2actions/st2actions/scheduler/handler.py +++ b/st2actions/st2actions/scheduler/handler.py @@ -21,9 +21,11 @@ from st2common import log as logging from st2common.util import date from st2common.constants import action as action_constants +from st2common.constants import policy as policy_constants from st2common.exceptions.db import StackStormDBObjectNotFoundError from st2common.models.db.liveaction import LiveActionDB from st2common.services import action as action_service +from st2common.services import coordination as coordination_service from st2common.services import policies as policy_service from st2common.persistence.liveaction import LiveAction from st2common.persistence.execution_queue import ActionExecutionSchedulingQueue @@ -57,6 +59,7 @@ def __init__(self): self.message_type = LiveActionDB self._shutdown = False self._pool = eventlet.GreenPool(size=cfg.CONF.scheduler.pool_size) + self._coordinator = coordination_service.get_coordinator() def run(self): LOG.debug('Entering scheduler loop') @@ -161,16 +164,41 @@ def _handle_execution(self, execution_queue_item_db): ActionExecutionSchedulingQueue.delete(execution_queue_item_db) raise - liveaction_db = self._apply_pre_run(liveaction_db, execution_queue_item_db) + # Identify if the action has policies that require locking. + action_has_policies_require_lock = policy_service.has_policies( + liveaction_db, + policy_types=policy_constants.POLICY_TYPES_REQUIRING_LOCK + ) - if not liveaction_db: - return + # Acquire a distributed lock if the referenced action has specific policies attached. + if action_has_policies_require_lock: + # Warn users that the coordination service is not configured. + if not coordination_service.configured(): + LOG.warn( + 'Coordination backend is not configured. ' + 'Policy enforcement is best effort.' + ) - if self._is_execution_queue_item_runnable(liveaction_db, execution_queue_item_db): - self._update_to_scheduled(liveaction_db, execution_queue_item_db) + # Acquire a distributed lock before querying the database to make sure that only one + # scheduler is scheduling execution for this action. Even if the coordination service + # is not configured, the fake driver using zake or the file driver can still acquire + # a lock for the local process or server respectively. + lock_uid = liveaction_db.action + LOG.debug('%s is attempting to acquire lock "%s".', self.__class__.__name__, lock_uid) + lock = self._coordinator.get_lock(lock_uid) - @staticmethod - def _apply_pre_run(liveaction_db, execution_queue_item_db): + try: + if lock.acquire(blocking=False): + self._regulate_and_schedule(liveaction_db, execution_queue_item_db) + else: + self._delay(liveaction_db, execution_queue_item_db) + finally: + lock.release() + else: + # Otherwise if there is no policy, then schedule away. + self._schedule(liveaction_db, execution_queue_item_db) + + def _regulate_and_schedule(self, liveaction_db, execution_queue_item_db): # Apply policies defined for the action. liveaction_db = policy_service.apply_pre_run_policies(liveaction_db) @@ -190,10 +218,13 @@ def _apply_pre_run(liveaction_db, execution_queue_item_db): liveaction_db = action_service.update_status( liveaction_db, action_constants.LIVEACTION_STATUS_DELAYED, publish=False ) + + execution_queue_item_db.handling = False execution_queue_item_db.scheduled_start_timestamp = date.append_milliseconds_to_time( date.get_datetime_utc_now(), POLICY_DELAYED_EXECUTION_RESCHEDULE_TIME_MS ) + try: ActionExecutionSchedulingQueue.add_or_update(execution_queue_item_db, publish=False) except db_exc.StackStormDBObjectWriteConflictError: @@ -202,16 +233,40 @@ def _apply_pre_run(liveaction_db, execution_queue_item_db): execution_queue_item_db.id ) - return None + return if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES or liveaction_db.status in action_constants.LIVEACTION_CANCEL_STATES): ActionExecutionSchedulingQueue.delete(execution_queue_item_db) - return None + return + + self._schedule(liveaction_db, execution_queue_item_db) - return liveaction_db + def _delay(self, liveaction_db, execution_queue_item_db): + liveaction_db = action_service.update_status( + liveaction_db, action_constants.LIVEACTION_STATUS_DELAYED, publish=False + ) + + execution_queue_item_db.scheduled_start_timestamp = date.append_milliseconds_to_time( + date.get_datetime_utc_now(), + POLICY_DELAYED_EXECUTION_RESCHEDULE_TIME_MS + ) - def _is_execution_queue_item_runnable(self, liveaction_db, execution_queue_item_db): + try: + execution_queue_item_db.handling = False + ActionExecutionSchedulingQueue.add_or_update(execution_queue_item_db, publish=False) + except db_exc.StackStormDBObjectWriteConflictError: + LOG.warning( + 'Execution queue item update conflict during scheduling: %s', + execution_queue_item_db.id + ) + + def _schedule(self, liveaction_db, execution_queue_item_db): + if self._is_execution_queue_item_runnable(liveaction_db, execution_queue_item_db): + self._update_to_scheduled(liveaction_db, execution_queue_item_db) + + @staticmethod + def _is_execution_queue_item_runnable(liveaction_db, execution_queue_item_db): """ Return True if a particular execution request is runnable. @@ -228,13 +283,14 @@ def _is_execution_queue_item_runnable(self, liveaction_db, execution_queue_item_ return True LOG.info( - '%s is ignoring %s (id=%s) with "%s" status after policies are applied.', - self.__class__.__name__, + 'Scheduler is ignoring %s (id=%s) with "%s" status after policies are applied.', type(execution_queue_item_db), execution_queue_item_db.id, liveaction_db.status ) + ActionExecutionSchedulingQueue.delete(execution_queue_item_db) + return False @staticmethod diff --git a/st2actions/tests/unit/policies/test_concurrency.py b/st2actions/tests/unit/policies/test_concurrency.py index d8285ba69b..301fae2b7d 100644 --- a/st2actions/tests/unit/policies/test_concurrency.py +++ b/st2actions/tests/unit/policies/test_concurrency.py @@ -149,8 +149,9 @@ def test_over_threshold_delay_executions(self): # Execution is expected to be delayed since concurrency threshold is reached. liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo-last'}) liveaction, _ = action_service.request(liveaction) - expected_num_exec += 1 # This request is expected to be executed. + expected_num_pubs += 1 # Tally requested state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() @@ -158,6 +159,11 @@ def test_over_threshold_delay_executions(self): # Since states are being processed async, wait for the liveaction to go into delayed state. liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_DELAYED) + expected_num_exec += 0 # This request will not be scheduled for execution. + expected_num_pubs += 0 # The delayed status change should not be published. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) + self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) + # Mark one of the scheduled/running execution as completed. action_service.update_status( scheduled[0], @@ -165,14 +171,16 @@ def test_over_threshold_delay_executions(self): publish=True ) - expected_num_pubs += 1 # Tally requested state. - - # Once capacity freed up, the delayed execution is published as requested again. - expected_num_pubs += 3 # Tally requested, scheduled, and running state. + expected_num_pubs += 1 # Tally succeeded state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() + # Once capacity freed up, the delayed execution is published as scheduled. + expected_num_exec += 1 # This request is expected to be executed. + expected_num_pubs += 2 # Tally scheduled and running state. + # Since states are being processed async, wait for the liveaction to be scheduled. liveaction = self._wait_on_statuses(liveaction, SCHEDULED_STATES) self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) @@ -212,8 +220,9 @@ def test_over_threshold_cancel_executions(self): # Execution is expected to be canceled since concurrency threshold is reached. liveaction = LiveActionDB(action='wolfpack.action-2', parameters={'actionstr': 'foo'}) liveaction, _ = action_service.request(liveaction) - expected_num_exec += 0 # This request will not be scheduled for execution. + expected_num_pubs += 1 # Tally requested state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() @@ -222,6 +231,9 @@ def test_over_threshold_cancel_executions(self): calls = [call(liveaction, action_constants.LIVEACTION_STATUS_CANCELING)] LiveActionPublisher.publish_state.assert_has_calls(calls) expected_num_pubs += 2 # Tally canceling and canceled state changes. + expected_num_exec += 0 # This request will not be scheduled for execution. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) + self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) # Assert the action is canceled. liveaction = LiveAction.get_by_id(str(liveaction.id)) @@ -262,8 +274,9 @@ def test_on_cancellation(self): # Execution is expected to be delayed since concurrency threshold is reached. liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo'}) liveaction, _ = action_service.request(liveaction) - expected_num_exec += 1 # This request will be scheduled for execution. + expected_num_pubs += 1 # Tally requested state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() @@ -271,16 +284,23 @@ def test_on_cancellation(self): # Since states are being processed async, wait for the liveaction to go into delayed state. liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_DELAYED) + expected_num_exec += 0 # This request will not be scheduled for execution. + expected_num_pubs += 0 # The delayed status change should not be published. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) + self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) + # Cancel execution. action_service.request_cancellation(scheduled[0], 'stanley') expected_num_pubs += 2 # Tally the canceling and canceled states. - - # Once capacity freed up, the delayed execution is published as requested again. - expected_num_pubs += 3 # Tally requested, scheduled, and running state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() + # Once capacity freed up, the delayed execution is published as requested again. + expected_num_exec += 1 # This request is expected to be executed. + expected_num_pubs += 2 # Tally scheduled and running state. + # Execution is expected to be rescheduled. liveaction = LiveAction.get_by_id(str(liveaction.id)) self.assertIn(liveaction.status, SCHEDULED_STATES) diff --git a/st2actions/tests/unit/policies/test_concurrency_by_attr.py b/st2actions/tests/unit/policies/test_concurrency_by_attr.py index b38f4e7412..0056b33a4f 100644 --- a/st2actions/tests/unit/policies/test_concurrency_by_attr.py +++ b/st2actions/tests/unit/policies/test_concurrency_by_attr.py @@ -147,7 +147,9 @@ def test_over_threshold_delay_executions(self): # Execution is expected to be delayed since concurrency threshold is reached. liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo'}) liveaction, _ = action_service.request(liveaction) + expected_num_pubs += 1 # Tally requested state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() @@ -156,9 +158,8 @@ def test_over_threshold_delay_executions(self): # liveaction to go into delayed state. liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_DELAYED) - # Assert the action is delayed. - delayed = liveaction - self.assertEqual(delayed.status, action_constants.LIVEACTION_STATUS_DELAYED) + expected_num_exec += 0 # This request will not be scheduled for execution. + expected_num_pubs += 0 # The delayed status change should not be published. self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) @@ -166,8 +167,6 @@ def test_over_threshold_delay_executions(self): # The execution with actionstr "fu" is over the threshold but actionstr "bar" is not. liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'bar'}) liveaction, _ = action_service.request(liveaction) - expected_num_exec += 1 # This request is expected to be executed. - expected_num_pubs += 3 # Tally requested, scheduled, and running states. # Run the scheduler to schedule action executions. self._process_scheduling_queue() @@ -175,6 +174,8 @@ def test_over_threshold_delay_executions(self): # Since states are being processed asynchronously, wait for the # liveaction to go into scheduled state. liveaction = self._wait_on_statuses(liveaction, SCHEDULED_STATES) + expected_num_exec += 1 # This request is expected to be executed. + expected_num_pubs += 3 # Tally requested, scheduled, and running state. self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) @@ -186,17 +187,15 @@ def test_over_threshold_delay_executions(self): ) expected_num_pubs += 1 # Tally succeeded state. - - # Once capacity freed up, the delayed execution is published as requested again. - expected_num_exec += 1 # The delayed request is expected to be executed. - expected_num_pubs += 3 # Tally requested, scheduled, and running state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() - # Since states are being processed asynchronously, wait for the - # liveaction to go into scheduled state. + # Once capacity freed up, the delayed execution is published as requested again. liveaction = self._wait_on_statuses(liveaction, SCHEDULED_STATES) + expected_num_exec += 1 # The delayed request is expected to be executed. + expected_num_pubs += 2 # Tally scheduled and running state. self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) @@ -235,8 +234,9 @@ def test_over_threshold_cancel_executions(self): # Execution is expected to be delayed since concurrency threshold is reached. liveaction = LiveActionDB(action='wolfpack.action-2', parameters={'actionstr': 'foo'}) liveaction, _ = action_service.request(liveaction) - expected_num_exec += 0 # This request will not be scheduled for execution. + expected_num_pubs += 1 # Tally requested state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() @@ -245,12 +245,13 @@ def test_over_threshold_cancel_executions(self): calls = [call(liveaction, action_constants.LIVEACTION_STATUS_CANCELING)] LiveActionPublisher.publish_state.assert_has_calls(calls) expected_num_pubs += 2 # Tally canceling and canceled state changes. + expected_num_exec += 0 # This request will not be scheduled for execution. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) + self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) # Assert the action is canceled. canceled = LiveAction.get_by_id(str(liveaction.id)) self.assertEqual(canceled.status, action_constants.LIVEACTION_STATUS_CANCELED) - self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) - self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) @mock.patch.object( runner.MockActionRunner, 'run', @@ -285,7 +286,9 @@ def test_on_cancellation(self): # Execution is expected to be delayed since concurrency threshold is reached. liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'foo'}) liveaction, _ = action_service.request(liveaction) + expected_num_pubs += 1 # Tally requested state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() @@ -293,10 +296,10 @@ def test_on_cancellation(self): # Since states are being processed asynchronously, wait for the # liveaction to go into delayed state. liveaction = self._wait_on_status(liveaction, action_constants.LIVEACTION_STATUS_DELAYED) - - # Assert the action is delayed. delayed = liveaction - self.assertEqual(delayed.status, action_constants.LIVEACTION_STATUS_DELAYED) + + expected_num_exec += 0 # This request will not be scheduled for execution. + expected_num_pubs += 0 # The delayed status change should not be published. self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) @@ -304,8 +307,6 @@ def test_on_cancellation(self): # The execution with actionstr "fu" is over the threshold but actionstr "bar" is not. liveaction = LiveActionDB(action='wolfpack.action-1', parameters={'actionstr': 'bar'}) liveaction, _ = action_service.request(liveaction) - expected_num_exec += 1 # This request is expected to be executed. - expected_num_pubs += 3 # Tally requested, scheduled, and running states. # Run the scheduler to schedule action executions. self._process_scheduling_queue() @@ -313,23 +314,26 @@ def test_on_cancellation(self): # Since states are being processed asynchronously, wait for the # liveaction to go into scheduled state. liveaction = self._wait_on_statuses(liveaction, SCHEDULED_STATES) + expected_num_exec += 1 # This request is expected to be executed. + expected_num_pubs += 3 # Tally requested, scheduled, and running states. self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) # Cancel execution. action_service.request_cancellation(scheduled[0], 'stanley') expected_num_pubs += 2 # Tally the canceling and canceled states. - - # Once capacity freed up, the delayed execution is published as requested again. - expected_num_exec += 1 # The delayed request is expected to be executed. - expected_num_pubs += 3 # Tally requested, scheduled, and running state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) # Run the scheduler to schedule action executions. self._process_scheduling_queue() + # Once capacity freed up, the delayed execution is published as requested again. + expected_num_exec += 1 # The delayed request is expected to be executed. + expected_num_pubs += 2 # Tally scheduled and running state. + self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) + self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) + # Since states are being processed asynchronously, wait for the # liveaction to go into scheduled state. liveaction = LiveAction.get_by_id(str(delayed.id)) liveaction = self._wait_on_statuses(liveaction, SCHEDULED_STATES) - self.assertEqual(expected_num_pubs, LiveActionPublisher.publish_state.call_count) - self.assertEqual(expected_num_exec, runner.MockActionRunner.run.call_count) diff --git a/st2common/st2common/constants/policy.py b/st2common/st2common/constants/policy.py new file mode 100644 index 0000000000..b1303ac2af --- /dev/null +++ b/st2common/st2common/constants/policy.py @@ -0,0 +1,25 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [ + 'POLICY_TYPES_REQUIRING_LOCK' +] + +# Concurrency policies require scheduler to acquire a distributed lock to prevent race +# in scheduling when there are multiple scheduler instances. +POLICY_TYPES_REQUIRING_LOCK = [ + 'action.concurrency', + 'action.concurrency.attr' +] diff --git a/st2common/st2common/policies/base.py b/st2common/st2common/policies/base.py index 092d847b31..5528397fbe 100644 --- a/st2common/st2common/policies/base.py +++ b/st2common/st2common/policies/base.py @@ -21,7 +21,6 @@ from st2common import log as logging from st2common.persistence import policy as policy_access -from st2common.services import coordination LOG = logging.getLogger(__name__) @@ -48,10 +47,6 @@ def apply_before(self, target): :rtype: ``object`` """ - # Warn users that the coordination service is not configured - if not coordination.configured(): - LOG.warn('Coordination service is not configured. Policy enforcement is best effort.') - return target def apply_after(self, target): @@ -63,10 +58,6 @@ def apply_after(self, target): :rtype: ``object`` """ - # Warn users that the coordination service is not configured - if not coordination.configured(): - LOG.warn('Coordination service is not configured. Policy enforcement is best effort.') - return target def _get_lock_name(self, values): diff --git a/st2common/st2common/services/policies.py b/st2common/st2common/services/policies.py index cd8ce74280..ccb3274339 100644 --- a/st2common/st2common/services/policies.py +++ b/st2common/st2common/services/policies.py @@ -24,6 +24,20 @@ LOG = logging.getLogger(__name__) +def has_policies(lv_ac_db, policy_types=None): + query_params = { + 'resource_ref': lv_ac_db.action, + 'enabled': True + } + + if policy_types: + query_params['policy_type__in'] = policy_types + + policy_dbs = pc_db_access.Policy.query(**query_params) + + return len(policy_dbs) > 0 + + def apply_pre_run_policies(lv_ac_db): LOG.debug('Applying pre-run policies for liveaction "%s".' % str(lv_ac_db.id)) diff --git a/st2common/tests/unit/services/test_policy.py b/st2common/tests/unit/services/test_policy.py new file mode 100644 index 0000000000..274a4ff6e1 --- /dev/null +++ b/st2common/tests/unit/services/test_policy.py @@ -0,0 +1,106 @@ +# Licensed to the StackStorm, Inc ('StackStorm') under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import st2tests.config as tests_config +tests_config.parse_args() + +import st2common + +from st2common.bootstrap import policiesregistrar as policies_registrar +from st2common.bootstrap import runnersregistrar as runners_registrar +from st2common.constants import action as action_constants +from st2common.constants import policy as policy_constants +from st2common.models.db import action as action_db_models +from st2common.services import action as action_service +from st2common.services import policies as policy_service + +import st2tests +from st2tests import fixturesloader as fixtures + + +PACK = 'generic' + +TEST_FIXTURES = { + 'actions': [ + 'action1.yaml', # wolfpack.action-1 + 'action2.yaml', # wolfpack.action-2 + 'local.yaml' # core.local + ], + 'policies': [ + 'policy_2.yaml', # mock policy on wolfpack.action-1 + 'policy_5.yaml' # concurrency policy on wolfpack.action-2 + ] +} + + +class PolicyServiceTestCase(st2tests.DbTestCase): + + @classmethod + def setUpClass(cls): + super(PolicyServiceTestCase, cls).setUpClass() + + # Register runners + runners_registrar.register_runners() + + # Register common policy types + policies_registrar.register_policy_types(st2common) + + loader = fixtures.FixturesLoader() + loader.save_fixtures_to_db(fixtures_pack=PACK, + fixtures_dict=TEST_FIXTURES) + + def setUp(self): + super(PolicyServiceTestCase, self).setUp() + + params = {'action': 'wolfpack.action-1', 'parameters': {'actionstr': 'foo-last'}} + self.lv_ac_db_1 = action_db_models.LiveActionDB(**params) + self.lv_ac_db_1, _ = action_service.request(self.lv_ac_db_1) + + params = {'action': 'wolfpack.action-2', 'parameters': {'actionstr': 'foo-last'}} + self.lv_ac_db_2 = action_db_models.LiveActionDB(**params) + self.lv_ac_db_2, _ = action_service.request(self.lv_ac_db_2) + + params = {'action': 'core.local', 'parameters': {'cmd': 'date'}} + self.lv_ac_db_3 = action_db_models.LiveActionDB(**params) + self.lv_ac_db_3, _ = action_service.request(self.lv_ac_db_3) + + def tearDown(self): + action_service.update_status(self.lv_ac_db_1, action_constants.LIVEACTION_STATUS_CANCELED) + action_service.update_status(self.lv_ac_db_2, action_constants.LIVEACTION_STATUS_CANCELED) + action_service.update_status(self.lv_ac_db_3, action_constants.LIVEACTION_STATUS_CANCELED) + + def test_action_has_policies(self): + self.assertTrue(policy_service.has_policies(self.lv_ac_db_1)) + + def test_action_does_not_have_policies(self): + self.assertFalse(policy_service.has_policies(self.lv_ac_db_3)) + + def test_action_has_specific_policies(self): + self.assertTrue( + policy_service.has_policies( + self.lv_ac_db_2, + policy_types=policy_constants.POLICY_TYPES_REQUIRING_LOCK + ) + ) + + def test_action_does_not_have_specific_policies(self): + self.assertFalse( + policy_service.has_policies( + self.lv_ac_db_1, + policy_types=policy_constants.POLICY_TYPES_REQUIRING_LOCK + ) + ) From 97fd7d0c3a8ded2d00360e19e96081f96d1a666c Mon Sep 17 00:00:00 2001 From: W Chan Date: Wed, 6 Feb 2019 19:35:01 +0000 Subject: [PATCH 2/4] Use count instead of len when querying if action has policies Use the count method instead of len so the querying is done server side at MongoDB. --- st2common/st2common/services/policies.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/st2common/st2common/services/policies.py b/st2common/st2common/services/policies.py index ccb3274339..94da12444e 100644 --- a/st2common/st2common/services/policies.py +++ b/st2common/st2common/services/policies.py @@ -35,7 +35,7 @@ def has_policies(lv_ac_db, policy_types=None): policy_dbs = pc_db_access.Policy.query(**query_params) - return len(policy_dbs) > 0 + return policy_dbs.count() > 0 def apply_pre_run_policies(lv_ac_db): From 9bc62947aa2e11345d845ff22eecefdf1b2cc385 Mon Sep 17 00:00:00 2001 From: W Chan Date: Wed, 6 Feb 2019 19:39:36 +0000 Subject: [PATCH 3/4] Update changelog entry to include information on fixing the scheduler bug Update the changelog entry to be more descriptive on the fixing of the scheduler race related bug. --- CHANGELOG.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 34f8afc26f..c75117f82e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -31,7 +31,9 @@ Changed level is set to ``DEBUG`` or ``system.debug`` config option is set to ``True``. Reported by Nick Maludy. (improvement) #4538 #4502 -* Moved the lock from concurrency policies into the scheduler. #4481 (bug fix) +* Moved the lock from concurrency policies into the scheduler to fix a race condition when there + are multiple scheduler instances scheduling execution for action with concurrency policies. + #4481 (bug fix) Fixed ~~~~~ From a4f8b443fd39adfd0b949b0f8a7ed828ac8fda71 Mon Sep 17 00:00:00 2001 From: W Chan Date: Wed, 6 Feb 2019 23:20:30 +0000 Subject: [PATCH 4/4] Remove commented out code from coordination service Clean up and remove commented out code from the coordination service. --- st2common/st2common/services/coordination.py | 43 -------------------- 1 file changed, 43 deletions(-) diff --git a/st2common/st2common/services/coordination.py b/st2common/st2common/services/coordination.py index 108dc62ca1..e7dcd41937 100644 --- a/st2common/st2common/services/coordination.py +++ b/st2common/st2common/services/coordination.py @@ -180,46 +180,3 @@ def get_coordinator(): COORDINATOR = coordinator_setup() return COORDINATOR - - -# class LockAcquireError(Exception): -# pass - - -# class lock(object): -# def __init__(self, name, timeout=5000): -# self._name = name -# self._lock = None -# self._timeout = timeout - -# def __call__(self, func): -# @wraps(func) -# def with_lock(*args, **kwds): -# with self: -# return func(*args, **kwds) -# return with_lock - -# def _setup(self): -# if COORDINATOR is None: -# get_coordinator() - -# if not self._lock: -# self._lock = COORDINATOR.get_lock(self._name) - -# if self._timeout <= 0: -# LOG.warning("Failed to secure lock for %s.", self._name) -# raise LockAcquireError("Could not acquire lock for %s" % self._name) - -# def __enter__(self): -# self._setup() - -# LOG.debug("Attempting to secure lock for: %s", self._name) -# if not self._lock.acquire(): -# LOG.info("Could not secure lock for %s. Retrying.", self._name) -# self._timeout -= 1 -# eventlet.sleep(.25) -# self.__enter__() - -# def __exit__(self, *_args, **_kwargs): -# LOG.debug("Releasing lock for: %s", self._name) -# self._lock.release()