diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index efc96d30526f..cdcba824ad95 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -2,15 +2,13 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- - import logging import datetime import functools import asyncio -from typing import Any, List, Dict, Union - +from typing import Any, List, Dict -from uamqp import authentication, constants, types, errors +from uamqp import authentication, constants from uamqp import ( Message, AMQPClientAsync, @@ -188,11 +186,12 @@ def create_consumer( """ Create an async consumer to the client for a particular consumer group and partition. - :param consumer_group: The name of the consumer group. Default value is `$Default`. + :param consumer_group: The name of the consumer group this consumer is associated with. + Events are read in the context of this group. :type consumer_group: str - :param partition_id: The ID of the partition. + :param partition_id: The identifier of the Event Hub partition from which events will be received. :type partition_id: str - :param event_position: The position from which to start receiving. + :param event_position: The position within the partition where the consumer should begin reading events. :type event_position: ~azure.eventhub.common.EventPosition :param owner_level: The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set. @@ -228,8 +227,7 @@ def create_producer( self, partition_id=None, operation=None, send_timeout=None, loop=None): # type: (str, str, float, asyncio.AbstractEventLoop) -> EventHubProducer """ - Create an async producer to the client to send ~azure.eventhub.common.EventData object - to an EventHub. + Create an async producer to send EventData object to an EventHub. :param partition_id: Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via @@ -244,15 +242,13 @@ def create_producer( :param loop: An event loop. If not specified the default event loop will be used. :rtype ~azure.eventhub.aio.sender_async.EventHubProducer - Example: .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py :start-after: [START create_eventhub_client_async_sender] :end-before: [END create_eventhub_client_async_sender] :language: python :dedent: 4 - :caption: Add an async producer to the client to - send ~azure.eventhub.common.EventData object to an EventHub. + :caption: Add an async producer to the client to send EventData. """ target = "amqps://{}{}".format(self.address.hostname, self.address.path) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 3cf285106c62..3ff10b64ed04 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -2,7 +2,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- - import asyncio import uuid import logging @@ -19,7 +18,16 @@ class EventHubConsumer(object): """ - Implements the async API of a EventHubConsumer. + A consumer responsible for reading EventData from a specific Event Hub + partition and as a member of a specific consumer group. + + A consumer may be exclusive, which asserts ownership over the partition for the consumer + group to ensure that only one consumer from that group is reading the from the partition. + These exclusive consumers are sometimes referred to as "Epoch Consumers." + + A consumer may also be non-exclusive, allowing multiple consumers from the same consumer + group to be actively reading events from the partition. These non-exclusive consumers are + sometimes referred to as "Non-Epoch Consumers." """ timeout = 0 @@ -29,7 +37,8 @@ def __init__( # pylint: disable=super-init-not-called self, client, source, event_position=None, prefetch=300, owner_level=None, keep_alive=None, auto_reconnect=True, loop=None): """ - Instantiate an async consumer. + Instantiate an async consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method + in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync @@ -158,6 +167,7 @@ def _check_closed(self): if self.error: raise EventHubError("This consumer has been closed. Please create a new consumer to receive event data.", self.error) + async def _open(self): """ Open the EventHubConsumer using the supplied connection. @@ -282,41 +292,6 @@ async def _reconnect(self): a retryable error - attempt to reconnect.""" return await self._build_connection(is_reconnect=True) - async def close(self, exception=None): - # type: (Exception) -> None - """ - Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START eventhub_client_async_receiver_close] - :end-before: [END eventhub_client_async_receiver_close] - :language: python - :dedent: 4 - :caption: Close down the handler. - - """ - self.running = False - if self.error: - return - if isinstance(exception, errors.LinkRedirect): - self.redirected = exception - elif isinstance(exception, EventHubError): - self.error = exception - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self.error = ConnectError(str(exception), exception) - elif exception: - self.error = EventHubError(str(exception)) - else: - self.error = EventHubError("This receive handler is now closed.") - await self._handler.close_async() - @property def queue_size(self): # type: () -> int @@ -341,10 +316,8 @@ async def receive(self, max_batch_size=None, timeout=None): retrieve before the time, the result will be empty. If no batch size is supplied, the prefetch size will be the maximum. :type max_batch_size: int - :param timeout: The timeout time in seconds to receive a batch of events - from an Event Hub. Results will be returned after timeout. If combined - with max_batch_size, it will return after either the count of received events - reaches the max_batch_size or the operation has timed out. + :param timeout: The maximum wait time to build up the requested message count for the batch. + If not specified, the default wait time specified when the consumer was created will be used. :type timeout: float :rtype: list[~azure.eventhub.common.EventData] @@ -357,6 +330,7 @@ async def receive(self, max_batch_size=None, timeout=None): :caption: Receives events asynchronously """ + self._check_closed() await self._open() max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size @@ -425,4 +399,39 @@ async def receive(self, max_batch_size=None, timeout=None): log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Receive failed: {}".format(e)) await self.close(exception=error) - raise error \ No newline at end of file + raise error + + async def close(self, exception=None): + # type: (Exception) -> None + """ + Close down the handler. If the handler has already closed, + this will be a no op. An optional exception can be passed in to + indicate that the handler was shutdown due to error. + + :param exception: An optional exception if the handler is closing + due to an error. + :type exception: Exception + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_receiver_close] + :end-before: [END eventhub_client_async_receiver_close] + :language: python + :dedent: 4 + :caption: Close down the handler. + + """ + self.running = False + if self.error: + return + if isinstance(exception, errors.LinkRedirect): + self.redirected = exception + elif isinstance(exception, EventHubError): + self.error = exception + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + self.error = ConnectError(str(exception), exception) + elif exception: + self.error = EventHubError(str(exception)) + else: + self.error = EventHubError("This receive handler is now closed.") + await self._handler.close_async() \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 5398186d0e39..169ad3a3e702 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -2,7 +2,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- - import uuid import asyncio import logging @@ -19,7 +18,10 @@ class EventHubProducer(object): """ - Implements the async API of a EventHubProducer. + A producer responsible for transmitting EventData to a specific Event Hub, + grouped together in batches. Depending on the options specified at creation, the producer may + be created to allow event data to be automatically routed to an available partition or specific + to a partition. """ @@ -27,7 +29,8 @@ def __init__( # pylint: disable=super-init-not-called self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True, loop=None): """ - Instantiate an async EventHubProducer. + Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` + method in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync @@ -131,7 +134,8 @@ async def _build_connection(self, is_reconnect=False): error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, - properties=self.client._create_properties(self.client.config.user_agent)) + properties=self.client._create_properties(self.client.config.user_agent), + loop=self.loop) try: await self._handler.open_async() while not await self._handler.client_ready_async(): @@ -191,41 +195,6 @@ async def _build_connection(self, is_reconnect=False): async def _reconnect(self): return await self._build_connection(is_reconnect=True) - async def close(self, exception=None): - # type: (Exception) -> None - """ - Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START eventhub_client_async_sender_close] - :end-before: [END eventhub_client_async_sender_close] - :language: python - :dedent: 4 - :caption: Close down the handler. - - """ - self.running = False - if self.error: - return - if isinstance(exception, errors.LinkRedirect): - self.redirected = exception - elif isinstance(exception, EventHubError): - self.error = exception - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self.error = ConnectError(str(exception), exception) - elif exception: - self.error = EventHubError(str(exception)) - else: - self.error = EventHubError("This send handler is now closed.") - await self._handler.close_async() - async def _send_event_data(self): await self._open() max_retries = self.client.config.max_retries @@ -307,6 +276,23 @@ def _check_closed(self): raise EventHubError("This producer has been closed. Please create a new producer to send event data.", self.error) + def _on_outcome(self, outcome, condition): + """ + Called when the outcome is received for a delivery. + + :param outcome: The outcome of the message delivery - success or failure. + :type outcome: ~uamqp.constants.MessageSendResult + :param condition: Detail information of the outcome. + + """ + self._outcome = outcome + self._condition = condition + + @staticmethod + def _error(outcome, condition): + if outcome != constants.MessageSendResult.Ok: + raise condition + @staticmethod def _set_partition_key(event_datas, partition_key): ed_iter = iter(event_datas) @@ -352,19 +338,37 @@ async def send(self, event_data, partition_key=None): self.unsent_events = [wrapper_event_data.message] await self._send_event_data() - def _on_outcome(self, outcome, condition): + async def close(self, exception=None): + # type: (Exception) -> None """ - Called when the outcome is received for a delivery. + Close down the handler. If the handler has already closed, + this will be a no op. An optional exception can be passed in to + indicate that the handler was shutdown due to error. - :param outcome: The outcome of the message delivery - success or failure. - :type outcome: ~uamqp.constants.MessageSendResult - :param condition: Detail information of the outcome. + :param exception: An optional exception if the handler is closing + due to an error. + :type exception: Exception - """ - self._outcome = outcome - self._condition = condition + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_sender_close] + :end-before: [END eventhub_client_async_sender_close] + :language: python + :dedent: 4 + :caption: Close down the handler. - @staticmethod - def _error(outcome, condition): - if outcome != constants.MessageSendResult.Ok: - raise condition + """ + self.running = False + if self.error: + return + if isinstance(exception, errors.LinkRedirect): + self.redirected = exception + elif isinstance(exception, EventHubError): + self.error = exception + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + self.error = ConnectError(str(exception), exception) + elif exception: + self.error = EventHubError(str(exception)) + else: + self.error = EventHubError("This send handler is now closed.") + await self._handler.close_async() \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index 976ee5723cd5..c2dc694bbe9b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -12,10 +12,10 @@ from urllib import unquote_plus, urlencode, quote_plus except ImportError: from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus -from typing import Any, List, Dict, Union +from typing import Any, List, Dict import uamqp -from uamqp import Message, AMQPClient +from uamqp import Message from uamqp import authentication from uamqp import constants from uamqp import errors @@ -194,11 +194,12 @@ def create_consumer( """ Create a consumer to the client for a particular consumer group and partition. - :param consumer_group: The name of the consumer group. Default value is `$Default`. + :param consumer_group: The name of the consumer group this consumer is associated with. + Events are read in the context of this group. :type consumer_group: str - :param partition_id: The ID of the partition. + :param partition_id: The identifier of the Event Hub partition from which events will be received. :type partition_id: str - :param event_position: The position from which to start receiving. + :param event_position: The position within the partition where the consumer should begin reading events. :type event_position: ~azure.eventhub.common.EventPosition :param owner_level: The priority of the exclusive consumer. The client will create an exclusive consumer if owner_level is set. @@ -232,7 +233,7 @@ def create_consumer( def create_producer(self, partition_id=None, operation=None, send_timeout=None): # type: (str, str, float) -> EventHubProducer """ - Create a EventHubProducer to send EventData object to an EventHub. + Create an producer to send EventData object to an EventHub. :param partition_id: Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via @@ -252,7 +253,7 @@ def create_producer(self, partition_id=None, operation=None, send_timeout=None): :end-before: [END create_eventhub_client_sender] :language: python :dedent: 4 - :caption: Add a producer to the client to send EventData object to an EventHub. + :caption: Add a producer to the client to send EventData. """ target = "amqps://{}{}".format(self.address.hostname, self.address.path) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index afb58280442f..dc041269c5e5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -22,8 +22,7 @@ TYPE_CHECKING = False if TYPE_CHECKING: from azure.core.credentials import TokenCredential - from typing import Union, List, Dict - + from typing import Union from azure.eventhub import __version__ from azure.eventhub.configuration import Configuration @@ -32,6 +31,7 @@ log = logging.getLogger(__name__) MAX_USER_AGENT_LENGTH = 512 + def _parse_conn_str(conn_str): endpoint = None shared_access_key_name = None @@ -100,7 +100,7 @@ def __init__(self, host, event_hub_path, credential, **kwargs): :param host: The hostname of the the Event Hub. :type host: str - :param event_hub_path: The path/name of the Event Hub + :param event_hub_path: The path of the specific Event Hub to connect the client to. :type event_hub_path: str :param network_tracing: Whether to output network trace logs to the logger. Default is `False`. @@ -121,16 +121,15 @@ def __init__(self, host, event_hub_path, credential, **kwargs): :param max_retries: The max number of attempts to redo the failed operation when an error happened. Default value is 3. :type max_retries: int - :param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp. - ~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the - transport type is explicitly requested. + :param transport_type: The type of transport protocol that will be used for communicating with + the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp. :type transport_type: ~azure.eventhub.TransportType :param prefetch: The message prefetch count of the consumer. Default is 300. :type prefetch: int :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but will return as soon as service returns no new events. Default value is the same as prefetch. :type max_batch_size: int - :param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub. + :param receive_timeout: The timeout in seconds to receive a batch of events from an Event Hub. Default value is 0 seconds. :type receive_timeout: float :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is @@ -161,115 +160,8 @@ def __init__(self, host, event_hub_path, credential, **kwargs): log.info("%r: Created the Event Hub client", self.container_id) - @classmethod - def from_connection_string(cls, conn_str, event_hub_path=None, **kwargs): - """Create an EventHubClient from a connection string. - - :param conn_str: The connection string. - :type conn_str: str - :param event_hub_path: The path/name of the Event Hub, if the EntityName is - not included in the connection string. - :type event_hub_path: str - :param network_tracing: Whether to output network trace logs to the logger. Default - is `False`. - :type network_tracing: bool - :param http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). - Additionally the following keys may also be present: 'username', 'password'. - :type http_proxy: dict[str, Any] - :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. - :type auth_timeout: float - :param user_agent: The user agent that needs to be appended to the built in user agent string. - :type user_agent: str - :param max_retries: The max number of attempts to redo the failed operation when an error happened. Default - value is 3. - :type max_retries: int - :param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp. - ~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the - transport type is explicitly requested. - :type transport_type: ~azure.eventhub.TransportType - :param prefetch: The message prefetch count of the consumer. Default is 300. - :type prefetch: int - :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but - will return as soon as service returns no new events. Default value is the same as prefetch. - :type max_batch_size: int - :param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub. - Default value is 0 seconds. - :type receive_timeout: float - :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is - queued. Default value is 60 seconds. If set to 0, there will be no timeout. - :type send_timeout: float - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START create_eventhub_client_connstr] - :end-before: [END create_eventhub_client_connstr] - :language: python - :dedent: 4 - :caption: Create an EventHubClient from a connection string. - - """ - is_iot_conn_str = conn_str.lstrip().lower().startswith("hostname") - if not is_iot_conn_str: - address, policy, key, entity = _parse_conn_str(conn_str) - entity = event_hub_path or entity - left_slash_pos = address.find("//") - if left_slash_pos != -1: - host = address[left_slash_pos + 2:] - else: - host = address - return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) - else: - return cls._from_iothub_connection_string(conn_str, **kwargs) - @classmethod def _from_iothub_connection_string(cls, conn_str, **kwargs): - """ - Create an EventHubClient from an IoTHub connection string. - - :param conn_str: The connection string. - :type conn_str: str - :param network_tracing: Whether to output network trace logs to the logger. Default - is `False`. - :type network_tracing: bool - :param http_proxy: HTTP proxy settings. This must be a dictionary with the following - keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). - Additionally the following keys may also be present: 'username', 'password'. - :type http_proxy: dict[str, Any] - :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. - :type auth_timeout: float - :param user_agent: The user agent that needs to be appended to the built in user agent string. - :type user_agent: str - :param max_retries: The max number of attempts to redo the failed operation when an error happened. Default - value is 3. - :type max_retries: int - :param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp. - ~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the - transport type is explicitly requested. - :type transport_type: ~azure.eventhub.TransportType - :param prefetch: The message prefetch count of the consumer. Default is 300. - :type prefetch: int - :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but - will return as soon as service returns no new events. Default value is the same as prefetch. - :type max_batch_size: int - :param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub. - Default value is 0 seconds. - :type receive_timeout: float - :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is - queued. Default value is 60 seconds. If set to 0, there will be no timeout. - :type send_timeout: float - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START create_eventhub_client_iot_connstr] - :end-before: [END create_eventhub_client_iot_connstr] - :language: python - :dedent: 4 - :caption: Create an EventHubClient from an IoTHub connection string. - - """ address, policy, key, _ = _parse_conn_str(conn_str) hub_name = address.split('.')[0] username = "{}@sas.root.{}".format(policy, hub_name) @@ -326,6 +218,67 @@ def _process_redirect_uri(self, redirect): self.eh_name = self.address.path.lstrip('/') self.mgmt_target = redirect_uri + @classmethod + def from_connection_string(cls, conn_str, event_hub_path=None, **kwargs): + """Create an EventHubClient from an EventHub/IotHub connection string. + + :param conn_str: The connection string. + :type conn_str: str + :param event_hub_path: The path of the specific Event Hub to connect the client to, if the EntityName is + not included in the connection string. + :type event_hub_path: str + :param network_tracing: Whether to output network trace logs to the logger. Default + is `False`. + :type network_tracing: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] + :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :type auth_timeout: float + :param user_agent: The user agent that needs to be appended to the built in user agent string. + :type user_agent: str + :param max_retries: The max number of attempts to redo the failed operation when an error happened. Default + value is 3. + :type max_retries: int + :param transport_type: The type of transport protocol that will be used for communicating with + the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp. + :type transport_type: ~azure.eventhub.TransportType + :param prefetch: The message prefetch count of the consumer. Default is 300. + :type prefetch: int + :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but + will return as soon as service returns no new events. Default value is the same as prefetch. + :type max_batch_size: int + :param receive_timeout: The timeout in seconds to receive a batch of events from an Event Hub. + Default value is 0 seconds. + :type receive_timeout: float + :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is + queued. Default value is 60 seconds. If set to 0, there will be no timeout. + :type send_timeout: float + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START create_eventhub_client_connstr] + :end-before: [END create_eventhub_client_connstr] + :language: python + :dedent: 4 + :caption: Create an EventHubClient from a connection string. + + """ + is_iot_conn_str = conn_str.lstrip().lower().startswith("hostname") + if not is_iot_conn_str: + address, policy, key, entity = _parse_conn_str(conn_str) + entity = event_hub_path or entity + left_slash_pos = address.find("//") + if left_slash_pos != -1: + host = address[left_slash_pos + 2:] + else: + host = address + return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) + else: + return cls._from_iothub_connection_string(conn_str, **kwargs) + @abstractmethod def create_consumer( self, consumer_group, partition_id, event_position, owner_level=None, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index da01ec8603d1..5a6702a60324 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -4,16 +4,12 @@ # -------------------------------------------------------------------------------------------- from __future__ import unicode_literals -from enum import Enum import datetime import calendar import json - import six -import uamqp -from uamqp import BatchMessage -from uamqp import types, constants, errors +from uamqp import BatchMessage, Message, types from uamqp.message import MessageHeader, MessageProperties @@ -33,9 +29,6 @@ def parse_sas_token(sas_token): return sas_data -Message = uamqp.Message - - class EventData(object): """ The EventData class is a holder of event content. @@ -109,7 +102,20 @@ def __str__(self): dic['partition_key'] = str(self.partition_key) return str(dic) + def _set_partition_key(self, value): + """ + Set the partition key of the event data object. + :param value: The partition key to set. + :type value: str or bytes + """ + annotations = dict(self._annotations) + annotations[self._partition_key] = value + header = MessageHeader() + header.durable = True + self.message.annotations = annotations + self.message.header = header + self._annotations = annotations @property def sequence_number(self): @@ -166,22 +172,6 @@ def partition_key(self): except KeyError: return self._annotations.get(EventData.PROP_PARTITION_KEY, None) - def _set_partition_key(self, value): - """ - Set the partition key of the event data object. - - :param value: The partition key to set. - :type value: str or bytes - """ - annotations = dict(self._annotations) - annotations[self._partition_key] = value - header = MessageHeader() - header.durable = True - self.message.annotations = annotations - self.message.header = header - self._annotations = annotations - - @property def application_properties(self): """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/configuration.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/configuration.py index ba8aee6249b0..b3747a0ef581 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/configuration.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/configuration.py @@ -2,7 +2,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- - from uamqp.constants import TransportType diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 661ce983efb9..bcc3c7e0b0c3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -22,7 +22,16 @@ class EventHubConsumer(object): """ - Implements a EventHubConsumer. + A consumer responsible for reading EventData from a specific Event Hub + partition and as a member of a specific consumer group. + + A consumer may be exclusive, which asserts ownership over the partition for the consumer + group to ensure that only one consumer from that group is reading the from the partition. + These exclusive consumers are sometimes referred to as "Epoch Consumers." + + A consumer may also be non-exclusive, allowing multiple consumers from the same consumer + group to be actively reading events from the partition. These non-exclusive consumers are + sometimes referred to as "Non-Epoch Consumers." """ timeout = 0 @@ -31,7 +40,8 @@ class EventHubConsumer(object): def __init__(self, client, source, event_position=None, prefetch=300, owner_level=None, keep_alive=None, auto_reconnect=True): """ - Instantiate a consumer. + Instantiate a consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient @@ -147,11 +157,10 @@ def __next__(self): raise except KeyboardInterrupt: log.info("EventHubConsumer stops due to keyboard interrupt") - print("EventHubConsumer stopped") self.close() raise except Exception as e: - log.info("Unexpected error occurred (%r). Shutting down.", e) + log.error("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Receive failed: {}".format(e)) self.close(exception=error) raise error @@ -291,7 +300,7 @@ def _build_connection(self, is_reconnect=False): log.info("EventHubConsumer authentication timed out. Attempting reconnect.") return False except Exception as e: - log.info("Unexpected error occurred (%r). Shutting down.", e) + log.error("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("EventHubConsumer reconnect failed: {}".format(e)) self.close(exception=error) raise error @@ -299,42 +308,6 @@ def _build_connection(self, is_reconnect=False): def _reconnect(self): return self._build_connection(is_reconnect=True) - def close(self, exception=None): - # type:(Exception) -> None - """ - Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START eventhub_client_receiver_close] - :end-before: [END eventhub_client_receiver_close] - :language: python - :dedent: 4 - :caption: Close down the handler. - - """ - if self.messages_iter: - self.messages_iter.close() - self.messages_iter = None - self.running = False - if self.error: - return - if isinstance(exception, errors.LinkRedirect): - self.redirected = exception - elif isinstance(exception, EventHubError): - self.error = exception - elif exception: - self.error = EventHubError(str(exception)) - else: - self.error = EventHubError("This receive handler is now closed.") - self._handler.close() - @property def queue_size(self): # type:() -> int @@ -359,10 +332,8 @@ def receive(self, max_batch_size=None, timeout=None): retrieve before the time, the result will be empty. If no batch size is supplied, the prefetch size will be the maximum. :type max_batch_size: int - :param timeout: The timeout time in seconds to receive a batch of events - from an Event Hub. Results will be returned after timeout. If combined - with max_batch_size, it will return after either the count of received events - reaches the max_batch_size or the operation has timed out. + :param timeout: The maximum wait time to build up the requested message count for the batch. + If not specified, the default wait time specified when the consumer was created will be used. :type timeout: float :rtype: list[~azure.eventhub.common.EventData] @@ -442,13 +413,48 @@ def receive(self, max_batch_size=None, timeout=None): raise TimeoutError(str(shutdown), shutdown) except KeyboardInterrupt: log.info("EventHubConsumer stops due to keyboard interrupt") - print("EventHubConsumer stopped") self.close() raise except Exception as e: - log.info("Unexpected error occurred (%r). Shutting down.", e) + log.error("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Receive failed: {}".format(e)) self.close(exception=error) raise error + def close(self, exception=None): + # type:(Exception) -> None + """ + Close down the handler. If the handler has already closed, + this will be a no op. An optional exception can be passed in to + indicate that the handler was shutdown due to error. + + :param exception: An optional exception if the handler is closing + due to an error. + :type exception: Exception + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_receiver_close] + :end-before: [END eventhub_client_receiver_close] + :language: python + :dedent: 4 + :caption: Close down the handler. + + """ + if self.messages_iter: + self.messages_iter.close() + self.messages_iter = None + self.running = False + if self.error: + return + if isinstance(exception, errors.LinkRedirect): + self.redirected = exception + elif isinstance(exception, EventHubError): + self.error = exception + elif exception: + self.error = EventHubError(str(exception)) + else: + self.error = EventHubError("This receive handler is now closed.") + self._handler.close() + next = __next__ # for python2.7 diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py index a93d052af4fe..5921c1b6e7b1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py @@ -2,9 +2,10 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- - -from uamqp import types, constants, errors import six + +from uamqp import constants, errors + from azure.core.exceptions import AzureError _NO_RETRY_ERRORS = ( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index b3b26293a44f..f06471cd078b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -22,13 +22,17 @@ class EventHubProducer(object): """ - Implements a EventHubProducer. + A producer responsible for transmitting EventData to a specific Event Hub, + grouped together in batches. Depending on the options specified at creation, the producer may + be created to allow event data to be automatically routed to an available partition or specific + to a partition. """ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True): """ - Instantiate an EventHubProducer. + Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` method + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient. @@ -190,39 +194,6 @@ def _build_connection(self, is_reconnect=False): def _reconnect(self): return self._build_connection(is_reconnect=True) - def close(self, exception=None): - # type:(Exception) -> None - """ - Close down the handler. If the handler has already closed, - this will be a no op. An optional exception can be passed in to - indicate that the handler was shutdown due to error. - - :param exception: An optional exception if the handler is closing - due to an error. - :type exception: Exception - - Example: - .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START eventhub_client_sender_close] - :end-before: [END eventhub_client_sender_close] - :language: python - :dedent: 4 - :caption: Close down the handler. - - """ - self.running = False - if self.error: - return - if isinstance(exception, errors.LinkRedirect): - self.redirected = exception - elif isinstance(exception, EventHubError): - self.error = exception - elif exception: - self.error = EventHubError(str(exception)) - else: - self.error = EventHubError("This send handler is now closed.") - self._handler.close() - def _send_event_data(self): self._open() max_retries = self.client.config.max_retries @@ -310,6 +281,23 @@ def _set_partition_key(event_datas, partition_key): ed._set_partition_key(partition_key) yield ed + def _on_outcome(self, outcome, condition): + """ + Called when the outcome is received for a delivery. + + :param outcome: The outcome of the message delivery - success or failure. + :type outcome: ~uamqp.constants.MessageSendResult + :param condition: Detail information of the outcome. + + """ + self._outcome = outcome + self._condition = condition + + @staticmethod + def _error(outcome, condition): + if outcome != constants.MessageSendResult.Ok: + raise condition + def send(self, event_data, partition_key=None): # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None """ @@ -348,19 +336,35 @@ def send(self, event_data, partition_key=None): self.unsent_events = [wrapper_event_data.message] self._send_event_data() - def _on_outcome(self, outcome, condition): + def close(self, exception=None): + # type:(Exception) -> None """ - Called when the outcome is received for a delivery. + Close down the handler. If the handler has already closed, + this will be a no op. An optional exception can be passed in to + indicate that the handler was shutdown due to error. - :param outcome: The outcome of the message delivery - success or failure. - :type outcome: ~uamqp.constants.MessageSendResult - :param condition: Detail information of the outcome. + :param exception: An optional exception if the handler is closing + due to an error. + :type exception: Exception - """ - self._outcome = outcome - self._condition = condition + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_sender_close] + :end-before: [END eventhub_client_sender_close] + :language: python + :dedent: 4 + :caption: Close down the handler. - @staticmethod - def _error(outcome, condition): - if outcome != constants.MessageSendResult.Ok: - raise condition + """ + self.running = False + if self.error: + return + if isinstance(exception, errors.LinkRedirect): + self.redirected = exception + elif isinstance(exception, EventHubError): + self.error = exception + elif exception: + self.error = EventHubError(str(exception)) + else: + self.error = EventHubError("This send handler is now closed.") + self._handler.close()