From 602bcf4201e551761a77b2bc90b3f0ca3e8b87f4 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Thu, 6 Aug 2020 18:20:31 -0700 Subject: [PATCH 1/3] turn on keep_alive --- .../servicebus/_common/_configuration.py | 1 + .../azure/servicebus/_servicebus_receiver.py | 1 + .../azure/servicebus/_servicebus_sender.py | 1 + .../aio/_servicebus_receiver_async.py | 1 + .../aio/_servicebus_sender_async.py | 1 + .../tests/async_tests/test_queues_async.py | 28 +++++++++++++++++++ .../tests/servicebus_preparer.py | 4 ++- .../azure-servicebus/tests/test_queues.py | 28 +++++++++++++++++++ 8 files changed, 64 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py index e85f8d4325c5..996768b4bcd6 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py @@ -23,3 +23,4 @@ def __init__(self, **kwargs): self.auth_timeout = kwargs.get("auth_timeout", 60) # type: int self.encoding = kwargs.get("encoding", "UTF-8") self.auto_reconnect = kwargs.get("auto_reconnect", True) + self.keep_alive = kwargs.get("keep_alive", 30) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index b498eb4fbfcd..f738ce625c82 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -167,6 +167,7 @@ def _create_handler(self, auth): receive_settle_mode=self._mode.value, send_settle_mode=SenderSettleMode.Settled if self._mode == ReceiveSettleMode.ReceiveAndDelete else None, timeout=self._idle_timeout * 1000 if self._idle_timeout else 0, + keep_alive_interval=self._config.keep_alive, prefetch=self._prefetch ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 841b4eec7a4c..d7b199502c69 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -163,6 +163,7 @@ def _create_handler(self, auth): properties=self._properties, error_policy=self._error_policy, client_name=self._name, + keep_alive_interval=self._config.keep_alive, encoding=self._config.encoding ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 86498c2137d8..399cd8d6dfef 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -161,6 +161,7 @@ def _create_handler(self, auth): receive_settle_mode=self._mode.value, send_settle_mode=SenderSettleMode.Settled if self._mode == ReceiveSettleMode.ReceiveAndDelete else None, timeout=self._idle_timeout * 1000 if self._idle_timeout else 0, + keep_alive_interval=self._config.keep_alive, prefetch=self._prefetch ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index a5176d23d7f6..b82d8d754e51 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -109,6 +109,7 @@ def _create_handler(self, auth): properties=self._properties, error_policy=self._error_policy, client_name=self._name, + keep_alive_interval=self._config.keep_alive, encoding=self._config.encoding ) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index c3b609ad906a..d23e4dc29f40 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1270,3 +1270,31 @@ def message_content(): assert message_1st_received_cnt == 20 and message_2nd_received_cnt == 20 # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration assert receive_counter < 10 # Dynamic link credit issuing come info effect + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True, lock_duration='PT5M') + async def test_queue_receive_keep_conn_alive_async(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name) + + async with sender, receiver: + await sender.send_messages([Message("message1"), Message("message2")]) + + messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) + receiver_handler = receiver._handler + assert len(messages) == 2 + await asyncio.sleep(4 * 60 + 5) # 240s is the service defined connection idle timeout + await messages[0].renew_lock() # check mgmt link operation + await messages[0].complete() + await messages[1].complete() # check receiver link operation + + await asyncio.sleep(60) # sleep another one minute to ensure we pass the lock_duration time + messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) + assert len(messages) == 0 # make sure messages are removed from the queue + assert receiver_handler == receiver._handler # make sure no reconnection happened \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py b/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py index 46a2da594677..021cb36665b0 100644 --- a/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py +++ b/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py @@ -255,6 +255,7 @@ def __init__(self, requires_duplicate_detection=False, dead_lettering_on_message_expiration=False, requires_session=False, + lock_duration='PT30S', parameter_name=SERVICEBUS_QUEUE_PARAM, resource_group_parameter_name=RESOURCE_GROUP_PARAM, servicebus_namespace_parameter_name=SERVICEBUS_NAMESPACE_PARAM, @@ -274,6 +275,7 @@ def __init__(self, self.requires_duplicate_detection=requires_duplicate_detection self.dead_lettering_on_message_expiration=dead_lettering_on_message_expiration self.requires_session=requires_session + self.lock_duration=lock_duration if random_name_enabled: self.resource_moniker = self.name_prefix + "sbqueue" @@ -287,7 +289,7 @@ def create_resource(self, name, **kwargs): namespace.name, name, SBQueue( - lock_duration='PT30S', + lock_duration=self.lock_duration, requires_duplicate_detection = self.requires_duplicate_detection, dead_lettering_on_message_expiration = self.dead_lettering_on_message_expiration, requires_session = self.requires_session) diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index cf610889db57..6d030f4db64a 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1646,3 +1646,31 @@ def message_content(): assert message_1st_received_cnt == 20 and message_2nd_received_cnt == 20 # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration assert receive_counter < 10 # Dynamic link credit issuing come info effect + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True, lock_duration='PT5M') + def test_queue_receive_keep_conn_alive(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name) + + with sender, receiver: + sender.send_messages([Message("message1"), Message("message2")]) + + messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5) + receiver_handler = receiver._handler + assert len(messages) == 2 + time.sleep(4 * 60 + 5) # 240s is the service defined connection idle timeout + messages[0].renew_lock() # check mgmt link operation + messages[0].complete() + messages[1].complete() # check receiver link operation + + time.sleep(60) # sleep another one minute to ensure we pass the lock_duration time + messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5) + assert len(messages) == 0 # make sure messages are removed from the queue + assert receiver_handler == receiver._handler # make sure no reconnection happened From 8e2e7f228c13fac83538d69a0613b1849382c6ad Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Thu, 6 Aug 2020 18:24:20 -0700 Subject: [PATCH 2/3] update changelog --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 5dcd4cc459d2..f27f2a4eb7f4 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -12,6 +12,7 @@ * Added `on_lock_renew_failure` as a parameter to `AutoLockRenew.register`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion). * Added new supported value types int, float, datetime and timedelta for `CorrelationFilter.properties`. * Added new properties `parameters` and `requires_preprocessing` to `SqlRuleFilter` and `SqlRuleAction`. +* Added support for keeping connection alive. **Breaking Changes** From db0133e006dc8379bbf3930d85eeb9572b18b67c Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 11 Aug 2020 14:14:58 -0700 Subject: [PATCH 3/3] add lock_duration in set_cache --- sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py b/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py index 021cb36665b0..14ae04a1b4c3 100644 --- a/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py +++ b/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py @@ -269,7 +269,7 @@ def __init__(self, playback_fake_resource=playback_fake_resource, client_kwargs=client_kwargs) self.parameter_name = parameter_name - self.set_cache(use_cache, requires_duplicate_detection, dead_lettering_on_message_expiration, requires_session) + self.set_cache(use_cache, requires_duplicate_detection, dead_lettering_on_message_expiration, requires_session, lock_duration) # Queue parameters self.requires_duplicate_detection=requires_duplicate_detection