From c92cd142d8d2db75de8630ffb24b7ac04968283c Mon Sep 17 00:00:00 2001 From: yijxie Date: Wed, 22 Apr 2020 23:00:57 -0700 Subject: [PATCH 01/11] Add partition_key/partition_id into send_batch --- .../azure/eventhub/aio/_producer_client_async.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index 9d69d52ea18f..43d292725ab8 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -12,7 +12,7 @@ from ._client_base_async import ClientBaseAsync from ._producer_async import EventHubProducer from .._constants import ALL_PARTITIONS -from .._common import EventDataBatch +from .._common import EventDataBatch, EventData if TYPE_CHECKING: from uamqp.constants import TransportType @@ -211,7 +211,9 @@ def from_connection_string( async def send_batch( self, - event_data_batch: EventDataBatch, + event_data_batch: Union[EventDataBatch, List[EventData]], + partition_id: Optional[str] = None, + partition_key: Optional[str] = None, *, timeout: Optional[Union[int, float]] = None ) -> None: From 37e5cb301bb300a45c6f74c2e6cb76c5539557c9 Mon Sep 17 00:00:00 2001 From: yijxie Date: Wed, 22 Apr 2020 23:20:34 -0700 Subject: [PATCH 02/11] Add send list sample --- .../samples/async_samples/send_async.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py index a8d104a96188..7afa602a1983 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py @@ -16,12 +16,29 @@ import os from azure.eventhub.aio import EventHubProducerClient +from azure.eventhub.exceptions import EventHubError from azure.eventhub import EventData CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] +async def send_event_data_list(producer): + # If you know beforehand that the list of events you have will not exceed the + # size limits, you can use the `send_batch()` api directly without creating an EventDataBatch + + # Without specifying partition_id or partition_key + # the events will be distributed to available partitions via round-robin. + + event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)] + try: + await producer.send_batch(event_data_list) + except ValueError: # if size of the + print("Size of the event data list exceeds the Event Hub size limit") + except EventHubError as eh_err: + print("Sending error: ", eh_err) + + async def send_event_data_batch(producer): # Without specifying partition_id or partition_key # the events will be distributed to available partitions via round-robin. From fcfac3bfec0c543628b31c57a8a91bd9b070dc50 Mon Sep 17 00:00:00 2001 From: yijxie Date: Wed, 22 Apr 2020 23:22:38 -0700 Subject: [PATCH 03/11] Update send list sample --- .../azure-eventhub/samples/async_samples/send_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py index 7afa602a1983..2a05f5370c1d 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py @@ -33,8 +33,8 @@ async def send_event_data_list(producer): event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)] try: await producer.send_batch(event_data_list) - except ValueError: # if size of the - print("Size of the event data list exceeds the Event Hub size limit") + except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. + print("Size of the event data list exceeds the size limit of a single send") except EventHubError as eh_err: print("Sending error: ", eh_err) From d0a0810d0a41b07781d0e9b28af108e1c73e69d3 Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 23 Apr 2020 10:41:26 -0700 Subject: [PATCH 04/11] Put partition_key/partition_id after * --- .../azure/eventhub/aio/_producer_client_async.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index 43d292725ab8..6b58309823a4 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -212,15 +212,16 @@ def from_connection_string( async def send_batch( self, event_data_batch: Union[EventDataBatch, List[EventData]], + *, partition_id: Optional[str] = None, partition_key: Optional[str] = None, - *, timeout: Optional[Union[int, float]] = None ) -> None: """Sends event data and blocks until acknowledgement is received or operation times out. :param event_data_batch: The EventDataBatch object to be sent. :type event_data_batch: ~azure.eventhub.EventDataBatch + :param float timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. :rtype: None From 39dd6f37360464c1614d3f3fbd244cd99ad3ad7a Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 27 Apr 2020 09:16:37 -0700 Subject: [PATCH 05/11] Add send_batch List[EventData] --- sdk/eventhub/azure-eventhub/CHANGELOG.md | 3 + .../azure/eventhub/_producer_client.py | 49 +++++++++++++-- .../eventhub/aio/_producer_client_async.py | 50 ++++++++++++--- .../samples/async_samples/send_async.py | 33 +++++----- .../samples/sync_samples/send.py | 19 +++++- .../livetest/asynctests/test_send_async.py | 62 ++++++++++++++++++- .../tests/livetest/synctests/test_send.py | 58 ++++++++++++++++- 7 files changed, 240 insertions(+), 34 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index 4e87eab6dbdf..e235266b4a67 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -2,6 +2,9 @@ ## 5.1.0b2 (Unreleased) +**New Features** + +- `EventHubProducerClient.send_batch` accepts either an `EventDataBatch` or a finite list of `EventData`. #9181 ## 5.1.0b1 (2020-04-06) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 0a30fba39a18..7f610d8ccccf 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -6,6 +6,8 @@ import threading from typing import Any, Union, TYPE_CHECKING, Dict, List, Optional, cast + +from azure.eventhub import EventData from uamqp import constants from .exceptions import ConnectError, EventHubError @@ -183,13 +185,28 @@ def from_connection_string(cls, conn_str, **kwargs): return cls(**constructor_args) def send_batch(self, event_data_batch, **kwargs): - # type: (EventDataBatch, Any) -> None + # type: (Union[EventDataBatch, List[EventData]], Any) -> None """Sends event data and blocks until acknowledgement is received or operation times out. - :param event_data_batch: The EventDataBatch object to be sent. - :type event_data_batch: ~azure.eventhub.EventDataBatch + If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub + frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch` + to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit, + and then call this method to send out the batch. + + :param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent + in a batch. + :type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]] :keyword float timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. + :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service + will assign to all partitions using round-robin. + A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because + `EventDataBatch` itself has partition_id. + :keyword str partition_key: With the given partition_key, event data will be sent to + a particular partition of the Event Hub decided by the service. + A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because + `EventDataBatch` itself has partition_key. + If both partition_id and partition_key is provided, the partition_id will take precedence. :rtype: None :raises: :class:`AuthenticationError` :class:`ConnectError` @@ -197,6 +214,7 @@ def send_batch(self, event_data_batch, **kwargs): :class:`EventDataError` :class:`EventDataSendError` :class:`EventHubError` + `ValueError` .. admonition:: Example: @@ -208,18 +226,37 @@ def send_batch(self, event_data_batch, **kwargs): :caption: Sends event data """ + partition_id = kwargs.get("partition_id") + partition_key = kwargs.get("partition_key") + if isinstance(event_data_batch, EventDataBatch): + if partition_id or partition_key: + raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch " + "because type EventDataBatch itself may have partition_id or partition_key") + to_send_batch = event_data_batch + elif isinstance(event_data_batch, List): + to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key) + for event_data in event_data_batch: + try: + to_send_batch.add(event_data) + except ValueError: + raise ValueError("The list of EventData exceeds the Event Hub frame size limit. " + "Please send a smaller list of EventData, or use EventDataBatch, " + "which is guaranteed to be under the frame size limit") + else: + raise TypeError("event_data_batch must be of type List[EventData] or EventDataBatch") + partition_id = ( - event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access + to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access ) send_timeout = kwargs.pop("timeout", None) try: cast(EventHubProducer, self._producers[partition_id]).send( - event_data_batch, timeout=send_timeout + to_send_batch, timeout=send_timeout ) except (KeyError, AttributeError, EventHubError): self._start_producer(partition_id, send_timeout) cast(EventHubProducer, self._producers[partition_id]).send( - event_data_batch, timeout=send_timeout + to_send_batch, timeout=send_timeout ) def create_batch(self, **kwargs): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index 6b58309823a4..d6adab4f57a0 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -213,17 +213,30 @@ async def send_batch( self, event_data_batch: Union[EventDataBatch, List[EventData]], *, - partition_id: Optional[str] = None, - partition_key: Optional[str] = None, - timeout: Optional[Union[int, float]] = None + timeout: Optional[Union[int, float]] = None, + **kwargs ) -> None: """Sends event data and blocks until acknowledgement is received or operation times out. - :param event_data_batch: The EventDataBatch object to be sent. - :type event_data_batch: ~azure.eventhub.EventDataBatch + If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub + frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch` + to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit, + and then call this method to send out the batch. - :param float timeout: The maximum wait time to send the event data. + :param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent + in a batch. + :type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]] + :keyword float timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. + :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service + will assign to all partitions using round-robin. + A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because + `EventDataBatch` itself has partition_id. + :keyword str partition_key: With the given partition_key, event data will be sent to + a particular partition of the Event Hub decided by the service. + A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because + `EventDataBatch` itself has partition_key. + If both partition_id and partition_key is provided, the partition_id will take precedence. :rtype: None :raises: :class:`AuthenticationError` :class:`ConnectError` @@ -242,17 +255,36 @@ async def send_batch( :caption: Asynchronously sends event data """ + partition_id = kwargs.get("partition_id") + partition_key = kwargs.get("partition_key") + if isinstance(event_data_batch, EventDataBatch): + if partition_id or partition_key: + raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch " + "because type EventDataBatch itself may have partition_id or partition_key") + to_send_batch = event_data_batch + elif isinstance(event_data_batch, List): + to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key) + for event_data in event_data_batch: + try: + to_send_batch.add(event_data) + except ValueError: + raise ValueError("The list of EventData exceeds the Event Hub frame size limit. " + "Please send a smaller list of EventData, or use EventDataBatch, " + "which is guaranteed to be under the frame size limit") + else: + raise TypeError("event_data_batch must be of type List[EventData] or EventDataBatch") + partition_id = ( - event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access + to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access ) try: await cast(EventHubProducer, self._producers[partition_id]).send( - event_data_batch, timeout=timeout + to_send_batch, timeout=timeout ) except (KeyError, AttributeError, EventHubError): await self._start_producer(partition_id, timeout) await cast(EventHubProducer, self._producers[partition_id]).send( - event_data_batch, timeout=timeout + to_send_batch, timeout=timeout ) async def create_batch( diff --git a/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py b/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py index 2a05f5370c1d..d69f255861ef 100644 --- a/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py +++ b/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py @@ -23,22 +23,6 @@ EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] -async def send_event_data_list(producer): - # If you know beforehand that the list of events you have will not exceed the - # size limits, you can use the `send_batch()` api directly without creating an EventDataBatch - - # Without specifying partition_id or partition_key - # the events will be distributed to available partitions via round-robin. - - event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)] - try: - await producer.send_batch(event_data_list) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - print("Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - print("Sending error: ", eh_err) - - async def send_event_data_batch(producer): # Without specifying partition_id or partition_key # the events will be distributed to available partitions via round-robin. @@ -87,6 +71,22 @@ async def send_event_data_batch_with_properties(producer): await producer.send_batch(event_data_batch) +async def send_event_data_list(producer): + # If you know beforehand that the list of events you have will not exceed the + # size limits, you can use the `send_batch()` api directly without creating an EventDataBatch + + # Without specifying partition_id or partition_key + # the events will be distributed to available partitions via round-robin. + + event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)] + try: + await producer.send_batch(event_data_list) + except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. + print("Size of the event data list exceeds the size limit of a single send") + except EventHubError as eh_err: + print("Sending error: ", eh_err) + + async def run(): producer = EventHubProducerClient.from_connection_string( @@ -99,6 +99,7 @@ async def run(): await send_event_data_batch_with_partition_key(producer) await send_event_data_batch_with_partition_id(producer) await send_event_data_batch_with_properties(producer) + await send_event_data_list(producer) loop = asyncio.get_event_loop() diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/send.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/send.py index c949ddfaec0e..0d4fd467a3a0 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/send.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/send.py @@ -14,7 +14,7 @@ import time import os from azure.eventhub import EventHubProducerClient, EventData - +from azure.eventhub.exceptions import EventHubError CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] @@ -68,6 +68,22 @@ def send_event_data_batch_with_properties(producer): producer.send_batch(event_data_batch) +def send_event_data_list(producer): + # If you know beforehand that the list of events you have will not exceed the + # size limits, you can use the `send_batch()` api directly without creating an EventDataBatch + + # Without specifying partition_id or partition_key + # the events will be distributed to available partitions via round-robin. + + event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)] + try: + producer.send_batch(event_data_list) + except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. + print("Size of the event data list exceeds the size limit of a single send") + except EventHubError as eh_err: + print("Sending error: ", eh_err) + + producer = EventHubProducerClient.from_connection_string( conn_str=CONNECTION_STR, eventhub_name=EVENTHUB_NAME @@ -80,5 +96,6 @@ def send_event_data_batch_with_properties(producer): send_event_data_batch_with_partition_key(producer) send_event_data_batch_with_partition_id(producer) send_event_data_batch_with_properties(producer) + send_event_data_list(producer) print("Send messages in {} seconds.".format(time.time() - start_time)) diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py index 57143c92fa35..208fc00d51e1 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py @@ -11,7 +11,7 @@ import time import json -from azure.eventhub import EventData, TransportType +from azure.eventhub import EventData, TransportType, EventDataBatch from azure.eventhub.aio import EventHubProducerClient @@ -169,3 +169,63 @@ async def test_send_with_create_event_batch_async(connstr_receivers): received.extend(r.receive_message_batch(timeout=10000)) assert len(received) >= 1 assert EventData._from_message(received[0]).properties[b"raw_prop"] == b"raw_value" + + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_list_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + payload = "A1" + async with client: + await client.send_batch([EventData(payload)]) + received = [] + for r in receivers: + received.extend([EventData._from_message(x) for x in r.receive_message_batch(timeout=10000)]) + + assert len(received) == 1 + assert received[0].body_as_str() == payload + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_list_partition_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + payload = "A1" + async with client: + await client.send_batch([EventData(payload)], partition_id="0") + message = receivers[0].receive_message_batch(timeout=10000)[0] + received = EventData._from_message(message) + assert received.body_as_str() == payload + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_list_too_large_async(connection_str): + client = EventHubProducerClient.from_connection_string(connection_str) + to_send = [] + for i in range(1100): + to_send.append(EventData("A"*1024)) + async with client: + with pytest.raises(ValueError): + await client.send_batch(to_send) + + +@pytest.mark.parametrize("partition_id, partition_key", [("0", None), (None, "pk")]) +async def test_send_batch_pid_pk_async(invalid_hostname, partition_id, partition_key): + # Use invalid_hostname because this is not a live test. + client = EventHubProducerClient.from_connection_string(invalid_hostname) + batch = EventDataBatch(partition_id=partition_id, partition_key=partition_key) + async with client: + with pytest.raises(TypeError): + await client.send_batch(batch, partition_id=partition_id, partition_key=partition_key) + + +async def test_send_str_async(invalid_hostname): + # Use invalid_hostname because this is not a live test. + client = EventHubProducerClient.from_connection_string(invalid_hostname) + async with client: + with pytest.raises(TypeError): + await client.send_batch("aaa") diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py index 69ea1cb44aa9..d12e38dc2623 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py @@ -10,7 +10,7 @@ import json import sys -from azure.eventhub import EventData, TransportType +from azure.eventhub import EventData, TransportType, EventDataBatch from azure.eventhub import EventHubProducerClient @@ -179,3 +179,59 @@ def test_send_with_create_event_batch_with_app_prop_sync(connstr_receivers): received.extend(r.receive_message_batch(timeout=5000)) assert len(received) >= 1 assert EventData._from_message(received[0]).properties[b"raw_prop"] == b"raw_value" + + +@pytest.mark.liveTest +def test_send_list(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + payload = "A1" + with client: + client.send_batch([EventData(payload)]) + received = [] + for r in receivers: + received.extend([EventData._from_message(x) for x in r.receive_message_batch(timeout=10000)]) + + assert len(received) == 1 + assert received[0].body_as_str() == payload + + +@pytest.mark.liveTest +def test_send_list_partition(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + payload = "A1" + with client: + client.send_batch([EventData(payload)], partition_id="0") + message = receivers[0].receive_message_batch(timeout=10000)[0] + received = EventData._from_message(message) + assert received.body_as_str() == payload + + +@pytest.mark.liveTest +def test_send_list_too_large(connection_str): + client = EventHubProducerClient.from_connection_string(connection_str) + to_send = [] + for i in range(1100): + to_send.append(EventData("A"*1024)) + with client: + with pytest.raises(ValueError): + client.send_batch(to_send) + + +@pytest.mark.parametrize("partition_id, partition_key", [("0", None), (None, "pk")]) +def test_send_batch_pid_pk(invalid_hostname, partition_id, partition_key): + # Use invalid_hostname because this is not a live test. + client = EventHubProducerClient.from_connection_string(invalid_hostname) + batch = EventDataBatch(partition_id=partition_id, partition_key=partition_key) + with client: + with pytest.raises(TypeError): + client.send_batch(batch, partition_id=partition_id, partition_key=partition_key) + + +def test_send_str(invalid_hostname): + # Use invalid_hostname because this is not a live test. + client = EventHubProducerClient.from_connection_string(invalid_hostname) + with client: + with pytest.raises(TypeError): + client.send_batch("aaa") From 07ed4abc9f18ff61dd8fd739c7062f18105f1b31 Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 27 Apr 2020 09:22:11 -0700 Subject: [PATCH 06/11] Add raise ValueError in docstring to async send_batch --- .../azure-eventhub/azure/eventhub/aio/_producer_client_async.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index d6adab4f57a0..261eb3bef2f1 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -244,6 +244,7 @@ async def send_batch( :class:`EventDataError` :class:`EventDataSendError` :class:`EventHubError` + `ValueError` .. admonition:: Example: From 087b6d276a3dcfa4bc281a986bc7a4effe97a399 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 28 Apr 2020 13:55:41 -0700 Subject: [PATCH 07/11] Add test case to send empty list --- .../azure-eventhub/tests/livetest/synctests/test_send.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py index d12e38dc2623..249889aac619 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py @@ -12,7 +12,7 @@ from azure.eventhub import EventData, TransportType, EventDataBatch from azure.eventhub import EventHubProducerClient - +from azure.eventhub.exceptions import EventDataSendError @pytest.mark.liveTest def test_send_with_partition_key(connstr_receivers): @@ -208,14 +208,15 @@ def test_send_list_partition(connstr_receivers): assert received.body_as_str() == payload +@pytest.mark.parametrize("num, exception_type", [(0, EventDataSendError), (1100, ValueError)]) @pytest.mark.liveTest -def test_send_list_too_large(connection_str): +def test_send_list_wrong_data(connection_str, num, exception_type): client = EventHubProducerClient.from_connection_string(connection_str) to_send = [] - for i in range(1100): + for i in range(num): to_send.append(EventData("A"*1024)) with client: - with pytest.raises(ValueError): + with pytest.raises(exception_type): client.send_batch(to_send) From d71a94be0fabbb41ce067418647327b26c458e0a Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 28 Apr 2020 14:02:18 -0700 Subject: [PATCH 08/11] Add test case to async send wrong data --- .../tests/livetest/asynctests/test_send_async.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py index 208fc00d51e1..430ca99c5edc 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py @@ -13,7 +13,7 @@ from azure.eventhub import EventData, TransportType, EventDataBatch from azure.eventhub.aio import EventHubProducerClient - +from azure.eventhub.exceptions import EventDataSendError @pytest.mark.liveTest @pytest.mark.asyncio @@ -201,15 +201,17 @@ async def test_send_list_partition_async(connstr_receivers): assert received.body_as_str() == payload +@pytest.mark.parametrize("to_send, exception_type", + [([], EventDataSendError), + ([EventData("A"*1024)]*1100, ValueError), + ("any str", AttributeError) + ]) @pytest.mark.liveTest @pytest.mark.asyncio -async def test_send_list_too_large_async(connection_str): +async def test_send_list_wrong_data_async(connection_str, to_send, exception_type): client = EventHubProducerClient.from_connection_string(connection_str) - to_send = [] - for i in range(1100): - to_send.append(EventData("A"*1024)) async with client: - with pytest.raises(ValueError): + with pytest.raises(exception_type): await client.send_batch(to_send) From d8fcc680de5787a3dd62fc9e97b60911dd45ce38 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 28 Apr 2020 14:03:02 -0700 Subject: [PATCH 09/11] Add test case to send wrong data --- .../tests/livetest/synctests/test_send.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py index 249889aac619..d955c5bcb0bc 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py @@ -208,13 +208,14 @@ def test_send_list_partition(connstr_receivers): assert received.body_as_str() == payload -@pytest.mark.parametrize("num, exception_type", [(0, EventDataSendError), (1100, ValueError)]) +@pytest.mark.parametrize("to_send, exception_type", + [([], EventDataSendError), + ([EventData("A"*1024)]*1100, ValueError), + ("any str", AttributeError) + ]) @pytest.mark.liveTest -def test_send_list_wrong_data(connection_str, num, exception_type): +def test_send_list_wrong_data(connection_str, to_send, exception_type): client = EventHubProducerClient.from_connection_string(connection_str) - to_send = [] - for i in range(num): - to_send.append(EventData("A"*1024)) with client: with pytest.raises(exception_type): client.send_batch(to_send) From b63c415ba6e7e91d1f77dce6cfdb851d160b7d58 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 28 Apr 2020 14:05:18 -0700 Subject: [PATCH 10/11] remove isinstance(event_data_batch, List) --- .../azure-eventhub/azure/eventhub/_common.py | 9 +++++++++ .../azure/eventhub/_producer_client.py | 19 +++++-------------- .../eventhub/aio/_producer_client_async.py | 15 ++++----------- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index 74491e4dff99..975cd177ccb3 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -374,6 +374,15 @@ def _from_batch(cls, batch_data, partition_key=None): ) return batch_data_instance + def _load_events(self, events): + for event_data in events: + try: + self.add(event_data) + except ValueError: + raise ValueError("The combined size of EventData collection exceeds the Event Hub frame size limit. " + "Please send a smaller collection of EventData, or use EventDataBatch, " + "which is guaranteed to be under the frame size limit") + @property def size_in_bytes(self): # type: () -> int diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 7f610d8ccccf..bce3efb9d67a 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -7,14 +7,13 @@ from typing import Any, Union, TYPE_CHECKING, Dict, List, Optional, cast -from azure.eventhub import EventData from uamqp import constants from .exceptions import ConnectError, EventHubError from ._client_base import ClientBase from ._producer import EventHubProducer from ._constants import ALL_PARTITIONS -from ._common import EventDataBatch +from ._common import EventDataBatch, EventData if TYPE_CHECKING: from azure.core.credentials import TokenCredential @@ -214,7 +213,8 @@ def send_batch(self, event_data_batch, **kwargs): :class:`EventDataError` :class:`EventDataSendError` :class:`EventHubError` - `ValueError` + :class:`ValueError` + :class:`TypeError` .. admonition:: Example: @@ -233,18 +233,9 @@ def send_batch(self, event_data_batch, **kwargs): raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch " "because type EventDataBatch itself may have partition_id or partition_key") to_send_batch = event_data_batch - elif isinstance(event_data_batch, List): - to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key) - for event_data in event_data_batch: - try: - to_send_batch.add(event_data) - except ValueError: - raise ValueError("The list of EventData exceeds the Event Hub frame size limit. " - "Please send a smaller list of EventData, or use EventDataBatch, " - "which is guaranteed to be under the frame size limit") else: - raise TypeError("event_data_batch must be of type List[EventData] or EventDataBatch") - + to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key) + to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access partition_id = ( to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access ) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index 261eb3bef2f1..b5eb6ba6fb71 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -244,7 +244,8 @@ async def send_batch( :class:`EventDataError` :class:`EventDataSendError` :class:`EventHubError` - `ValueError` + :class:`ValueError` + :class:`TypeError` .. admonition:: Example: @@ -263,17 +264,9 @@ async def send_batch( raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch " "because type EventDataBatch itself may have partition_id or partition_key") to_send_batch = event_data_batch - elif isinstance(event_data_batch, List): - to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key) - for event_data in event_data_batch: - try: - to_send_batch.add(event_data) - except ValueError: - raise ValueError("The list of EventData exceeds the Event Hub frame size limit. " - "Please send a smaller list of EventData, or use EventDataBatch, " - "which is guaranteed to be under the frame size limit") else: - raise TypeError("event_data_batch must be of type List[EventData] or EventDataBatch") + to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key) + to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access partition_id = ( to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access From 52d1c5bb8a054bc6839345ccfdc205c342014264 Mon Sep 17 00:00:00 2001 From: yijxie Date: Tue, 28 Apr 2020 18:48:17 -0700 Subject: [PATCH 11/11] Remove test_send_str --- .../tests/livetest/asynctests/test_send_async.py | 8 -------- .../azure-eventhub/tests/livetest/synctests/test_send.py | 8 -------- 2 files changed, 16 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py index 430ca99c5edc..851eec7e30a5 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py @@ -223,11 +223,3 @@ async def test_send_batch_pid_pk_async(invalid_hostname, partition_id, partition async with client: with pytest.raises(TypeError): await client.send_batch(batch, partition_id=partition_id, partition_key=partition_key) - - -async def test_send_str_async(invalid_hostname): - # Use invalid_hostname because this is not a live test. - client = EventHubProducerClient.from_connection_string(invalid_hostname) - async with client: - with pytest.raises(TypeError): - await client.send_batch("aaa") diff --git a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py index d955c5bcb0bc..d3790df006d5 100644 --- a/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py +++ b/sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py @@ -229,11 +229,3 @@ def test_send_batch_pid_pk(invalid_hostname, partition_id, partition_key): with client: with pytest.raises(TypeError): client.send_batch(batch, partition_id=partition_id, partition_key=partition_key) - - -def test_send_str(invalid_hostname): - # Use invalid_hostname because this is not a live test. - client = EventHubProducerClient.from_connection_string(invalid_hostname) - with client: - with pytest.raises(TypeError): - client.send_batch("aaa")