Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 2019-11-04 1.0.0b5

**New features**

- Added method `list_checkpoints` which list all the checkpoints under given eventhub namespace, eventhub name and consumer group.

## 1.0.0b4 (2019-10-09)
This release has trivial internal changes only. No feature changes.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ include *.md
include azure/__init__.py
include azure/eventhub/__init__.py
include azure/eventhub/extensions/__init__.py
recursive-include tests *.py *.yaml
recursive-include samples *.py
59 changes: 30 additions & 29 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Azure EventHubs Checkpoint Store client library for Python using Storage Blobs

Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs.
This Checkpoint Store package works as a plug-in package to `EventProcessor`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.
This Checkpoint Store package works as a plug-in package to `EventHubConsumerClient`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)
Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob).

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.aio.html#azure.eventhub.aio.PartitionManager) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

## Getting started

Expand Down Expand Up @@ -47,8 +49,8 @@ sequence number and the timestamp of when it was enqueued.

## Examples
- [Create an Azure Storage Blobs `ContainerClient`](#create-an-azure-storage-blobs-containerclient)
- [Create an Azure EventHubs `EventHubClient`](#create-an-eventhubclient)
- [Consume events using an `EventProessor` that uses a `BlobPartitionManager`](#consume-events-using-an-eventprocessor-that-uses-a-blobpartitionmanager-to-do-checkpointing)
- [Create an Azure EventHubs `EventHubConsumerClient`](#create-an-eventhubconsumerclient)
- [Consume events using a `BlobPartitionManager`](#consume-events-using-a-blobpartitionmanager-to-do-checkpoint)

### Create an Azure Storage Blobs `ContainerClient`
The easiest way to create a `ContainerClient` is to use a connection string.
Expand All @@ -58,43 +60,43 @@ container_client = ContainerClient.from_connection_string("my_storageacount_conn
```
For other ways of creating a `ContainerClient`, go to [Blob Storage library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob) for more details.

### Create an `EventHubClient`
The easiest way to create a `EventHubClient` is to use a connection string.
### Create an `EventHubConsumerClient`
The easiest way to create a `EventHubConsumerClient` is to use a connection string.
```python
from azure.eventhub.aio import EventHubClient
eventhub_client = EventHubClient.from_connection_string("my_eventhub_namespace_connection_string", event_hub_path="myeventhub")
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", event_hub_path="myeventhub")
```
For other ways of creating a `EventHubClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details.
For other ways of creating a `EventHubConsumerClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details.

### Consume events using an `EventProcessor` that uses a `BlobPartitionManager` to do checkpointing
### Consume events using a `BlobPartitionManager` to do checkpoint
```python
import asyncio

from azure.eventhub.aio import EventHubClient
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor
from azure.eventhub.aio import EventHubConsumerClient
from azure.storage.blob.aio import ContainerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager

eventhub_connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
storage_container_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
storage_container_name = '<< STORAGE CONTAINER NAME>>'

class MyPartitionProcessor(PartitionProcessor):
async def process_events(self, events, partition_context):
if events:
# write your code here to process events
# save checkpoint to the data store
await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)
async def do_operation(events):
# do some operations to the events.
pass

async def process_events(partition_context, events):
await do_operation(events)
partition_context.update_checkpoint(events[-1])

async def main():
eventhub_client = EventHubClient.from_connection_string(eventhub_connection_str, receive_timeout=5, retry_total=3)
storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, storage_container_name)
partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save
event_processor = EventProcessor(eventhub_client, "$default", MyPartitionProcessor, partition_manager)
async with storage_container_client:
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(60) # run for a while
await event_processor.stop()
partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save
client = EventHubConsumerClient.from_connection_string(eventhub_connection_str, partition_manager=partition_manager, receive_timeout=5, retry_total=3)

try:
await client.receive(process_events, "$default")
except KeyboardInterrupt:
await client.close()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
Expand All @@ -110,20 +112,19 @@ Refer to [Logging](#logging) to enable loggers for related libraries.
## Next steps

### Examples
- [./examples/eventprocessor/event_processor_blob_storage_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py) - event processor with blob partition manager example
- [./samples/event_processor_blob_storage_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/samples/event_processor_blob_storage_example.py) - EventHubConsumerClient with blob partition manager example

### Documentation

Reference documentation is available at https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub.extensions.html.
Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extensions being empty, this link will not exits. What do you wanted to show here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the link , pointing to the PartitionManager in eventhub docs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still don't see the link updated in the PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's due to the github that can't show the changes.

I checked files in the branch and see the links are up to date.

README of azure-eventhubs-checkpointstoreblob

README of azure-eventhubs-checkpointstoreblob-aio


### Logging

- Enable `azure.eventhub.extensions.checkpointstoreblobaio` logger to collect traces from the library.
- Enable `azure.eventhub.aio.eventprocessor` logger to collect traces from package eventprocessor of the azure-eventhub library.
- Enable `azure.eventhub` logger to collect traces from the main azure-eventhub library.
- Enable `azure.storage.blob` logger to collect traces from azure storage blob library.
- Enable `uamqp` logger to collect traces from the underlying uAMQP library.
- Enable AMQP frame level trace by setting `network_tracing=True` when creating the client.
- Enable AMQP frame level trace by setting `logging_enable=True` when creating the client.

### Provide Feedback

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.0.0b4"
__version__ = "1.0.0b5"

from .blobstoragepm import BlobPartitionManager
from .blobstoragepmaio import BlobPartitionManager

__all__ = [
"BlobPartitionManager",
Expand Down

This file was deleted.

Loading