Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ include *.md
include azure/__init__.py
include azure/eventhub/__init__.py
include azure/eventhub/extensions/__init__.py
recursive-include tests *.py *.yaml
recursive-include samples *.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This Checkpoint Store package works as a plug-in package to `EventHubConsumerCli

Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob).

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)
[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.aio.html#azure.eventhub.aio.PartitionManager) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

## Getting started

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ include *.md
include azure/__init__.py
include azure/eventhub/__init__.py
include azure/eventhub/extensions/__init__.py
recursive-include tests *.py *.yaml
recursive-include samples *.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This Checkpoint Store package works as a plug-in package to `EventHubConsumerCli

Please note that this is a sync library, for async version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob-aio](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio).

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)
[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.html#azure.eventhub.PartitionManager) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

## Getting started

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

**Breaking changes**

- `EventHubClient` has been split into two separate clients: `EventHubProducerClient` and `EventHubConsumerClient`.
- `EventHubClient`, `EventHubConsumer` and `EventHubProducer` has been removed. Use `EventHubProducerClient` and `EventHubConsumerClient` instead.
- Construction of both objects is the same as it was for the previous client.
- Introduced `EventHubProducerClient` as substitution for`EventHubProducer`.
- `EventHubProducerClient` supports sending events to different partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def from_connection_string(cls, conn_str, **kwargs):
:param str conn_str: The connection string of an eventhub.
:keyword str event_hub_path: The path of the specific Event Hub to connect the client to.
:keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
:keyword dict[str,Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys - 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present - 'username', 'password'.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
Expand All @@ -124,6 +124,16 @@ def from_connection_string(cls, conn_str, **kwargs):
:paramtype partition_manager: ~azure.eventhub.PartitionManager
:keyword float load_balancing_interval:
When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10.

.. admonition:: Example:

.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START create_eventhub_consumer_client_from_conn_str_sync]
:end-before: [END create_eventhub_consumer_client_from_conn_str_sync]
:language: python
:dedent: 4
:caption: Create a new instance of the EventHubConsumerClient from connection string.

"""
return super(EventHubConsumerClient, cls).from_connection_string(conn_str, **kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ def update_checkpoint(self, fully_qualified_namespace, eventhub_name, consumer_g
the Event Hubs namespace that contains it.
:param str consumer_group_name: The name of the consumer group the ownership are associated with.
:param str partition_id: The partition id which the checkpoint is created for.
:param str offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:param int sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint
will be associated with.
:param str offset: The offset of the :class:`EventData<azure.eventhub.EventData>`
the new checkpoint will be associated with.
:param int sequence_number: The sequence_number of the :class:`EventData<azure.eventhub.EventData>`
the new checkpoint will be associated with.
:rtype: None
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def from_connection_string(cls, conn_str, **kwargs):
:param str conn_str: The connection string of an eventhub.
:keyword str event_hub_path: The path of the specific Event Hub to connect the client to.
:keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
:keyword dict[str,Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys - 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present - 'username', 'password'.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
Expand All @@ -92,6 +92,15 @@ def from_connection_string(cls, conn_str, **kwargs):
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType

.. admonition:: Example:

.. literalinclude:: ../samples/sync_samples/sample_code_eventhub.py
:start-after: [START create_eventhub_producer_client_from_conn_str_sync]
:end-before: [END create_eventhub_producer_client_from_conn_str_sync]
:language: python
:dedent: 4
:caption: Create a new instance of the EventHubProducerClient from connection string.
"""
return super(EventHubProducerClient, cls).from_connection_string(conn_str, **kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def from_connection_string(cls, conn_str, **kwargs):
:param str conn_str: The connection string of an eventhub.
:keyword str event_hub_path: The path of the specific Event Hub to connect the client to.
:keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
:keyword dict[str,Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys - 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present - 'username', 'password'.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
Expand All @@ -103,6 +103,16 @@ def from_connection_string(cls, conn_str, **kwargs):
:paramtype partition_manager: ~azure.eventhub.aio.PartitionManager
:keyword float load_balancing_interval:
When load balancing kicks in, this is the interval in seconds between two load balancing. Default is 10.

.. admonition:: Example:

.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START create_eventhub_consumer_client_from_conn_str_async]
:end-before: [END create_eventhub_consumer_client_from_conn_str_async]
:language: python
:dedent: 4
:caption: Create a new instance of the EventHubConsumerClient from connection string.

"""
return super(EventHubConsumerClient, cls).from_connection_string(conn_str, **kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def from_connection_string(cls, conn_str, **kwargs):
:param str conn_str: The connection string of an eventhub.
:keyword str event_hub_path: The path of the specific Event Hub to connect the client to.
:keyword bool network_tracing: Whether to output network trace logs to the logger. Default is `False`.
:keyword dict[str, Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
:keyword dict[str,Any] http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys - 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present - 'username', 'password'.
:keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service.
Expand All @@ -92,6 +92,15 @@ def from_connection_string(cls, conn_str, **kwargs):
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Event Hubs service. Default is `TransportType.Amqp`.
:paramtype transport_type: ~azure.eventhub.TransportType

.. admonition:: Example:

.. literalinclude:: ../samples/async_samples/sample_code_eventhub_async.py
:start-after: [START create_eventhub_producer_client_from_conn_str_async]
:end-before: [END create_eventhub_producer_client_from_conn_str_async]
:language: python
:dedent: 4
:caption: Create a new instance of the EventHubProducerClient from connection string.
"""
return super(EventHubProducerClient, cls).from_connection_string(conn_str, **kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ async def update_checkpoint(self, fully_qualified_namespace: str, eventhub_name:
the Event Hubs namespace that contains it.
:param str consumer_group_name: The name of the consumer group the ownership are associated with.
:param str partition_id: The partition id which the checkpoint is created for.
:param str offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:param int sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint
will be associated with.
:param str offset: The offset of the :class:`EventData<azure.eventhub.EventData>`
the new checkpoint will be associated with.
:param int sequence_number: The sequence_number of the :class:`EventData<azure.eventhub.EventData>`
the new checkpoint will be associated with.
:rtype: None
"""

Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ class EventDataBatch(object):

Use `try_add` method to add events until the maximum batch size limit in bytes has been reached -
a `ValueError` will be raised.
Use `send` method of ~azure.eventhub.EventHubProducerClient or ~azure.eventhub.aio.EventHubProducerClient
Use `send` method of :class:`EventHubProducerClient<azure.eventhub.EventHubProducerClient>`
or the async :class:`EventHubProducerClient<azure.eventhub.aio.EventHubProducerClient>`
for sending. The `send` method accepts partition_key as a parameter for sending a particular partition.

**Please use the create_batch method of EventHubProducerClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,65 @@
# license information.
#--------------------------------------------------------------------------

import pytest
import logging
import asyncio


def create_async_eventhub_producer_client():
# [START create_eventhub_producer_client_from_conn_str_async]
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
event_hub = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_str,
event_hub_path=event_hub)
# [END create_eventhub_producer_client_from_conn_str_async]

# [START create_eventhub_producer_client_async]
import os
from azure.eventhub import EventHubSharedKeyCredential
from azure.eventhub.aio import EventHubProducerClient

EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENT_HUB = os.environ['EVENT_HUB_NAME']
hostname = os.environ['EVENT_HUB_HOSTNAME']
event_hub = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR,
event_hub_path=EVENT_HUB)
producer = EventHubProducerClient(host=hostname,
event_hub_path=event_hub,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
# [END create_eventhub_producer_client_async]
return producer


def create_async_eventhub_consumer_client():
# [START create_eventhub_consumer_client_from_conn_str_async]
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
event_hub = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(conn_str=event_hub_connection_str,
event_hub_path=event_hub)
# [END create_eventhub_consumer_client_from_conn_str_async]

# [START create_eventhub_consumer_client_async]
import os
from azure.eventhub import EventHubSharedKeyCredential
from azure.eventhub.aio import EventHubConsumerClient

EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENT_HUB = os.environ['EVENT_HUB_NAME']
hostname = os.environ['EVENT_HUB_HOSTNAME']
event_hub = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=EVENT_HUB_CONNECTION_STR,
event_hub_path=EVENT_HUB
)
consumer = EventHubConsumerClient(host=hostname,
event_hub_path=event_hub,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
# [END create_eventhub_consumer_client_async]
return consumer


async def example_eventhub_async_send_and_receive(live_eventhub_config):
async def example_eventhub_async_send_and_receive():
producer = create_async_eventhub_producer_client()
consumer = create_async_eventhub_consumer_client()
try:
Expand Down Expand Up @@ -70,25 +93,23 @@ async def on_events(partition_context, events):
len(events), partition_context.partition_id))
# Do ops on received events
async with consumer:
task = asyncio.ensure_future(consumer.receive(on_events=on_events, consumer_group="$default"))
await asyncio.sleep(3) # keep receiving for 3 seconds
task.cancel() # stop receiving
await consumer.receive(on_events=on_events, consumer_group="$default")
# [END eventhub_consumer_client_receive_async]
finally:
pass


async def example_eventhub_async_producer_ops(live_eventhub_config, connection_str):
async def example_eventhub_async_producer_ops():
# [START eventhub_producer_client_close_async]
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENT_HUB = os.environ['EVENT_HUB_NAME']
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
event_hub = os.environ['EVENT_HUB_NAME']

producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STR,
event_hub_path=EVENT_HUB)
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_str,
event_hub_path=event_hub)
try:
await producer.send(EventData(b"A single event"))
finally:
Expand All @@ -97,17 +118,17 @@ async def example_eventhub_async_producer_ops(live_eventhub_config, connection_s
# [END eventhub_producer_client_close_async]


async def example_eventhub_async_consumer_ops(live_eventhub_config, connection_str):
async def example_eventhub_async_consumer_ops():
# [START eventhub_consumer_client_close_async]
import os

EVENT_HUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENT_HUB = os.environ['EVENT_HUB_NAME']
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
event_hub = os.environ['EVENT_HUB_NAME']

from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=EVENT_HUB_CONNECTION_STR,
event_hub_path=EVENT_HUB
conn_str=event_hub_connection_str,
event_hub_path=event_hub
)

logger = logging.getLogger("azure.eventhub")
Expand All @@ -127,3 +148,9 @@ async def on_events(partition_context, events):
# Close down the consumer handler explicitly.
await consumer.close()
# [END eventhub_consumer_client_close_async]

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(example_eventhub_async_producer_ops())
loop.run_until_complete(example_eventhub_async_consumer_ops())
# loop.run_until_complete(example_eventhub_async_send_and_receive())
Loading