Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import logging
import datetime
import functools
import asyncio
from typing import Any, List, Dict, Union

from typing import Any, List, Dict

from uamqp import authentication, constants, types, errors
from uamqp import authentication, constants
from uamqp import (
Message,
AMQPClientAsync,
Expand Down Expand Up @@ -188,11 +186,12 @@ def create_consumer(
"""
Create an async consumer to the client for a particular consumer group and partition.

:param consumer_group: The name of the consumer group. Default value is `$Default`.
:param consumer_group: The name of the consumer group this consumer is associated with.
Events are read in the context of this group.
:type consumer_group: str
:param partition_id: The ID of the partition.
:param partition_id: The identifier of the Event Hub partition from which events will be received.
:type partition_id: str
:param event_position: The position from which to start receiving.
:param event_position: The position within the partition where the consumer should begin reading events.
:type event_position: ~azure.eventhub.common.EventPosition
:param owner_level: The priority of the exclusive consumer. The client will create an exclusive
consumer if owner_level is set.
Expand Down Expand Up @@ -228,8 +227,7 @@ def create_producer(
self, partition_id=None, operation=None, send_timeout=None, loop=None):
# type: (str, str, float, asyncio.AbstractEventLoop) -> EventHubProducer
"""
Create an async producer to the client to send ~azure.eventhub.common.EventData object
to an EventHub.
Create an async producer to send EventData object to an EventHub.

:param partition_id: Optionally specify a particular partition to send to.
If omitted, the events will be distributed to available partitions via
Expand All @@ -244,15 +242,13 @@ def create_producer(
:param loop: An event loop. If not specified the default event loop will be used.
:rtype ~azure.eventhub.aio.sender_async.EventHubProducer


Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START create_eventhub_client_async_sender]
:end-before: [END create_eventhub_client_async_sender]
:language: python
:dedent: 4
:caption: Add an async producer to the client to
send ~azure.eventhub.common.EventData object to an EventHub.
:caption: Add an async producer to the client to send EventData.

"""
target = "amqps://{}{}".format(self.address.hostname, self.address.path)
Expand Down
95 changes: 52 additions & 43 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import asyncio
import uuid
import logging
Expand All @@ -19,7 +18,16 @@

class EventHubConsumer(object):
"""
Implements the async API of a EventHubConsumer.
A consumer responsible for reading EventData from a specific Event Hub
partition and as a member of a specific consumer group.

A consumer may be exclusive, which asserts ownership over the partition for the consumer
group to ensure that only one consumer from that group is reading the from the partition.
These exclusive consumers are sometimes referred to as "Epoch Consumers."

A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
group to be actively reading events from the partition. These non-exclusive consumers are
sometimes referred to as "Non-Epoch Consumers."

"""
timeout = 0
Expand All @@ -29,7 +37,8 @@ def __init__( # pylint: disable=super-init-not-called
self, client, source, event_position=None, prefetch=300, owner_level=None,
keep_alive=None, auto_reconnect=True, loop=None):
"""
Instantiate an async consumer.
Instantiate an async consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method
in EventHubClient.

:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub.aio.EventHubClientAsync
Expand Down Expand Up @@ -158,6 +167,7 @@ def _check_closed(self):
if self.error:
raise EventHubError("This consumer has been closed. Please create a new consumer to receive event data.",
self.error)

async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
Expand Down Expand Up @@ -282,41 +292,6 @@ async def _reconnect(self):
a retryable error - attempt to reconnect."""
return await self._build_connection(is_reconnect=True)

async def close(self, exception=None):
# type: (Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_receiver_close]
:end-before: [END eventhub_client_async_receiver_close]
:language: python
:dedent: 4
:caption: Close down the handler.

"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This receive handler is now closed.")
await self._handler.close_async()

@property
def queue_size(self):
# type: () -> int
Expand All @@ -341,10 +316,8 @@ async def receive(self, max_batch_size=None, timeout=None):
retrieve before the time, the result will be empty. If no batch
size is supplied, the prefetch size will be the maximum.
:type max_batch_size: int
:param timeout: The timeout time in seconds to receive a batch of events
from an Event Hub. Results will be returned after timeout. If combined
with max_batch_size, it will return after either the count of received events
reaches the max_batch_size or the operation has timed out.
:param timeout: The maximum wait time to build up the requested message count for the batch.
If not specified, the default wait time specified when the consumer was created will be used.
:type timeout: float
:rtype: list[~azure.eventhub.common.EventData]

Expand All @@ -357,6 +330,7 @@ async def receive(self, max_batch_size=None, timeout=None):
:caption: Receives events asynchronously

"""
self._check_closed()
await self._open()

max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
Expand Down Expand Up @@ -425,4 +399,39 @@ async def receive(self, max_batch_size=None, timeout=None):
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receive failed: {}".format(e))
await self.close(exception=error)
raise error
raise error

async def close(self, exception=None):
# type: (Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_receiver_close]
:end-before: [END eventhub_client_async_receiver_close]
:language: python
:dedent: 4
:caption: Close down the handler.

"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This receive handler is now closed.")
await self._handler.close_async()
106 changes: 55 additions & 51 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import uuid
import asyncio
import logging
Expand All @@ -19,15 +18,19 @@

class EventHubProducer(object):
"""
Implements the async API of a EventHubProducer.
A producer responsible for transmitting EventData to a specific Event Hub,
grouped together in batches. Depending on the options specified at creation, the producer may
be created to allow event data to be automatically routed to an available partition or specific
to a partition.

"""

def __init__( # pylint: disable=super-init-not-called
self, client, target, partition=None, send_timeout=60,
keep_alive=None, auto_reconnect=True, loop=None):
"""
Instantiate an async EventHubProducer.
Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer`
method in EventHubClient.

:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub.aio.EventHubClientAsync
Expand Down Expand Up @@ -131,7 +134,8 @@ async def _build_connection(self, is_reconnect=False):
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client._create_properties(self.client.config.user_agent))
properties=self.client._create_properties(self.client.config.user_agent),
loop=self.loop)
try:
await self._handler.open_async()
while not await self._handler.client_ready_async():
Expand Down Expand Up @@ -191,41 +195,6 @@ async def _build_connection(self, is_reconnect=False):
async def _reconnect(self):
return await self._build_connection(is_reconnect=True)

async def close(self, exception=None):
# type: (Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_sender_close]
:end-before: [END eventhub_client_async_sender_close]
:language: python
:dedent: 4
:caption: Close down the handler.

"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This send handler is now closed.")
await self._handler.close_async()

async def _send_event_data(self):
await self._open()
max_retries = self.client.config.max_retries
Expand Down Expand Up @@ -307,6 +276,23 @@ def _check_closed(self):
raise EventHubError("This producer has been closed. Please create a new producer to send event data.",
self.error)

def _on_outcome(self, outcome, condition):
"""
Called when the outcome is received for a delivery.

:param outcome: The outcome of the message delivery - success or failure.
:type outcome: ~uamqp.constants.MessageSendResult
:param condition: Detail information of the outcome.

"""
self._outcome = outcome
self._condition = condition

@staticmethod
def _error(outcome, condition):
if outcome != constants.MessageSendResult.Ok:
raise condition

@staticmethod
def _set_partition_key(event_datas, partition_key):
ed_iter = iter(event_datas)
Expand Down Expand Up @@ -352,19 +338,37 @@ async def send(self, event_data, partition_key=None):
self.unsent_events = [wrapper_event_data.message]
await self._send_event_data()

def _on_outcome(self, outcome, condition):
async def close(self, exception=None):
# type: (Exception) -> None
"""
Called when the outcome is received for a delivery.
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param outcome: The outcome of the message delivery - success or failure.
:type outcome: ~uamqp.constants.MessageSendResult
:param condition: Detail information of the outcome.
:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

"""
self._outcome = outcome
self._condition = condition
Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_sender_close]
:end-before: [END eventhub_client_async_sender_close]
:language: python
:dedent: 4
:caption: Close down the handler.

@staticmethod
def _error(outcome, condition):
if outcome != constants.MessageSendResult.Ok:
raise condition
"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This send handler is now closed.")
await self._handler.close_async()
Loading