From 020e02e0021cba0331e572a61a3c8fc9fe26ac02 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 23 Aug 2021 10:40:45 +0530 Subject: [PATCH 1/7] Consumer shutdown. --- st2common/st2common/transport/consumers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/st2common/st2common/transport/consumers.py b/st2common/st2common/transport/consumers.py index dd2f47cb55..0bfe17b32f 100644 --- a/st2common/st2common/transport/consumers.py +++ b/st2common/st2common/transport/consumers.py @@ -156,7 +156,7 @@ def process(self, body, message): def shutdown(self): self._workflows_dispatcher.shutdown() self._actions_dispatcher.shutdown() - + self.should_stop = True class VariableMessageQueueConsumer(QueueConsumer): """ From 4c47d084f4f8d402a736a92a187a0724232a2a7d Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 23 Aug 2021 14:54:50 +0530 Subject: [PATCH 2/7] Black reformat --- st2common/st2common/transport/consumers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/st2common/st2common/transport/consumers.py b/st2common/st2common/transport/consumers.py index 0bfe17b32f..47752f035f 100644 --- a/st2common/st2common/transport/consumers.py +++ b/st2common/st2common/transport/consumers.py @@ -158,6 +158,7 @@ def shutdown(self): self._actions_dispatcher.shutdown() self.should_stop = True + class VariableMessageQueueConsumer(QueueConsumer): """ Used by ``VariableMessageHandler`` to processes multiple message types. From 62f85db75932a058814de85f99bc37ed835eb2fc Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 25 Aug 2021 09:23:21 +0530 Subject: [PATCH 3/7] Updated changelog --- CHANGELOG.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 877f8d40fa..c59a78eeb1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -33,6 +33,10 @@ Changed Contributed by @lukepatrick +* Actionrunner worker shutdown should stop Kombu consumer thread. #5338 + + Contributed by @khushboobhatia01 + 3.5.0 - June 23, 2021 --------------------- From fd75dfef2b54a1bf49e3de5e4bab0d7547ca17d8 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 25 Aug 2021 09:27:35 +0530 Subject: [PATCH 4/7] Updated changelog --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c59a78eeb1..f3f3d768e3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -35,7 +35,7 @@ Changed * Actionrunner worker shutdown should stop Kombu consumer thread. #5338 - Contributed by @khushboobhatia01 + Contributed by @khushboobhatia01 3.5.0 - June 23, 2021 --------------------- From 400f5c3bdc2aef90e957107fe1b1432e808301b4 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sun, 5 Sep 2021 16:28:24 +0530 Subject: [PATCH 5/7] Test case added --- .../tests/unit/test_action_runner_worker.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/st2actions/tests/unit/test_action_runner_worker.py b/st2actions/tests/unit/test_action_runner_worker.py index 1d0c7bbbd0..ff7a1c1630 100644 --- a/st2actions/tests/unit/test_action_runner_worker.py +++ b/st2actions/tests/unit/test_action_runner_worker.py @@ -14,10 +14,17 @@ # limitations under the License. from __future__ import absolute_import +import random +import eventlet + +from kombu import Exchange +from kombu import Queue from unittest2 import TestCase from mock import Mock from st2common.transport.consumers import ActionsQueueConsumer +from st2common.transport.publishers import PoolPublisher +from st2common.transport import utils as transport_utils from st2common.models.db.liveaction import LiveActionDB from st2tests import config as test_config @@ -28,6 +35,9 @@ class ActionsQueueConsumerTestCase(TestCase): + message_count = 0 + message_type = LiveActionDB + def test_process_right_dispatcher_is_used(self): handler = Mock() handler.message_type = LiveActionDB @@ -58,3 +68,38 @@ def test_process_right_dispatcher_is_used(self): self.assertEqual(consumer._workflows_dispatcher.dispatch.call_count, 1) self.assertEqual(consumer._actions_dispatcher.dispatch.call_count, 0) + + def test_stop_consumption_on_shutdown(self): + exchange = Exchange("st2.execution.test", type="topic") + queue_name = "test-" + str(random.randint(1, 10000)) + queue = Queue( + name=queue_name, exchange=exchange, routing_key="#", auto_delete=True + ) + publisher = PoolPublisher() + with transport_utils.get_connection() as connection: + connection.connect() + watcher = ActionsQueueConsumer( + connection=connection, queues=queue, handler=self + ) + watcher_thread = eventlet.greenthread.spawn(watcher.run) + + # Give it some time to start up since we are publishing on a new queue + eventlet.sleep(0.5) + body = LiveActionDB( + status="scheduled", action="core.local", action_is_workflow=False + ) + publisher.publish(payload=body, exchange=exchange) + eventlet.sleep(0.2) + self.assertEqual(self.message_count, 1) + body = LiveActionDB( + status="scheduled", action="core.local", action_is_workflow=True + ) + watcher.shutdown() + eventlet.sleep(1) + publisher.publish(payload=body, exchange=exchange) + # Second published message won't be consumed. + self.assertEqual(self.message_count, 1) + watcher_thread.kill() + + def process(self, liveaction): + self.message_count = self.message_count + 1 From c391bdd8764295a69e80c3e697bef5f5b6fbdd75 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 6 Sep 2021 09:55:32 +0530 Subject: [PATCH 6/7] Move test case to new file --- .../test_actions_queue_consumer.py | 69 +++++++++++++++++++ .../tests/unit/test_action_runner_worker.py | 44 ------------ 2 files changed, 69 insertions(+), 44 deletions(-) create mode 100644 st2actions/tests/integration/test_actions_queue_consumer.py diff --git a/st2actions/tests/integration/test_actions_queue_consumer.py b/st2actions/tests/integration/test_actions_queue_consumer.py new file mode 100644 index 0000000000..e4b57698de --- /dev/null +++ b/st2actions/tests/integration/test_actions_queue_consumer.py @@ -0,0 +1,69 @@ +# Copyright 2021 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import random +import eventlet + +from kombu import Exchange +from kombu import Queue +from unittest2 import TestCase + +from st2common.transport.consumers import ActionsQueueConsumer +from st2common.transport.publishers import PoolPublisher +from st2common.transport import utils as transport_utils +from st2common.models.db.liveaction import LiveActionDB + +__all__ = ["ActionsQueueConsumerTestCase"] + + +class ActionsQueueConsumerTestCase(TestCase): + message_count = 0 + message_type = LiveActionDB + + def test_stop_consumption_on_shutdown(self): + exchange = Exchange("st2.execution.test", type="topic") + queue_name = "test-" + str(random.randint(1, 10000)) + queue = Queue( + name=queue_name, exchange=exchange, routing_key="#", auto_delete=True + ) + publisher = PoolPublisher() + with transport_utils.get_connection() as connection: + connection.connect() + watcher = ActionsQueueConsumer( + connection=connection, queues=queue, handler=self + ) + watcher_thread = eventlet.greenthread.spawn(watcher.run) + + # Give it some time to start up since we are publishing on a new queue + eventlet.sleep(0.5) + body = LiveActionDB( + status="scheduled", action="core.local", action_is_workflow=False + ) + publisher.publish(payload=body, exchange=exchange) + eventlet.sleep(0.2) + self.assertEqual(self.message_count, 1) + body = LiveActionDB( + status="scheduled", action="core.local", action_is_workflow=True + ) + watcher.shutdown() + eventlet.sleep(1) + publisher.publish(payload=body, exchange=exchange) + # Second published message won't be consumed. + self.assertEqual(self.message_count, 1) + watcher_thread.kill() + + def process(self, liveaction): + self.message_count = self.message_count + 1 diff --git a/st2actions/tests/unit/test_action_runner_worker.py b/st2actions/tests/unit/test_action_runner_worker.py index ff7a1c1630..8cfe84a6cb 100644 --- a/st2actions/tests/unit/test_action_runner_worker.py +++ b/st2actions/tests/unit/test_action_runner_worker.py @@ -14,17 +14,11 @@ # limitations under the License. from __future__ import absolute_import -import random -import eventlet -from kombu import Exchange -from kombu import Queue from unittest2 import TestCase from mock import Mock from st2common.transport.consumers import ActionsQueueConsumer -from st2common.transport.publishers import PoolPublisher -from st2common.transport import utils as transport_utils from st2common.models.db.liveaction import LiveActionDB from st2tests import config as test_config @@ -35,9 +29,6 @@ class ActionsQueueConsumerTestCase(TestCase): - message_count = 0 - message_type = LiveActionDB - def test_process_right_dispatcher_is_used(self): handler = Mock() handler.message_type = LiveActionDB @@ -68,38 +59,3 @@ def test_process_right_dispatcher_is_used(self): self.assertEqual(consumer._workflows_dispatcher.dispatch.call_count, 1) self.assertEqual(consumer._actions_dispatcher.dispatch.call_count, 0) - - def test_stop_consumption_on_shutdown(self): - exchange = Exchange("st2.execution.test", type="topic") - queue_name = "test-" + str(random.randint(1, 10000)) - queue = Queue( - name=queue_name, exchange=exchange, routing_key="#", auto_delete=True - ) - publisher = PoolPublisher() - with transport_utils.get_connection() as connection: - connection.connect() - watcher = ActionsQueueConsumer( - connection=connection, queues=queue, handler=self - ) - watcher_thread = eventlet.greenthread.spawn(watcher.run) - - # Give it some time to start up since we are publishing on a new queue - eventlet.sleep(0.5) - body = LiveActionDB( - status="scheduled", action="core.local", action_is_workflow=False - ) - publisher.publish(payload=body, exchange=exchange) - eventlet.sleep(0.2) - self.assertEqual(self.message_count, 1) - body = LiveActionDB( - status="scheduled", action="core.local", action_is_workflow=True - ) - watcher.shutdown() - eventlet.sleep(1) - publisher.publish(payload=body, exchange=exchange) - # Second published message won't be consumed. - self.assertEqual(self.message_count, 1) - watcher_thread.kill() - - def process(self, liveaction): - self.message_count = self.message_count + 1 From abd3217ad24cf6850b8f93436547426ac0bebc8c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 6 Sep 2021 09:58:08 +0530 Subject: [PATCH 7/7] Black reformat --- st2actions/tests/unit/test_action_runner_worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/st2actions/tests/unit/test_action_runner_worker.py b/st2actions/tests/unit/test_action_runner_worker.py index 8cfe84a6cb..1d0c7bbbd0 100644 --- a/st2actions/tests/unit/test_action_runner_worker.py +++ b/st2actions/tests/unit/test_action_runner_worker.py @@ -14,7 +14,6 @@ # limitations under the License. from __future__ import absolute_import - from unittest2 import TestCase from mock import Mock