Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ partition_ids = client.get_partition_ids()

Publish events to an Event Hub.

#### Send a single event or an array of events

```python
from azure.eventhub import EventHubClient, EventData

Expand All @@ -130,6 +132,34 @@ finally:
pass
```

#### Send a batch of events

Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached.
```python
from azure.eventhub import EventHubClient, EventData

try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubClient.from_connection_string(connection_str, event_hub_path)
producer = client.create_producer(partition_id="0")

event_data_batch = producer.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.

with producer:
producer.send(event_data_batch)
except:
raise
finally:
pass
```

### Consume events from an Event Hub

Consume events from an Event Hub.
Expand Down Expand Up @@ -159,6 +189,7 @@ finally:

Publish events to an Event Hub asynchronously.

#### Send a single event or an array of events
```python
from azure.eventhub.aio import EventHubClient
from azure.eventhub import EventData
Expand All @@ -174,7 +205,37 @@ try:
event_list.append(EventData(b"A single event"))

async with producer:
await producer.send(event_list)
await producer.send(event_list) # Send a list of events
await producer.send(EventData(b"A single event")) # Send a single event
except:
raise
finally:
pass
```

#### Send a batch of events

Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached.
```python
from azure.eventhub.aio import EventHubClient
from azure.eventhub import EventData

try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubClient.from_connection_string(connection_str, event_hub_path)
producer = client.create_producer(partition_id="0")

event_data_batch = await producer.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.

async with producer:
await producer.send(event_data_batch)
except:
raise
finally:
Expand Down Expand Up @@ -213,9 +274,11 @@ Using an `EventHubConsumer` to consume events like in the previous examples puts

The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

Load balancing is typically useful when running multiple instances of `EventProcessor` across multiple processes or even machines. It is recommended to store checkpoints to a persistent store when running in production. Search pypi with the prefix `azure-eventhubs-checkpoint` to find packages that support persistent storage of checkpoints.

You can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.

[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is another `PartitionManager` implementation that allows multiple EventProcessors to share the load balancing and checkpoint data in a central storage.
[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is one of the `PartitionManager` implementation we provide that applies Azure Blob Storage as the persistent store.


```python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ def __init__(
:param partition_processor_type: A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor.
:type partition_processor_type: type
:param partition_manager: Interacts with the data storage that stores ownership and checkpoints data.
~azure.eventhub.aio.eventprocessor.SamplePartitionManager can be used to save data in memory or to a file.
More sophisticated partition managers are / will be provided as plug-ins. Users can also develop their own
partition managers.
~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of `PartitionManager`
which stores data in memory or a file.
Users can either use the provided `PartitionManager` plug-ins or develop their own `PartitionManager`.
:type partition_manager: Subclass of ~azure.eventhub.eventprocessor.PartitionManager.
:param initial_event_position: The event position to start a partition consumer.
if the partition has no checkpoint yet. This could be replaced by "reset" checkpoint in the near future.
Expand All @@ -119,10 +119,10 @@ def __repr__(self):
return 'EventProcessor: id {}'.format(self._id)

async def start(self):
"""Start the EventProcessor
"""Start the EventProcessor.

This EventProcessor will then start to balance partition ownership with other EventProcessors
and asynchronously start to receive EventData from EventHub and process events.
The EventProcessor will try to claim and balance partition ownership with other `EventProcessor`
and asynchronously start receiving EventData from EventHub and processing events.

:return: None

Expand Down Expand Up @@ -155,13 +155,13 @@ async def start(self):
await asyncio.sleep(self._polling_interval)

async def stop(self):
"""Stop the EventProcessor
"""Stop the EventProcessor.

This EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions
The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions
it is working on.
If other EventProcessors are still working, they will take over these partitions.
Other running EventProcessor will take over these released partitions.

A stopped EventProcessor can be restarted by calling method start() again.
A stopped EventProcessor can be restarted by calling method `start` again.

:return: None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ class PartitionProcessor(ABC):
"""

async def initialize(self, partition_context: PartitionContext):
"""Called when EventProcessor creates this PartitionProcessor.
"""This method will be called when `EventProcessor` creates a `PartitionProcessor`.

:param partition_context: The context information of this partition.
:type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext
"""

# Please put the code for initialization of PartitionProcessor here.

async def close(self, reason, partition_context: PartitionContext):
"""Called when EventProcessor stops processing this PartitionProcessor.

Expand All @@ -46,6 +48,8 @@ async def close(self, reason, partition_context: PartitionContext):

"""

# Please put the code for closing PartitionProcessor here.

@abstractmethod
async def process_events(self, events: List[EventData], partition_context: PartitionContext):
"""Called when a batch of events have been received.
Expand All @@ -58,6 +62,8 @@ async def process_events(self, events: List[EventData], partition_context: Parti

"""

# Please put the code for processing events here.

async def process_error(self, error, partition_context: PartitionContext):
"""Called when an error happens when receiving or processing events

Expand All @@ -68,3 +74,5 @@ async def process_error(self, error, partition_context: PartitionContext):
:type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext

"""

# Please put the code for processing error here.
20 changes: 8 additions & 12 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,18 +261,14 @@ def encode_message(self):

class EventDataBatch(object):
"""
It's much faster to send EventData in a batch than individually. But putting too much EventData in one batch
may exceed the frame size limit of the event hub.
EventDataBatch helps you build the maximum allowed size batch of EventData to improve performance
within the size limit

Use create_batch method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer
to create an EventDataBatch object. It retrieves the frame size limit from the service.
Use method EventDataBatch.try_add to build the list until a ValueError is raised,
and use send method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer
to send out the EventData batch to EventHub

Do not instantiate an EventDataBatch object using its constructor.
Sending events in batch get better performance than sending individual events.
EventDataBatch helps you create the maximum allowed size batch of `EventData` to improve sending performance.

Use `try_add` method to add events until the maximum batch size limit in bytes has been reached - a `ValueError` will be raised.
Use `send` method of ~azure.eventhub.EventHubProducer or ~azure.eventhub.aio.EventHubProducer for sending.

Please use the `create_batch` method of `EventHubProducer`
to create an `EventDataBatch` object instead of instantiating an `EventDataBatch` object directly.
"""

def __init__(self, max_size=None, partition_key=None):
Expand Down