From 31a78cea049f316dcc57cb53db116196f25c8ae6 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Thu, 3 Sep 2020 19:38:53 -0700 Subject: [PATCH 1/3] Expose internal amqp message properties via AMQPMessage wrapper object on Message. Add test, changelog notes and docstring. (Note: Cannot rename old message as uamqp relies on the internal property name. Should likely be adapted.) --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 1 + .../azure/servicebus/_common/message.py | 79 +++++++++++++++++++ .../azure-servicebus/tests/test_queues.py | 39 +++++++++ 3 files changed, 119 insertions(+) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index f1b832deb546..b80ba58c5888 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -4,6 +4,7 @@ **New Features** * Messages can now be sent twice in succession. +* Internal AMQP message properties (header, footer, annotations, properties, etc) are now exposed via `Message.amqp_message` **Breaking Changes** diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 77871922401a..7d9e0e545a7c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -85,6 +85,8 @@ class Message(object): # pylint: disable=too-many-public-methods,too-many-insta :keyword str reply_to_session_id: The session identifier augmenting the `reply_to` address. :keyword str encoding: The encoding for string data. Default is UTF-8. + :ivar AMQPMessage amqp_message: Advanced use only. The internal AMQP message payload that is sent or received. + .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py @@ -105,6 +107,7 @@ def __init__(self, body, **kwargs): self._amqp_header = uamqp.message.MessageHeader() if 'message' in kwargs: + # Note: This cannot be renamed until UAMQP no longer relies on this specific name. self.message = kwargs['message'] self._amqp_properties = self.message.properties self._amqp_header = self.message.header @@ -123,6 +126,8 @@ def __init__(self, body, **kwargs): self.time_to_live = kwargs.pop("time_to_live", None) self.partition_key = kwargs.pop("partition_key", None) self.via_partition_key = kwargs.pop("via_partition_key", None) + # If message is the full message, amqp_message is the "public facing interface" for what we expose. + self.amqp_message = AMQPMessage(self.message) def __str__(self): return str(self.message) @@ -1069,3 +1074,77 @@ def renew_lock(self): expiry = self._receiver._renew_locks(token) # type: ignore self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) + + +class AMQPMessage(object): + """ + The internal AMQP message that this ServiceBusMessage represents. + + :param properties: Properties to add to the message. + :type properties: ~uamqp.message.MessageProperties + :param application_properties: Service specific application properties. + :type application_properties: dict + :param annotations: Service specific message annotations. Keys in the dictionary + must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`. + :type annotations: dict + :param delivery_annotations: Delivery-specific non-standard properties at the head of the message. + Delivery annotations convey information from the sending peer to the receiving peer. + Keys in the dictionary must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`. + :type annotations: dict + :param header: The message header. + :type header: ~uamqp.message.MessageHeader + :param header: The message footer. + :type header: dict + + """ + def __init__(self, message): + # type: (uamqp.Message) -> None + self._message = message + + @property + def properties(self): + return self._message.properties + + @properties.setter + def properties(self, value): + self._message.properties = value + + @property + def application_properties(self): + return self._message.application_properties + + @application_properties.setter + def application_properties(self, value): + self._message.application_properties = value + + @property + def annotations(self): + return self._message.annotations + + @annotations.setter + def annotations(self, value): + self._message.annotations = value + + @property + def delivery_annotations(self): + return self._message.delivery_annotations + + @delivery_annotations.setter + def delivery_annotations(self, value): + self._message.delivery_annotations = value + + @property + def header(self): + return self._message.header + + @header.setter + def header(self, value): + self._message.header = value + + @property + def footer(self): + return self._message.footer + + @footer.setter + def footer(self, value): + self._message.footer = value \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index c17e4e9b00a9..64412ddce7c7 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1857,3 +1857,42 @@ def test_queue_receiver_invalid_mode(self, servicebus_namespace_connection_strin max_wait_time="oij") as receiver: assert receiver + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest') + def test_message_inner_amqp_properties(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + + message = Message("body") + + with pytest.raises(TypeError): + message.amqp_message.properties = {"properties":1} + message.amqp_message.properties.subject = "subject" + + message.amqp_message.application_properties = {b"application_properties":1} + + message.amqp_message.annotations = {b"annotations":2} + message.amqp_message.delivery_annotations = {b"delivery_annotations":3} + + with pytest.raises(TypeError): + message.amqp_message.header = {"header":4} + message.amqp_message.header.priority = 5 + + message.amqp_message.footer = {b"footer":6} + + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + + with sb_client.get_queue_sender(servicebus_queue.name) as sender: + sender.send_messages(message) + with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: + message = receiver.receive_messages()[0] + assert message.amqp_message.properties.subject == b"subject" + assert message.amqp_message.application_properties[b"application_properties"] == 1 + assert message.amqp_message.annotations[b"annotations"] == 2 + # delivery_annotations and footer disabled pending uamqp bug https://github.com/Azure/azure-uamqp-python/issues/169 + #assert message.amqp_message.delivery_annotations[b"delivery_annotations"] == 3 + assert message.amqp_message.header.priority == 5 + #assert message.amqp_message.footer[b"footer"] == 6 \ No newline at end of file From 2775064c7b8ac93e9132f272de35e680df249c88 Mon Sep 17 00:00:00 2001 From: KieranBrantnerMagee Date: Thu, 3 Sep 2020 23:04:54 -0700 Subject: [PATCH 2/3] Update message.py Add missing final newline for pylint --- .../azure-servicebus/azure/servicebus/_common/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 7d9e0e545a7c..f24d036f2497 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -1147,4 +1147,4 @@ def footer(self): @footer.setter def footer(self, value): - self._message.footer = value \ No newline at end of file + self._message.footer = value From 34c236c2a07e863be44b2e6911d18ab793e89fcd Mon Sep 17 00:00:00 2001 From: KieranBrantnerMagee Date: Fri, 4 Sep 2020 15:56:46 -0700 Subject: [PATCH 3/3] Apply suggestions from code review Fix annotation and footer docstring type typos Co-authored-by: Adam Ling (MSFT) --- .../azure-servicebus/azure/servicebus/_common/message.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index f24d036f2497..050828f2bb06 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -1090,11 +1090,11 @@ class AMQPMessage(object): :param delivery_annotations: Delivery-specific non-standard properties at the head of the message. Delivery annotations convey information from the sending peer to the receiving peer. Keys in the dictionary must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`. - :type annotations: dict + :type delivery_annotations: dict :param header: The message header. :type header: ~uamqp.message.MessageHeader - :param header: The message footer. - :type header: dict + :param footer: The message footer. + :type footer: dict """ def __init__(self, message):