diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md index 4e731b864491..e82dd99b5dea 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md @@ -1,5 +1,11 @@ # Release History +## 2019-11-04 1.0.0b5 + +**New features** + +- Added method `list_checkpoints` which list all the checkpoints under given eventhub namespace, eventhub name and consumer group. + ## 1.0.0b4 (2019-10-09) This release has trivial internal changes only. No feature changes. diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md index 276259766384..bceae6dbd80d 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md @@ -3,7 +3,7 @@ Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs. This Checkpoint Store package works as a plug-in package to `EventProcessor`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information. -[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/) +[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/) ## Getting started @@ -47,8 +47,8 @@ sequence number and the timestamp of when it was enqueued. ## Examples - [Create an Azure Storage Blobs `ContainerClient`](#create-an-azure-storage-blobs-containerclient) -- [Create an Azure EventHubs `EventHubClient`](#create-an-eventhubclient) -- [Consume events using an `EventProessor` that uses a `BlobPartitionManager`](#consume-events-using-an-eventprocessor-that-uses-a-blobpartitionmanager-to-do-checkpointing) +- [Create an Azure EventHubs `EventHubConsumerClient`](#create-an-eventhubconsumerclient) +- [Consume events using a `BlobPartitionManager`](#consume-events-using-a-blobpartitionmanager-to-do-checkpoint) ### Create an Azure Storage Blobs `ContainerClient` The easiest way to create a `ContainerClient` is to use a connection string. @@ -58,20 +58,19 @@ container_client = ContainerClient.from_connection_string("my_storageacount_conn ``` For other ways of creating a `ContainerClient`, go to [Blob Storage library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob) for more details. -### Create an `EventHubClient` -The easiest way to create a `EventHubClient` is to use a connection string. +### Create an `EventHubConsumerClient` +The easiest way to create a `EventHubConsumerClient` is to use a connection string. ```python -from azure.eventhub.aio import EventHubClient -eventhub_client = EventHubClient.from_connection_string("my_eventhub_namespace_connection_string", event_hub_path="myeventhub") +from azure.eventhub.aio import EventHubConsumerClient +eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", event_hub_path="myeventhub") ``` -For other ways of creating a `EventHubClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details. +For other ways of creating a `EventHubConsumerClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details. -### Consume events using an `EventProcessor` that uses a `BlobPartitionManager` to do checkpointing +### Consume events using a `BlobPartitionManager` to do checkpoint ```python import asyncio -from azure.eventhub.aio import EventHubClient -from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor +from azure.eventhub.aio import EventHubConsumerClient from azure.storage.blob.aio import ContainerClient from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager @@ -79,22 +78,23 @@ eventhub_connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' storage_container_connection_str = '<< CONNECTION STRING OF THE STORAGE >>' storage_container_name = '<< STORAGE CONTAINER NAME>>' -class MyPartitionProcessor(PartitionProcessor): - async def process_events(self, events, partition_context): - if events: - # write your code here to process events - # save checkpoint to the data store - await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) +async def do_operation(events): + # do some operations to the events. + pass + +async def process_events(partition_context, events): + await do_operation(events) + partition_context.update_checkpoint(events[-1]) async def main(): - eventhub_client = EventHubClient.from_connection_string(eventhub_connection_str, receive_timeout=5, retry_total=3) storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, storage_container_name) - partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save - event_processor = EventProcessor(eventhub_client, "$default", MyPartitionProcessor, partition_manager) - async with storage_container_client: - asyncio.ensure_future(event_processor.start()) - await asyncio.sleep(60) # run for a while - await event_processor.stop() + partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save + client = EventHubConsumerClient.from_connection_string(eventhub_connection_str, partition_manager=partition_manager, receive_timeout=5, retry_total=3) + + try: + await client.receive(process_events, "$default") + except KeyboardInterrupt: + await client.close() if __name__ == '__main__': loop = asyncio.get_event_loop() @@ -110,20 +110,19 @@ Refer to [Logging](#logging) to enable loggers for related libraries. ## Next steps ### Examples -- [./examples/eventprocessor/event_processor_blob_storage_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py) - event processor with blob partition manager example +- [./samples/event_processor_blob_storage_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py) - EventHubConsumerClient with blob partition manager example ### Documentation -Reference documentation is available at https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub.extensions.html. +Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html ### Logging - Enable `azure.eventhub.extensions.checkpointstoreblobaio` logger to collect traces from the library. -- Enable `azure.eventhub.aio.eventprocessor` logger to collect traces from package eventprocessor of the azure-eventhub library. - Enable `azure.eventhub` logger to collect traces from the main azure-eventhub library. - Enable `azure.storage.blob` logger to collect traces from azure storage blob library. - Enable `uamqp` logger to collect traces from the underlying uAMQP library. -- Enable AMQP frame level trace by setting `network_tracing=True` when creating the client. +- Enable AMQP frame level trace by setting `logging_enable=True` when creating the client. ### Provide Feedback diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py index e69de29bb2d1..34913fb394d7 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py @@ -0,0 +1,4 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py index 32ea741b11d3..aa5333dbdbc1 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py @@ -3,9 +3,9 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "1.0.0b4" +__version__ = "1.0.0b5" -from .blobstoragepm import BlobPartitionManager +from .blobstoragepmaio import BlobPartitionManager __all__ = [ "BlobPartitionManager", diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py deleted file mode 100644 index 244c1a8ffcd6..000000000000 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py +++ /dev/null @@ -1,132 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- -from typing import Iterable, Dict, Any -import logging -from collections import defaultdict -import asyncio -from azure.eventhub.aio.eventprocessor import PartitionManager, OwnershipLostError # type: ignore -from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore -from azure.storage.blob.aio import ContainerClient, BlobClient # type: ignore - -logger = logging.getLogger(__name__) -UPLOAD_DATA = "" - - -class BlobPartitionManager(PartitionManager): - """An PartitionManager that uses Azure Blob Storage to store the partition ownership and checkpoint data. - - This class implements methods list_ownership, claim_ownership, and update_checkpoint that are defined in class - azure.eventhub.eventprocessor.PartitionManager of package azure-eventhub. - - """ - def __init__(self, container_client: ContainerClient): - """Create a BlobPartitionManager - - :param container_client: The Azure Blob Storage Container client that is used to save checkpoint data to Azure - Blob Storage Container. - """ - self._container_client = container_client - self._cached_blob_clients = defaultdict() # type:Dict[str, BlobClient] - self._cached_ownership_dict = defaultdict(dict) # type: Dict[str, Dict[str, Any]] - # lock each partition for list_ownership, claim_ownership and update_checkpoint etag doesn't get out of sync - # when the three methods are running concurrently - self._cached_ownership_locks = defaultdict(asyncio.Lock) # type:Dict[str, asyncio.Lock] - - def _get_blob_client(self, blob_name): - result = self._cached_blob_clients.get(blob_name) - if not result: - result = self._container_client.get_blob_client(blob_name) - self._cached_blob_clients[blob_name] = result - return result - - async def _upload_blob(self, ownership, metadata): - etag = ownership.get("etag") - if etag: - etag_match = {"if_match": etag} - else: - etag_match = {"if_none_match": '*'} - partition_id = ownership["partition_id"] - uploaded_blob_properties = await self._get_blob_client(partition_id).upload_blob( - data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match - ) - ownership["etag"] = uploaded_blob_properties["etag"] - ownership["last_modified_time"] = uploaded_blob_properties["last_modified"].timestamp() - ownership.update(metadata) - - async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]: - try: - blobs = self._container_client.list_blobs(include=['metadata']) - except Exception as err: # pylint:disable=broad-except - logger.warning("An exception occurred during list_ownership for eventhub %r consumer group %r. " - "Exception is %r", eventhub_name, consumer_group_name, err) - raise - async for b in blobs: - async with self._cached_ownership_locks[b.name]: - if b.name not in self._cached_ownership_dict \ - or b.last_modified.timestamp() > self._cached_ownership_dict[b.name].get("last_modified_time"): - metadata = b.metadata - ownership = { - "eventhub_name": eventhub_name, - "consumer_group_name": consumer_group_name, - "partition_id": b.name, - "owner_id": metadata["owner_id"], - "etag": b.etag, - "last_modified_time": b.last_modified.timestamp() if b.last_modified else None - } - ownership.update(metadata) - self._cached_ownership_dict[b.name] = ownership - return self._cached_ownership_dict.values() - - async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: - result = [] - for ownership in ownership_list: - partition_id = ownership["partition_id"] - eventhub_name = ownership["eventhub_name"] - consumer_group_name = ownership["consumer_group_name"] - owner_id = ownership["owner_id"] - - async with self._cached_ownership_locks[partition_id]: - metadata = {"owner_id": ownership["owner_id"]} - if "offset" in ownership: - metadata["offset"] = ownership["offset"] - if "sequence_number" in ownership: - metadata["sequence_number"] = ownership["sequence_number"] - try: - await self._upload_blob(ownership, metadata) - self._cached_ownership_dict[partition_id] = ownership - result.append(ownership) - except (ResourceModifiedError, ResourceExistsError): - logger.info( - "EventProcessor instance %r of eventhub %r consumer group %r lost ownership to partition %r", - owner_id, eventhub_name, consumer_group_name, partition_id) - except Exception as err: # pylint:disable=broad-except - logger.warning("An exception occurred when EventProcessor instance %r claim_ownership for " - "eventhub %r consumer group %r partition %r. The ownership is now lost. Exception " - "is %r", owner_id, eventhub_name, consumer_group_name, partition_id, err) - return result - - async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, - offset, sequence_number) -> None: - metadata = { - "owner_id": owner_id, - "offset": offset, - "sequence_number": str(sequence_number) - } - cached_ownership = self._cached_ownership_dict[partition_id] - async with self._cached_ownership_locks[partition_id]: - try: - await self._upload_blob(cached_ownership, metadata) - except (ResourceModifiedError, ResourceExistsError): - logger.info( - "EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to " - "partition %r because the ownership has been stolen", - owner_id, eventhub_name, consumer_group_name, partition_id) - raise OwnershipLostError() - except Exception as err: - logger.warning( - "EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to " - "partition %r because of unexpected error. Exception is %r", - owner_id, eventhub_name, consumer_group_name, partition_id, err) - raise # EventProcessor will catch the exception and handle it diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepmaio.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepmaio.py new file mode 100644 index 000000000000..225603d91828 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepmaio.py @@ -0,0 +1,139 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +from typing import Iterable, Dict, Any +import logging +from collections import defaultdict +import asyncio +from azure.eventhub import OwnershipLostError # type: ignore #pylint:disable=no-name-in-module +from azure.eventhub.aio import PartitionManager # type: ignore +from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore +from azure.storage.blob.aio import ContainerClient, BlobClient # type: ignore + +logger = logging.getLogger(__name__) +UPLOAD_DATA = "" + + +class BlobPartitionManager(PartitionManager): + """An PartitionManager that uses Azure Blob Storage to store the partition ownership and checkpoint data. + + This class implements methods list_ownership, claim_ownership, update_checkpoint and list_checkpoints that are + defined in class azure.eventhub.aio.PartitionManager of package azure-eventhub. + + """ + def __init__(self, container_client: ContainerClient): + """Create a BlobPartitionManager + + :param container_client: The Azure Blob Storage Container client that is used to save checkpoint data to Azure + Blob Storage Container. + """ + self._container_client = container_client + self._cached_blob_clients = defaultdict() # type:Dict[str, BlobClient] + + def _get_blob_client(self, blob_name): + result = self._cached_blob_clients.get(blob_name) + if not result: + result = self._container_client.get_blob_client(blob_name) + self._cached_blob_clients[blob_name] = result + return result + + async def _upload_ownership(self, ownership, metadata): + etag = ownership.get("etag") + if etag: + etag_match = {"if_match": etag} + else: + etag_match = {"if_none_match": '*'} + blob_name = "{}/{}/{}/ownership/{}".format(ownership["fully_qualified_namespace"], ownership["eventhub_name"], + ownership["consumer_group_name"], ownership["partition_id"]) + uploaded_blob_properties = await self._get_blob_client(blob_name).upload_blob( + data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match + ) + ownership["etag"] = uploaded_blob_properties["etag"] + ownership["last_modified_time"] = uploaded_blob_properties["last_modified"].timestamp() + + async def list_ownership(self, fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str) \ + -> Iterable[Dict[str, Any]]: + try: + blobs = self._container_client.list_blobs( + name_starts_with="{}/{}/{}/ownership".format( + fully_qualified_namespace, eventhub_name, consumer_group_name), + include=['metadata']) + result = [] + async for b in blobs: + ownership = { + "fully_qualified_namespace": fully_qualified_namespace, + "eventhub_name": eventhub_name, + "consumer_group_name": consumer_group_name, + "partition_id": b.name.split("/")[-1], + "owner_id": b.metadata["ownerId"], + "etag": b.etag, + "last_modified_time": b.last_modified.timestamp() if b.last_modified else None + } + result.append(ownership) + return result + except Exception as err: # pylint:disable=broad-except + logger.warning("An exception occurred during list_ownership for " + "namespace %r eventhub %r consumer group %r. " + "Exception is %r", fully_qualified_namespace, eventhub_name, consumer_group_name, err) + raise + + async def _claim_one_partition(self, ownership): + partition_id = ownership["partition_id"] + namespace = ownership["fully_qualified_namespace"] + eventhub_name = ownership["eventhub_name"] + consumer_group_name = ownership["consumer_group_name"] + owner_id = ownership["owner_id"] + metadata = {"ownerId": owner_id} + try: + await self._upload_ownership(ownership, metadata) + return ownership + except (ResourceModifiedError, ResourceExistsError): + logger.info( + "EventProcessor instance %r of namespace %r eventhub %r consumer group %r " + "lost ownership to partition %r", + owner_id, namespace, eventhub_name, consumer_group_name, partition_id) + raise OwnershipLostError() + except Exception as err: # pylint:disable=broad-except + logger.warning("An exception occurred when EventProcessor instance %r claim_ownership for " + "namespace %r eventhub %r consumer group %r partition %r. " + "The ownership is now lost. Exception " + "is %r", owner_id, namespace, eventhub_name, consumer_group_name, partition_id, err) + return ownership # Keep the ownership if an unexpected error happens + + async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: + gathered_results = await asyncio.gather(*[self._claim_one_partition(x) + for x in ownership_list], return_exceptions=True) + return [claimed_ownership for claimed_ownership in gathered_results + if not isinstance(claimed_ownership, Exception)] + + async def update_checkpoint(self, fully_qualified_namespace, eventhub_name, consumer_group_name, partition_id, + offset, sequence_number) -> None: + metadata = { + "Offset": offset, + "SequenceNumber": str(sequence_number), + } + blob_name = "{}/{}/{}/checkpoint/{}".format(fully_qualified_namespace, eventhub_name, + consumer_group_name, partition_id) + await self._get_blob_client(blob_name).upload_blob( + data=UPLOAD_DATA, overwrite=True, metadata=metadata + ) + + async def list_checkpoints(self, fully_qualified_namespace, eventhub_name, consumer_group_name): + blobs = self._container_client.list_blobs( + name_starts_with="{}/{}/{}/checkpoint".format( + fully_qualified_namespace, eventhub_name, consumer_group_name), + include=['metadata']) + result = [] + async for b in blobs: + metadata = b.metadata + checkpoint = { + "fully_qualified_namespace": fully_qualified_namespace, + "eventhub_name": eventhub_name, + "consumer_group_name": consumer_group_name, + "partition_id": b.name.split("/")[-1], + "offset": metadata["Offset"], + "sequence_number": metadata["SequenceNumber"] + } + result.append(checkpoint) + return result diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py index 9a3312220d84..c16166aaa394 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py @@ -35,7 +35,7 @@ exclude_packages = [ 'tests', - 'examples', + 'samples', # Exclude packages that will be covered by PEP420 or nspkg 'azure', 'azure.eventhub', @@ -66,7 +66,7 @@ packages=find_packages(exclude=exclude_packages), python_requires=">=3.5.3", install_requires=[ - 'azure-storage-blob<=12.1,>=12.0.0b2', + 'azure-storage-blob<13.0.0,>=12.0.0', 'azure-eventhub<6.0.0,>=5.0.0b5', 'aiohttp<4.0,>=3.0', ], diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager_aio.py similarity index 66% rename from sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py rename to sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager_aio.py index b37aed20805b..96a5352dfbad 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager_aio.py @@ -43,33 +43,37 @@ def remove_live_storage_blob_client(container_str): async def _claim_and_list_ownership(live_storage_blob_client): + fully_qualified_namespace = 'test_namespace' eventhub_name = 'eventhub' consumer_group_name = '$default' ownership_cnt = 8 async with live_storage_blob_client: partition_manager = BlobPartitionManager(container_client=live_storage_blob_client) - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, - consumer_group_name=consumer_group_name) + ownership_list = await partition_manager.list_ownership( + fully_qualified_namespace=fully_qualified_namespace, + eventhub_name=eventhub_name, + consumer_group_name=consumer_group_name) assert len(ownership_list) == 0 ownership_list = [] for i in range(ownership_cnt): ownership = {} + ownership['fully_qualified_namespace'] = fully_qualified_namespace ownership['eventhub_name'] = eventhub_name ownership['consumer_group_name'] = consumer_group_name ownership['owner_id'] = 'ownerid' ownership['partition_id'] = str(i) ownership['last_modified_time'] = time.time() - ownership["offset"] = "1" - ownership["sequence_number"] = "1" ownership_list.append(ownership) await partition_manager.claim_ownership(ownership_list) - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, - consumer_group_name=consumer_group_name) + ownership_list = await partition_manager.list_ownership( + fully_qualified_namespace=fully_qualified_namespace, + eventhub_name=eventhub_name, + consumer_group_name=consumer_group_name) assert len(ownership_list) == ownership_cnt @@ -86,46 +90,26 @@ def test_claim_and_list_ownership(): async def _update_checkpoint(live_storage_blob_client): + fully_qualified_namespace = 'test_namespace' eventhub_name = 'eventhub' consumer_group_name = '$default' - owner_id = 'owner' partition_cnt = 8 async with live_storage_blob_client: partition_manager = BlobPartitionManager(container_client=live_storage_blob_client) - - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, - consumer_group_name=consumer_group_name) - assert len(ownership_list) == 0 - - ownership_list = [] - - for i in range(partition_cnt): - ownership = {} - ownership['eventhub_name'] = eventhub_name - ownership['consumer_group_name'] = consumer_group_name - ownership['owner_id'] = owner_id - ownership['partition_id'] = str(i) - ownership['last_modified_time'] = time.time() - ownership['offset'] = '1' - ownership['sequence_number'] = '10' - ownership_list.append(ownership) - - await partition_manager.claim_ownership(ownership_list) - - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, - consumer_group_name=consumer_group_name) - assert len(ownership_list) == partition_cnt - for i in range(partition_cnt): - await partition_manager.update_checkpoint(eventhub_name, consumer_group_name, str(i), - owner_id, '2', '20') - - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, - consumer_group_name=consumer_group_name) - for ownership in ownership_list: - assert ownership['offset'] == '2' - assert ownership['sequence_number'] == '20' + await partition_manager.update_checkpoint( + fully_qualified_namespace, eventhub_name, consumer_group_name, str(i), + '2', 20) + + checkpoint_list = await partition_manager.list_checkpoints( + fully_qualified_namespace=fully_qualified_namespace, + eventhub_name=eventhub_name, + consumer_group_name=consumer_group_name) + assert len(checkpoint_list) == partition_cnt + for checkpoint in checkpoint_list: + assert checkpoint['offset'] == '2' + assert checkpoint['sequence_number'] == '20' @pytest.mark.liveTest diff --git a/sdk/eventhub/azure-eventhubs/MANIFEST.in b/sdk/eventhub/azure-eventhubs/MANIFEST.in index 9916e9199bcd..5433e4aaf689 100644 --- a/sdk/eventhub/azure-eventhubs/MANIFEST.in +++ b/sdk/eventhub/azure-eventhubs/MANIFEST.in @@ -1,4 +1,4 @@ include *.md include azure/__init__.py recursive-include tests *.py *.yaml -recursive-include examples *.py \ No newline at end of file +recursive-include samples *.py \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py index 7cf8b74d32fe..91a17df639cc 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py @@ -2,12 +2,16 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from .client_async import EventHubClient -from .consumer_async import EventHubConsumer -from .producer_async import EventHubProducer +from ._consumer_client_async import EventHubConsumerClient +from ._producer_client_async import EventHubProducerClient +from .eventprocessor.partition_manager import PartitionManager +from .eventprocessor.partition_context import PartitionContext +from .eventprocessor.event_processor import CloseReason __all__ = [ - "EventHubClient", - "EventHubConsumer", - "EventHubProducer" + "EventHubConsumerClient", + "EventHubProducerClient", + "PartitionManager", + "PartitionContext", + "CloseReason", ] diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py index 2b38f2fd220e..cff3fbd3d8fe 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py @@ -6,6 +6,7 @@ from asyncio import Lock from uamqp import TransportType, c_uamqp # type: ignore from uamqp.async_ops import ConnectionAsync # type: ignore +from .._connection_manager import _ConnectionMode class _SharedConnectionManager(object): # pylint:disable=too-many-instance-attributes @@ -75,4 +76,7 @@ async def reset_connection_if_broken(self): def get_connection_manager(**kwargs): + connection_mode = kwargs.get("connection_mode", _ConnectionMode.SeparateConnection) + if connection_mode == _ConnectionMode.ShareConnection: + return _SharedConnectionManager(**kwargs) return _SeparateConnectionManager(**kwargs) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py new file mode 100644 index 000000000000..570d4f052b45 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py @@ -0,0 +1,202 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +import logging +from typing import Any, Union, TYPE_CHECKING, Callable, Dict, List, Tuple +from azure.eventhub import EventPosition, EventData, EventHubSharedKeyCredential, EventHubSASTokenCredential +from .eventprocessor.event_processor import EventProcessor, CloseReason +from .eventprocessor.partition_context import PartitionContext +from .client_async import EventHubClient +if TYPE_CHECKING: + from azure.core.credentials import TokenCredential # type: ignore + +log = logging.getLogger(__name__) + + +class EventHubConsumerClient(EventHubClient): + """ The EventHubProducerClient class defines a high level interface for + receiving events from the Azure Event Hubs service. + + The main goal of `EventHubConsumerClient` is to receive events from all partitions of an EventHub with + load balancing and checkpointing. + + When multiple `EventHubConsumerClient` works with one process, multiple processes, or multiple computer machines + and if they use the same repository as the load balancing and checkpointing store, they will balance automatically. + To enable the load balancing and / or checkpointing, partition_manager must be set when creating the + `EventHubConsumerClient`. + + An `EventHubConsumerClient` can also receive from a specific partition when you call its method `receive()` + and specify the partition_id. + Load balancing won't work in single-partition mode. But users can still save checkpoint if the partition_manager + is set. + + :param str host: The hostname of the Event Hub. + :param str event_hub_path: The path of the specific Event Hub to connect the client to. + :param credential: The credential object used for authentication which implements particular interface + of getting tokens. + :type credential: ~azure.eventhub.EventHubSharedKeyCredential,~azure.eventhub.EventHubSASTokenCredential, + Credential objects in azure-identity and objects that implement `get_token(self, *scopes)` method + :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. + :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :keyword str user_agent: The user agent that needs to be appended to the built in user agent string. + :keyword int retry_total: The total number of attempts to redo the failed operation when an error happened. Default + value is 3. + :keyword transport_type: The type of transport protocol that will be used for communicating with + the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp. + :paramtype transport_type: ~azure.eventhub.TransportType + :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :keyword partition_manager: stores the load balancing data and checkpoint data when receiving events + if partition_manager is specified. If it's None, this EventHubConsumerClient instance will receive + events without load balancing and checkpoint. + :paramtype partition_manager: Implementation classes of ~azure.eventhub.aio.PartitionManager + :keyword float load_balancing_interval: When load balancing kicks in, this is the interval in seconds + between two load balancing. Default is 10. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py + :start-after: [START create_eventhub_consumer_client_async] + :end-before: [END create_eventhub_consumer_client_async] + :language: python + :dedent: 4 + :caption: Create a new instance of the EventHubConsumerClient. + """ + + def __init__(self, host, event_hub_path, credential, **kwargs) -> None: + # type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None + self._partition_manager = kwargs.pop("partition_manager", None) + self._load_balancing_interval = kwargs.pop("load_balancing_interval", 10) + super(EventHubConsumerClient, self).__init__( + host=host, event_hub_path=event_hub_path, credential=credential, + network_tracing=kwargs.get("logging_enable"), **kwargs) + self._event_processors = dict() # type: Dict[Tuple[str, str], EventProcessor] + self._closed = False + + async def receive( + self, on_events: Callable[[PartitionContext, List[EventData]], None], consumer_group: str, + *, + partition_id: str = None, + owner_level: int = None, + prefetch: int = 300, + track_last_enqueued_event_properties: bool = False, + initial_event_position: Union[EventPosition, Dict[str, EventPosition]] = None, + on_error: Callable[[PartitionContext, Exception], None] = None, + on_partition_initialize: Callable[[PartitionContext], None] = None, + on_partition_close: Callable[[PartitionContext, CloseReason], None] = None + ) -> None: + """Receive events from partition(s) optionally with load balancing and checkpointing. + + :param on_events: The callback function for handling received events. The callback takes two + parameters: partition_context` which contains partition information and `events` which are the received events. + Please define the callback like `on_event(partition_context, events)`. + For detailed partition context information, please refer to ~azure.eventhub.PartitionContext. + :type on_events: Callable[PartitionContext, List[EventData]] + :param consumer_group: Receive events from the event hub for this consumer group + :keyword partition_id: Receive from this partition only if it's not None. Receive from all partition otherwise. + :keyword owner_level: The priority of the exclusive consumer. An exclusive + consumer will be created if owner_level is set. Higher owner_level has higher exclusive priority. + :keyword prefetch: The number of events to prefetch from the service + for processing. Default is 300. + :keyword track_last_enqueued_event_properties: Indicates whether or not the consumer should request information + on the last enqueued event on its associated partition, and track that information as events are received. + When information about the partition's last enqueued event is being tracked, each event received from the + Event Hubs service will carry metadata about the partition. This results in a small amount of additional + network bandwidth consumption that is generally a favorable trade-off when considered against periodically + making requests for partition properties using the Event Hub client. + It is set to `False` by default. + :keyword initial_event_position: Start receiving from this initial_event_position + if there isn't checkpoint data for a partition. Use the checkpoint data if there it's available. This can be a + a dict with partition id as the key and position as the value for individual partitions, or a single + EventPosition instance for all partitions. + :type initial_event_position: ~azure.eventhub.EventPosition, dict[str,~azure.eventhub.EventPosition] + :keyword on_error: The callback function which would be called when there is an error met during the receiving + time. The callback takes two parameters: `partition_context` which contains partition information + and `error` being the exception. Please define the callback like `on_error(partition_context, error)`. + :paramtype on_error: Callable[[PartitionContext, Exception]] + :keyword on_partition_initialize: The callback function which will be called after a consumer for certain + partition finishes initialization. The callback takes two parameter: `partition_context` which contains + the partition information. Please define the callback like`on_partition_initialize(partition_context)`. + :paramtype on_partition_initialize: Callable[[PartitionContext]] + :keyword on_partition_close: The callback function which will be called after a consumer for certain + partition is closed. The callback takes two parameters: `partition_context` which contains partition + information and `reason` for the close. Please define the callback like `on_error(partition_context, reason)`. + Please refer to `azure.eventhub.CloseReason` for different closing reason. + :paramtype on_partition_close: Callable[[PartitionContext, CloseReason]] + :rtype: None + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py + :start-after: [START eventhub_consumer_client_receive_async] + :end-before: [END eventhub_consumer_client_receive_async] + :language: python + :dedent: 4 + :caption: Receive events from the EventHub. + """ + async with self._lock: + error = None + if (consumer_group, '-1') in self._event_processors: + error = ValueError("This consumer client is already receiving events from all partitions for" + " consumer group {}. ".format(consumer_group)) + elif partition_id is None and any(x[0] == consumer_group for x in self._event_processors): + error = ValueError("This consumer client is already receiving events for consumer group {}. " + .format(consumer_group)) + elif (consumer_group, partition_id) in self._event_processors: + error = ValueError("This consumer is already receiving events from partition {} for consumer group {}. " + .format(partition_id, consumer_group)) + if error: + log.warning(error) + raise error + + event_processor = EventProcessor( + self, consumer_group, on_events, + partition_id=partition_id, + partition_manager=self._partition_manager, + error_handler=on_error, + partition_initialize_handler=on_partition_initialize, + partition_close_handler=on_partition_close, + initial_event_position=initial_event_position or EventPosition("-1"), + polling_interval=self._load_balancing_interval, + owner_level=owner_level, + prefetch=prefetch, + track_last_enqueued_event_properties=track_last_enqueued_event_properties, + ) + if partition_id: + self._event_processors[(consumer_group, partition_id)] = event_processor + else: + self._event_processors[(consumer_group, "-1")] = event_processor + try: + await event_processor.start() + finally: + await event_processor.stop() + async with self._lock: + if partition_id and (consumer_group, partition_id) in self._event_processors: + del self._event_processors[(consumer_group, partition_id)] + elif partition_id is None and (consumer_group, '-1') in self._event_processors: + del self._event_processors[(consumer_group, "-1")] + + async def close(self): + # type: () -> None + """Stop retrieving events from event hubs and close the underlying AMQP connection and links. + + :rtype: None + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py + :start-after: [START eventhub_consumer_client_close_async] + :end-before: [END eventhub_consumer_client_close_async] + :language: python + :dedent: 4 + :caption: Close down the client. + + """ + async with self._lock: + for _ in range(len(self._event_processors)): + _, ep = self._event_processors.popitem() + await ep.stop() + await super().close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 939c78a1a458..0fac427f7eae 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -7,7 +7,7 @@ import time from uamqp import errors, constants, compat # type: ignore -from azure.eventhub.error import EventHubError +from ..error import EventHubError from ..aio.error_async import _handle_exception log = logging.getLogger(__name__) @@ -98,15 +98,6 @@ async def close(self): """ Close down the handler. If the handler has already closed, this will be a no op. - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START eventhub_client_async_receiver_close] - :end-before: [END eventhub_client_async_receiver_close] - :language: python - :dedent: 4 - :caption: Close down the handler. - """ if self._handler: await self._handler.close_async() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py new file mode 100644 index 000000000000..f77a6b8f010b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py @@ -0,0 +1,167 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +import asyncio +import logging + +from typing import Any, Union, TYPE_CHECKING, Iterable, List +from uamqp import constants # type: ignore +from azure.eventhub import EventData, EventHubSharedKeyCredential, EventHubSASTokenCredential, EventDataBatch +from .client_async import EventHubClient +from .producer_async import EventHubProducer + +if TYPE_CHECKING: + from azure.core.credentials import TokenCredential # type: ignore + +log = logging.getLogger(__name__) + + +class EventHubProducerClient(EventHubClient): + """ + The EventHubProducerClient class defines a high level interface for + sending events to the Azure Event Hubs service. + + :param str host: The hostname of the Event Hub. + :param str event_hub_path: The path of the specific Event Hub to connect the client to. + :param credential: The credential object used for authentication which implements particular interface + of getting tokens. It accepts ~azure.eventhub.EventHubSharedKeyCredential, + ~azure.eventhub.EventHubSASTokenCredential, credential objects generated by the azure-identity library and + objects that implement `get_token(self, *scopes)` method. + :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. + :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. + :keyword str user_agent: The user agent that needs to be appended to the built in user agent string. + :keyword int retry_total: The total number of attempts to redo the failed operation when an error happened. + Default value is 3. + :keyword transport_type: The type of transport protocol that will be used for + communicating with the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp. + :paramtype transport_type: ~azure.eventhub.TransportType + :keyword dict http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys - 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present - 'username', 'password'. + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py + :start-after: [START create_eventhub_producer_client_async] + :end-before: [END create_eventhub_producer_client_async] + :language: python + :dedent: 4 + :caption: Create a new instance of the EventHubProducerClient. + """ + + def __init__(self, host, event_hub_path, credential, **kwargs) -> None: + # type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None + super(EventHubProducerClient, self).__init__( + host=host, event_hub_path=event_hub_path, credential=credential, + network_tracing=kwargs.get("logging_enable"), **kwargs) + self._producers = [] # type: List[EventHubProducer] + self._client_lock = asyncio.Lock() # sync the creation of self._producers + self._producers_locks = [] # type: List[asyncio.Lock] + self._max_message_size_on_link = 0 + + async def _init_locks_for_producers(self): + if not self._producers: + async with self._client_lock: + if not self._producers: + num_of_producers = len(await self.get_partition_ids()) + 1 + self._producers = [None] * num_of_producers + for _ in range(num_of_producers): + self._producers_locks.append(asyncio.Lock()) + # self._producers_locks = [asyncio.Lock()] * num_of_producers + + async def send(self, event_data: Union[EventData, EventDataBatch, Iterable[EventData]], + *, partition_key: Union[str, bytes] = None, partition_id: str = None, timeout: float = None) -> None: + """Sends event data and blocks until acknowledgement is received or operation times out. + + :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects. + :type event_data: ~azure.eventhub.EventData, ~azure.eventhub.EventDataBatch, EventData Iterator/Generator/list + :keyword str partition_key: With the given partition_key, event data will land to + a particular partition of the Event Hub decided by the service. + :keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service + will assign to all partitions using round-robin. + :keyword float timeout: The maximum wait time to send the event data. + If not specified, the default wait time specified when the producer was created will be used. + :rtype: None + :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, + ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py + :start-after: [START eventhub_producer_client_send_async] + :end-before: [END eventhub_producer_client_send_async] + :language: python + :dedent: 4 + :caption: Asynchronously sends event data + + """ + + await self._init_locks_for_producers() + + producer_index = int(partition_id) if partition_id is not None else -1 + if self._producers[producer_index] is None or self._producers[producer_index]._closed: # pylint:disable=protected-access + async with self._producers_locks[producer_index]: + if self._producers[producer_index] is None: + self._producers[producer_index] = self._create_producer(partition_id=partition_id) + async with self._producers_locks[producer_index]: + await self._producers[producer_index].send(event_data, partition_key=partition_key, timeout=timeout) + + async def create_batch(self, max_size=None): + # type:(int) -> EventDataBatch + """ + Create an EventDataBatch object with max size being max_size. + The max_size should be no greater than the max allowed message size defined by the service side. + + :param int max_size: The maximum size of bytes data that an EventDataBatch object can hold. + :rtype: ~azure.eventhub.EventDataBatch + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py + :start-after: [START eventhub_producer_client_create_batch_async] + :end-before: [END eventhub_producer_client_create_batch_async] + :language: python + :dedent: 4 + :caption: Create EventDataBatch object within limited size + + """ + if not self._max_message_size_on_link: + await self._init_locks_for_producers() + async with self._producers_locks[-1]: + if self._producers[-1] is None: + self._producers[-1] = self._create_producer(partition_id=None) + await self._producers[-1]._open_with_retry() # pylint: disable=protected-access + async with self._client_lock: + self._max_message_size_on_link = \ + self._producers[-1]._handler.message_handler._link.peer_max_message_size or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access, line-too-long + + if max_size and max_size > self._max_message_size_on_link: + raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_size, self._max_message_size_on_link)) + + return EventDataBatch(max_size=(max_size or self._max_message_size_on_link)) + + async def close(self): + # type: () -> None + """ + Close down the handler. If the handler has already closed, + this will be a no op. + + :rtype: None + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py + :start-after: [START eventhub_producer_client_close_async] + :end-before: [END eventhub_producer_client_close_async] + :language: python + :dedent: 4 + :caption: Close down the handler. + + """ + for p in self._producers: + if p: + await p.close() + await self._conn_manager.close_connection() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index e679afaa1cf1..b31b529ad198 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -13,7 +13,7 @@ from uamqp import authentication, constants # type: ignore from uamqp import Message, AMQPClientAsync # type: ignore -from azure.eventhub.common import parse_sas_token, EventPosition, \ +from ..common import parse_sas_token, EventPosition, \ EventHubSharedKeyCredential, EventHubSASTokenCredential from ..client_abstract import EventHubClientAbstract @@ -33,14 +33,6 @@ class EventHubClient(EventHubClientAbstract): The EventHubClient class defines a high level interface for asynchronously sending events to and receiving events from the Azure Event Hubs service. - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START create_eventhub_client_async] - :end-before: [END create_eventhub_client_async] - :language: python - :dedent: 4 - :caption: Create a new instance of the Event Hub client async. - """ def __init__(self, host, event_hub_path, credential, **kwargs): @@ -202,7 +194,7 @@ async def get_partition_properties(self, partition): output['is_empty'] = partition_info[b'is_partition_empty'] return output - def create_consumer( + def _create_consumer( self, consumer_group: str, partition_id: str, @@ -233,15 +225,6 @@ def create_consumer( :type track_last_enqueued_event_properties: bool :param loop: An event loop. If not specified the default event loop will be used. :rtype: ~azure.eventhub.aio.consumer_async.EventHubConsumer - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START create_eventhub_client_async_receiver] - :end-before: [END create_eventhub_client_async_receiver] - :language: python - :dedent: 4 - :caption: Add an async consumer to the client for a particular consumer group and partition. - """ owner_level = kwargs.get("owner_level") prefetch = kwargs.get("prefetch") or self._config.prefetch @@ -256,7 +239,7 @@ def create_consumer( track_last_enqueued_event_properties=track_last_enqueued_event_properties, loop=loop) return handler - def create_producer( + def _create_producer( self, *, partition_id: str = None, send_timeout: float = None, @@ -274,15 +257,6 @@ def create_producer( :type send_timeout: float :param loop: An event loop. If not specified the default event loop will be used. :rtype: ~azure.eventhub.aio.producer_async.EventHubProducer - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START create_eventhub_client_async_sender] - :end-before: [END create_eventhub_client_async_sender] - :language: python - :dedent: 4 - :caption: Add an async producer to the client to send EventData. - """ target = "amqps://{}{}".format(self._address.hostname, self._address.path) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 7aff6980b9e0..02573b00b262 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -13,8 +13,8 @@ from uamqp import errors, types, utils # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore -from azure.eventhub import EventData, EventPosition -from azure.eventhub.error import _error_handler +from ..common import EventData, EventPosition +from ..error import _error_handler from ._consumer_producer_mixin_async import ConsumerProducerMixin log = logging.getLogger(__name__) @@ -240,15 +240,6 @@ async def receive(self, *, max_batch_size=None, timeout=None): :rtype: list[~azure.eventhub.common.EventData] :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventHubError - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START eventhub_client_async_receive] - :end-before: [END eventhub_client_async_receive] - :language: python - :dedent: 4 - :caption: Receives events asynchronously - """ self._check_closed() @@ -262,14 +253,5 @@ async def close(self): """ Close down the handler. If the handler has already closed, this will be a no op. - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START eventhub_client_async_receiver_close] - :end-before: [END eventhub_client_async_receiver_close] - :language: python - :dedent: 4 - :caption: Close down the handler. - """ await super(EventHubConsumer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/__init__.py index e3eefa4774f4..ad42bf279a86 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/__init__.py @@ -2,19 +2,3 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # ----------------------------------------------------------------------------------- - -from .event_processor import EventProcessor -from .partition_processor import PartitionProcessor, CloseReason -from .partition_manager import PartitionManager, OwnershipLostError -from .partition_context import PartitionContext -from .sample_partition_manager import SamplePartitionManager - -__all__ = [ - 'CloseReason', - 'EventProcessor', - 'PartitionProcessor', - 'PartitionManager', - 'OwnershipLostError', - 'PartitionContext', - 'SamplePartitionManager', -] diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/_ownership_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/_ownership_manager.py index 094ca8e0ce39..ddd501c6040f 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/_ownership_manager.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/_ownership_manager.py @@ -5,10 +5,8 @@ import time import random -import math from typing import List from collections import Counter, defaultdict -from azure.eventhub.aio import EventHubClient from .partition_manager import PartitionManager @@ -23,74 +21,79 @@ class OwnershipManager(object): """ def __init__( - self, eventhub_client: EventHubClient, consumer_group_name: str, owner_id: str, - partition_manager: PartitionManager, ownership_timeout: float + self, eventhub_client, consumer_group_name: str, owner_id: str, + partition_manager: PartitionManager, ownership_timeout: float, + partition_id: str ): self.cached_parition_ids = [] # type: List[str] self.eventhub_client = eventhub_client + self.fully_qualified_namespace = eventhub_client._address.hostname # pylint: disable=protected-access self.eventhub_name = eventhub_client.eh_name self.consumer_group_name = consumer_group_name self.owner_id = owner_id self.partition_manager = partition_manager self.ownership_timeout = ownership_timeout + self.partition_id = partition_id + self._initializing = True async def claim_ownership(self): """Claims ownership for this EventProcessor - 1. Retrieves all partition ids of an event hub from azure event hub service - 2. Retrieves current ownership list via this EventProcessor's PartitionManager. - 3. Balances number of ownership. Refer to _balance_ownership() for details. - 4. Claims the ownership for the balanced number of partitions. - - :return: List[Dict[Any]] """ if not self.cached_parition_ids: await self._retrieve_partition_ids() - to_claim = await self._balance_ownership(self.cached_parition_ids) - claimed_list = await self.partition_manager.claim_ownership(to_claim) if to_claim else None - return claimed_list + + if self.partition_id is not None: + if self.partition_id in self.cached_parition_ids: + return [self.partition_id] + raise ValueError( + "Wrong partition id:{}. The eventhub has partitions: {}.". + format(self.partition_id, self.cached_parition_ids)) + + if self.partition_manager is None: + return self.cached_parition_ids + + ownership_list = await self.partition_manager.list_ownership( + self.fully_qualified_namespace, self.eventhub_name, self.consumer_group_name + ) + to_claim = await self._balance_ownership(ownership_list, self.cached_parition_ids) + claimed_list = await self.partition_manager.claim_ownership(to_claim) if to_claim else [] + return [x["partition_id"] for x in claimed_list] async def _retrieve_partition_ids(self): """List all partition ids of the event hub that the EventProcessor is working on. - - :return: List[str] """ self.cached_parition_ids = await self.eventhub_client.get_partition_ids() - async def _balance_ownership(self, all_partition_ids): + async def _balance_ownership(self, ownership_list, all_partition_ids): """Balances and claims ownership of partitions for this EventProcessor. - The balancing algorithm is: - 1. Find partitions with inactive ownership and partitions that haven never been claimed before - 2. Find the number of active owners, including this EventProcessor, for all partitions. - 3. Calculate the average count of partitions that an owner should own. - (number of partitions // number of active owners) - 4. Calculate the largest allowed count of partitions that an owner can own. - math.ceil(number of partitions / number of active owners). - This should be equal or 1 greater than the average count - 5. Adjust the number of partitions owned by this EventProcessor (owner) - a. if this EventProcessor owns more than largest allowed count, abandon one partition - b. if this EventProcessor owns less than average count, add one from the inactive or unclaimed partitions, - or steal one from another owner that has the largest number of ownership among all owners (EventProcessors) - c. Otherwise, no change to the ownership - - The balancing algorithm adjust one partition at a time to gradually build the balanced ownership. - Ownership must be renewed to keep it active. So the returned result includes both existing ownership and - the newly adjusted ownership. - This method balances but doesn't claim ownership. The caller of this method tries to claim the result ownership - list. But it may not successfully claim all of them because of concurrency. Other EventProcessors may happen to - claim a partition at that time. Since balancing and claiming are run in infinite repeatedly, - it achieves balancing among all EventProcessors after some time of running. - - :return: List[Dict[str, Any]], A list of ownership. """ - ownership_list = await self.partition_manager.list_ownership( - self.eventhub_name, self.consumer_group_name - ) + now = time.time() ownership_dict = {x["partition_id"]: x for x in ownership_list} # put the list to dict for fast lookup not_owned_partition_ids = [pid for pid in all_partition_ids if pid not in ownership_dict] - timed_out_partition_ids = [ownership["partition_id"] for ownership in ownership_list - if ownership["last_modified_time"] + self.ownership_timeout < now] + timed_out_partitions = [x for x in ownership_list + if x["last_modified_time"] + self.ownership_timeout < now] + if self._initializing: # greedily claim all available partitions when an EventProcessor is started. + to_claim = timed_out_partitions + for to_claim_item in to_claim: + to_claim_item["owner_id"] = self.owner_id + for pid in not_owned_partition_ids: + to_claim.append( + { + "fully_qualified_namespace": self.fully_qualified_namespace, + "partition_id": pid, + "eventhub_name": self.eventhub_name, + "consumer_group_name": self.consumer_group_name, + "owner_id": self.owner_id + } + ) + self._initializing = False + if to_claim: # if no expired or unclaimed partitions, go ahead with balancing + return to_claim + + timed_out_partition_ids = [ownership["partition_id"] for ownership in timed_out_partitions] claimable_partition_ids = not_owned_partition_ids + timed_out_partition_ids + active_ownership = [ownership for ownership in ownership_list if ownership["last_modified_time"] + self.ownership_timeout >= now] active_ownership_by_owner = defaultdict(list) @@ -105,21 +108,21 @@ async def _balance_ownership(self, all_partition_ids): owners_count = len(active_ownership_by_owner) + \ (0 if self.owner_id in active_ownership_by_owner else 1) expected_count_per_owner = all_partition_count // owners_count - most_count_allowed_per_owner = math.ceil(all_partition_count / owners_count) # end of calculating expected count per owner to_claim = active_ownership_self - if len(active_ownership_self) > most_count_allowed_per_owner: # needs to abandon a partition - to_claim.pop() # abandon one partition if owned too many - elif len(active_ownership_self) < expected_count_per_owner: + if len(active_ownership_self) < expected_count_per_owner: # Either claims an inactive partition, or steals from other owners if claimable_partition_ids: # claim an inactive partition if there is random_partition_id = random.choice(claimable_partition_ids) - random_chosen_to_claim = ownership_dict.get(random_partition_id, - {"partition_id": random_partition_id, - "eventhub_name": self.eventhub_name, - "consumer_group_name": self.consumer_group_name - }) + random_chosen_to_claim = ownership_dict.get( + random_partition_id, + {"fully_qualified_namespace": self.fully_qualified_namespace, + "partition_id": random_partition_id, + "eventhub_name": self.eventhub_name, + "consumer_group_name": self.consumer_group_name, + } + ) random_chosen_to_claim["owner_id"] = self.owner_id to_claim.append(random_chosen_to_claim) else: # steal from another owner that has the most count @@ -131,3 +134,10 @@ async def _balance_ownership(self, all_partition_ids): to_steal_partition["owner_id"] = self.owner_id to_claim.append(to_steal_partition) return to_claim + + async def get_checkpoints(self): + if self.partition_manager: + checkpoints = await self.partition_manager.list_checkpoints( + self.fully_qualified_namespace, self.eventhub_name, self.consumer_group_name) + return {x["partition_id"]: x for x in checkpoints} + return {} diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 14af12444c0b..7dc16c4519bb 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -4,7 +4,7 @@ # ----------------------------------------------------------------------------------- from contextlib import contextmanager -from typing import Dict, Type +from typing import Dict, Type, Callable, List, Any import uuid import asyncio import logging @@ -12,182 +12,77 @@ from azure.core.tracing import SpanKind # type: ignore from azure.core.settings import settings # type: ignore -from azure.eventhub import EventPosition, EventHubError -from azure.eventhub.aio import EventHubClient +from azure.eventhub import EventPosition, EventData +from ..._eventprocessor.common import CloseReason from .partition_context import PartitionContext -from .partition_manager import PartitionManager, OwnershipLostError +from .partition_manager import PartitionManager from ._ownership_manager import OwnershipManager -from .partition_processor import CloseReason, PartitionProcessor from .utils import get_running_loop log = logging.getLogger(__name__) -OWNER_LEVEL = 0 - class EventProcessor(object): # pylint:disable=too-many-instance-attributes """ - An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given - consumer group. The received data will be sent to PartitionProcessor to be processed. - - It provides the user a convenient way to receive events from multiple partitions and save checkpoints. - If multiple EventProcessors are running for an event hub, they will automatically balance load. - - Example: - .. code-block:: python - - import asyncio - import logging - import os - from azure.eventhub.aio import EventHubClient - from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor - from azure.eventhub.aio.eventprocessor import SamplePartitionManager - - RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout - RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. - # Actual number of retries clould be less if RECEIVE_TIMEOUT is too small - CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] - - logging.basicConfig(level=logging.INFO) - - async def do_operation(event): - # do some sync or async operations. If the operation is i/o bound, async will have better performance - print(event) - - - class MyPartitionProcessor(PartitionProcessor): - async def process_events(self, events, partition_context): - if events: - await asyncio.gather(*[do_operation(event) for event in events]) - await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) - - async def main(): - client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, - retry_total=RETRY_TOTAL) - partition_manager = SamplePartitionManager(db_filename=":memory:") # a filename to persist checkpoint - try: - event_processor = EventProcessor(client, "$default", MyPartitionProcessor, - partition_manager, polling_interval=10) - asyncio.create_task(event_processor.start()) - await asyncio.sleep(60) - await event_processor.stop() - finally: - await partition_manager.close() - - if __name__ == '__main__': - asyncio.get_event_loop().run_until_complete(main()) + An EventProcessor constantly receives events from one or multiple partitions of the Event Hub + in the context of a given consumer group. """ def __init__( - self, eventhub_client: EventHubClient, consumer_group_name: str, - partition_processor_type: Type[PartitionProcessor], - partition_manager: PartitionManager, *, - initial_event_position: EventPosition = EventPosition("-1"), polling_interval: float = 10.0 + self, eventhub_client, consumer_group_name: str, + event_handler: Callable[[PartitionContext, List[EventData]], None], + *, + partition_id: str = None, + partition_manager: PartitionManager = None, + initial_event_position=EventPosition("-1"), polling_interval: float = 10.0, + owner_level=None, prefetch=None, track_last_enqueued_event_properties=False, + error_handler, + partition_initialize_handler, + partition_close_handler ): - """ - Instantiate an EventProcessor. - - :param eventhub_client: An instance of ~azure.eventhub.aio.EventClient object - :type eventhub_client: ~azure.eventhub.aio.EventClient - :param consumer_group_name: The name of the consumer group this event processor is associated with. Events will - be read only in the context of this group. - :type consumer_group_name: str - :param partition_processor_type: A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor. - :type partition_processor_type: type - :param partition_manager: Interacts with the data storage that stores ownership and checkpoints data. - ~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of `PartitionManager` - which stores data in memory or a file. - Users can either use the provided `PartitionManager` plug-ins or develop their own `PartitionManager`. - :type partition_manager: Subclass of ~azure.eventhub.eventprocessor.PartitionManager. - :param initial_event_position: The event position to start a partition consumer. - if the partition has no checkpoint yet. This could be replaced by "reset" checkpoint in the near future. - :type initial_event_position: EventPosition - :param polling_interval: The interval between any two pollings of balancing and claiming - :type polling_interval: float - - """ - self._consumer_group_name = consumer_group_name self._eventhub_client = eventhub_client + self._namespace = eventhub_client._address.hostname # pylint: disable=protected-access self._eventhub_name = eventhub_client.eh_name - self._partition_processor_factory = partition_processor_type + self._partition_id = partition_id + self._event_handler = event_handler + self._error_handler = error_handler + self._partition_initialize_handler = partition_initialize_handler + self._partition_close_handler = partition_close_handler self._partition_manager = partition_manager self._initial_event_position = initial_event_position # will be replaced by reset event position in preview 4 self._polling_interval = polling_interval self._ownership_timeout = self._polling_interval * 2 self._tasks = {} # type: Dict[str, asyncio.Task] + self._partition_contexts = {} # type: Dict[str, PartitionContext] + self._owner_level = owner_level + self._prefetch = prefetch + self._track_last_enqueued_event_properties = track_last_enqueued_event_properties + self._last_enqueued_event_properties = {} # type: Dict[str, Dict[str, Any]] self._id = str(uuid.uuid4()) self._running = False def __repr__(self): return 'EventProcessor: id {}'.format(self._id) - async def start(self): - """Start the EventProcessor. - - The EventProcessor will try to claim and balance partition ownership with other `EventProcessor` - and asynchronously start receiving EventData from EventHub and processing events. - - :return: None - - """ - log.info("EventProcessor %r is being started", self._id) - ownership_manager = OwnershipManager(self._eventhub_client, self._consumer_group_name, self._id, - self._partition_manager, self._ownership_timeout) - if not self._running: - self._running = True - while self._running: - try: - claimed_ownership_list = await ownership_manager.claim_ownership() - except Exception as err: # pylint:disable=broad-except - log.warning("An exception (%r) occurred during balancing and claiming ownership for eventhub %r " - "consumer group %r. Retrying after %r seconds", - err, self._eventhub_name, self._consumer_group_name, self._polling_interval) - await asyncio.sleep(self._polling_interval) - continue - - if claimed_ownership_list: - claimed_partition_ids = [x["partition_id"] for x in claimed_ownership_list] - to_cancel_list = self._tasks.keys() - claimed_partition_ids - self._create_tasks_for_claimed_ownership(claimed_ownership_list) - else: - to_cancel_list = set(self._tasks.keys()) - log.info("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id) - if to_cancel_list: - self._cancel_tasks_for_partitions(to_cancel_list) - log.info("EventProcesor %r has cancelled partitions %r", self._id, to_cancel_list) - await asyncio.sleep(self._polling_interval) - - async def stop(self): - """Stop the EventProcessor. - - The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions - it is working on. - Other running EventProcessor will take over these released partitions. - - A stopped EventProcessor can be restarted by calling method `start` again. - - :return: None - - """ - self._running = False - for _ in range(len(self._tasks)): - _, task = self._tasks.popitem() - task.cancel() - log.info("EventProcessor %r has been cancelled", self._id) - await asyncio.sleep(2) # give some time to finish after cancelled. + def _get_last_enqueued_event_properties(self, partition_id): + if partition_id in self._tasks and partition_id in self._last_enqueued_event_properties: + return self._last_enqueued_event_properties[partition_id] + raise ValueError("You're not receiving events from partition {}".format(partition_id)) def _cancel_tasks_for_partitions(self, to_cancel_partitions): for partition_id in to_cancel_partitions: - if partition_id in self._tasks: - task = self._tasks.pop(partition_id) + task = self._tasks.get(partition_id) + if task: task.cancel() + if to_cancel_partitions: + log.info("EventProcesor %r has cancelled partitions %r", self._id, to_cancel_partitions) - def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list): - for ownership in to_claim_ownership_list: - partition_id = ownership["partition_id"] + def _create_tasks_for_claimed_ownership(self, claimed_partitions, checkpoints=None): + for partition_id in claimed_partitions: if partition_id not in self._tasks or self._tasks[partition_id].done(): - self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership)) + checkpoint = checkpoints.get(partition_id) if checkpoints else None + self._tasks[partition_id] = get_running_loop().create_task(self._receive(partition_id, checkpoint)) @contextmanager def _context(self, events): @@ -205,96 +100,192 @@ def _context(self, events): with child: yield - async def _receive(self, ownership): # pylint: disable=too-many-statements - log.info("start ownership, %r", ownership) - partition_processor = self._partition_processor_factory() - partition_id = ownership["partition_id"] - eventhub_name = ownership["eventhub_name"] - consumer_group_name = ownership["consumer_group_name"] - owner_id = ownership["owner_id"] - partition_context = PartitionContext( - eventhub_name, - consumer_group_name, - partition_id, - owner_id, - self._partition_manager - ) - partition_consumer = self._eventhub_client.create_consumer( - consumer_group_name, - partition_id, - EventPosition(ownership.get("offset", self._initial_event_position.value)) + async def _process_error(self, partition_context, err): + log.warning( + "EventProcessor instance %r of eventhub %r partition %r consumer group %r" + " has met an error. The exception is %r.", + partition_context.owner_id, + partition_context.eventhub_name, + partition_context.partition_id, + partition_context.consumer_group_name, + err ) - - async def process_error(err): - log.warning( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" - " has met an error. The exception is %r.", - owner_id, eventhub_name, partition_id, consumer_group_name, err - ) + if self._error_handler: try: - await partition_processor.process_error(err, partition_context) + await self._error_handler(partition_context, err) except Exception as err_again: # pylint:disable=broad-except log.warning( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" - " has another error during running process_error(). The exception is %r.", - owner_id, eventhub_name, partition_id, consumer_group_name, err_again + "EventProcessor instance %r of eventhub %r partition %r consumer group %r. " + "An error occurred while running process_error(). The exception is %r.", + partition_context.owner_id, + partition_context.eventhub_name, + partition_context.partition_id, + partition_context.consumer_group_name, + err_again ) - async def close(reason): + async def _close_partition(self, partition_context, reason): + if self._partition_close_handler: log.info( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" + "EventProcessor instance %r of eventhub %r partition %r consumer group %r" " is being closed. Reason is: %r", - owner_id, eventhub_name, partition_id, consumer_group_name, reason + partition_context.owner_id, + partition_context.eventhub_name, + partition_context.partition_id, + partition_context.consumer_group_name, + reason ) try: - await partition_processor.close(reason, partition_context) + await self._partition_close_handler(partition_context, reason) except Exception as err: # pylint:disable=broad-except log.warning( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" - " has an error during running close(). The exception is %r.", - owner_id, eventhub_name, partition_id, consumer_group_name, err + "EventProcessor instance %r of eventhub %r partition %r consumer group %r. " + "An error occurred while running close(). The exception is %r.", + partition_context.owner_id, + partition_context.eventhub_name, + partition_context.partition_id, + partition_context.consumer_group_name, + err ) - try: - try: - await partition_processor.initialize(partition_context) - except Exception as err: # pylint:disable=broad-except - log.warning( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" - " has an error during running initialize(). The exception is %r.", - owner_id, eventhub_name, partition_id, consumer_group_name, err + async def _receive(self, partition_id, checkpoint=None): # pylint: disable=too-many-statements + try: # pylint:disable=too-many-nested-blocks + log.info("start ownership %r, checkpoint %r", partition_id, checkpoint) + namespace = self._namespace + eventhub_name = self._eventhub_name + consumer_group_name = self._consumer_group_name + owner_id = self._id + checkpoint_offset = checkpoint.get("offset") if checkpoint else None + if checkpoint_offset: + initial_event_position = EventPosition(checkpoint_offset) + elif isinstance(self._initial_event_position, EventPosition): + initial_event_position = self._initial_event_position + elif isinstance(self._initial_event_position, dict): + initial_event_position = self._initial_event_position.get(partition_id, EventPosition("-1")) + else: + initial_event_position = EventPosition(self._initial_event_position) + if partition_id in self._partition_contexts: + partition_context = self._partition_contexts[partition_id] + else: + partition_context = PartitionContext( + namespace, + eventhub_name, + consumer_group_name, + partition_id, + owner_id, + self._partition_manager ) - while True: - try: - events = await partition_consumer.receive() - with self._context(events): - await partition_processor.process_events(events, partition_context) + self._partition_contexts[partition_id] = partition_context + + partition_consumer = self._eventhub_client._create_consumer( # pylint: disable=protected-access + consumer_group_name, + partition_id, + initial_event_position, + owner_level=self._owner_level, + track_last_enqueued_event_properties=self._track_last_enqueued_event_properties, + prefetch=self._prefetch, + ) - except asyncio.CancelledError: - log.info( - "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" - " is cancelled", - owner_id, - eventhub_name, - partition_id, - consumer_group_name - ) - if self._running is False: - await close(CloseReason.SHUTDOWN) - else: - await close(CloseReason.OWNERSHIP_LOST) - raise - except EventHubError as eh_err: - await process_error(eh_err) - await close(CloseReason.EVENTHUB_EXCEPTION) - # An EventProcessor will pick up this partition again after the ownership is released - break - except OwnershipLostError: - await close(CloseReason.OWNERSHIP_LOST) - break - except Exception as other_error: # pylint:disable=broad-except - await process_error(other_error) - await close(CloseReason.PROCESS_EVENTS_ERROR) - break + try: + if self._partition_initialize_handler: + try: + await self._partition_initialize_handler(partition_context) + except Exception as err: # pylint:disable=broad-except + log.warning( + "EventProcessor instance %r of eventhub %r partition %r consumer group %r. " + " An error occurred while running initialize(). The exception is %r.", + owner_id, eventhub_name, partition_id, consumer_group_name, err + ) + while True: + try: + events = await partition_consumer.receive() + if events: + if self._track_last_enqueued_event_properties: + self._last_enqueued_event_properties[partition_id] = \ + partition_consumer.last_enqueued_event_properties + with self._context(events): + await self._event_handler(partition_context, events) + except asyncio.CancelledError: + log.info( + "EventProcessor instance %r of eventhub %r partition %r consumer group %r" + " is cancelled", + owner_id, + eventhub_name, + partition_id, + consumer_group_name + ) + raise + except Exception as error: # pylint:disable=broad-except + await self._process_error(partition_context, error) + break + # Go to finally to stop this partition processor. + # Later an EventProcessor(this one or another one) will pick up this partition again + finally: + await partition_consumer.close() + if self._running is False: + await self._close_partition(partition_context, CloseReason.SHUTDOWN) + else: + await self._close_partition(partition_context, CloseReason.OWNERSHIP_LOST) finally: - await partition_consumer.close() + if partition_id in self._tasks: + del self._tasks[partition_id] + + async def start(self): + """Start the EventProcessor. + + The EventProcessor will try to claim and balance partition ownership with other `EventProcessor` + and asynchronously start receiving EventData from EventHub and processing events. + + :return: None + + """ + log.info("EventProcessor %r is being started", self._id) + ownership_manager = OwnershipManager(self._eventhub_client, self._consumer_group_name, self._id, + self._partition_manager, self._ownership_timeout, self._partition_id) + if not self._running: + self._running = True + while self._running: + try: + checkpoints = await ownership_manager.get_checkpoints() if self._partition_manager else None + claimed_partition_ids = await ownership_manager.claim_ownership() + if claimed_partition_ids: + to_cancel_list = self._tasks.keys() - claimed_partition_ids + self._create_tasks_for_claimed_ownership(claimed_partition_ids, checkpoints) + else: + log.info("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id) + to_cancel_list = set(self._tasks.keys()) + self._cancel_tasks_for_partitions(to_cancel_list) + except Exception as err: # pylint:disable=broad-except + ''' + ownership_manager.get_checkpoints() and ownership_manager.claim_ownership() may raise exceptions + when there are load balancing and/or checkpointing (partition_manager isn't None). + They're swallowed here to retry every self._polling_interval seconds. Meanwhile this event processor + won't lose the partitions it has claimed before. + If it keeps failing, other EventProcessors will start to claim ownership of the partitions + that this EventProcessor is working on. So two or multiple EventProcessors may be working + on the same partition. + ''' # pylint:disable=pointless-string-statement + log.warning("An exception (%r) occurred during balancing and claiming ownership for " + "eventhub %r consumer group %r. Retrying after %r seconds", + err, self._eventhub_name, self._consumer_group_name, self._polling_interval) + await asyncio.sleep(self._polling_interval) + + async def stop(self): + """Stop the EventProcessor. + + The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions + it is working on. + Other running EventProcessor will take over these released partitions. + + A stopped EventProcessor can be restarted by calling method `start` again. + + :return: None + + """ + self._running = False + pids = list(self._tasks.keys()) + self._cancel_tasks_for_partitions(pids) + log.info("EventProcessor %r tasks have been cancelled.", self._id) + while self._tasks: + await asyncio.sleep(1) + log.info("EventProcessor %r has been stopped.", self._id) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/local_partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/local_partition_manager.py new file mode 100644 index 000000000000..77946a42d9b2 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/local_partition_manager.py @@ -0,0 +1,24 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ----------------------------------------------------------------------------------- + +from .sqlite3_partition_manager import Sqlite3PartitionManager + + +class InMemoryPartitionManager(Sqlite3PartitionManager): + """A partition manager that stores checkpoint and load balancer partition ownership data in memory. + This is for mock test only. + + """ + def __init__(self): + super(InMemoryPartitionManager, self).__init__(db_filename=":memory:") + + +class FileBasedPartitionManager(Sqlite3PartitionManager): + """A partition manager that stores checkpoint and load balancer partition ownership data in a file. + This is for internal test only. + + """ + def __init__(self, filename): + super(FileBasedPartitionManager, self).__init__(db_filename=filename) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_context.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_context.py index 961540d0dea7..fac34bba567e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_context.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_context.py @@ -3,37 +3,42 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # ----------------------------------------------------------------------------------- - +import logging from .partition_manager import PartitionManager +_LOGGER = logging.getLogger(__name__) + class PartitionContext(object): """Contains partition related context information for a PartitionProcessor instance to use. Users can use update_checkpoint() of this class to save checkpoint data. """ - def __init__(self, eventhub_name: str, consumer_group_name: str, - partition_id: str, owner_id: str, partition_manager: PartitionManager): + def __init__(self, fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str, + partition_id: str, owner_id: str, partition_manager: PartitionManager = None): + self.fully_qualified_namespace = fully_qualified_namespace self.partition_id = partition_id self.eventhub_name = eventhub_name self.consumer_group_name = consumer_group_name self.owner_id = owner_id self._partition_manager = partition_manager - async def update_checkpoint(self, offset, sequence_number=None): + async def update_checkpoint(self, event): """ Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service. - :param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with. - :type offset: str - :param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be - associated with. - :type sequence_number: int - :return: None + :param ~azure.eventhub.EventData event: The EventData instance which contains the offset and + sequence number information used for checkpoint. + :rtype: None """ - # TODO: whether change this method to accept event_data as well - await self._partition_manager.update_checkpoint( - self.eventhub_name, self.consumer_group_name, self.partition_id, self.owner_id, offset, - sequence_number - ) + if self._partition_manager: + await self._partition_manager.update_checkpoint( + self.fully_qualified_namespace, self.eventhub_name, self.consumer_group_name, + self.partition_id, event.offset, event.sequence_number + ) + else: + _LOGGER.info( + "namespace %r, eventhub %r, consumer_group %r, partition_id %r " + "update_checkpoint is called without partition manager. No checkpoint is updated.", + self.fully_qualified_namespace, self.eventhub_name, self.consumer_group_name, self.partition_id) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py index c90dfac7235e..946d18176e35 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_manager.py @@ -14,25 +14,24 @@ class PartitionManager(ABC): """ @abstractmethod - async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]: + async def list_ownership(self, fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str) \ + -> Iterable[Dict[str, Any]]: """ Retrieves a complete ownership list from the chosen storage service. - :param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to + :param str fully_qualified_namespace: The fully qualified namespace that the event hub belongs to. + The format is like ".servicebus.windows.net" + :param str eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. - :type eventhub_name: str - :param consumer_group_name: The name of the consumer group the ownership are associated with. - :type consumer_group_name: str - :return: Iterable of dictionaries containing the following partition ownership information: - eventhub_name - consumer_group_name - owner_id - partition_id - owner_level - offset - sequence_number - last_modified_time - etag + :param str consumer_group_name: The name of the consumer group the ownership are associated with. + :rtype: Iterable[Dict[str, Any]], Iterable of dictionaries containing partition ownership information: + * fully_qualified_namespace + * eventhub_name + * consumer_group_name + * owner_id + * partition_id + * last_modified_time + * etag """ @abstractmethod @@ -40,47 +39,57 @@ async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Ite """ Tries to claim a list of specified ownership. - :param ownership_list: Iterable of dictionaries containing all the ownership to claim. - :type ownership_list: Iterable of dict - :return: Iterable of dictionaries containing the following partition ownership information: - eventhub_name - consumer_group_name - owner_id - partition_id - owner_level - offset - sequence_number - last_modified_time - etag + :param Iterable[Dict[str,Any]] ownership_list: Iterable of dictionaries containing all the ownership to claim. + :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information: + * fully_qualified_namespace + * eventhub_name + * consumer_group_name + * owner_id + * partition_id + * last_modified_time + * etag """ @abstractmethod - async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, - offset, sequence_number) -> None: + async def update_checkpoint(self, fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str, + partition_id: str, offset: str, sequence_number: int) -> None: """ Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service. - :param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to + :param str fully_qualified_namespace: The fully qualified namespace that the event hub belongs to. + The format is like ".servicebus.windows.net" + :param str eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. - :type eventhub_name: str - :param consumer_group_name: The name of the consumer group the ownership are associated with. - :type consumer_group_name: str - :param partition_id: The partition id which the checkpoint is created for. - :type partition_id: str - :param owner_id: The identifier of the ~azure.eventhub.eventprocessor.EventProcessor. - :type owner_id: str - :param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with. - :type offset: str - :param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint + :param str consumer_group_name: The name of the consumer group the ownership are associated with. + :param str partition_id: The partition id which the checkpoint is created for. + :param str offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with. + :param int sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with. - :type sequence_number: int - :return: None + :rtype: None :raise: `OwnershipLostError` """ + @abstractmethod + async def list_checkpoints(self, fully_qualified_namespace: str, eventhub_name: str, consumer_group_name: str): + """List the updated checkpoints from the store + + :param str fully_qualified_namespace: The fully qualified namespace that the event hub belongs to. + The format is like ".servicebus.windows.net" + :param str eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to + the Event Hubs namespace that contains it. + :param str consumer_group_name: The name of the consumer group the ownership are associated with. + :rtype: Iterable[Dict[str,Any]], Iterable of dictionaries containing partition ownership information: + * fully_qualified_namespace + * eventhub_name + * consumer_group_name + * partition_id + * sequence_number + * offset + """ + class OwnershipLostError(Exception): - """Raises when update_checkpoint detects the ownership to a partition has been lost + """Raises when an EventHubConsumerClient fails to renew the ownership of a partition """ diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sample_partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sample_partition_manager.py deleted file mode 100644 index cf92d92af89d..000000000000 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sample_partition_manager.py +++ /dev/null @@ -1,144 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ----------------------------------------------------------------------------------- - -import time -import uuid -import sqlite3 -import logging -from azure.eventhub.aio.eventprocessor import PartitionManager, OwnershipLostError - -logger = logging.getLogger(__name__) - - -def _check_table_name(table_name: str): - for c in table_name: - if not (c.isalnum() or c == "_"): - raise ValueError("Table name \"{}\" is not in correct format".format(table_name)) - return table_name - - -class SamplePartitionManager(PartitionManager): - """An implementation of PartitionManager by using the sqlite3 in Python standard library. - Sqlite3 is a mini sql database that runs in memory or files. - Please don't use this PartitionManager for production use. - - - """ - primary_keys_dict = {"eventhub_name": "text", "consumer_group_name": "text", "partition_id": "text"} - other_fields_dict = {"owner_id": "text", "owner_level": "integer", "sequence_number": "integer", "offset": "text", - "last_modified_time": "real", "etag": "text"} - checkpoint_fields = ["sequence_number", "offset"] - fields_dict = {**primary_keys_dict, **other_fields_dict} - primary_keys = list(primary_keys_dict.keys()) - other_fields = list(other_fields_dict.keys()) - fields = primary_keys + other_fields - - def __init__(self, db_filename: str = ":memory:", ownership_table: str = "ownership"): - """ - - :param db_filename: name of file that saves the sql data. - Sqlite3 will run in memory without a file when db_filename is ":memory:". - :param ownership_table: The table name of the sqlite3 database. - """ - super(SamplePartitionManager, self).__init__() - self.ownership_table = _check_table_name(ownership_table) - conn = sqlite3.connect(db_filename) - c = conn.cursor() - try: - sql = "create table if not exists " + _check_table_name(ownership_table)\ - + "("\ - + ",".join([x[0]+" "+x[1] for x in self.fields_dict.items()])\ - + ", constraint pk_ownership PRIMARY KEY ("\ - + ",".join(self.primary_keys)\ - + "))" - c.execute(sql) - finally: - c.close() - self.conn = conn - - async def list_ownership(self, eventhub_name, consumer_group_name): - cursor = self.conn.cursor() - try: - cursor.execute("select " + ",".join(self.fields) + - " from "+_check_table_name(self.ownership_table)+" where eventhub_name=? " - "and consumer_group_name=?", - (eventhub_name, consumer_group_name)) - return [dict(zip(self.fields, row)) for row in cursor.fetchall()] - finally: - cursor.close() - - async def claim_ownership(self, ownership_list): - result = [] - cursor = self.conn.cursor() - try: - for p in ownership_list: - cursor.execute("select etag from " + _check_table_name(self.ownership_table) + - " where "+ " and ".join([field+"=?" for field in self.primary_keys]), - tuple(p.get(field) for field in self.primary_keys)) - cursor_fetch = cursor.fetchall() - if not cursor_fetch: - p["last_modified_time"] = time.time() - p["etag"] = str(uuid.uuid4()) - try: - fields_without_checkpoint = list(filter(lambda x: x not in self.checkpoint_fields, self.fields)) - sql = "insert into " + _check_table_name(self.ownership_table) + " (" \ - + ",".join(fields_without_checkpoint) \ - + ") values (?,?,?,?,?,?,?)" - cursor.execute(sql, tuple(p.get(field) for field in fields_without_checkpoint)) - except sqlite3.OperationalError as op_err: - logger.info("EventProcessor %r failed to claim partition %r " - "because it was claimed by another EventProcessor at the same time. " - "The Sqlite3 exception is %r", p["owner_id"], p["partition_id"], op_err) - continue - else: - result.append(p) - else: - if p.get("etag") == cursor_fetch[0][0]: - p["last_modified_time"] = time.time() - p["etag"] = str(uuid.uuid4()) - other_fields_without_checkpoint = list( - filter(lambda x: x not in self.checkpoint_fields, self.other_fields) - ) - sql = "update " + _check_table_name(self.ownership_table) + " set "\ - + ','.join([field+"=?" for field in other_fields_without_checkpoint])\ - + " where "\ - + " and ".join([field+"=?" for field in self.primary_keys]) - - cursor.execute(sql, tuple(p.get(field) for field in other_fields_without_checkpoint) - + tuple(p.get(field) for field in self.primary_keys)) - result.append(p) - else: - logger.info("EventProcessor %r failed to claim partition %r " - "because it was claimed by another EventProcessor at the same time", p["owner_id"], - p["partition_id"]) - self.conn.commit() - return result - finally: - cursor.close() - - async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, - offset, sequence_number): - cursor = self.conn.cursor() - try: - cursor.execute("select owner_id from " + _check_table_name(self.ownership_table) - + " where eventhub_name=? and consumer_group_name=? and partition_id=?", - (eventhub_name, consumer_group_name, partition_id)) - cursor_fetch = cursor.fetchall() - if cursor_fetch and owner_id == cursor_fetch[0][0]: - cursor.execute("update " + _check_table_name(self.ownership_table) - + " set offset=?, sequence_number=? " - "where eventhub_name=? and consumer_group_name=? and partition_id=?", - (offset, sequence_number, eventhub_name, consumer_group_name, partition_id)) - self.conn.commit() - else: - logger.info("EventProcessor couldn't checkpoint to partition %r because it no longer has the ownership", - partition_id) - raise OwnershipLostError() - - finally: - cursor.close() - - async def close(self): - self.conn.close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sqlite3_partition_manager.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sqlite3_partition_manager.py new file mode 100644 index 000000000000..577ed917f56e --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/sqlite3_partition_manager.py @@ -0,0 +1,160 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# ----------------------------------------------------------------------------------- + +import time +import uuid +import sqlite3 +import logging +from .partition_manager import PartitionManager + +logger = logging.getLogger(__name__) + + +def _check_table_name(table_name: str): + for c in table_name: + if not (c.isalnum() or c == "_"): + raise ValueError("Table name \"{}\" is not in correct format".format(table_name)) + return table_name + + +class Sqlite3PartitionManager(PartitionManager): + """An implementation of PartitionManager by using the sqlite3 in Python standard library. + Sqlite3 is a mini sql database that runs in memory or files. + This is for test only. + + :param db_filename: name of file that saves the sql data. Sqlite3 will run in memory without + a file when db_filename is ":memory:". + + """ + primary_keys_dict = {"fully_qualified_namespace": "text", "eventhub_name": "text", + "consumer_group_name": "text", "partition_id": "text"} + primary_keys = list(primary_keys_dict.keys()) + + ownership_data_fields_dict = {"owner_id": "text", "last_modified_time": "real", "etag": "text"} + ownership_fields_dict = {**primary_keys_dict, **ownership_data_fields_dict} + ownership_data_fields = list(ownership_data_fields_dict.keys()) + ownership_fields = primary_keys + ownership_data_fields + + checkpoint_data_fields_dict = {"sequence_number": "integer", "offset": "text"} + checkpoint_data_fields = list(checkpoint_data_fields_dict.keys()) + checkpoint_fields_dict = {**primary_keys_dict, **checkpoint_data_fields_dict} + checkpoint_fields = primary_keys + checkpoint_data_fields + + def __init__(self, db_filename: str = ":memory:", + ownership_table: str = "ownership", checkpoint_table: str = "checkpoint"): + super(Sqlite3PartitionManager, self).__init__() + self.ownership_table = _check_table_name(ownership_table) + self.checkpoint_table = _check_table_name(checkpoint_table) + conn = sqlite3.connect(db_filename) + c = conn.cursor() + try: + ownership_sql = "create table if not exists " + self.ownership_table\ + + "("\ + + ",".join([x[0]+" "+x[1] for x in self.ownership_fields_dict.items()])\ + + ", constraint pk_ownership PRIMARY KEY ("\ + + ",".join(self.primary_keys)\ + + "))" + c.execute(ownership_sql) + + checkpoint_sql = "create table if not exists " + self.checkpoint_table \ + + "(" \ + + ",".join([x[0] + " " + x[1] for x in self.checkpoint_fields_dict.items()]) \ + + ", constraint pk_ownership PRIMARY KEY (" \ + + ",".join(self.primary_keys) \ + + "))" + c.execute(checkpoint_sql) + finally: + c.close() + self.conn = conn + + async def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_group_name): + cursor = self.conn.cursor() + try: + cursor.execute("select " + ",".join(self.ownership_fields) + + " from "+_check_table_name(self.ownership_table) + + " where fully_qualified_namespace=? and eventhub_name=? and consumer_group_name=?", + (fully_qualified_namespace, eventhub_name, consumer_group_name)) + return [dict(zip(self.ownership_fields, row)) for row in cursor.fetchall()] + finally: + cursor.close() + + async def claim_ownership(self, ownership_list): + result = [] + cursor = self.conn.cursor() + try: + for p in ownership_list: + cursor.execute("select etag from " + _check_table_name(self.ownership_table) + + " where "+ " and ".join([field+"=?" for field in self.primary_keys]), + tuple(p.get(field) for field in self.primary_keys)) + cursor_fetch = cursor.fetchall() + if not cursor_fetch: + p["last_modified_time"] = time.time() + p["etag"] = str(uuid.uuid4()) + try: + sql = "insert into " + _check_table_name(self.ownership_table) + " (" \ + + ",".join(self.ownership_fields) \ + + ") values ("+",".join(["?"] * len(self.ownership_fields)) + ")" + cursor.execute(sql, tuple(p.get(field) for field in self.ownership_fields)) + except sqlite3.OperationalError as op_err: + logger.info("EventProcessor %r failed to claim partition %r " + "because it was claimed by another EventProcessor at the same time. " + "The Sqlite3 exception is %r", p["owner_id"], p["partition_id"], op_err) + continue + else: + result.append(p) + else: + if p.get("etag") == cursor_fetch[0][0]: + p["last_modified_time"] = time.time() + p["etag"] = str(uuid.uuid4()) + sql = "update " + _check_table_name(self.ownership_table) + " set "\ + + ','.join([field+"=?" for field in self.ownership_data_fields])\ + + " where "\ + + " and ".join([field+"=?" for field in self.primary_keys]) + + cursor.execute(sql, tuple(p.get(field) for field in self.ownership_data_fields) + + tuple(p.get(field) for field in self.primary_keys)) + result.append(p) + else: + logger.info("EventProcessor %r failed to claim partition %r " + "because it was claimed by another EventProcessor at the same time", p["owner_id"], + p["partition_id"]) + self.conn.commit() + return result + finally: + cursor.close() + + async def update_checkpoint( + self, fully_qualified_namespace, eventhub_name, consumer_group_name, partition_id, offset, sequence_number): + cursor = self.conn.cursor() + localvars = locals() + try: + cursor.execute("insert or replace into " + self.checkpoint_table + "(" + + ",".join([field for field in self.checkpoint_fields]) + + ") values (" + + ",".join(["?"] * len(self.checkpoint_fields)) + + ")", + tuple(localvars[field] for field in self.checkpoint_fields) + ) + self.conn.commit() + finally: + cursor.close() + + async def list_checkpoints(self, fully_qualified_namespace, eventhub_name, consumer_group_name): + cursor = self.conn.cursor() + try: + cursor.execute("select " + + ",".join(self.checkpoint_fields) + + " from " + + self.checkpoint_table + + " where fully_qualified_namespace=? and eventhub_name=? and consumer_group_name=?", + (fully_qualified_namespace, eventhub_name, consumer_group_name) + ) + return [dict(zip(self.checkpoint_fields, row)) for row in cursor.fetchall()] + + finally: + cursor.close() + + async def close(self): + self.conn.close() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 75b39217ddb4..e8981e3eca07 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -14,8 +14,8 @@ from azure.core.tracing import SpanKind, AbstractSpan # type: ignore from azure.core.settings import settings # type: ignore -from azure.eventhub.common import EventData, EventDataBatch -from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError +from ..common import EventData, EventDataBatch +from ..error import _error_handler, OperationTimeoutError, EventDataError from ..producer import _error, _set_partition_key, _set_trace_message from ._consumer_producer_mixin_async import ConsumerProducerMixin @@ -138,28 +138,16 @@ def _on_outcome(self, outcome, condition): self._outcome = outcome self._condition = condition - async def create_batch(self, max_size=None, partition_key=None): - # type:(int, str) -> EventDataBatch + async def create_batch(self, max_size=None): + # type:(int) -> EventDataBatch """ Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side. :param max_size: The maximum size of bytes data that an EventDataBatch object can hold. :type max_size: int - :param partition_key: With the given partition_key, event data will land to - a particular partition of the Event Hub decided by the service. - :type partition_key: str :return: an EventDataBatch instance :rtype: ~azure.eventhub.EventDataBatch - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START eventhub_client_async_create_batch] - :end-before: [END eventhub_client_async_create_batch] - :language: python - :dedent: 4 - :caption: Create EventDataBatch object within limited size - """ if not self._max_message_size_on_link: @@ -169,7 +157,7 @@ async def create_batch(self, max_size=None, partition_key=None): raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_size, self._max_message_size_on_link)) - return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key) + return EventDataBatch(max_size=(max_size or self._max_message_size_on_link)) async def send( self, event_data: Union[EventData, EventDataBatch, Iterable[EventData]], @@ -192,15 +180,6 @@ async def send( ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError :return: None :rtype: None - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START eventhub_client_async_send] - :end-before: [END eventhub_client_async_send] - :language: python - :dedent: 4 - :caption: Sends an event data and blocks until acknowledgement is received or operation times out. - """ # Tracing code span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] @@ -241,14 +220,5 @@ async def close(self): """ Close down the handler. If the handler has already closed, this will be a no op. - - Example: - .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py - :start-after: [START eventhub_client_async_sender_close] - :end-before: [END eventhub_client_async_sender_close] - :language: python - :dedent: 4 - :caption: Close down the handler. - """ await super(EventHubProducer, self).close() diff --git a/sdk/eventhub/azure-eventhubs/conftest.py b/sdk/eventhub/azure-eventhubs/conftest.py index b55f0ad6f204..b5e07e8915ca 100644 --- a/sdk/eventhub/azure-eventhubs/conftest.py +++ b/sdk/eventhub/azure-eventhubs/conftest.py @@ -19,6 +19,7 @@ collect_ignore.append("tests/eventprocessor") collect_ignore.append("features") collect_ignore.append("samples/async_samples") + collect_ignore.append("examples/async_examples") from azure.eventhub.client import EventHubClient from azure.eventhub import EventPosition diff --git a/sdk/eventhub/azure-eventhubs/dev_requirements.txt b/sdk/eventhub/azure-eventhubs/dev_requirements.txt index f355027adfa4..c808b7948163 100644 --- a/sdk/eventhub/azure-eventhubs/dev_requirements.txt +++ b/sdk/eventhub/azure-eventhubs/dev_requirements.txt @@ -5,4 +5,4 @@ docutils>=0.14 pygments>=2.2.0 behave==1.2.6 -wheel \ No newline at end of file +wheel diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_auth_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_auth_async.py index 935fa944379f..1dc9a517f24e 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_auth_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_auth_async.py @@ -5,42 +5,40 @@ #-------------------------------------------------------------------------- import pytest -import time import asyncio -from azure.eventhub import EventData, EventPosition -from azure.eventhub.aio import EventHubClient +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient @pytest.mark.liveTest @pytest.mark.asyncio async def test_client_secret_credential_async(aad_credential, live_eventhub): try: - from azure.identity.aio import ClientSecretCredential + from azure.identity.aio import EnvironmentCredential except ImportError: pytest.skip("No azure identity library") - client_id, secret, tenant_id = aad_credential - credential = ClientSecretCredential(client_id=client_id, client_secret=secret, tenant_id=tenant_id) - client = EventHubClient(host=live_eventhub['hostname'], - event_hub_path=live_eventhub['event_hub'], - credential=credential, - user_agent='customized information') - sender = client.create_producer(partition_id='0') - receiver = client.create_consumer(consumer_group="$default", partition_id='0', event_position=EventPosition("@latest")) - - async with receiver: - - received = await receiver.receive(timeout=3) - assert len(received) == 0 - - async with sender: - event = EventData(body='A single message') - await sender.send(event) - - await asyncio.sleep(1) - - received = await receiver.receive(timeout=3) - - assert len(received) == 1 - assert list(received[0].body)[0] == 'A single message'.encode('utf-8') + credential = EnvironmentCredential() + producer_client = EventHubProducerClient(host=live_eventhub['hostname'], + event_hub_path=live_eventhub['event_hub'], + credential=credential, + user_agent='customized information') + consumer_client = EventHubConsumerClient(host=live_eventhub['hostname'], + event_hub_path=live_eventhub['event_hub'], + credential=credential, + user_agent='customized information') + + async with producer_client: + await producer_client.send(EventData(body='A single message')) + + async def event_handler(partition_context, events): + assert partition_context.partition_id == '0' + assert len(events) == 1 + assert list(events[0].body)[0] == 'A single message'.encode('utf-8') + + async with consumer_client: + task = asyncio.ensure_future( + consumer_client.receive(event_handler=event_handler, consumer_group='$default', partition_id='0')) + await asyncio.sleep(2) + task.cancel() diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_consumer_client_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_consumer_client_async.py new file mode 100644 index 000000000000..85811d35fc73 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_consumer_client_async.py @@ -0,0 +1,75 @@ +import pytest +import asyncio +from azure.eventhub import EventData, EventPosition +from azure.eventhub.aio import EventHubConsumerClient +from azure.eventhub.aio.eventprocessor.local_partition_manager import InMemoryPartitionManager + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_receive_no_partition_async(connstr_senders): + connection_str, senders = connstr_senders + senders[0].send(EventData("Test EventData")) + senders[1].send(EventData("Test EventData")) + client = EventHubConsumerClient.from_connection_string(connection_str) + received = 0 + + async def on_events(partition_context, events): + nonlocal received + received += len(events) + + async with client: + task = asyncio.ensure_future( + client.receive(on_events, consumer_group="$default", initial_event_position="-1")) + await asyncio.sleep(10) + assert received == 2 + # task.cancel() + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_receive_partition_async(connstr_senders): + connection_str, senders = connstr_senders + senders[0].send(EventData("Test EventData")) + client = EventHubConsumerClient.from_connection_string(connection_str) + received = 0 + + async def on_events(partition_context, events): + nonlocal received + received += len(events) + assert partition_context.partition_id == "0" + assert partition_context.consumer_group_name == "$default" + assert partition_context.fully_qualified_namespace in connection_str + assert partition_context.eventhub_name == senders[0]._client.eh_name + + async with client: + task = asyncio.ensure_future( + client.receive(on_events, consumer_group="$default", partition_id="0", initial_event_position="-1")) + await asyncio.sleep(10) + assert received == 1 + # task.cancel() + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_receive_load_balancing_async(connstr_senders): + connection_str, senders = connstr_senders + pm = InMemoryPartitionManager() + client1 = EventHubConsumerClient.from_connection_string( + connection_str, partition_manager=pm, load_balancing_interval=1) + client2 = EventHubConsumerClient.from_connection_string( + connection_str, partition_manager=pm, load_balancing_interval=1) + + async def on_events(partition_context, events): + pass + + async with client1, client2: + task1 = asyncio.ensure_future( + client1.receive(on_events, consumer_group="$default", initial_event_position="-1")) + task2 = asyncio.ensure_future( + client2.receive(on_events, consumer_group="$default", initial_event_position="-1")) + await asyncio.sleep(10) + assert len(client1._event_processors[("$default", "-1")]._tasks) == 1 + assert len(client2._event_processors[("$default", "-1")]._tasks) == 1 + # task1.cancel() + # task2.cancel() diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_negative_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_negative_async.py index b7d7142a6f51..cce75cac93af 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_negative_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_negative_async.py @@ -4,10 +4,8 @@ # license information. #-------------------------------------------------------------------------- -import os import asyncio import pytest -import time import sys from azure.eventhub import ( @@ -15,108 +13,116 @@ EventPosition, EventHubError, ConnectError, - ConnectionLostError, AuthenticationError, - EventDataError, EventDataSendError, ) -from azure.eventhub.aio import EventHubClient +from azure.eventhub.aio.client_async import EventHubClient + @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_with_invalid_hostname_async(invalid_hostname, connstr_receivers): _, receivers = connstr_receivers - client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(invalid_hostname) + sender = client._create_producer() with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) await sender.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_invalid_hostname_async(invalid_hostname): - client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + client = EventHubClient.from_connection_string(invalid_hostname) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=3) await receiver.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_with_invalid_key_async(invalid_key, connstr_receivers): _, receivers = connstr_receivers - client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(invalid_key) + sender = client._create_producer() with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) await sender.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_invalid_key_async(invalid_key): - client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + client = EventHubClient.from_connection_string(invalid_key) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=3) await receiver.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_with_invalid_policy_async(invalid_policy, connstr_receivers): _, receivers = connstr_receivers - client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(invalid_policy) + sender = client._create_producer() with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) await sender.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_invalid_policy_async(invalid_policy): - client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + client = EventHubClient.from_connection_string(invalid_policy) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=3) await receiver.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_partition_key_with_partition_async(connection_str): pytest.skip("No longer raise value error. EventData will be sent to partition_id") - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer(partition_id="1") + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer(partition_id="1") try: data = EventData(b"Data") with pytest.raises(ValueError): await sender.send(EventData("test data")) finally: await sender.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_non_existing_entity_sender_async(connection_str): - client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) - sender = client.create_producer(partition_id="1") + client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo") + sender = client._create_producer(partition_id="1") with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) await sender.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_non_existing_entity_receiver_async(connection_str): - client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo") + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=5) await receiver.close() + await client.close() @pytest.mark.liveTest @@ -124,11 +130,12 @@ async def test_non_existing_entity_receiver_async(connection_str): async def test_receive_from_invalid_partitions_async(connection_str): partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1")) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1")) with pytest.raises(ConnectError): await receiver.receive(timeout=5) await receiver.close() + await client.close() @pytest.mark.liveTest @@ -136,11 +143,12 @@ async def test_receive_from_invalid_partitions_async(connection_str): async def test_send_to_invalid_partitions_async(connection_str): partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer(partition_id=p) + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer(partition_id=p) with pytest.raises(ConnectError): await sender.send(EventData("test data")) await sender.close() + await client.close() @pytest.mark.liveTest @@ -148,27 +156,29 @@ async def test_send_to_invalid_partitions_async(connection_str): async def test_send_too_large_message_async(connection_str): if sys.platform.startswith('darwin'): pytest.skip("Skipping on OSX - open issue regarding message size") - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() try: data = EventData(b"A" * 1100000) with pytest.raises(EventDataSendError): await sender.send(data) finally: await sender.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_null_body_async(connection_str): - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() try: with pytest.raises(ValueError): data = EventData(None) await sender.send(data) finally: await sender.close() + await client.close() async def pump(receiver): @@ -187,10 +197,10 @@ async def pump(receiver): @pytest.mark.asyncio async def test_max_receivers_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) + client = EventHubClient.from_connection_string(connection_str) receivers = [] for i in range(6): - receivers.append(client.create_consumer(consumer_group="$default", partition_id="0", prefetch=1000, event_position=EventPosition('@latest'))) + receivers.append(client._create_consumer(consumer_group="$default", partition_id="0", prefetch=1000, event_position=EventPosition('@latest'))) outputs = await asyncio.gather( pump(receivers[0]), @@ -204,27 +214,30 @@ async def test_max_receivers_async(connstr_senders): failed = [o for o in outputs if isinstance(o, EventHubError)] assert len(failed) == 1 print(failed[0].message) + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_create_batch_with_invalid_hostname_async(invalid_hostname): - client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(invalid_hostname) + sender = client._create_producer() try: with pytest.raises(AuthenticationError): - batch_event_data = await sender.create_batch(max_size=300, partition_key="key") + batch_event_data = await sender.create_batch(max_size=300) finally: await sender.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_create_batch_with_too_large_size_async(connection_str): - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() try: with pytest.raises(ValueError): - batch_event_data = await sender.create_batch(max_size=5 * 1024 * 1024, partition_key="key") + batch_event_data = await sender.create_batch(max_size=5 * 1024 * 1024) finally: await sender.close() + await client.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_producer_client_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_producer_client_async.py new file mode 100644 index 000000000000..71879dad6e63 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_producer_client_async.py @@ -0,0 +1,77 @@ +import pytest +import asyncio +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubProducerClient + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_with_partition_key_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + + async with client: + data_val = 0 + for partition in [b"a", b"b", b"c", b"d", b"e", b"f"]: + partition_key = b"test_partition_" + partition + for i in range(50): + data = EventData(str(data_val)) + data_val += 1 + await client.send(data, partition_key=partition_key) + + found_partition_keys = {} + for index, partition in enumerate(receivers): + received = partition.receive(timeout=5) + for message in received: + try: + existing = found_partition_keys[message.partition_key] + assert existing == index + except KeyError: + found_partition_keys[message.partition_key] = index + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_partition_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + async with client: + await client.send(EventData(b"Data"), partition_id="1") + + partition_0 = receivers[0].receive(timeout=2) + assert len(partition_0) == 0 + partition_1 = receivers[1].receive(timeout=2) + assert len(partition_1) == 1 + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_partitio_concurrent_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + async with client: + await asyncio.gather(client.send(EventData(b"Data"), partition_id="1"), + client.send(EventData(b"Data"), partition_id="1")) + + partition_0 = receivers[0].receive(timeout=2) + assert len(partition_0) == 0 + partition_1 = receivers[1].receive(timeout=2) + assert len(partition_1) == 2 + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_no_partition_batch_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubProducerClient.from_connection_string(connection_str) + async with client: + event_batch = await client.create_batch() + try: + while True: + event_batch.try_add(EventData(b"Data")) + except ValueError: + await client.send(event_batch) + + partition_0 = receivers[0].receive(timeout=2) + partition_1 = receivers[1].receive(timeout=2) + assert len(partition_0) + len(partition_1) > 10 diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py index 9401c65515aa..475640ed9b28 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py @@ -6,7 +6,7 @@ import pytest from azure.eventhub import EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubClient +from azure.eventhub.aio.client_async import EventHubClient @pytest.mark.liveTest @@ -17,7 +17,7 @@ async def test_get_properties(live_eventhub): ) properties = await client.get_properties() assert properties['path'] == live_eventhub['event_hub'] and properties['partition_ids'] == ['0', '1'] - + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio @@ -27,6 +27,7 @@ async def test_get_partition_ids(live_eventhub): ) partition_ids = await client.get_partition_ids() assert partition_ids == ['0', '1'] + await client.close() @pytest.mark.liveTest @@ -43,3 +44,4 @@ async def test_get_partition_properties(live_eventhub): and 'last_enqueued_offset' in properties \ and 'last_enqueued_time_utc' in properties \ and 'is_empty' in properties + await client.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receive_async.py index 55e2fa60ff91..2584e505da8a 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receive_async.py @@ -4,21 +4,20 @@ # license information. #-------------------------------------------------------------------------- -import os import asyncio import pytest import time -from azure.eventhub import EventData, EventPosition, EventHubError, TransportType, ConnectionLostError, ConnectError -from azure.eventhub.aio import EventHubClient +from azure.eventhub import EventData, EventPosition, TransportType, ConnectionLostError +from azure.eventhub.aio.client_async import EventHubClient @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_end_of_stream_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -27,14 +26,15 @@ async def test_receive_end_of_stream_async(connstr_senders): assert len(received) == 1 assert list(received[-1].body)[0] == b"Receiving only a single event" + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_offset_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -44,21 +44,22 @@ async def test_receive_with_offset_async(connstr_senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset, inclusive=False)) + offset_receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset, inclusive=False)) async with offset_receiver: received = await offset_receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Message after offset")) received = await offset_receiver.receive(timeout=5) assert len(received) == 1 + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_inclusive_offset_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -68,18 +69,19 @@ async def test_receive_with_inclusive_offset_async(connstr_senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset, inclusive=True)) + offset_receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset, inclusive=True)) async with offset_receiver: received = await offset_receiver.receive(timeout=5) assert len(received) == 1 + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_datetime_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -88,7 +90,7 @@ async def test_receive_with_datetime_async(connstr_senders): assert len(received) == 1 offset = received[0].enqueued_time - offset_receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset)) + offset_receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset)) async with offset_receiver: received = await offset_receiver.receive(timeout=5) assert len(received) == 0 @@ -96,14 +98,15 @@ async def test_receive_with_datetime_async(connstr_senders): time.sleep(1) received = await offset_receiver.receive(timeout=5) assert len(received) == 1 + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_sequence_no_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -112,7 +115,7 @@ async def test_receive_with_sequence_no_async(connstr_senders): assert len(received) == 1 offset = received[0].sequence_number - offset_receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset)) + offset_receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset)) async with offset_receiver: received = await offset_receiver.receive(timeout=5) assert len(received) == 0 @@ -120,14 +123,15 @@ async def test_receive_with_sequence_no_async(connstr_senders): time.sleep(1) received = await offset_receiver.receive(timeout=5) assert len(received) == 1 + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_inclusive_sequence_no_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -136,18 +140,19 @@ async def test_receive_with_inclusive_sequence_no_async(connstr_senders): assert len(received) == 1 offset = received[0].sequence_number - offset_receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset, inclusive=True)) + offset_receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset, inclusive=True)) async with offset_receiver: received = await offset_receiver.receive(timeout=5) assert len(received) == 1 + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_batch_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), prefetch=500) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), prefetch=500) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -161,6 +166,7 @@ async def test_receive_batch_async(connstr_senders): assert event.sequence_number is not None assert event.offset assert event.enqueued_time + await client.close() async def pump(receiver, sleep=None): @@ -184,9 +190,9 @@ async def test_exclusive_receiver_async(connstr_senders): connection_str, senders = connstr_senders senders[0].send(EventData(b"Receiving only a single event")) - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver1 = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=10, prefetch=5) - receiver2 = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=20, prefetch=10) + client = EventHubClient.from_connection_string(connection_str) + receiver1 = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=10, prefetch=5) + receiver2 = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=20, prefetch=10) try: await pump(receiver1) output2 = await pump(receiver2) @@ -196,6 +202,7 @@ async def test_exclusive_receiver_async(connstr_senders): finally: await receiver1.close() await receiver2.close() + await client.close() @pytest.mark.liveTest @@ -204,12 +211,12 @@ async def test_multiple_receiver_async(connstr_senders): connection_str, senders = connstr_senders senders[0].send(EventData(b"Receiving only a single event")) - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) + client = EventHubClient.from_connection_string(connection_str) partitions = await client.get_properties() assert partitions["partition_ids"] == ["0", "1"] receivers = [] for i in range(2): - receivers.append(client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=10)) + receivers.append(client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=10)) try: more_partitions = await client.get_properties() assert more_partitions["partition_ids"] == ["0", "1"] @@ -221,6 +228,7 @@ async def test_multiple_receiver_async(connstr_senders): finally: for r in receivers: await r.close() + await client.close() @pytest.mark.liveTest @@ -229,9 +237,9 @@ async def test_exclusive_receiver_after_non_exclusive_receiver_async(connstr_sen connection_str, senders = connstr_senders senders[0].send(EventData(b"Receiving only a single event")) - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver1 = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=10) - receiver2 = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=15, prefetch=10) + client = EventHubClient.from_connection_string(connection_str) + receiver1 = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=10) + receiver2 = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=15, prefetch=10) try: await pump(receiver1) output2 = await pump(receiver2) @@ -241,6 +249,7 @@ async def test_exclusive_receiver_after_non_exclusive_receiver_async(connstr_sen finally: await receiver1.close() await receiver2.close() + await client.close() @pytest.mark.liveTest @@ -249,9 +258,9 @@ async def test_non_exclusive_receiver_after_exclusive_receiver_async(connstr_sen connection_str, senders = connstr_senders senders[0].send(EventData(b"Receiving only a single event")) - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver1 = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=15, prefetch=10) - receiver2 = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=10) + client = EventHubClient.from_connection_string(connection_str) + receiver1 = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), owner_level=15, prefetch=10) + receiver2 = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=10) try: output1 = await pump(receiver1) with pytest.raises(ConnectionLostError): @@ -260,6 +269,7 @@ async def test_non_exclusive_receiver_after_exclusive_receiver_async(connstr_sen finally: await receiver1.close() await receiver2.close() + await client.close() @pytest.mark.liveTest @@ -280,8 +290,8 @@ def batched(): ed.application_properties = app_prop yield ed - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), prefetch=500) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), prefetch=500) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -297,14 +307,15 @@ def batched(): assert list(message.body)[0] == "Event Data {}".format(index).encode('utf-8') assert (app_prop_key.encode('utf-8') in message.application_properties) \ and (dict(message.application_properties)[app_prop_key.encode('utf-8')] == app_prop_value.encode('utf-8')) + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_over_websocket_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), prefetch=500) + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest'), prefetch=500) event_list = [] for i in range(20): @@ -320,6 +331,7 @@ async def test_receive_over_websocket_async(connstr_senders): received = await receiver.receive(max_batch_size=50, timeout=5) assert len(received) == 20 + await client.close() @pytest.mark.asyncio @@ -330,11 +342,10 @@ async def test_receive_run_time_metric_async(connstr_senders): if StrictVersion(uamqp_version) < StrictVersion('1.2.3'): pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.") connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, - network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", - event_position=EventPosition('@latest'), prefetch=500, - track_last_enqueued_event_properties=True) + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", + event_position=EventPosition('@latest'), prefetch=500, + track_last_enqueued_event_properties=True) event_list = [] for i in range(20): @@ -355,3 +366,4 @@ async def test_receive_run_time_metric_async(connstr_senders): assert receiver.last_enqueued_event_properties.get('offset', None) assert receiver.last_enqueued_event_properties.get('enqueued_time', None) assert receiver.last_enqueued_event_properties.get('retrieval_time', None) + await client.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receiver_iterator_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receiver_iterator_async.py index 366f1fa80880..e10fc60db09c 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receiver_iterator_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_receiver_iterator_async.py @@ -4,21 +4,18 @@ # license information. #-------------------------------------------------------------------------- -import os -import asyncio import pytest -import time from azure.eventhub import EventData, EventPosition, EventHubError, TransportType -from azure.eventhub.aio import EventHubClient +from azure.eventhub.aio.client_async import EventHubClient @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_iterator_async(connstr_senders): connection_str, senders = connstr_senders - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) + client = EventHubClient.from_connection_string(connection_str) + receiver = client._create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition('@latest')) async with receiver: received = await receiver.receive(timeout=5) assert len(received) == 0 @@ -27,4 +24,5 @@ async def test_receive_iterator_async(connstr_senders): received.append(item) break assert len(received) == 1 - assert list(received[-1].body)[0] == b"Receiving only a single event" \ No newline at end of file + assert list(received[-1].body)[0] == b"Receiving only a single event" + await client.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py index 24258bc532ae..b260af3c6b07 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py @@ -4,24 +4,20 @@ # license information. #-------------------------------------------------------------------------- -import os -import time + import asyncio import pytest -from azure.eventhub import ( - EventData, - EventPosition, - EventHubError) -from azure.eventhub.aio import EventHubClient +from azure.eventhub import EventData +from azure.eventhub.aio.client_async import EventHubClient @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_with_long_interval_async(connstr_receivers, sleep): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() try: await sender.send(EventData(b"A single event")) for _ in range(1): @@ -40,3 +36,4 @@ async def test_send_with_long_interval_async(connstr_receivers, sleep): received.extend(r.receive(timeout=5)) assert len(received) == 2 assert list(received[0].body)[0] == b"A single event" + await client.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_send_async.py index 306e547ac439..a4ca028f4f16 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_send_async.py @@ -12,15 +12,15 @@ import json from azure.eventhub import EventData, TransportType -from azure.eventhub.aio import EventHubClient +from azure.eventhub.aio.client_async import EventHubClient @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_with_partition_key_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() async with sender: data_val = 0 @@ -41,14 +41,15 @@ async def test_send_with_partition_key_async(connstr_receivers): assert existing == index except KeyError: found_partition_keys[message.partition_key] = index + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_and_receive_zero_length_body_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() async with sender: await sender.send(EventData("")) @@ -58,14 +59,15 @@ async def test_send_and_receive_zero_length_body_async(connstr_receivers): assert len(received) == 1 assert list(received[0].body)[0] == b"" + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_single_event_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() async with sender: await sender.send(EventData(b"A single event")) @@ -75,6 +77,7 @@ async def test_send_single_event_async(connstr_receivers): assert len(received) == 1 assert list(received[0].body)[0] == b"A single event" + await client.close() @pytest.mark.liveTest @@ -86,8 +89,8 @@ def batched(): for i in range(10): yield EventData("Event number {}".format(i)) - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() async with sender: await sender.send(batched()) @@ -99,14 +102,15 @@ def batched(): assert len(received) == 10 for index, message in enumerate(received): assert list(message.body)[0] == "Event number {}".format(index).encode('utf-8') + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_partition_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer(partition_id="1") + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer(partition_id="1") async with sender: await sender.send(EventData(b"Data")) @@ -114,14 +118,15 @@ async def test_send_partition_async(connstr_receivers): assert len(partition_0) == 0 partition_1 = receivers[1].receive(timeout=2) assert len(partition_1) == 1 + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_non_ascii_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer(partition_id="0") + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer(partition_id="0") async with sender: await sender.send(EventData("é,è,à,ù,â,ê,î,ô,û")) await sender.send(EventData(json.dumps({"foo": "漢字"}))) @@ -130,6 +135,7 @@ async def test_send_non_ascii_async(connstr_receivers): assert len(partition_0) == 2 assert partition_0[0].body_as_str() == "é,è,à,ù,â,ê,î,ô,û" assert partition_0[1].body_as_json() == {"foo": "漢字"} + await client.close() @pytest.mark.liveTest @@ -141,8 +147,8 @@ def batched(): for i in range(10): yield EventData("Event number {}".format(i)) - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer(partition_id="1") + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer(partition_id="1") async with sender: await sender.send(batched()) @@ -150,14 +156,15 @@ def batched(): assert len(partition_0) == 0 partition_1 = receivers[1].receive(timeout=2) assert len(partition_1) == 10 + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_array_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() async with sender: await sender.send(EventData([b"A", b"B", b"C"])) @@ -167,15 +174,16 @@ async def test_send_array_async(connstr_receivers): assert len(received) == 1 assert list(received[0].body) == [b"A", b"B", b"C"] + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_multiple_clients_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender_0 = client.create_producer(partition_id="0") - sender_1 = client.create_producer(partition_id="1") + client = EventHubClient.from_connection_string(connection_str) + sender_0 = client._create_producer(partition_id="0") + sender_1 = client._create_producer(partition_id="1") async with sender_0: await sender_0.send(EventData(b"Message 0")) async with sender_1: @@ -185,6 +193,7 @@ async def test_send_multiple_clients_async(connstr_receivers): assert len(partition_0) == 1 partition_1 = receivers[1].receive(timeout=2) assert len(partition_1) == 1 + await client.close() @pytest.mark.liveTest @@ -205,8 +214,8 @@ def batched(): ed.application_properties = app_prop yield ed - client = EventHubClient.from_connection_string(connection_str, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str) + sender = client._create_producer() async with sender: await sender.send(batched()) @@ -221,14 +230,15 @@ def batched(): assert list(message.body)[0] == "Event number {}".format(index).encode('utf-8') assert (app_prop_key.encode('utf-8') in message.application_properties) \ and (dict(message.application_properties)[app_prop_key.encode('utf-8')] == app_prop_value.encode('utf-8')) + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_over_websocket_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket) + sender = client._create_producer() event_list = [] for i in range(20): @@ -246,16 +256,17 @@ async def test_send_over_websocket_async(connstr_receivers): for r in receivers: r.close() + await client.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_with_create_event_batch_async(connstr_receivers): connection_str, receivers = connstr_receivers - client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) - sender = client.create_producer() + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket) + sender = client._create_producer() - event_data_batch = await sender.create_batch(max_size=100000, partition_key="0") + event_data_batch = await sender.create_batch(max_size=100000) while True: try: event_data_batch.try_add(EventData('A single event data')) @@ -273,3 +284,4 @@ async def test_send_with_create_event_batch_async(connstr_receivers): await sender.send(event_data_batch) await sender.close() + await client.close() diff --git a/shared_requirements.txt b/shared_requirements.txt index d950629eff9b..b8afebf8da6d 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -113,8 +113,8 @@ opencensus>=0.6.0 opencensus-ext-threading opencensus-ext-azure>=0.3.1 #override azure-cognitiveservices-inkrecognizer azure-core<2.0.0,>=1.0.0b2 +#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<13.0.0,>=12.0.0 #override azure-eventhub-checkpointstoreblob azure-storage-blob<13.0.0,>=12.0.0 -#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<=12.1,>=12.0.0b2 #override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0 #override azure-eventhub uamqp<2.0,>=1.2.3 #override azure-appconfiguration msrest>=0.6.10