diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py index e85f8d4325c5..996768b4bcd6 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py @@ -23,3 +23,4 @@ def __init__(self, **kwargs): self.auth_timeout = kwargs.get("auth_timeout", 60) # type: int self.encoding = kwargs.get("encoding", "UTF-8") self.auto_reconnect = kwargs.get("auto_reconnect", True) + self.keep_alive = kwargs.get("keep_alive", 30) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index b5d8f3e2fa35..7a322e6b76a0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -188,6 +188,7 @@ def _create_handler(self, auth): send_settle_mode=SenderSettleMode.Settled if self._mode == ReceiveSettleMode.ReceiveAndDelete else None, timeout=self._max_wait_time * 1000 if self._max_wait_time else 0, prefetch=self._prefetch, + keep_alive_interval=self._config.keep_alive, shutdown_after_timeout=False ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 841b4eec7a4c..d7b199502c69 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -163,6 +163,7 @@ def _create_handler(self, auth): properties=self._properties, error_policy=self._error_policy, client_name=self._name, + keep_alive_interval=self._config.keep_alive, encoding=self._config.encoding ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index aa8c94b8e2f2..571bcdc1610b 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -188,6 +188,7 @@ def _create_handler(self, auth): send_settle_mode=SenderSettleMode.Settled if self._mode == ReceiveSettleMode.ReceiveAndDelete else None, timeout=self._max_wait_time * 1000 if self._max_wait_time else 0, prefetch=self._prefetch, + keep_alive_interval=self._config.keep_alive, shutdown_after_timeout=False ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index a5176d23d7f6..b82d8d754e51 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -109,6 +109,7 @@ def _create_handler(self, auth): properties=self._properties, error_policy=self._error_policy, client_name=self._name, + keep_alive_interval=self._config.keep_alive, encoding=self._config.encoding ) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 335b854d934a..785e12d60418 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1271,7 +1271,6 @@ def message_content(): # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration assert receive_counter < 10 # Dynamic link credit issuing come info effect - @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -1323,6 +1322,34 @@ async def test_async_queue_receiver_alive_after_timeout(self, servicebus_namespa messages = await receiver.receive_messages() assert not messages + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True, lock_duration='PT5M') + async def test_queue_receive_keep_conn_alive_async(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name) + + async with sender, receiver: + await sender.send_messages([Message("message1"), Message("message2")]) + + messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) + receiver_handler = receiver._handler + assert len(messages) == 2 + await asyncio.sleep(4 * 60 + 5) # 240s is the service defined connection idle timeout + await messages[0].renew_lock() # check mgmt link operation + await messages[0].complete() + await messages[1].complete() # check receiver link operation + + await asyncio.sleep(60) # sleep another one minute to ensure we pass the lock_duration time + messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) + assert len(messages) == 0 # make sure messages are removed from the queue + assert receiver_handler == receiver._handler # make sure no reconnection happened + @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') diff --git a/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py b/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py index 46a2da594677..14ae04a1b4c3 100644 --- a/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py +++ b/sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py @@ -255,6 +255,7 @@ def __init__(self, requires_duplicate_detection=False, dead_lettering_on_message_expiration=False, requires_session=False, + lock_duration='PT30S', parameter_name=SERVICEBUS_QUEUE_PARAM, resource_group_parameter_name=RESOURCE_GROUP_PARAM, servicebus_namespace_parameter_name=SERVICEBUS_NAMESPACE_PARAM, @@ -268,12 +269,13 @@ def __init__(self, playback_fake_resource=playback_fake_resource, client_kwargs=client_kwargs) self.parameter_name = parameter_name - self.set_cache(use_cache, requires_duplicate_detection, dead_lettering_on_message_expiration, requires_session) + self.set_cache(use_cache, requires_duplicate_detection, dead_lettering_on_message_expiration, requires_session, lock_duration) # Queue parameters self.requires_duplicate_detection=requires_duplicate_detection self.dead_lettering_on_message_expiration=dead_lettering_on_message_expiration self.requires_session=requires_session + self.lock_duration=lock_duration if random_name_enabled: self.resource_moniker = self.name_prefix + "sbqueue" @@ -287,7 +289,7 @@ def create_resource(self, name, **kwargs): namespace.name, name, SBQueue( - lock_duration='PT30S', + lock_duration=self.lock_duration, requires_duplicate_detection = self.requires_duplicate_detection, dead_lettering_on_message_expiration = self.dead_lettering_on_message_expiration, requires_session = self.requires_session) diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index bbe1eb04a6b1..5d1b27892554 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1649,7 +1649,6 @@ def message_content(): # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration assert receive_counter < 10 # Dynamic link credit issuing come info effect - @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -1701,7 +1700,33 @@ def test_queue_receiver_alive_after_timeout(self, servicebus_namespace_connectio messages = receiver.receive_messages() assert not messages + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True, lock_duration='PT5M') + def test_queue_receive_keep_conn_alive(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name) + + with sender, receiver: + sender.send_messages([Message("message1"), Message("message2")]) + + messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5) + receiver_handler = receiver._handler + assert len(messages) == 2 + time.sleep(4 * 60 + 5) # 240s is the service defined connection idle timeout + messages[0].renew_lock() # check mgmt link operation + messages[0].complete() + messages[1].complete() # check receiver link operation + time.sleep(60) # sleep another one minute to ensure we pass the lock_duration time + messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5) + assert len(messages) == 0 # make sure messages are removed from the queue + assert receiver_handler == receiver._handler # make sure no reconnection happened @pytest.mark.liveTest @pytest.mark.live_test_only