From e7ee218235ef50a36d6594661fea75bc75421be7 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Fri, 26 Jul 2019 14:38:44 -0700 Subject: [PATCH 1/2] Update according to the review --- .../azure/eventhub/aio/producer_async.py | 26 ++++++++++++------- .../azure-eventhubs/azure/eventhub/common.py | 10 +++---- .../azure/eventhub/producer.py | 21 +++++++++------ 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index a14142672f27..48e82091c1d8 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -111,9 +111,8 @@ async def _open(self): await self._connect() self.running = True - self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ - self._handler.message_handler._link.peer_max_message_size\ - else constants.MAX_MESSAGE_LENGTH_BYTES + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ + or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access async def _connect(self): connected = await self._build_connection() @@ -308,18 +307,22 @@ def _set_partition_key(event_datas, partition_key): async def create_batch(self, max_size=None, partition_key=None): """ - Create an EventDataBatch object with max message size being max_message_size. - The max_message_size should be no greater than the max allowed message size defined by the service side. - :param max_message_size: - :param partition_key: - :return: + Create an EventDataBatch object with max size being max_size. + The max_size should be no greater than the max allowed message size defined by the service side. + :param max_size: The maximum size of bytes data that an EventDataBatch object can hold. + :type max_size: int + :param partition_key: With the given partition_key, event data will land to + a particular partition of the Event Hub decided by the service. + :type partition_key: str + :return: None + :rtype: None """ if not self._max_message_size_on_link: await self._open() if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' - .format(max_size, self._max_message_size_on_link)) + .format(max_size, self._max_message_size_on_link)) return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) @@ -332,7 +335,8 @@ async def send(self, event_data, partition_key=None): :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to - a particular partition of the Event Hub decided by the service. + a particular partition of the Event Hub decided by the service. partition_key + could be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError @@ -355,6 +359,8 @@ async def send(self, event_data, partition_key=None): wrapper_event_data = event_data else: if isinstance(event_data, EventDataBatch): + if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access + raise EventDataError('The partition_key does not match the one of the EventDataBatch') wrapper_event_data = event_data else: if partition_key: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 6c7d68092842..00015c5fd5dc 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -14,6 +14,8 @@ from uamqp import BatchMessage, Message, types, constants, errors from uamqp.message import MessageHeader, MessageProperties +log = logging.getLogger(__name__) + # event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each _BATCH_MESSAGE_OVERHEAD_COST = [5, 8] @@ -251,15 +253,13 @@ def encode_message(self): class EventDataBatch(object): """ - The EventDataBatch class is a holder of a batch of event date within max message size bytes. + The EventDataBatch class is a holder of a batch of event data within max size bytes. Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. Do not instantiate an EventDataBatch object directly. """ - log = logging.getLogger(__name__) - def __init__(self, max_size=None, partition_key=None): - self.max_size = max_size if max_size else constants.MAX_MESSAGE_LENGTH_BYTES + self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES self._partition_key = partition_key self.message = BatchMessage(data=[], multi_messages=False, properties=None) @@ -302,7 +302,7 @@ def try_add(self, event_data): :return: """ if event_data is None: - self.log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") + log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") return if not isinstance(event_data, EventData): raise TypeError('event_data should be type of EventData') diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 44bd24b16553..d1358699f991 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -304,18 +304,22 @@ def _error(outcome, condition): def create_batch(self, max_size=None, partition_key=None): """ - Create an EventDataBatch object with max message size being max_message_size. - The max_message_size should be no greater than the max allowed message size defined by the service side. - :param max_message_size: - :param partition_key: - :return: + Create an EventDataBatch object with max size being max_size. + The max_size should be no greater than the max allowed message size defined by the service side. + :param max_size: The maximum size of bytes data that an EventDataBatch object can hold. + :type max_size: int + :param partition_key: With the given partition_key, event data will land to + a particular partition of the Event Hub decided by the service. + :type partition_key: str + :return: None + :rtype: None """ if not self._max_message_size_on_link: self._open() if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' - .format(max_size, self._max_message_size_on_link)) + .format(max_size, self._max_message_size_on_link)) return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) @@ -329,11 +333,10 @@ def send(self, event_data, partition_key=None): :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. partition_key - will be omitted if event_data is of type ~azure.eventhub.EventDataBatch. + could be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError - :return: None :rtype: None @@ -353,6 +356,8 @@ def send(self, event_data, partition_key=None): wrapper_event_data = event_data else: if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. + if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access + raise EventDataError('The partition_key does not match the one of the EventDataBatch') wrapper_event_data = event_data else: if partition_key: From af06a02d480f0b1da772310d2277759b0cf29967 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Fri, 26 Jul 2019 14:44:51 -0700 Subject: [PATCH 2/2] Update comment --- .../azure-eventhubs/azure/eventhub/aio/producer_async.py | 4 ++-- sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 48e82091c1d8..93ea12460eca 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -314,8 +314,8 @@ async def create_batch(self, max_size=None, partition_key=None): :param partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. :type partition_key: str - :return: None - :rtype: None + :return: an EventDataBatch instance + :rtype: ~azure.eventhub.EventDataBatch """ if not self._max_message_size_on_link: await self._open() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index d1358699f991..4a2a5f0cc81d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -311,8 +311,8 @@ def create_batch(self, max_size=None, partition_key=None): :param partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. :type partition_key: str - :return: None - :rtype: None + :return: an EventDataBatch instance + :rtype: ~azure.eventhub.EventDataBatch """ if not self._max_message_size_on_link: self._open()