Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ async def send(self, event_data, partition_key=None):

Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
:start-after: [START eventhub_client_sync_send]
:end-before: [END eventhub_client_sync_send]
:start-after: [START eventhub_client_async_send]
:end-before: [END eventhub_client_async_send]
:language: python
:dedent: 4
:caption: Sends an event data and blocks until acknowledgement is received or operation times out.
Expand Down
18 changes: 18 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ def __init__(self, host, event_hub_path, credential, **kwargs):
:type transport_type: ~azure.eventhub.TransportType
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but
will return as soon as service returns no new events. Default value is the same as prefetch.
:type max_batch_size: int
:param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub.
Default value is 0 seconds.
:type receive_timeout: int
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
Expand Down Expand Up @@ -177,6 +183,12 @@ def from_connection_string(cls, conn_str, event_hub_path=None, **kwargs):
:type transport_type: ~azure.eventhub.TransportType
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but
will return as soon as service returns no new events. Default value is the same as prefetch.
:type max_batch_size: int
:param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub.
Default value is 0 seconds.
:type receive_timeout: int
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
Expand Down Expand Up @@ -227,6 +239,12 @@ def from_iothub_connection_string(cls, conn_str, **kwargs):
:type transport_type: ~azure.eventhub.TransportType
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but
will return as soon as service returns no new events. Default value is the same as prefetch.
:type max_batch_size: int
:param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub.
Default value is 0 seconds.
:type receive_timeout: int
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,62 +31,54 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config):

# [START create_eventhub_client_async_sender]
client = EventHubClient.from_connection_string(connection_str)
# Add a async sender to the async client object.
# Create an async sender.
sender = client.create_sender(partition_id="0")
# [END create_eventhub_client_async_sender]

# [START create_eventhub_client_async_receiver]
client = EventHubClient.from_connection_string(connection_str)
# Add a async receiver to the async client object.
# Create an async receiver.
receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest'))
# Create an exclusive async receiver.
receiver = client.create_receiver(partition_id="0", event_position=EventPosition('@latest'), exclusive_receiver_priority=1)
# [END create_eventhub_client_async_receiver]

# [START create_eventhub_client_async_epoch_receiver]
client = EventHubClient.from_connection_string(connection_str)
# Add a async receiver to the async client object.
epoch_receiver = client.create_receiver(consumer_group="$default", partition_id="0", exclusive_receiver_priority=42)
# [END create_eventhub_client_async_epoch_receiver]

client = EventHubClient.from_connection_string(connection_str)
sender = client.create_sender(partition_id="0")
receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest'))

await receiver.receive(timeout=1)

# [START eventhub_client_async_send]
async with sender:
# [START eventhub_client_async_send]
event_data = EventData(b"A single event")
await sender.send(event_data)
# [END eventhub_client_async_send]
time.sleep(1)
# [START eventhub_client_async_receive]
logger = logging.getLogger("azure.eventhub")
async with receiver:
received = await receiver.receive(timeout=5)
for event_data in received:
logger.info("Message received:{}".format(event_data.body_as_str()))
# [END eventhub_client_async_receive]
assert len(received) == 1
assert received[0].body_as_str() == "A single event"
assert list(received[-1].body)[0] == b"A single event"
# [END eventhub_client_async_send]

await asyncio.sleep(1)

# [START eventhub_client_async_receive]
logger = logging.getLogger("azure.eventhub")
async with receiver:
received = await receiver.receive(timeout=5)
for event_data in received:
logger.info("Message received:{}".format(event_data.body_as_str()))
# [END eventhub_client_async_receive]
assert len(received) == 1
assert received[0].body_as_str() == "A single event"
assert list(received[-1].body)[0] == b"A single event"


@pytest.mark.asyncio
async def test_example_eventhub_async_sender_ops(live_eventhub_config, connection_str):
import os
# [START create_eventhub_client_async_sender_instance]
from azure.eventhub.aio import EventHubClient
from azure.eventhub import EventData

client = EventHubClient.from_connection_string(connection_str)
sender = client.create_sender(partition_id="0")
# [END create_eventhub_client_async_sender_instance]

# [START eventhub_client_async_sender_close]
client = EventHubClient.from_connection_string(connection_str)
sender = client.create_sender(partition_id="0")
try:
# Open the Async EventSender using the supplied conneciton and receive.
await sender.send(EventData(b"A single event"))
except:
raise
finally:
# Close down the send handler.
await sender.close()
Expand All @@ -95,28 +87,9 @@ async def test_example_eventhub_async_sender_ops(live_eventhub_config, connectio

@pytest.mark.asyncio
async def test_example_eventhub_async_receiver_ops(live_eventhub_config, connection_str):
import os
# [START create_eventhub_client_async_receiver_instance]
from azure.eventhub.aio import EventHubClient
from azure.eventhub import EventPosition

client = EventHubClient.from_connection_string(connection_str)
receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest'))
# [END create_eventhub_client_async_receiver_instance]

# [START eventhub_client_async_receiver_open]
client = EventHubClient.from_connection_string(connection_str)
receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest'))
try:
# Open and receive
await receiver.receive(timeout=1)
except:
raise
finally:
# Close down the receive handler.
await receiver.close()
# [END eventhub_client_async_receiver_open]

# [START eventhub_client_async_receiver_close]
client = EventHubClient.from_connection_string(connection_str)
receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest'))
Expand All @@ -128,4 +101,4 @@ async def test_example_eventhub_async_receiver_ops(live_eventhub_config, connect
finally:
# Close down the receive handler.
await receiver.close()
# [END eventhub_client_async_receiver_close]
# [END eventhub_client_async_receiver_close]
Loading