Conversation
|
The function |
annatisch
left a comment
There was a problem hiding this comment.
Just one comment (x2) regarding the assignment statements :)
Otherwise looks good!
uamqp/client.py
Outdated
| """ | ||
| self._message_received_callback = on_message_received | ||
| self._received_messages = compat.queue.Queue() | ||
| self._received_messages = self._received_messages or compat.queue.Queue() |
There was a problem hiding this comment.
Is this assignment statement still necessary? self._received_messages should never be None now right?
There was a problem hiding this comment.
you're right! it's no longer needed.
Removed 😀
uamqp/async_ops/client_async.py
Outdated
| """ | ||
| self._message_received_callback = on_message_received | ||
| self._received_messages = queue.Queue() | ||
| self._received_messages = self._received_messages or compat.queue.Queue() |
There was a problem hiding this comment.
Is this assignment statement still necessary? self._received_messages should never be None now right?
|
|
||
| @pytest.mark.asyncio | ||
| async def test_event_hubs_shared_connection_async(live_eventhub_config): | ||
| pytest.skip("Unstable on OSX and Linux - need to fix") # TODO |
There was a problem hiding this comment.
Awesome that these are now resolved :)
Once this is merged I think we have an issue for these that we can now close?
There was a problem hiding this comment.
yup, we have #83 tracking the issue.
also I find the receive timeout = 1s (too short) in these two tests may also be the reason for the unstability.
I extend the timeout to be 3 seconds and now the tests are more stable(all green!!) now.
There was a problem hiding this comment.
Hello Anna,
One thing I noticed when receiving messages using the receive_message_batch function, is that if the operation is timeout, we would set self._shutdown = True:
azure-uamqp-python/uamqp/client.py
Line 947 in 7f667c1
But we don't close or reset the state of the receiver client.
The result is that I can still call the receive_message_batch but it would simply return nothing, as the do_work function would return immediately after checking that self._shutdown is True.
I think that behavior may seem to be wrong. Should we close the receiver here?
Is it by design or it's actually a bug.
* initial _received_messages object * Use flag to control the streaming behavior * Update comments and code * release_async isn't a sync function * Add missing module * Wait too short cause the receive client to close so that the test would fail * Remove unnecessary queue assignment * remove unused import module * Update timeout as the unit is milliseconds
* Share CBS Session (#88) * initial _received_messages object * Use flag to control the streaming behavior * Update comments and code * release_async isn't a sync function * Add missing module * Wait too short cause the receive client to close so that the test would fail * Remove unnecessary queue assignment * remove unused import module * Update timeout as the unit is milliseconds * Update for 1.2.3 (#91) * review update (#97)
* Runtime metric (#95) * Init commit for desired compatibility * runtime metric init commit * Small fix of runtime receiver metric * Fix pylint error * Update according to review * update link destroy * Remove offered capabilities for now * add sample/test code * Update test * Share CBS Session (#89) * Share CBS Session (#88) * initial _received_messages object * Use flag to control the streaming behavior * Update comments and code * release_async isn't a sync function * Add missing module * Wait too short cause the receive client to close so that the test would fail * Remove unnecessary queue assignment * remove unused import module * Update timeout as the unit is milliseconds * Update for 1.2.3 (#91) * review update (#97) * Service Bus message transfer fixes (#96) * Support message delivery tag * Added headers * Removed null init * Added memory cleanup * Fix build error * Moved delivery tag to message * Cython fixes * Binary type * Attempt to set message tag * Converted to AMQP_VALUE * Syntax fixes * Build error * Renamed value * Get tag type * Fixed name * Extract tag bytes * Some C cleanup * More logging * Updated test * pylint fix * More logging * More logging * More logging * Fixed print formatting * More logging * Syntax error * TLSIO logging * Log socket error * Added sleep * Fixed sleep * Reduced sleep * Another attempt * Ping CI * Attempt to move outgoing flow * Moved send flow frame * Removed debug logging * Update link status * Fix diff * pylint fixes * Py2.7 * Updated status description * Fixed executor * Some review feedback * Performance improvement (#98) * Remove deepcopy and increase buffer size * Move parse into cython * Small fix * lazy parse * small fix * Update name * Update message property * remove unused import * put deepcopy back as it influences the performance little * Add footer setter * Update setters of message * fix bug in batch message (#99) * Bug fix, properties of message can be None type (#100) * on_transfer_received should not fail due to lack of a delivery tag (#101) * on_transfer_received should not fail due to lack of a delivery tag * Test reversing if-statements * Try initialize value * Trying to make Windows happy * Fix proxy test (#104) * Update docs (#103) * Update docs * fix typo * More typo
Initialize the
_received_messageswhen in the__init__function of the AMQP client so that received messages won't be simply dropped during the authentication process of multiple clients.@annatisch
I'm wondering whether the
self._streaming_receiveshould be thread-safe or not.The situation where thread-unsafe may happen to the
_streaming_receivevariable is that both functionsreceive_message_batchandreceive_messagesare called simultaneously.But I don't think our SDK support such kind of operation, do I miss something?