From 3bc55fa5a28bccca33ef2ae93c42d11bc0ae4c2b Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 10 Sep 2019 00:44:06 -0700 Subject: [PATCH] Update docstring --- sdk/eventhub/azure-eventhubs/README.md | 67 ++++++++++++++++++- .../aio/eventprocessor/event_processor.py | 20 +++--- .../aio/eventprocessor/partition_processor.py | 10 ++- .../azure-eventhubs/azure/eventhub/common.py | 20 +++--- 4 files changed, 92 insertions(+), 25 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/README.md b/sdk/eventhub/azure-eventhubs/README.md index e3b99aad051d..6dc04539f0b8 100644 --- a/sdk/eventhub/azure-eventhubs/README.md +++ b/sdk/eventhub/azure-eventhubs/README.md @@ -109,6 +109,8 @@ partition_ids = client.get_partition_ids() Publish events to an Event Hub. +#### Send a single event or an array of events + ```python from azure.eventhub import EventHubClient, EventData @@ -130,6 +132,34 @@ finally: pass ``` +#### Send a batch of events + +Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached. +```python +from azure.eventhub import EventHubClient, EventData + +try: + connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' + event_hub_path = '<< NAME OF THE EVENT HUB >>' + client = EventHubClient.from_connection_string(connection_str, event_hub_path) + producer = client.create_producer(partition_id="0") + + event_data_batch = producer.create_batch(max_size=10000) + can_add = True + while can_add: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + can_add = False # EventDataBatch object reaches max_size. + + with producer: + producer.send(event_data_batch) +except: + raise +finally: + pass +``` + ### Consume events from an Event Hub Consume events from an Event Hub. @@ -159,6 +189,7 @@ finally: Publish events to an Event Hub asynchronously. +#### Send a single event or an array of events ```python from azure.eventhub.aio import EventHubClient from azure.eventhub import EventData @@ -174,7 +205,37 @@ try: event_list.append(EventData(b"A single event")) async with producer: - await producer.send(event_list) + await producer.send(event_list) # Send a list of events + await producer.send(EventData(b"A single event")) # Send a single event +except: + raise +finally: + pass +``` + +#### Send a batch of events + +Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached. +```python +from azure.eventhub.aio import EventHubClient +from azure.eventhub import EventData + +try: + connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' + event_hub_path = '<< NAME OF THE EVENT HUB >>' + client = EventHubClient.from_connection_string(connection_str, event_hub_path) + producer = client.create_producer(partition_id="0") + + event_data_batch = await producer.create_batch(max_size=10000) + can_add = True + while can_add: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + can_add = False # EventDataBatch object reaches max_size. + + async with producer: + await producer.send(event_data_batch) except: raise finally: @@ -213,9 +274,11 @@ Using an `EventHubConsumer` to consume events like in the previous examples puts The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing. +Load balancing is typically useful when running multiple instances of `EventProcessor` across multiple processes or even machines. It is recommended to store checkpoints to a persistent store when running in production. Search pypi with the prefix `azure-eventhubs-checkpoint` to find packages that support persistent storage of checkpoints. + You can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory. -[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is another `PartitionManager` implementation that allows multiple EventProcessors to share the load balancing and checkpoint data in a central storage. +[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is one of the `PartitionManager` implementation we provide that applies Azure Blob Storage as the persistent store. ```python diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index ec65d0453d5c..18446249ff23 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -91,9 +91,9 @@ def __init__( :param partition_processor_type: A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor. :type partition_processor_type: type :param partition_manager: Interacts with the data storage that stores ownership and checkpoints data. - ~azure.eventhub.aio.eventprocessor.SamplePartitionManager can be used to save data in memory or to a file. - More sophisticated partition managers are / will be provided as plug-ins. Users can also develop their own - partition managers. + ~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of `PartitionManager` + which stores data in memory or a file. + Users can either use the provided `PartitionManager` plug-ins or develop their own `PartitionManager`. :type partition_manager: Subclass of ~azure.eventhub.eventprocessor.PartitionManager. :param initial_event_position: The event position to start a partition consumer. if the partition has no checkpoint yet. This could be replaced by "reset" checkpoint in the near future. @@ -119,10 +119,10 @@ def __repr__(self): return 'EventProcessor: id {}'.format(self._id) async def start(self): - """Start the EventProcessor + """Start the EventProcessor. - This EventProcessor will then start to balance partition ownership with other EventProcessors - and asynchronously start to receive EventData from EventHub and process events. + The EventProcessor will try to claim and balance partition ownership with other `EventProcessor` + and asynchronously start receiving EventData from EventHub and processing events. :return: None @@ -155,13 +155,13 @@ async def start(self): await asyncio.sleep(self._polling_interval) async def stop(self): - """Stop the EventProcessor + """Stop the EventProcessor. - This EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions + The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions it is working on. - If other EventProcessors are still working, they will take over these partitions. + Other running EventProcessor will take over these released partitions. - A stopped EventProcessor can be restarted by calling method start() again. + A stopped EventProcessor can be restarted by calling method `start` again. :return: None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py index 9481c032eb86..a16be38e6220 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/partition_processor.py @@ -26,12 +26,14 @@ class PartitionProcessor(ABC): """ async def initialize(self, partition_context: PartitionContext): - """Called when EventProcessor creates this PartitionProcessor. + """This method will be called when `EventProcessor` creates a `PartitionProcessor`. :param partition_context: The context information of this partition. :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """ + # Please put the code for initialization of PartitionProcessor here. + async def close(self, reason, partition_context: PartitionContext): """Called when EventProcessor stops processing this PartitionProcessor. @@ -46,6 +48,8 @@ async def close(self, reason, partition_context: PartitionContext): """ + # Please put the code for closing PartitionProcessor here. + @abstractmethod async def process_events(self, events: List[EventData], partition_context: PartitionContext): """Called when a batch of events have been received. @@ -58,6 +62,8 @@ async def process_events(self, events: List[EventData], partition_context: Parti """ + # Please put the code for processing events here. + async def process_error(self, error, partition_context: PartitionContext): """Called when an error happens when receiving or processing events @@ -68,3 +74,5 @@ async def process_error(self, error, partition_context: PartitionContext): :type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext """ + + # Please put the code for processing error here. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 7a25fbdd7bac..a9656eb773d7 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -261,18 +261,14 @@ def encode_message(self): class EventDataBatch(object): """ - It's much faster to send EventData in a batch than individually. But putting too much EventData in one batch - may exceed the frame size limit of the event hub. - EventDataBatch helps you build the maximum allowed size batch of EventData to improve performance - within the size limit - - Use create_batch method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer - to create an EventDataBatch object. It retrieves the frame size limit from the service. - Use method EventDataBatch.try_add to build the list until a ValueError is raised, - and use send method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer - to send out the EventData batch to EventHub - - Do not instantiate an EventDataBatch object using its constructor. + Sending events in batch get better performance than sending individual events. + EventDataBatch helps you create the maximum allowed size batch of `EventData` to improve sending performance. + + Use `try_add` method to add events until the maximum batch size limit in bytes has been reached - a `ValueError` will be raised. + Use `send` method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer for sending. + + Please use the `create_batch` method of `EventHubProducer` + to create an `EventDataBatch` object instead of instantiating an `EventDataBatch` object directly. """ def __init__(self, max_size=None, partition_key=None):