diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index 95f6e908c404..f53736feeb03 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -7,12 +7,30 @@ import logging import time -from uamqp import errors +from uamqp import errors, constants from azure.eventhub.error import EventHubError, _handle_exception log = logging.getLogger(__name__) +def _retry_decorator(to_be_wrapped_func): + def wrapped_func(*args, **kwargs): + timeout = kwargs.get("timeout", None) + if not timeout: + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout_time = time.time() + timeout + max_retries = args[0].client.config.max_retries + retry_count = 0 + last_exception = None + while True: + try: + return to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs) + except Exception as exception: + last_exception = args[0]._handle_exception(exception, retry_count, max_retries, timeout_time) + retry_count += 1 + return wrapped_func + + class ConsumerProducerMixin(object): def __init__(self): self.client = None @@ -61,6 +79,8 @@ def _open(self, timeout_time=None): if timeout_time and time.time() >= timeout_time: return time.sleep(0.05) + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ + or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access self.running = True def _close_handler(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index e6b35ad41ae4..648138c93d73 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -6,13 +6,31 @@ import logging import time -from uamqp import errors +from uamqp import errors, constants from azure.eventhub.error import EventHubError, ConnectError from ..aio.error_async import _handle_exception log = logging.getLogger(__name__) +def _retry_decorator(to_be_wrapped_func): + async def wrapped_func(*args, **kwargs): + timeout = kwargs.get("timeout", None) + if not timeout: + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout_time = time.time() + timeout + max_retries = args[0].client.config.max_retries + retry_count = 0 + last_exception = None + while True: + try: + return await to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs) + except Exception as exception: + last_exception = await args[0]._handle_exception(exception, retry_count, max_retries, timeout_time) + retry_count += 1 + return wrapped_func + + class ConsumerProducerMixin(object): def __init__(self): @@ -62,6 +80,8 @@ async def _open(self, timeout_time=None): if timeout_time and time.time() >= timeout_time: return await asyncio.sleep(0.05) + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ + or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access self.running = True async def _close_handler(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index dac2d0c0fa61..fdf029e37764 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -14,7 +14,7 @@ from azure.eventhub import EventData, EventPosition from azure.eventhub.error import EventHubError, AuthenticationError, ConnectError, ConnectionLostError, _error_handler from ..aio.error_async import _handle_exception -from ._consumer_producer_mixin_async import ConsumerProducerMixin +from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator log = logging.getLogger(__name__) @@ -159,11 +159,72 @@ def queue_size(self): return self._handler._received_messages.qsize() return 0 + @_retry_decorator + async def _receive(self, **kwargs): + timeout_time = kwargs.get("timeout_time") + last_exception = kwargs.get("last_exception") + max_batch_size = kwargs.get("max_batch_size") + data_batch = kwargs.get("data_batch") + + await self._open(timeout_time) + remaining_time = timeout_time - time.time() + if remaining_time <= 0.0: + if last_exception: + log.info("%r receive operation timed out. (%r)", self.name, last_exception) + raise last_exception + return data_batch + + remaining_time_ms = 1000 * remaining_time + message_batch = await self._handler.receive_message_batch_async( + max_batch_size=max_batch_size, + timeout=remaining_time_ms) + for message in message_batch: + event_data = EventData(message=message) + self.offset = EventPosition(event_data.offset) + data_batch.append(event_data) + return data_batch + async def receive(self, **kwargs): # type: (int, float) -> List[EventData] """ Receive events asynchronously from the EventHub. + :param max_batch_size: Receive a batch of events. Batch size will + be up to the maximum specified, but will return as soon as service + returns no new events. If combined with a timeout and no events are + retrieve before the time, the result will be empty. If no batch + size is supplied, the prefetch size will be the maximum. + :type max_batch_size: int + :param timeout: The maximum wait time to build up the requested message count for the batch. + If not specified, the default wait time specified when the consumer was created will be used. + :type timeout: float + :rtype: list[~azure.eventhub.common.EventData] + :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, + ~azure.eventhub.EventHubError + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_receive] + :end-before: [END eventhub_client_async_receive] + :language: python + :dedent: 4 + :caption: Receives events asynchronously + + """ + self._check_closed() + + max_batch_size = kwargs.get("max_batch_size", None) + timeout = kwargs.get("timeout", None) or self.client.config.receive_timeout + max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size + data_batch = [] # type: List[EventData] + + return await self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) + + async def _legacy_receive(self, **kwargs): + # type: (int, float) -> List[EventData] + """ + Receive events asynchronously from the EventHub. + :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. If combined with a timeout and no events are diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index e326aef0a115..0dae1734419a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -14,7 +14,7 @@ from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError from ..producer import _error, _set_partition_key -from ._consumer_producer_mixin_async import ConsumerProducerMixin +from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator log = logging.getLogger(__name__) @@ -98,7 +98,7 @@ def _create_handler(self): self.client.config.user_agent), # pylint: disable=protected-access loop=self.loop) - async def _open(self, timeout_time=None): + async def _open(self, timeout_time=None, **kwargs): """ Open the EventHubProducer using the supplied connection. If the handler has previously been redirected, the redirect @@ -110,7 +110,32 @@ async def _open(self, timeout_time=None): self.target = self.redirected.address await super(EventHubProducer, self)._open(timeout_time) - async def _send_event_data(self, timeout=None): + @_retry_decorator + async def _send_event_data(self, **kwargs): + timeout_time = kwargs.get("timeout_time") + last_exception = kwargs.get("last_exception") + + if self.unsent_events: + await self._open(timeout_time) + remaining_time = timeout_time - time.time() + if remaining_time <= 0.0: + if last_exception: + error = last_exception + else: + error = OperationTimeoutError("send operation timed out") + log.info("%r send operation timed out. (%r)", self.name, error) + raise error + self._handler._msg_timeout = remaining_time # pylint: disable=protected-access + self._handler.queue_message(*self.unsent_events) + await self._handler.wait_async() + self.unsent_events = self._handler.pending_messages + if self._outcome != constants.MessageSendResult.Ok: + if self._outcome == constants.MessageSendResult.Timeout: + self._condition = OperationTimeoutError("send operation timed out") + _error(self._outcome, self._condition) + return + + async def _legacy_send_event_data(self, timeout=None): timeout = timeout or self.client.config.send_timeout if not timeout: timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout @@ -170,14 +195,19 @@ async def create_batch(self, **kwargs): """ max_size = kwargs.get("max_size", None) partition_key = kwargs.get("partition_key", None) + + @_retry_decorator + async def wrapped_open(*args, **kwargs): + await self._open(**kwargs) + if not self._max_message_size_on_link: - await self._open() + await wrapped_open(self, timeout=self.client.config.send_timeout) if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_size, self._max_message_size_on_link)) - return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) + return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key) async def send(self, event_data, **kwargs): # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index e59c440c7c88..1139a8b725d4 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -14,7 +14,7 @@ from azure.eventhub.common import EventData, EventPosition from azure.eventhub.error import _error_handler, EventHubError -from ._consumer_producer_mixin import ConsumerProducerMixin +from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator log = logging.getLogger(__name__) @@ -152,7 +152,65 @@ def queue_size(self): return self._handler._received_messages.qsize() return 0 + @_retry_decorator + def _receive(self, **kwargs): + timeout_time = kwargs.get("timeout_time") + last_exception = kwargs.get("last_exception") + max_batch_size = kwargs.get("max_batch_size") + data_batch = kwargs.get("data_batch") + + self._open(timeout_time) + remaining_time = timeout_time - time.time() + if remaining_time <= 0.0: + if last_exception: + log.info("%r receive operation timed out. (%r)", self.name, last_exception) + raise last_exception + return data_batch + remaining_time_ms = 1000 * remaining_time + message_batch = self._handler.receive_message_batch( + max_batch_size=max_batch_size - (len(data_batch) if data_batch else 0), + timeout=remaining_time_ms) + for message in message_batch: + event_data = EventData(message=message) + self.offset = EventPosition(event_data.offset) + data_batch.append(event_data) + return data_batch + def receive(self, **kwargs): + """ + Receive events from the EventHub. + + :param max_batch_size: Receive a batch of events. Batch size will + be up to the maximum specified, but will return as soon as service + returns no new events. If combined with a timeout and no events are + retrieve before the time, the result will be empty. If no batch + size is supplied, the prefetch size will be the maximum. + :type max_batch_size: int + :param timeout: The maximum wait time to build up the requested message count for the batch. + If not specified, the default wait time specified when the consumer was created will be used. + :type timeout: float + :rtype: list[~azure.eventhub.common.EventData] + :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, + ~azure.eventhub.EventHubError + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_sync_receive] + :end-before: [END eventhub_client_sync_receive] + :language: python + :dedent: 4 + :caption: Receive events from the EventHub. + + """ + self._check_closed() + + max_batch_size = kwargs.get("max_batch_size", None) + timeout = kwargs.get("timeout", None) or self.client.config.receive_timeout + max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size + data_batch = [] # type: List[EventData] + + return self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) + + def _legacy_receive(self, **kwargs): # type:(int, float) -> List[EventData] """ Receive events from the EventHub. @@ -182,17 +240,19 @@ def receive(self, **kwargs): timeout = kwargs.get("timeout", None) self._check_closed() + max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size + data_batch = [] # type: List[EventData] + timeout = self.client.config.receive_timeout if timeout is None else timeout if not timeout: timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout - - data_batch = [] # type: List[EventData] - start_time = time.time() - timeout_time = start_time + timeout + timeout_time = time.time() + timeout max_retries = self.client.config.max_retries retry_count = 0 last_exception = None + + self._receive() while True: try: self._open(timeout_time) @@ -211,8 +271,6 @@ def receive(self, **kwargs): self.offset = EventPosition(event_data.offset) data_batch.append(event_data) return data_batch - except EventHubError: - raise except Exception as exception: last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) retry_count += 1 diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index da2a9ee95368..01c3c28fe374 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -15,7 +15,7 @@ from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError -from ._consumer_producer_mixin import ConsumerProducerMixin +from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator log = logging.getLogger(__name__) @@ -105,7 +105,7 @@ def _create_handler(self): link_properties=self._link_properties, properties=self.client._create_properties(self.client.config.user_agent)) # pylint: disable=protected-access - def _open(self, timeout_time=None): + def _open(self, timeout_time=None, **kwargs): """ Open the EventHubProducer using the supplied connection. If the handler has previously been redirected, the redirect @@ -118,12 +118,36 @@ def _open(self, timeout_time=None): self.target = self.redirected.address super(EventHubProducer, self)._open(timeout_time) - def _send_event_data(self, timeout=None): + @_retry_decorator + def _send_event_data(self, **kwargs): + timeout_time = kwargs.get("timeout_time") + last_exception = kwargs.get("last_exception") + + if self.unsent_events: + self._open(timeout_time) + remaining_time = timeout_time - time.time() + if remaining_time <= 0.0: + if last_exception: + error = last_exception + else: + error = OperationTimeoutError("send operation timed out") + log.info("%r send operation timed out. (%r)", self.name, error) + raise error + self._handler._msg_timeout = remaining_time # pylint: disable=protected-access + self._handler.queue_message(*self.unsent_events) + self._handler.wait() + self.unsent_events = self._handler.pending_messages + if self._outcome != constants.MessageSendResult.Ok: + if self._outcome == constants.MessageSendResult.Timeout: + self._condition = OperationTimeoutError("send operation timed out") + _error(self._outcome, self._condition) + return + + def _legacy_send_event_data(self, timeout=None): timeout = timeout or self.client.config.send_timeout if not timeout: timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout - start_time = time.time() - timeout_time = start_time + timeout + timeout_time = time.time() + timeout max_retries = self.client.config.max_retries retry_count = 0 last_exception = None @@ -178,14 +202,19 @@ def create_batch(self, **kwargs): """ max_size = kwargs.get("max_size", None) partition_key = kwargs.get("partition_key", None) + + @_retry_decorator + def wrapped_open(*args, **kwargs): + self._open(**kwargs) + if not self._max_message_size_on_link: - self._open() + wrapped_open(self, timeout=self.client.config.send_timeout) if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_size, self._max_message_size_on_link)) - return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) + return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key) def send(self, event_data, **kwargs): # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py index 3d43942fe6c8..4d17002eb9fd 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py @@ -100,6 +100,7 @@ async def test_non_existing_entity_sender_async(connection_str): sender = client.create_producer(partition_id="1") with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) + await sender.close() @pytest.mark.liveTest @@ -109,6 +110,7 @@ async def test_non_existing_entity_receiver_async(connection_str): receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=5) + await receiver.close() @pytest.mark.liveTest @@ -196,3 +198,13 @@ async def test_max_receivers_async(connstr_senders): failed = [o for o in outputs if isinstance(o, EventHubError)] assert len(failed) == 1 print(failed[0].message) + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_create_batch_with_invalid_hostname(invalid_hostname): + client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) + sender = client.create_producer() + with pytest.raises(AuthenticationError): + batch_event_data = await sender.create_batch(max_size=300, partition_key="key") + await sender.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py index ae696bf469b5..2a2e4836c2d5 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py @@ -185,7 +185,7 @@ async def test_exclusive_receiver_async(connstr_senders): await pump(receiver1) output2 = await pump(receiver2) with pytest.raises(ConnectionLostError): - await receiver1.receive(timeout=1) + await receiver1.receive(timeout=3) assert output2 == 1 finally: await receiver1.close() @@ -230,7 +230,7 @@ async def test_exclusive_receiver_after_non_exclusive_receiver_async(connstr_sen await pump(receiver1) output2 = await pump(receiver2) with pytest.raises(ConnectionLostError): - await receiver1.receive(timeout=1) + await receiver1.receive(timeout=3) assert output2 == 1 finally: await receiver1.close() @@ -248,7 +248,7 @@ async def test_non_exclusive_receiver_after_exclusive_receiver_async(connstr_sen receiver2 = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=10) try: output1 = await pump(receiver1) - with pytest.raises(ConnectError): + with pytest.raises(ConnectionLostError): await pump(receiver2) assert output1 == 1 finally: diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py index 3d5fb70601ea..c84268d15f21 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py @@ -246,3 +246,21 @@ async def test_send_over_websocket_async(connstr_receivers): for r in receivers: r.close() + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_with_create_event_batch_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) + sender = client.create_producer() + + event_data_batch = await sender.create_batch(max_size=100 * 1024) + while True: + try: + event_data_batch.try_add(EventData('A single event data')) + except ValueError: + break + + await sender.send(event_data_batch) + await sender.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/test_negative.py b/sdk/eventhub/azure-eventhubs/tests/test_negative.py index 4749df940d9c..01707f4256e8 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_negative.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_negative.py @@ -214,3 +214,12 @@ def test_message_body_types(connstr_senders): raise finally: receiver.close() + + +@pytest.mark.liveTest +def test_create_batch_with_invalid_hostname(invalid_hostname): + client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) + sender = client.create_producer() + with pytest.raises(AuthenticationError): + batch_event_data = sender.create_batch(max_size=300, partition_key="key") + sender.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/test_send.py b/sdk/eventhub/azure-eventhubs/tests/test_send.py index f50ac702fb52..3d7bc3815c22 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_send.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_send.py @@ -249,3 +249,20 @@ def test_send_over_websocket_sync(connstr_receivers): received.extend(r.receive(timeout=3)) assert len(received) == 20 + + +@pytest.mark.liveTest +def test_send_with_create_event_batch_sync(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) + sender = client.create_producer() + + event_data_batch = sender.create_batch(max_size=100 * 1024) + while True: + try: + event_data_batch.try_add(EventData('A single event data')) + except ValueError: + break + + sender.send(event_data_batch) + sender.close()