From ba7d811bc01cbbba0c1ff78d98d37ff7c381f74b Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Fri, 5 Jul 2019 18:40:48 -0700 Subject: [PATCH 1/5] Init create batch event --- .../azure/eventhub/__init__.py | 2 +- .../azure-eventhubs/azure/eventhub/common.py | 42 ++++++++++++++++++- .../azure/eventhub/producer.py | 7 +++- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 616107e86125..537845bb004a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -5,7 +5,7 @@ __version__ = "5.0.0b1" -from azure.eventhub.common import EventData, EventPosition +from azure.eventhub.common import EventData, EventDataBatch, EventPosition from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \ AuthenticationError, EventDataSendError, ConnectionLostError from azure.eventhub.client import EventHubClient diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 5a6702a60324..ba959ad1cecc 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -9,9 +9,13 @@ import json import six -from uamqp import BatchMessage, Message, types +from azure.eventhub.error import EventDataError +from uamqp import BatchMessage, Message, types, constants, errors from uamqp.message import MessageHeader, MessageProperties +MAX_MESSAGE_SIZE = constants.MAX_MESSAGE_LENGTH_BYTES +# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each +_BATCH_MESSAGE_OVERHEAD_BYTE_COST = [5, 8] def parse_sas_token(sas_token): """Parse a SAS token into its components. @@ -261,6 +265,42 @@ def _set_partition_key(self, value): self.message.header = header +class EventDataBatch(_BatchSendEventData): + def __init__(self, max_batch_size=None, partition_key=None): + if max_batch_size and MAX_MESSAGE_SIZE and max_batch_size > MAX_MESSAGE_SIZE: + raise EventDataError('Max batch size is too large, acceptable max batch size is: {} bytes.' + .format(MAX_MESSAGE_SIZE)) + + self.max_batch_size = max_batch_size if max_batch_size else MAX_MESSAGE_SIZE + + self.message = BatchMessage(data=[], multi_messages=False, properties=None) + + super(EventDataBatch, self)._set_partition_key(partition_key) + self._size = self.message.gather()[0].get_message_encoded_size() + + def try_add(self, event_data): + if not isinstance(event_data, EventData): + raise EventDataError('event_data should be type of EventData') + + event_data_size = event_data.message.get_message_encoded_size() + + # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that + # into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes + size_after_append = self._size + event_data_size\ + + _BATCH_MESSAGE_OVERHEAD_BYTE_COST[0 if event_data_size < 256 else 1] + + if size_after_append > self.max_batch_size: + return False + + self.message._body_gen.append(event_data) + self._size = size_after_append + return True + + @property + def size(self): + return self.message.gather()[0].get_message_encoded_size() + + class EventPosition(object): """ The position(offset, sequence or timestamp) where a consumer starts. Examples: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 3f95b7be08c3..8aee0236f817 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -13,7 +13,8 @@ from uamqp import compat from uamqp import SendClient -from azure.eventhub.common import EventData, _BatchSendEventData +from azure.eventhub import common +from azure.eventhub.common import EventData, _BatchSendEventData, EventDataBatch from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler @@ -108,6 +109,8 @@ def _open(self): if not self.running: self._connect() self.running = True + if self._handler.message_handler._link.peer_max_message_size: + common.MAX_MESSAGE_SIZE = self._handler.message_handler._link.peer_max_message_size def _connect(self): connected = self._build_connection() @@ -330,6 +333,8 @@ def send(self, event_data, partition_key=None): event_data._set_partition_key(partition_key) wrapper_event_data = event_data else: + if isinstance(event_data, EventDataBatch): + event_data = event_data.event_list event_data_with_pk = self._set_partition_key(event_data, partition_key) wrapper_event_data = _BatchSendEventData( event_data_with_pk, From 28ad2f3d227ce6519e2ac21999e4f6592a775d52 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Mon, 8 Jul 2019 18:32:08 -0700 Subject: [PATCH 2/5] create_batch implementation --- .../azure/eventhub/__init__.py | 1 + .../azure/eventhub/aio/producer_async.py | 39 +++++++++--- .../azure-eventhubs/azure/eventhub/common.py | 63 +++++++++++-------- .../azure/eventhub/consumer.py | 12 ++-- .../azure/eventhub/producer.py | 53 +++++++++++----- 5 files changed, 113 insertions(+), 55 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 537845bb004a..c4faad253c16 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -18,6 +18,7 @@ __all__ = [ "EventData", + "EventDataBatch", "EventHubError", "ConnectError", "ConnectionLostError", diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index aef8dc50ff02..26cf44220c92 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -10,7 +10,7 @@ from uamqp import constants, errors, compat from uamqp import SendClientAsync -from azure.eventhub.common import EventData, _BatchSendEventData +from azure.eventhub.common import EventData, EventDataBatch, _BatchSendEventData from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler @@ -52,6 +52,7 @@ def __init__( # pylint: disable=super-init-not-called :param loop: An event loop. If not specified the default event loop will be used. """ self.loop = loop or asyncio.get_event_loop() + self._max_message_size_on_link = None self.running = False self.client = client self.target = target @@ -110,6 +111,10 @@ async def _open(self): await self._connect() self.running = True + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ + self._handler.message_handler._link.peer_max_message_size\ + else constants.MAX_MESSAGE_LENGTH_BYTES + async def _connect(self): connected = await self._build_connection() if not connected: @@ -301,6 +306,23 @@ def _set_partition_key(event_datas, partition_key): ed._set_partition_key(partition_key) yield ed + async def create_batch(self, max_message_size=None, partition_key=None): + """ + Create an EventDataBatch object with max message size being max_message_size. + The max_message_size should be no greater than the max allowed message size defined by the service side. + :param max_message_size: + :param partition_key: + :return: + """ + if not self._max_message_size_on_link: + await self._open() + + if max_message_size and max_message_size > self._max_message_size_on_link: + raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_message_size, self._max_message_size_on_link)) + + return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) + async def send(self, event_data, partition_key=None): # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None """ @@ -328,14 +350,17 @@ async def send(self, event_data, partition_key=None): """ self._check_closed() if isinstance(event_data, EventData): - if partition_key: - event_data._set_partition_key(partition_key) + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: - event_data_with_pk = self._set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData( - event_data_with_pk, - partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) + if isinstance(event_data, EventDataBatch): + wrapper_event_data = event_data + else: + if partition_key: + event_data_with_pk = self._set_partition_key(event_data, partition_key) + wrapper_event_data = _BatchSendEventData(event_data_with_pk, partition_key=partition_key) + else: + wrapper_event_data = _BatchSendEventData(event_data) wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] await self._send_event_data() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index ba959ad1cecc..773e060217c2 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -13,9 +13,9 @@ from uamqp import BatchMessage, Message, types, constants, errors from uamqp.message import MessageHeader, MessageProperties -MAX_MESSAGE_SIZE = constants.MAX_MESSAGE_LENGTH_BYTES # event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each -_BATCH_MESSAGE_OVERHEAD_BYTE_COST = [5, 8] +_BATCH_MESSAGE_OVERHEAD_COST = [5, 8] + def parse_sas_token(sas_token): """Parse a SAS token into its components. @@ -113,13 +113,14 @@ def _set_partition_key(self, value): :param value: The partition key to set. :type value: str or bytes """ - annotations = dict(self._annotations) - annotations[self._partition_key] = value - header = MessageHeader() - header.durable = True - self.message.annotations = annotations - self.message.header = header - self._annotations = annotations + if value: + annotations = dict(self._annotations) + annotations[self._partition_key] = value + header = MessageHeader() + header.durable = True + self.message.annotations = annotations + self.message.header = header + self._annotations = annotations @property def sequence_number(self): @@ -266,40 +267,48 @@ def _set_partition_key(self, value): class EventDataBatch(_BatchSendEventData): - def __init__(self, max_batch_size=None, partition_key=None): - if max_batch_size and MAX_MESSAGE_SIZE and max_batch_size > MAX_MESSAGE_SIZE: - raise EventDataError('Max batch size is too large, acceptable max batch size is: {} bytes.' - .format(MAX_MESSAGE_SIZE)) - - self.max_batch_size = max_batch_size if max_batch_size else MAX_MESSAGE_SIZE - + """ + The EventDataBatch class is a holder of a batch of event date within max message size bytes. + Do not instantiate an EventDataBatch object directly. + Do use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. + """ + def __init__(self, max_message_size, partition_key=None): + self.max_message_size = max_message_size + self._partition_key = partition_key self.message = BatchMessage(data=[], multi_messages=False, properties=None) - super(EventDataBatch, self)._set_partition_key(partition_key) + self._set_partition_key(partition_key) self._size = self.message.gather()[0].get_message_encoded_size() def try_add(self, event_data): + """ + The message size is a sum up of body, properties, header, etc. + :param event_data: + :return: + """ if not isinstance(event_data, EventData): raise EventDataError('event_data should be type of EventData') + if self._partition_key: + if event_data.partition_key and not (event_data.partition_key == self._partition_key): + raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch') + if not event_data.partition_key: + event_data._set_partition_key(self._partition_key) + event_data_size = event_data.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that - # into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes - size_after_append = self._size + event_data_size\ - + _BATCH_MESSAGE_OVERHEAD_BYTE_COST[0 if event_data_size < 256 else 1] + # message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes. + size_after_add = self._size + event_data_size\ + + _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1] - if size_after_append > self.max_batch_size: + if size_after_add > self.max_message_size: return False - self.message._body_gen.append(event_data) - self._size = size_after_append + self.message._body_gen.append(event_data) # pylint: disable=protected-access + self._size = size_after_add return True - @property - def size(self): - return self.message.gather()[0].get_message_encoded_size() - class EventPosition(object): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 856c77d6fb65..08db5118e925 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -23,15 +23,15 @@ class EventHubConsumer(object): """ A consumer responsible for reading EventData from a specific Event Hub - partition and as a member of a specific consumer group. + partition and as a member of a specific consumer group. A consumer may be exclusive, which asserts ownership over the partition for the consumer - group to ensure that only one consumer from that group is reading the from the partition. - These exclusive consumers are sometimes referred to as "Epoch Consumers." + group to ensure that only one consumer from that group is reading the from the partition. + These exclusive consumers are sometimes referred to as "Epoch Consumers." A consumer may also be non-exclusive, allowing multiple consumers from the same consumer - group to be actively reading events from the partition. These non-exclusive consumers are - sometimes referred to as "Non-Epoch Consumers." + group to be actively reading events from the partition. These non-exclusive consumers are + sometimes referred to as "Non-Epoch Consumers." """ timeout = 0 @@ -41,7 +41,7 @@ def __init__(self, client, source, event_position=None, prefetch=300, owner_leve keep_alive=None, auto_reconnect=True): """ Instantiate a consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 8aee0236f817..ae7d2bddb8af 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -13,7 +13,6 @@ from uamqp import compat from uamqp import SendClient -from azure.eventhub import common from azure.eventhub.common import EventData, _BatchSendEventData, EventDataBatch from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler @@ -21,19 +20,21 @@ log = logging.getLogger(__name__) + + class EventHubProducer(object): """ A producer responsible for transmitting EventData to a specific Event Hub, - grouped together in batches. Depending on the options specified at creation, the producer may - be created to allow event data to be automatically routed to an available partition or specific - to a partition. + grouped together in batches. Depending on the options specified at creation, the producer may + be created to allow event data to be automatically routed to an available partition or specific + to a partition. """ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True): """ Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient. @@ -52,6 +53,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N Default value is `True`. :type auto_reconnect: bool """ + self._max_message_size_on_link = None self.running = False self.client = client self.target = target @@ -109,8 +111,10 @@ def _open(self): if not self.running: self._connect() self.running = True - if self._handler.message_handler._link.peer_max_message_size: - common.MAX_MESSAGE_SIZE = self._handler.message_handler._link.peer_max_message_size + + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ + self._handler.message_handler._link.peer_max_message_size\ + else constants.MAX_MESSAGE_LENGTH_BYTES def _connect(self): connected = self._build_connection() @@ -301,6 +305,23 @@ def _error(outcome, condition): if outcome != constants.MessageSendResult.Ok: raise condition + def create_batch(self, max_message_size=None, partition_key=None): + """ + Create an EventDataBatch object with max message size being max_message_size. + The max_message_size should be no greater than the max allowed message size defined by the service side. + :param max_message_size: + :param partition_key: + :return: + """ + if not self._max_message_size_on_link: + self._open() + + if max_message_size and max_message_size > self._max_message_size_on_link: + raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_message_size, self._max_message_size_on_link)) + + return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) + def send(self, event_data, partition_key=None): # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None """ @@ -310,7 +331,8 @@ def send(self, event_data, partition_key=None): :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to - a particular partition of the Event Hub decided by the service. + a particular partition of the Event Hub decided by the service. partition_key + will be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError @@ -329,16 +351,17 @@ def send(self, event_data, partition_key=None): """ self._check_closed() if isinstance(event_data, EventData): - if partition_key: - event_data._set_partition_key(partition_key) + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: if isinstance(event_data, EventDataBatch): - event_data = event_data.event_list - event_data_with_pk = self._set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData( - event_data_with_pk, - partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) + wrapper_event_data = event_data + else: + if partition_key: + event_data_with_pk = self._set_partition_key(event_data, partition_key) + wrapper_event_data = _BatchSendEventData(event_data_with_pk, partition_key=partition_key) + else: + wrapper_event_data = _BatchSendEventData(event_data) wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] self._send_event_data() From 3d638fc2ef70b7d327859fdc7c500c02c13654ee Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 9 Jul 2019 13:52:41 -0700 Subject: [PATCH 3/5] Revert _set_partition_key method and update comment --- .../azure/eventhub/aio/consumer_async.py | 12 ++++++------ .../azure/eventhub/aio/producer_async.py | 8 ++++---- .../azure-eventhubs/azure/eventhub/common.py | 19 +++++++++---------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 6cf020176d96..38f4586ce5a6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -19,15 +19,15 @@ class EventHubConsumer(object): """ A consumer responsible for reading EventData from a specific Event Hub - partition and as a member of a specific consumer group. + partition and as a member of a specific consumer group. A consumer may be exclusive, which asserts ownership over the partition for the consumer - group to ensure that only one consumer from that group is reading the from the partition. - These exclusive consumers are sometimes referred to as "Epoch Consumers." + group to ensure that only one consumer from that group is reading the from the partition. + These exclusive consumers are sometimes referred to as "Epoch Consumers." A consumer may also be non-exclusive, allowing multiple consumers from the same consumer - group to be actively reading events from the partition. These non-exclusive consumers are - sometimes referred to as "Non-Epoch Consumers." + group to be actively reading events from the partition. These non-exclusive consumers are + sometimes referred to as "Non-Epoch Consumers." """ timeout = 0 @@ -38,7 +38,7 @@ def __init__( # pylint: disable=super-init-not-called keep_alive=None, auto_reconnect=True, loop=None): """ Instantiate an async consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 26cf44220c92..fe0d372e9db6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -20,9 +20,9 @@ class EventHubProducer(object): """ A producer responsible for transmitting EventData to a specific Event Hub, - grouped together in batches. Depending on the options specified at creation, the producer may - be created to allow event data to be automatically routed to an available partition or specific - to a partition. + grouped together in batches. Depending on the options specified at creation, the producer may + be created to allow event data to be automatically routed to an available partition or specific + to a partition. """ @@ -31,7 +31,7 @@ def __init__( # pylint: disable=super-init-not-called keep_alive=None, auto_reconnect=True, loop=None): """ Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` - method in EventHubClient. + method in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 773e060217c2..2094d8b60426 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -113,14 +113,13 @@ def _set_partition_key(self, value): :param value: The partition key to set. :type value: str or bytes """ - if value: - annotations = dict(self._annotations) - annotations[self._partition_key] = value - header = MessageHeader() - header.durable = True - self.message.annotations = annotations - self.message.header = header - self._annotations = annotations + annotations = dict(self._annotations) + annotations[self._partition_key] = value + header = MessageHeader() + header.durable = True + self.message.annotations = annotations + self.message.header = header + self._annotations = annotations @property def sequence_number(self): @@ -269,8 +268,8 @@ def _set_partition_key(self, value): class EventDataBatch(_BatchSendEventData): """ The EventDataBatch class is a holder of a batch of event date within max message size bytes. - Do not instantiate an EventDataBatch object directly. - Do use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. + Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. + Do not instantiate an EventDataBatch object directly. """ def __init__(self, max_message_size, partition_key=None): self.max_message_size = max_message_size From 9779be04e139be8e3be2753fcb30651f74ab1a5d Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 9 Jul 2019 21:03:49 -0700 Subject: [PATCH 4/5] Refacor EventDataBatch class --- .../azure/eventhub/aio/producer_async.py | 10 +++--- .../azure-eventhubs/azure/eventhub/common.py | 36 +++++++++---------- .../azure/eventhub/producer.py | 12 +++---- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index fe0d372e9db6..dfd6c890f998 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -10,7 +10,7 @@ from uamqp import constants, errors, compat from uamqp import SendClientAsync -from azure.eventhub.common import EventData, EventDataBatch, _BatchSendEventData +from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler @@ -357,10 +357,8 @@ async def send(self, event_data, partition_key=None): wrapper_event_data = event_data else: if partition_key: - event_data_with_pk = self._set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData(event_data_with_pk, partition_key=partition_key) - else: - wrapper_event_data = _BatchSendEventData(event_data) + event_data = self._set_partition_key(event_data, partition_key) + wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] await self._send_event_data() @@ -398,4 +396,4 @@ async def close(self, exception=None): self.error = EventHubError(str(exception)) else: self.error = EventHubError("This send handler is now closed.") - await self._handler.close_async() \ No newline at end of file + await self._handler.close_async() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 2094d8b60426..24138bb2b6cb 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -248,10 +248,25 @@ def encode_message(self): return self.message.encode_message() -class _BatchSendEventData(EventData): - def __init__(self, batch_event_data, partition_key=None): - self.message = BatchMessage(data=batch_event_data, multi_messages=False, properties=None) +class EventDataBatch(object): + """ + The EventDataBatch class is a holder of a batch of event date within max message size bytes. + Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. + Do not instantiate an EventDataBatch object directly. + """ + def __init__(self, max_message_size=None, partition_key=None): + self.max_message_size = max_message_size if max_message_size else constants.MAX_MESSAGE_LENGTH_BYTES + self._partition_key = partition_key + self.message = BatchMessage(data=[], multi_messages=False, properties=None) + self._set_partition_key(partition_key) + self._size = self.message.gather()[0].get_message_encoded_size() + + @staticmethod + def _from_batch(batch_data, partition_key=None): + batch_data_instance = EventDataBatch(partition_key=partition_key) + batch_data_instance.message._body_gen = batch_data + return batch_data_instance def _set_partition_key(self, value): if value: @@ -264,21 +279,6 @@ def _set_partition_key(self, value): self.message.annotations = annotations self.message.header = header - -class EventDataBatch(_BatchSendEventData): - """ - The EventDataBatch class is a holder of a batch of event date within max message size bytes. - Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. - Do not instantiate an EventDataBatch object directly. - """ - def __init__(self, max_message_size, partition_key=None): - self.max_message_size = max_message_size - self._partition_key = partition_key - self.message = BatchMessage(data=[], multi_messages=False, properties=None) - - self._set_partition_key(partition_key) - self._size = self.message.gather()[0].get_message_encoded_size() - def try_add(self, event_data): """ The message size is a sum up of body, properties, header, etc. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index ae7d2bddb8af..9883a53fe583 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -13,15 +13,13 @@ from uamqp import compat from uamqp import SendClient -from azure.eventhub.common import EventData, _BatchSendEventData, EventDataBatch +from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import EventHubError, ConnectError, \ AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler log = logging.getLogger(__name__) - - class EventHubProducer(object): """ A producer responsible for transmitting EventData to a specific Event Hub, @@ -354,14 +352,12 @@ def send(self, event_data, partition_key=None): event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: - if isinstance(event_data, EventDataBatch): + if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. wrapper_event_data = event_data else: if partition_key: - event_data_with_pk = self._set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData(event_data_with_pk, partition_key=partition_key) - else: - wrapper_event_data = _BatchSendEventData(event_data) + event_data = self._set_partition_key(event_data, partition_key) + wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] self._send_event_data() From d0a51bb9fbdcbbe6394b00390968639e298cfd49 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 9 Jul 2019 21:15:08 -0700 Subject: [PATCH 5/5] Revert logic when setting partition_key of event_data --- .../azure-eventhubs/azure/eventhub/aio/producer_async.py | 3 ++- sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index dfd6c890f998..5a893f308724 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -350,7 +350,8 @@ async def send(self, event_data, partition_key=None): """ self._check_closed() if isinstance(event_data, EventData): - event_data._set_partition_key(partition_key) # pylint: disable=protected-access + if partition_key: + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: if isinstance(event_data, EventDataBatch): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 9883a53fe583..4317b993ceb1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -349,7 +349,8 @@ def send(self, event_data, partition_key=None): """ self._check_closed() if isinstance(event_data, EventData): - event_data._set_partition_key(partition_key) # pylint: disable=protected-access + if partition_key: + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted.