From 41cce7400bc29177780de2f58132136f1ac18d5b Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Sun, 29 Sep 2019 15:40:26 -0700 Subject: [PATCH 1/5] performance improvement --- .../azure/eventhub/aio/consumer_async.py | 3 +++ .../azure-eventhubs/azure/eventhub/common.py | 22 +++++++++---------- .../azure/eventhub/consumer.py | 3 +++ 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index efad6a3cb7db..7b9c7b3eeb58 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -10,6 +10,7 @@ from uamqp import errors, types # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore +import uamqp from azure.eventhub import EventData, EventPosition from azure.eventhub.error import EventHubError, ConnectError, _error_handler @@ -132,6 +133,8 @@ def _create_handler(self): error_policy=self._retry_policy, keep_alive_interval=self._keep_alive, client_name=self._name, + receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete, + auto_complete=False, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access loop=self._loop) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 5923d7f57972..e4d34ec6d3cd 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -118,9 +118,9 @@ def _set_partition_key(self, value): def _from_message(message): event_data = EventData(body='') event_data.message = message - event_data._msg_properties = message.properties # pylint:disable=protected-access - event_data._annotations = message.annotations # pylint:disable=protected-access - event_data._app_properties = message.application_properties # pylint:disable=protected-access + #event_data._msg_properties = message.properties # pylint:disable=protected-access + #event_data._annotations = message.annotations # pylint:disable=protected-access + #event_data._app_properties = message.application_properties # pylint:disable=protected-access return event_data @property @@ -130,7 +130,7 @@ def sequence_number(self): :rtype: int or long """ - return self._annotations.get(EventData.PROP_SEQ_NUMBER, None) + return self.message.annotations.get(EventData.PROP_SEQ_NUMBER, None) @property def offset(self): @@ -140,7 +140,7 @@ def offset(self): :rtype: str """ try: - return self._annotations[EventData.PROP_OFFSET].decode('UTF-8') + return self.message.annotations[EventData.PROP_OFFSET].decode('UTF-8') except (KeyError, AttributeError): return None @@ -151,7 +151,7 @@ def enqueued_time(self): :rtype: datetime.datetime """ - timestamp = self._annotations.get(EventData.PROP_TIMESTAMP, None) + timestamp = self.message.annotations.get(EventData.PROP_TIMESTAMP, None) if timestamp: return datetime.datetime.utcfromtimestamp(float(timestamp)/1000) return None @@ -164,7 +164,7 @@ def device_id(self): :rtype: bytes """ - return self._annotations.get(EventData.PROP_DEVICE_ID, None) + return self.message.annotations.get(EventData.PROP_DEVICE_ID, None) @property def partition_key(self): @@ -174,9 +174,9 @@ def partition_key(self): :rtype: bytes """ try: - return self._annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] + return self.message.annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] except KeyError: - return self._annotations.get(EventData.PROP_PARTITION_KEY, None) + return self.message.annotations.get(EventData.PROP_PARTITION_KEY, None) @property def application_properties(self): @@ -185,7 +185,7 @@ def application_properties(self): :rtype: dict """ - return self._app_properties + return self.message.application_properties @application_properties.setter def application_properties(self, value): @@ -206,7 +206,7 @@ def system_properties(self): :rtype: dict """ - return self._annotations + return self.self.message.annotations @property def body(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 604d9c7d7b82..a3f5c9f1a288 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -11,6 +11,7 @@ from uamqp import types, errors # type: ignore from uamqp import ReceiveClient, Source # type: ignore +import uamqp from azure.eventhub.common import EventData, EventPosition from azure.eventhub.error import _error_handler @@ -128,6 +129,8 @@ def _create_handler(self): error_policy=self._retry_policy, keep_alive_interval=self._keep_alive, client_name=self._name, + receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete, + auto_complete=False, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent)) # pylint:disable=protected-access self._messages_iter = None From 25fa5d7b341fa5cfca3b3d631fb393b758a7fc4e Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Sun, 29 Sep 2019 15:45:15 -0700 Subject: [PATCH 2/5] batch offset --- .../azure-eventhubs/azure/eventhub/aio/consumer_async.py | 3 ++- sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 7b9c7b3eeb58..052a8a05e433 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -180,8 +180,9 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): timeout=remaining_time_ms) for message in message_batch: event_data = EventData._from_message(message) # pylint:disable=protected-access - self._offset = EventPosition(event_data.offset) data_batch.append(event_data) + if data_batch: + self._offset = EventPosition(data_batch[-1].offset) return data_batch async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index a3f5c9f1a288..744456aa1e69 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -174,8 +174,9 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): timeout=remaining_time_ms) for message in message_batch: event_data = EventData._from_message(message) # pylint:disable=protected-access - self._offset = EventPosition(event_data.offset) data_batch.append(event_data) + if data_batch: + self._offset = EventPosition(data_batch[-1].offset) return data_batch def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): From 647b2561318ae127870b0ca503b09b5fbe09a7f9 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 1 Oct 2019 19:51:02 -0700 Subject: [PATCH 3/5] Review feedback --- .../azure/eventhub/aio/consumer_async.py | 4 ++-- .../azure-eventhubs/azure/eventhub/common.py | 23 +++++-------------- .../azure/eventhub/consumer.py | 4 ++-- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index ef17379a0d76..c523d021eb5d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -119,7 +119,7 @@ async def __anext__(self): self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 if self._track_last_enqueued_event_properties: - self._last_enqueued_event_properties = event_data._get_runtime_info() # pylint:disable=protected-access + self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access return event_data except Exception as exception: # pylint:disable=broad-except last_exception = await self._handle_exception(exception) @@ -209,7 +209,7 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): self._offset = EventPosition(data_batch[-1].offset) if self._track_last_enqueued_event_properties and len(data_batch): - self._last_enqueued_event_properties = data_batch[-1]._get_runtime_info() # pylint:disable=protected-access + self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access return data_batch diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 66468c5d24b4..9f51b8a02a80 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -78,7 +78,7 @@ def __init__(self, body=None, to_device=None): self._delivery_annotations = {} self._app_properties = {} self._msg_properties = MessageProperties() - self._runtime_info = {} + self._last_enqueued_event_properties = {} self._need_further_parse = False if to_device: self._msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) @@ -153,12 +153,12 @@ def _trace_link_message(self, parent_span=None): if traceparent: current_span.link(traceparent) - def _get_runtime_info(self): - if self._runtime_info: - return self._runtime_info + def _get_last_enqueued_event_properties(self): + if self._last_enqueued_event_properties: + return self._last_enqueued_event_properties if self.message.delivery_annotations: - self._runtime_info = { + self._last_enqueued_event_properties = { "sequence_number": self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None), "offset": @@ -168,7 +168,7 @@ def _get_runtime_info(self): "retrieval_time": self.message.delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None) } - return self._runtime_info + return self._last_enqueued_event_properties return None @@ -185,17 +185,6 @@ def _parse_message_properties(self): self._annotations = self.message.annotations self._app_properties = self.message.application_properties self._delivery_annotations = self.message.delivery_annotations - if self._delivery_annotations: - self._runtime_info = { - "sequence_number": - self._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None), - "offset": - self._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None), - "enqueued_time": - self._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None), - "retrieval_time": - self._delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None) - } self._need_further_parse = False @property diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 0e942e309c83..185b353772b6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -115,7 +115,7 @@ def __next__(self): self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 if self._track_last_enqueued_event_properties: - self._last_enqueued_event_properties = event_data._get_runtime_info() # pylint:disable=protected-access + self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access return event_data except Exception as exception: # pylint:disable=broad-except last_exception = self._handle_exception(exception) @@ -203,7 +203,7 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): self._offset = EventPosition(data_batch[-1].offset) if self._track_last_enqueued_event_properties and len(data_batch): - self._last_enqueued_event_properties = data_batch[-1]._get_runtime_info() # pylint:disable=protected-access + self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access return data_batch From 64c7152d9ce8066a3bba0a13532287882ab677e5 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Wed, 2 Oct 2019 15:04:01 -0700 Subject: [PATCH 4/5] Remove lazy parse in EventData --- .../azure-eventhubs/azure/eventhub/common.py | 76 ++++--------------- 1 file changed, 15 insertions(+), 61 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 9f51b8a02a80..f29a4c35bbc2 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -58,38 +58,28 @@ class EventData(object): PROP_PARTITION_KEY = b"x-opt-partition-key" PROP_PARTITION_KEY_AMQP_SYMBOL = types.AMQPSymbol(PROP_PARTITION_KEY) PROP_TIMESTAMP = b"x-opt-enqueued-time" - PROP_DEVICE_ID = b"iothub-connection-device-id" PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number" PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset" PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc" PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc" - def __init__(self, body=None, to_device=None): + def __init__(self, body=None): """ Initialize EventData. :param body: The data to send in a single message. :type body: str, bytes or list - :param to_device: An IoT device to route to. - :type to_device: str """ - self._annotations = {} - self._delivery_annotations = {} - self._app_properties = {} - self._msg_properties = MessageProperties() self._last_enqueued_event_properties = {} - self._need_further_parse = False - if to_device: - self._msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) if body and isinstance(body, list): - self.message = Message(body[0], properties=self._msg_properties) + self.message = Message(body[0]) for more in body[1:]: self.message._body.append(more) # pylint: disable=protected-access elif body is None: raise ValueError("EventData cannot be None.") else: - self.message = Message(body, properties=self._msg_properties) + self.message = Message(body) def __str__(self): dic = { @@ -103,8 +93,6 @@ def __str__(self): dic['offset'] = str(self.offset) if self.enqueued_time: dic['enqueued_time'] = str(self.enqueued_time) - if self.device_id: - dic['device_id'] = str(self.device_id) if self.partition_key: dic['partition_key'] = str(self.partition_key) return str(dic) @@ -116,13 +104,12 @@ def _set_partition_key(self, value): :param value: The partition key to set. :type value: str or bytes """ - annotations = dict(self._annotations) + annotations = dict(self.message.annotations) annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] = value header = MessageHeader() header.durable = True self.message.annotations = annotations self.message.header = header - self._annotations = annotations def _trace_message(self, parent_span=None): """Add tracing information to this message. @@ -172,21 +159,13 @@ def _get_last_enqueued_event_properties(self): return None - @staticmethod - def _from_message(message): + @classmethod + def _from_message(cls, message): # pylint:disable=protected-access - event_data = EventData(body='') + event_data = cls(body='') event_data.message = message - event_data._need_further_parse = True return event_data - def _parse_message_properties(self): - self._msg_properties = self.message.properties - self._annotations = self.message.annotations - self._app_properties = self.message.application_properties - self._delivery_annotations = self.message.delivery_annotations - self._need_further_parse = False - @property def sequence_number(self): """ @@ -194,9 +173,7 @@ def sequence_number(self): :rtype: int or long """ - if self._need_further_parse: - self._parse_message_properties() - return self._annotations.get(EventData.PROP_SEQ_NUMBER, None) + return self.message.annotations.get(EventData.PROP_SEQ_NUMBER, None) @property def offset(self): @@ -205,10 +182,8 @@ def offset(self): :rtype: str """ - if self._need_further_parse: - self._parse_message_properties() try: - return self._annotations[EventData.PROP_OFFSET].decode('UTF-8') + return self.message.annotations[EventData.PROP_OFFSET].decode('UTF-8') except (KeyError, AttributeError): return None @@ -219,25 +194,11 @@ def enqueued_time(self): :rtype: datetime.datetime """ - if self._need_further_parse: - self._parse_message_properties() - timestamp = self._annotations.get(EventData.PROP_TIMESTAMP, None) + timestamp = self.message.annotations.get(EventData.PROP_TIMESTAMP, None) if timestamp: return datetime.datetime.utcfromtimestamp(float(timestamp)/1000) return None - @property - def device_id(self): - """ - The device ID of the event data object. This is only used for - IoT Hub implementations. - - :rtype: bytes - """ - if self._need_further_parse: - self._parse_message_properties() - return self._annotations.get(EventData.PROP_DEVICE_ID, None) - @property def partition_key(self): """ @@ -245,12 +206,10 @@ def partition_key(self): :rtype: bytes """ - if self._need_further_parse: - self._parse_message_properties() try: - return self._annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] + return self.message.annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] except KeyError: - return self._annotations.get(EventData.PROP_PARTITION_KEY, None) + return self.message.annotations.get(EventData.PROP_PARTITION_KEY, None) @property def application_properties(self): @@ -259,9 +218,7 @@ def application_properties(self): :rtype: dict """ - if self._need_further_parse: - self._parse_message_properties() - return self._app_properties + return self.message.application_properties @application_properties.setter def application_properties(self, value): @@ -271,8 +228,7 @@ def application_properties(self, value): :param value: The application properties for the EventData. :type value: dict """ - self._app_properties = value - properties = None if value is None else dict(self._app_properties) + properties = None if value is None else dict(value) self.message.application_properties = properties @property @@ -282,9 +238,7 @@ def system_properties(self): :rtype: dict """ - if self._need_further_parse: - self._parse_message_properties() - return self._annotations + return self.message.annotations @property def body(self): From 803bc1eead6e1ec4ec0d215d278660fc0b8a90b0 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Wed, 2 Oct 2019 15:22:02 -0700 Subject: [PATCH 5/5] Add annotation assertions --- .../azure-eventhubs/tests/asynctests/test_receive_async.py | 6 ++++++ sdk/eventhub/azure-eventhubs/tests/test_receive.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py index 03987fbf3b77..440f0d76161e 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py @@ -156,6 +156,12 @@ async def test_receive_batch_async(connstr_senders): received = await receiver.receive(max_batch_size=5, timeout=5) assert len(received) == 5 + for event in received: + assert event.system_properties + assert event.sequence_number is not None + assert event.offset + assert event.enqueued_time + async def pump(receiver, sleep=None): messages = 0 diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_receive.py index eed03ae62bfe..6d8f04ee1086 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_receive.py @@ -214,6 +214,12 @@ def test_receive_batch(connstr_senders): received = receiver.receive(max_batch_size=5, timeout=5) assert len(received) == 5 + for event in received: + assert event.system_properties + assert event.sequence_number is not None + assert event.offset + assert event.enqueued_time + @pytest.mark.liveTest def test_receive_batch_with_app_prop_sync(connstr_senders):