Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import uamqp # type: ignore
from uamqp import errors, types, utils # type: ignore
from uamqp import ReceiveClientAsync, Source # type: ignore
import uamqp

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
Expand Down Expand Up @@ -116,7 +117,7 @@ async def __anext__(self):
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
if self._track_last_enqueued_event_properties:
self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access
return event_data
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
Expand Down Expand Up @@ -149,6 +150,8 @@ def _create_handler(self):
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
auto_complete=False,
properties=self._client._create_properties( # pylint:disable=protected-access
self._client._config.user_agent), # pylint:disable=protected-access
**desired_capabilities, # pylint:disable=protected-access
Expand Down Expand Up @@ -176,12 +179,14 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

if data_batch:
self._offset = EventPosition(data_batch[-1].offset)

if self._track_last_enqueued_event_properties and len(data_batch):
self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access

return data_batch

Expand Down
63 changes: 31 additions & 32 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,15 @@ def __init__(self, body=None):
:type body: str, bytes or list
"""

self._annotations = {}
self._delivery_annotations = {}
self._app_properties = {}
self._msg_properties = MessageProperties()
self._runtime_info = {}
self._last_enqueued_event_properties = {}
if body and isinstance(body, list):
self.message = Message(body[0], properties=self._msg_properties)
self.message = Message(body[0])
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
self.message = Message(body, properties=self._msg_properties)
self.message = Message(body)

def __str__(self):
dic = {
Expand All @@ -108,13 +104,12 @@ def _set_partition_key(self, value):
:param value: The partition key to set.
:type value: str or bytes
"""
annotations = dict(self._annotations)
annotations = dict(self.message.annotations)
annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL] = value
header = MessageHeader()
header.durable = True
self.message.annotations = annotations
self.message.header = header
self._annotations = annotations

def _trace_message(self, parent_span=None):
"""Add tracing information to this message.
Expand Down Expand Up @@ -145,25 +140,30 @@ def _trace_link_message(self, parent_span=None):
if traceparent:
current_span.link(traceparent)

@staticmethod
def _from_message(message):
event_data = EventData(body='')
event_data.message = message
event_data._msg_properties = message.properties
event_data._annotations = message.annotations
event_data._app_properties = message.application_properties
event_data._delivery_annotations = message.delivery_annotations
if event_data._delivery_annotations:
event_data._runtime_info = {
def _get_last_enqueued_event_properties(self):
if self._last_enqueued_event_properties:
return self._last_enqueued_event_properties

if self.message.delivery_annotations:
self._last_enqueued_event_properties = {
"sequence_number":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_SEQUENCE_NUMBER, None),
"offset":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None),
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_OFFSET, None),
"enqueued_time":
event_data._delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None),
self.message.delivery_annotations.get(EventData.PROP_LAST_ENQUEUED_TIME_UTC, None),
"retrieval_time":
event_data._delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
self.message.delivery_annotations.get(EventData.PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC, None)
}
return self._last_enqueued_event_properties

return None

@classmethod
def _from_message(cls, message):
# pylint:disable=protected-access
event_data = cls(body='')
event_data.message = message
return event_data

@property
Expand All @@ -173,7 +173,7 @@ def sequence_number(self):

:rtype: int or long
"""
return self._annotations.get(EventData.PROP_SEQ_NUMBER, None)
return self.message.annotations.get(EventData.PROP_SEQ_NUMBER, None)

@property
def offset(self):
Expand All @@ -183,7 +183,7 @@ def offset(self):
:rtype: str
"""
try:
return self._annotations[EventData.PROP_OFFSET].decode('UTF-8')
return self.message.annotations[EventData.PROP_OFFSET].decode('UTF-8')
except (KeyError, AttributeError):
return None

Expand All @@ -194,7 +194,7 @@ def enqueued_time(self):

:rtype: datetime.datetime
"""
timestamp = self._annotations.get(EventData.PROP_TIMESTAMP, None)
timestamp = self.message.annotations.get(EventData.PROP_TIMESTAMP, None)
if timestamp:
return datetime.datetime.utcfromtimestamp(float(timestamp)/1000)
return None
Expand All @@ -207,9 +207,9 @@ def partition_key(self):
:rtype: bytes
"""
try:
return self._annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL]
return self.message.annotations[EventData.PROP_PARTITION_KEY_AMQP_SYMBOL]
except KeyError:
return self._annotations.get(EventData.PROP_PARTITION_KEY, None)
return self.message.annotations.get(EventData.PROP_PARTITION_KEY, None)

@property
def application_properties(self):
Expand All @@ -218,7 +218,7 @@ def application_properties(self):

:rtype: dict
"""
return self._app_properties
return self.message.application_properties

@application_properties.setter
def application_properties(self, value):
Expand All @@ -228,8 +228,7 @@ def application_properties(self, value):
:param value: The application properties for the EventData.
:type value: dict
"""
self._app_properties = value
properties = None if value is None else dict(self._app_properties)
properties = None if value is None else dict(value)
self.message.application_properties = properties

@property
Expand All @@ -239,7 +238,7 @@ def system_properties(self):

:rtype: dict
"""
return self._annotations
return self.message.annotations

@property
def body(self):
Expand Down
11 changes: 8 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import uamqp # type: ignore
from uamqp import types, errors, utils # type: ignore
from uamqp import ReceiveClient, Source # type: ignore
import uamqp

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import _error_handler
Expand Down Expand Up @@ -113,7 +114,7 @@ def __next__(self):
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
if self._track_last_enqueued_event_properties:
self._last_enqueued_event_properties = event_data._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = event_data._get_last_enqueued_event_properties() # pylint:disable=protected-access
return event_data
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
Expand Down Expand Up @@ -146,6 +147,8 @@ def _create_handler(self):
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
auto_complete=False,
properties=self._client._create_properties( # pylint:disable=protected-access
self._client._config.user_agent), # pylint:disable=protected-access
**desired_capabilities) # pylint:disable=protected-access
Expand All @@ -171,12 +174,14 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

if data_batch:
self._offset = EventPosition(data_batch[-1].offset)

if self._track_last_enqueued_event_properties and len(data_batch):
self._last_enqueued_event_properties = data_batch[-1]._runtime_info # pylint:disable=protected-access
self._last_enqueued_event_properties = data_batch[-1]._get_last_enqueued_event_properties() # pylint:disable=protected-access

return data_batch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ async def test_receive_batch_async(connstr_senders):
received = await receiver.receive(max_batch_size=5, timeout=5)
assert len(received) == 5

for event in received:
assert event.system_properties
assert event.sequence_number is not None
assert event.offset
assert event.enqueued_time


async def pump(receiver, sleep=None):
messages = 0
Expand Down
6 changes: 6 additions & 0 deletions sdk/eventhub/azure-eventhubs/tests/test_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ def test_receive_batch(connstr_senders):
received = receiver.receive(max_batch_size=5, timeout=5)
assert len(received) == 5

for event in received:
assert event.system_properties
assert event.sequence_number is not None
assert event.offset
assert event.enqueued_time


@pytest.mark.liveTest
def test_receive_batch_with_app_prop_sync(connstr_senders):
Expand Down