Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ def __init__(self, **kwargs):
self.auth_timeout = kwargs.get("auth_timeout", 60) # type: int
self.encoding = kwargs.get("encoding", "UTF-8")
self.auto_reconnect = kwargs.get("auto_reconnect", True)
self.keep_alive = kwargs.get("keep_alive", 30)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume 30 aligns with cross-language consistency? (just double-checking)

Copy link
Contributor Author

@yunhaoling yunhaoling Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 30 seconds default value is used in eventhub track1.

The working mechanism in uamqp is "every 30 seconds, call connection.do_work(), connection.do_work() would check whether it has passed 0.5*remote idle timeout (240s), if passed, send a empty frame out"

.Net is sending out empty frame like every ~ 50s seconds.
JS is sending out empty frame every 0.5 *remote idle timeout.

I think 30 seconds is a reasonable interval in our case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how we tell the users to configure this value. 240s is the hard expiry time. So is 220s always better than 200s to keep the connection alive because it does the work with less traffic? If so, a bool value is better than a number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YijunXieMS , right now we don't expose this parameter to users to keep consistent with other languages -- JS and .Net don't allow users to set/tweak this interval, it is turned on by default in their SDK.

If there're customer needs to configure the value/turn on the switch, this could be a post ga feature.

Copy link
Member

@KieranBrantnerMagee KieranBrantnerMagee Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This echoes a question I posed for autolockrenew, and I don't recall if we settled on an answer: Should we disallow setting a keepalive > 240s? Or caveat emptor? I might classify it a "semantic error"; if someone wants to disable keepalive they should show intent and pass None. (Context: Have had at least one user who adjusted a value such as this and unintentionally ran into lock expiry as a result)

Should be precise that this is likely a consideration for whenever we would expose this setting and thus lock it for backcompat, but is something to be mentioned/kept in mind in case there were strong feelings. (I've added it to our discussion list)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as person discussion OOB:

The proposal of flag is preferred as it's simple, users don't need to care about the value.
But we will leave it untouched (turned on by default) until there're customer requests for turning it off.

Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def _create_handler(self, auth):
send_settle_mode=SenderSettleMode.Settled if self._mode == ReceiveSettleMode.ReceiveAndDelete else None,
timeout=self._max_wait_time * 1000 if self._max_wait_time else 0,
prefetch=self._prefetch,
keep_alive_interval=self._config.keep_alive,
shutdown_after_timeout=False
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def _create_handler(self, auth):
properties=self._properties,
error_policy=self._error_policy,
client_name=self._name,
keep_alive_interval=self._config.keep_alive,
encoding=self._config.encoding
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def _create_handler(self, auth):
send_settle_mode=SenderSettleMode.Settled if self._mode == ReceiveSettleMode.ReceiveAndDelete else None,
timeout=self._max_wait_time * 1000 if self._max_wait_time else 0,
prefetch=self._prefetch,
keep_alive_interval=self._config.keep_alive,
shutdown_after_timeout=False
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def _create_handler(self, auth):
properties=self._properties,
error_policy=self._error_policy,
client_name=self._name,
keep_alive_interval=self._config.keep_alive,
encoding=self._config.encoding
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,6 @@ def message_content():
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect


@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
Expand Down Expand Up @@ -1323,6 +1322,34 @@ async def test_async_queue_receiver_alive_after_timeout(self, servicebus_namespa
messages = await receiver.receive_messages()
assert not messages

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True, lock_duration='PT5M')
async def test_queue_receive_keep_conn_alive_async(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)

async with sender, receiver:
await sender.send_messages([Message("message1"), Message("message2")])

messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
receiver_handler = receiver._handler
assert len(messages) == 2
await asyncio.sleep(4 * 60 + 5) # 240s is the service defined connection idle timeout
await messages[0].renew_lock() # check mgmt link operation
await messages[0].complete()
await messages[1].complete() # check receiver link operation

await asyncio.sleep(60) # sleep another one minute to ensure we pass the lock_duration time
messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
assert len(messages) == 0 # make sure messages are removed from the queue
assert receiver_handler == receiver._handler # make sure no reconnection happened

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
Expand Down
6 changes: 4 additions & 2 deletions sdk/servicebus/azure-servicebus/tests/servicebus_preparer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def __init__(self,
requires_duplicate_detection=False,
dead_lettering_on_message_expiration=False,
requires_session=False,
lock_duration='PT30S',
parameter_name=SERVICEBUS_QUEUE_PARAM,
resource_group_parameter_name=RESOURCE_GROUP_PARAM,
servicebus_namespace_parameter_name=SERVICEBUS_NAMESPACE_PARAM,
Expand All @@ -268,12 +269,13 @@ def __init__(self,
playback_fake_resource=playback_fake_resource,
client_kwargs=client_kwargs)
self.parameter_name = parameter_name
self.set_cache(use_cache, requires_duplicate_detection, dead_lettering_on_message_expiration, requires_session)
self.set_cache(use_cache, requires_duplicate_detection, dead_lettering_on_message_expiration, requires_session, lock_duration)

# Queue parameters
self.requires_duplicate_detection=requires_duplicate_detection
self.dead_lettering_on_message_expiration=dead_lettering_on_message_expiration
self.requires_session=requires_session
self.lock_duration=lock_duration
if random_name_enabled:
self.resource_moniker = self.name_prefix + "sbqueue"

Expand All @@ -287,7 +289,7 @@ def create_resource(self, name, **kwargs):
namespace.name,
name,
SBQueue(
lock_duration='PT30S',
lock_duration=self.lock_duration,
requires_duplicate_detection = self.requires_duplicate_detection,
dead_lettering_on_message_expiration = self.dead_lettering_on_message_expiration,
requires_session = self.requires_session)
Expand Down
27 changes: 26 additions & 1 deletion sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -1649,7 +1649,6 @@ def message_content():
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect


@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
Expand Down Expand Up @@ -1701,7 +1700,33 @@ def test_queue_receiver_alive_after_timeout(self, servicebus_namespace_connectio
messages = receiver.receive_messages()
assert not messages

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True, lock_duration='PT5M')
def test_queue_receive_keep_conn_alive(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)

with sender, receiver:
sender.send_messages([Message("message1"), Message("message2")])

messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5)
receiver_handler = receiver._handler
assert len(messages) == 2
time.sleep(4 * 60 + 5) # 240s is the service defined connection idle timeout
messages[0].renew_lock() # check mgmt link operation
messages[0].complete()
messages[1].complete() # check receiver link operation

time.sleep(60) # sleep another one minute to ensure we pass the lock_duration time
messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5)
assert len(messages) == 0 # make sure messages are removed from the queue
assert receiver_handler == receiver._handler # make sure no reconnection happened

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down