diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 9d576ac378df..ac3fa9d3a102 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -11,6 +11,7 @@ import uamqp # type: ignore from uamqp import errors, types, utils # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore +import uamqp from azure.eventhub import EventData, EventPosition from azure.eventhub.error import EventHubError, ConnectError, _error_handler @@ -116,7 +117,7 @@ async def __anext__(self): self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 if self._track_last_enqueued_event_properties: - self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access + self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access return event_data except Exception as exception: # pylint:disable=broad-except last_exception = await self._handle_exception(exception) @@ -149,6 +150,8 @@ def _create_handler(self): error_policy=self._retry_policy, keep_alive_interval=self._keep_alive, client_name=self._name, + receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete, + auto_complete=False, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access **desired_capabilities, # pylint:disable=protected-access @@ -176,12 +179,14 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): timeout=remaining_time_ms) for message in message_batch: event_data = EventData._from_message(message) # pylint:disable=protected-access - self._offset = EventPosition(event_data.offset) data_batch.append(event_data) event_data._trace_link_message() # pylint:disable=protected-access + if data_batch: + self._offset = EventPosition(data_batch[-1].offset) + if self._track_last_enqueued_event_properties and len(data_batch): - self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access + self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access return data_batch diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 5ebb5a430b23..f29a4c35bbc2 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -71,19 +71,15 @@ def __init__(self, body=None): :type body: str, bytes or list """ - self._annotations = {} - self._delivery_annotations = {} - self._app_properties = {} - self._msg_properties = MessageProperties() - self._runtime_info = {} + self._last_enqueued_event_properties = {} if body and isinstance(body, list): - self.message = Message(body[0], properties=self._msg_properties) + self.message = Message(body[0]) for more in body[1:]: self.message._body.append(more) # pylint: disable=protected-access elif body is None: raise ValueError("EventData cannot be None.") else: - self.message = Message(body, properties=self._msg_properties) + self.message = Message(body) def __str__(self): dic = { @@ -108,13 +104,12 @@ def _set_partition_key(self, value): :param value: The partition key to set. :type value: str or bytes """ - annotations = dict(self._annotations) + annotations = dict(self.message.annotations) annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] = value header = MessageHeader() header.durable = True self.message.annotations = annotations self.message.header = header - self._annotations = annotations def _trace_message(self, parent_span=None): """Add tracing information to this message. @@ -145,25 +140,30 @@ def _trace_link_message(self, parent_span=None): if traceparent: current_span.link(traceparent) - @staticmethod - def _from_message(message): - event_data = EventData(body='') - event_data.message = message - event_data._msg_properties = message.properties - event_data._annotations = message.annotations - event_data._app_properties = message.application_properties - event_data._delivery_annotations = message.delivery_annotations - if event_data._delivery_annotations: - event_data._runtime_info = { + def _get_last_enqueued_event_properties(self): + if self._last_enqueued_event_properties: + return self._last_enqueued_event_properties + + if self.message.delivery_annotations: + self._last_enqueued_event_properties = { "sequence_number": - event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None), + self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None), "offset": - event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None), + self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None), "enqueued_time": - event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None), + self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None), "retrieval_time": - event_data._delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None) + self.message.delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None) } + return self._last_enqueued_event_properties + + return None + + @classmethod + def _from_message(cls, message): + # pylint:disable=protected-access + event_data = cls(body='') + event_data.message = message return event_data @property @@ -173,7 +173,7 @@ def sequence_number(self): :rtype: int or long """ - return self._annotations.get(EventData.PROP_SEQ_NUMBER, None) + return self.message.annotations.get(EventData.PROP_SEQ_NUMBER, None) @property def offset(self): @@ -183,7 +183,7 @@ def offset(self): :rtype: str """ try: - return self._annotations[EventData.PROP_OFFSET].decode('UTF-8') + return self.message.annotations[EventData.PROP_OFFSET].decode('UTF-8') except (KeyError, AttributeError): return None @@ -194,7 +194,7 @@ def enqueued_time(self): :rtype: datetime.datetime """ - timestamp = self._annotations.get(EventData.PROP_TIMESTAMP, None) + timestamp = self.message.annotations.get(EventData.PROP_TIMESTAMP, None) if timestamp: return datetime.datetime.utcfromtimestamp(float(timestamp)/1000) return None @@ -207,9 +207,9 @@ def partition_key(self): :rtype: bytes """ try: - return self._annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] + return self.message.annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] except KeyError: - return self._annotations.get(EventData.PROP_PARTITION_KEY, None) + return self.message.annotations.get(EventData.PROP_PARTITION_KEY, None) @property def application_properties(self): @@ -218,7 +218,7 @@ def application_properties(self): :rtype: dict """ - return self._app_properties + return self.message.application_properties @application_properties.setter def application_properties(self, value): @@ -228,8 +228,7 @@ def application_properties(self, value): :param value: The application properties for the EventData. :type value: dict """ - self._app_properties = value - properties = None if value is None else dict(self._app_properties) + properties = None if value is None else dict(value) self.message.application_properties = properties @property @@ -239,7 +238,7 @@ def system_properties(self): :rtype: dict """ - return self._annotations + return self.message.annotations @property def body(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 3f4e11d982c9..196531511f0d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -12,6 +12,7 @@ import uamqp # type: ignore from uamqp import types, errors, utils # type: ignore from uamqp import ReceiveClient, Source # type: ignore +import uamqp from azure.eventhub.common import EventData, EventPosition from azure.eventhub.error import _error_handler @@ -113,7 +114,7 @@ def __next__(self): self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 if self._track_last_enqueued_event_properties: - self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access + self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access return event_data except Exception as exception: # pylint:disable=broad-except last_exception = self._handle_exception(exception) @@ -146,6 +147,8 @@ def _create_handler(self): error_policy=self._retry_policy, keep_alive_interval=self._keep_alive, client_name=self._name, + receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete, + auto_complete=False, properties=self._client._create_properties( # pylint:disable=protected-access self._client._config.user_agent), # pylint:disable=protected-access **desired_capabilities) # pylint:disable=protected-access @@ -171,12 +174,14 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): timeout=remaining_time_ms) for message in message_batch: event_data = EventData._from_message(message) # pylint:disable=protected-access - self._offset = EventPosition(event_data.offset) data_batch.append(event_data) event_data._trace_link_message() # pylint:disable=protected-access + if data_batch: + self._offset = EventPosition(data_batch[-1].offset) + if self._track_last_enqueued_event_properties and len(data_batch): - self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access + self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access return data_batch diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py index 03987fbf3b77..440f0d76161e 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py @@ -156,6 +156,12 @@ async def test_receive_batch_async(connstr_senders): received = await receiver.receive(max_batch_size=5, timeout=5) assert len(received) == 5 + for event in received: + assert event.system_properties + assert event.sequence_number is not None + assert event.offset + assert event.enqueued_time + async def pump(receiver, sleep=None): messages = 0 diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_receive.py index eed03ae62bfe..6d8f04ee1086 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_receive.py @@ -214,6 +214,12 @@ def test_receive_batch(connstr_senders): received = receiver.receive(max_batch_size=5, timeout=5) assert len(received) == 5 + for event in received: + assert event.system_properties + assert event.sequence_number is not None + assert event.offset + assert event.enqueued_time + @pytest.mark.liveTest def test_receive_batch_with_app_prop_sync(connstr_senders):