Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ async def _open(self):
await self._connect()
self.running = True

self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\
self._handler.message_handler._link.peer_max_message_size\
else constants.MAX_MESSAGE_LENGTH_BYTES
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access

async def _connect(self):
connected = await self._build_connection()
Expand Down Expand Up @@ -308,18 +307,22 @@ def _set_partition_key(event_datas, partition_key):

async def create_batch(self, max_size=None, partition_key=None):
"""
Create an EventDataBatch object with max message size being max_message_size.
The max_message_size should be no greater than the max allowed message size defined by the service side.
:param max_message_size:
:param partition_key:
:return:
Create an EventDataBatch object with max size being max_size.
The max_size should be no greater than the max allowed message size defined by the service side.
:param max_size: The maximum size of bytes data that an EventDataBatch object can hold.
:type max_size: int
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
:type partition_key: str
:return: an EventDataBatch instance
:rtype: ~azure.eventhub.EventDataBatch
"""
if not self._max_message_size_on_link:
await self._open()

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_size, self._max_message_size_on_link))
.format(max_size, self._max_message_size_on_link))

return EventDataBatch(max_size or self._max_message_size_on_link, partition_key)

Expand All @@ -332,7 +335,8 @@ async def send(self, event_data, partition_key=None):
:param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects
:type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
a particular partition of the Event Hub decided by the service. partition_key
could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.
:type partition_key: str
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError
Expand All @@ -355,6 +359,8 @@ async def send(self, event_data, partition_key=None):
wrapper_event_data = event_data
else:
if isinstance(event_data, EventDataBatch):
if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access
raise EventDataError('The partition_key does not match the one of the EventDataBatch')
wrapper_event_data = event_data
else:
if partition_key:
Expand Down
10 changes: 5 additions & 5 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from uamqp import BatchMessage, Message, types, constants, errors
from uamqp.message import MessageHeader, MessageProperties

log = logging.getLogger(__name__)

# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
_BATCH_MESSAGE_OVERHEAD_COST = [5, 8]

Expand Down Expand Up @@ -251,15 +253,13 @@ def encode_message(self):

class EventDataBatch(object):
"""
The EventDataBatch class is a holder of a batch of event date within max message size bytes.
The EventDataBatch class is a holder of a batch of event data within max size bytes.
Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object.
Do not instantiate an EventDataBatch object directly.
"""

log = logging.getLogger(__name__)

def __init__(self, max_size=None, partition_key=None):
self.max_size = max_size if max_size else constants.MAX_MESSAGE_LENGTH_BYTES
self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES
self._partition_key = partition_key
self.message = BatchMessage(data=[], multi_messages=False, properties=None)

Expand Down Expand Up @@ -302,7 +302,7 @@ def try_add(self, event_data):
:return:
"""
if event_data is None:
self.log.warning("event_data is None when calling EventDataBatch.try_add. Ignored")
log.warning("event_data is None when calling EventDataBatch.try_add. Ignored")
return
if not isinstance(event_data, EventData):
raise TypeError('event_data should be type of EventData')
Expand Down
21 changes: 13 additions & 8 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,22 @@ def _error(outcome, condition):

def create_batch(self, max_size=None, partition_key=None):
"""
Create an EventDataBatch object with max message size being max_message_size.
The max_message_size should be no greater than the max allowed message size defined by the service side.
:param max_message_size:
:param partition_key:
:return:
Create an EventDataBatch object with max size being max_size.
The max_size should be no greater than the max allowed message size defined by the service side.
:param max_size: The maximum size of bytes data that an EventDataBatch object can hold.
:type max_size: int
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
:type partition_key: str
:return: an EventDataBatch instance
:rtype: ~azure.eventhub.EventDataBatch
"""
if not self._max_message_size_on_link:
self._open()

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_size, self._max_message_size_on_link))
.format(max_size, self._max_message_size_on_link))

return EventDataBatch(max_size or self._max_message_size_on_link, partition_key)

Expand All @@ -329,11 +333,10 @@ def send(self, event_data, partition_key=None):
:type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service. partition_key
will be omitted if event_data is of type ~azure.eventhub.EventDataBatch.
could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.
:type partition_key: str
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError

:return: None
:rtype: None

Expand All @@ -353,6 +356,8 @@ def send(self, event_data, partition_key=None):
wrapper_event_data = event_data
else:
if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted.
if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access
raise EventDataError('The partition_key does not match the one of the EventDataBatch')
wrapper_event_data = event_data
else:
if partition_key:
Expand Down