diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py index b03db7959db5..3006e95dac7b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/sender_async.py @@ -321,8 +321,8 @@ async def send(self, event_data, partition_key=None): Example: .. literalinclude:: ../examples/test_examples_eventhub.py - :start-after: [START eventhub_client_sync_send] - :end-before: [END eventhub_client_sync_send] + :start-after: [START eventhub_client_async_send] + :end-before: [END eventhub_client_async_send] :language: python :dedent: 4 :caption: Sends an event data and blocks until acknowledgement is received or operation times out. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 5911c54bab0f..7349adcb9b0a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -119,6 +119,12 @@ def __init__(self, host, event_hub_path, credential, **kwargs): :type transport_type: ~azure.eventhub.TransportType :param prefetch: The message prefetch count of the receiver. Default is 300. :type prefetch: int + :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but + will return as soon as service returns no new events. Default value is the same as prefetch. + :type max_batch_size: int + :param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub. + Default value is 0 seconds. + :type receive_timeout: int :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: int @@ -177,6 +183,12 @@ def from_connection_string(cls, conn_str, event_hub_path=None, **kwargs): :type transport_type: ~azure.eventhub.TransportType :param prefetch: The message prefetch count of the receiver. Default is 300. :type prefetch: int + :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but + will return as soon as service returns no new events. Default value is the same as prefetch. + :type max_batch_size: int + :param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub. + Default value is 0 seconds. + :type receive_timeout: int :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: int @@ -227,6 +239,12 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): :type transport_type: ~azure.eventhub.TransportType :param prefetch: The message prefetch count of the receiver. Default is 300. :type prefetch: int + :param max_batch_size: Receive a batch of events. Batch size will be up to the maximum specified, but + will return as soon as service returns no new events. Default value is the same as prefetch. + :type max_batch_size: int + :param receive_timeout: The timeout time in seconds to receive a batch of events from an Event Hub. + Default value is 0 seconds. + :type receive_timeout: int :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is queued. Default value is 60 seconds. If set to 0, there will be no timeout. :type send_timeout: int diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py index 65b0d8be4bbe..4a36faa88947 100644 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py @@ -31,62 +31,54 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config): # [START create_eventhub_client_async_sender] client = EventHubClient.from_connection_string(connection_str) - # Add a async sender to the async client object. + # Create an async sender. sender = client.create_sender(partition_id="0") # [END create_eventhub_client_async_sender] # [START create_eventhub_client_async_receiver] client = EventHubClient.from_connection_string(connection_str) - # Add a async receiver to the async client object. + # Create an async receiver. receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest')) + # Create an exclusive async receiver. + receiver = client.create_receiver(partition_id="0", event_position=EventPosition('@latest'), exclusive_receiver_priority=1) # [END create_eventhub_client_async_receiver] - # [START create_eventhub_client_async_epoch_receiver] - client = EventHubClient.from_connection_string(connection_str) - # Add a async receiver to the async client object. - epoch_receiver = client.create_receiver(consumer_group="$default", partition_id="0", exclusive_receiver_priority=42) - # [END create_eventhub_client_async_epoch_receiver] - client = EventHubClient.from_connection_string(connection_str) sender = client.create_sender(partition_id="0") receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest')) + + await receiver.receive(timeout=1) + + # [START eventhub_client_async_send] async with sender: - # [START eventhub_client_async_send] event_data = EventData(b"A single event") await sender.send(event_data) - # [END eventhub_client_async_send] - time.sleep(1) - # [START eventhub_client_async_receive] - logger = logging.getLogger("azure.eventhub") - async with receiver: - received = await receiver.receive(timeout=5) - for event_data in received: - logger.info("Message received:{}".format(event_data.body_as_str())) - # [END eventhub_client_async_receive] - assert len(received) == 1 - assert received[0].body_as_str() == "A single event" - assert list(received[-1].body)[0] == b"A single event" + # [END eventhub_client_async_send] + + await asyncio.sleep(1) + + # [START eventhub_client_async_receive] + logger = logging.getLogger("azure.eventhub") + async with receiver: + received = await receiver.receive(timeout=5) + for event_data in received: + logger.info("Message received:{}".format(event_data.body_as_str())) + # [END eventhub_client_async_receive] + assert len(received) == 1 + assert received[0].body_as_str() == "A single event" + assert list(received[-1].body)[0] == b"A single event" @pytest.mark.asyncio async def test_example_eventhub_async_sender_ops(live_eventhub_config, connection_str): - import os - # [START create_eventhub_client_async_sender_instance] from azure.eventhub.aio import EventHubClient from azure.eventhub import EventData - client = EventHubClient.from_connection_string(connection_str) - sender = client.create_sender(partition_id="0") - # [END create_eventhub_client_async_sender_instance] - # [START eventhub_client_async_sender_close] client = EventHubClient.from_connection_string(connection_str) sender = client.create_sender(partition_id="0") try: - # Open the Async EventSender using the supplied conneciton and receive. await sender.send(EventData(b"A single event")) - except: - raise finally: # Close down the send handler. await sender.close() @@ -95,28 +87,9 @@ async def test_example_eventhub_async_sender_ops(live_eventhub_config, connectio @pytest.mark.asyncio async def test_example_eventhub_async_receiver_ops(live_eventhub_config, connection_str): - import os - # [START create_eventhub_client_async_receiver_instance] from azure.eventhub.aio import EventHubClient from azure.eventhub import EventPosition - client = EventHubClient.from_connection_string(connection_str) - receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest')) - # [END create_eventhub_client_async_receiver_instance] - - # [START eventhub_client_async_receiver_open] - client = EventHubClient.from_connection_string(connection_str) - receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest')) - try: - # Open and receive - await receiver.receive(timeout=1) - except: - raise - finally: - # Close down the receive handler. - await receiver.close() - # [END eventhub_client_async_receiver_open] - # [START eventhub_client_async_receiver_close] client = EventHubClient.from_connection_string(connection_str) receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest')) @@ -128,4 +101,4 @@ async def test_example_eventhub_async_receiver_ops(live_eventhub_config, connect finally: # Close down the receive handler. await receiver.close() - # [END eventhub_client_async_receiver_close] \ No newline at end of file + # [END eventhub_client_async_receiver_close] diff --git a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py index 41e07d034e25..fd1c1ba1773e 100644 --- a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py +++ b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py @@ -16,34 +16,22 @@ def create_eventhub_client(live_eventhub_config): # [START create_eventhub_client] import os - from azure.eventhub import EventHubClient + from azure.eventhub import EventHubClient, EventHubSharedKeyCredential - address = os.environ['EVENT_HUB_ADDRESS'] + host = os.environ['EVENT_HUB_HOSTNAME'] + event_hub_path = os.environ['EVENT_HUB_NAME'] shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY'] shared_access_key = os.environ['EVENT_HUB_SAS_KEY'] client = EventHubClient( - address=address, - username=shared_access_policy, - password=shared_access_key) + host=host, + event_hub_path=event_hub_path, + credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key) + ) # [END create_eventhub_client] return client -def create_eventhub_client_from_sas_token(live_eventhub_config): - # [START create_eventhub_client_sas_token] - import os - from azure.eventhub import EventHubClient - - address = os.environ['EVENT_HUB_ADDRESS'] - sas_token = os.environ['EVENT_HUB_SAS_TOKEN'] - - client = EventHubClient.from_sas_token( - address=address, - sas_token=sas_token) - # [END create_eventhub_client_sas_token] - - def create_eventhub_client_from_iothub_connection_string(live_eventhub_config): # [START create_eventhub_client_iot_connstr] import os @@ -67,163 +55,82 @@ def test_example_eventhub_sync_send_and_receive(live_eventhub_config): client = EventHubClient.from_connection_string(connection_str) # [END create_eventhub_client_connstr] - from azure.eventhub import EventData, Offset + from azure.eventhub import EventData, EventPosition # [START create_eventhub_client_sender] - client = EventHubClient.from_connection_string(connection_str) - # Add a sender to the client object. - sender = client.add_sender(partition="0") + client = EventHubClient.from_connection_string(connection_str) + # Create a sender. + sender = client.create_sender(partition_id="0") # [END create_eventhub_client_sender] # [START create_eventhub_client_receiver] - client = EventHubClient.from_connection_string(connection_str) - # Add a receiver to the client object. - receiver = client.add_receiver(consumer_group="$default", partition="0", offset=Offset('@latest')) - # [END create_eventhub_client_receiver] - - # [START create_eventhub_client_epoch_receiver] - client = EventHubClient.from_connection_string(connection_str) - # Add a receiver to the client object with an epoch value. - epoch_receiver = client.add_epoch_receiver(consumer_group="$default", partition="0", epoch=42) - # [END create_eventhub_client_epoch_receiver] - - # [START eventhub_client_run] client = EventHubClient.from_connection_string(connection_str) - # Add Senders/Receivers - try: - client.run() - # Start sending and receiving - except: - raise - finally: - client.stop() - # [END eventhub_client_run] + # Create a receiver. + receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest')) + # Create an exclusive receiver object. + exclusive_receiver = client.create_receiver(partition_id="0", exclusive_receiver_priority=1) + # [END create_eventhub_client_receiver] client = EventHubClient.from_connection_string(connection_str) - sender = client.add_sender(partition="0") - receiver = client.add_receiver(consumer_group="$default", partition="0", offset=Offset('@latest')) + sender = client.create_sender(partition_id="0") + receiver = client.create_receiver(partition_id="0", event_position=EventPosition('@latest')) try: - # Opens the connection and starts running all EventSender/EventReceiver clients. - client.run() - # Start sending and receiving + receiver.receive(timeout=1) # [START create_event_data] event_data = EventData("String data") event_data = EventData(b"Bytes data") event_data = EventData([b"A", b"B", b"C"]) - def batched(): - for i in range(10): - yield "Batch data, Event number {}".format(i) - - event_data = EventData(batch=batched()) + list_data = ['Message {}'.format(i) for i in range(10)] + event_data = EventData(body=list_data) # [END create_event_data] # [START eventhub_client_sync_send] - event_data = EventData(b"A single event") - sender.send(event_data) + with sender: + event_data = EventData(b"A single event") + sender.send(event_data) # [END eventhub_client_sync_send] time.sleep(1) # [START eventhub_client_sync_receive] - logger = logging.getLogger("azure.eventhub") - received = receiver.receive(timeout=5, max_batch_size=1) - for event_data in received: - logger.info("Message received:{}".format(event_data.body_as_str())) + with receiver: + logger = logging.getLogger("azure.eventhub") + received = receiver.receive(timeout=5, max_batch_size=1) + for event_data in received: + logger.info("Message received:{}".format(event_data.body_as_str())) # [END eventhub_client_sync_receive] - assert len(received) == 1 - assert received[0].body_as_str() == "A single event" - assert list(received[-1].body)[0] == b"A single event" - except: - raise - + assert len(received) == 1 + assert received[0].body_as_str() == "A single event" + assert list(received[-1].body)[0] == b"A single event" finally: - client.stop() - - # [START eventhub_client_stop] - client = EventHubClient.from_connection_string(connection_str) - # Add Senders/Receivers - try: - client.run() - # Start sending and receiving - except: - raise - finally: - client.stop() - # [END eventhub_client_stop] - + pass -def test_example_eventhub_sync_sender_ops(live_eventhub_config, connection_str): - import os - # [START create_eventhub_client_sender_instance] - from azure.eventhub import EventHubClient - client = EventHubClient.from_connection_string(connection_str) - sender = client.add_sender(partition="0") - # [END create_eventhub_client_sender_instance] - - # [START eventhub_client_sender_open] - client = EventHubClient.from_connection_string(connection_str) - sender = client.add_sender(partition="0") - try: - # Open the EventSender using the supplied conneciton. - sender.open() - # Start sending - except: - raise - finally: - # Close down the send handler. - sender.close() - # [END eventhub_client_sender_open] +def test_example_eventhub_sender_ops(live_eventhub_config, connection_str): + from azure.eventhub import EventHubClient, EventData # [START eventhub_client_sender_close] client = EventHubClient.from_connection_string(connection_str) - sender = client.add_sender(partition="0") + sender = client.create_sender(partition_id="0") try: - # Open the EventSender using the supplied conneciton. - sender.open() - # Start sending - except: - raise + sender.send(EventData(b"A single event")) finally: # Close down the send handler. sender.close() - # [END eventhub_client_sender_close] + # [END eventhub_client_sender_close] -def test_example_eventhub_sync_receiver_ops(live_eventhub_config, connection_str): - import os - # [START create_eventhub_client_receiver_instance] - from azure.eventhub import EventHubClient, Offset - - client = EventHubClient.from_connection_string(connection_str) - receiver = client.add_receiver(consumer_group="$default", partition="0", offset=Offset('@latest')) - # [END create_eventhub_client_receiver_instance] - - # [START eventhub_client_receiver_open] - client = EventHubClient.from_connection_string(connection_str) - receiver = client.add_receiver(consumer_group="$default", partition="0", offset=Offset('@latest')) - try: - # Open the EventReceiver using the supplied conneciton. - receiver.open() - # Start receiving - except: - raise - finally: - # Close down the receive handler. - receiver.close() - # [END eventhub_client_receiver_open] +def test_example_eventhub_receiver_ops(live_eventhub_config, connection_str): + from azure.eventhub import EventHubClient + from azure.eventhub import EventPosition # [START eventhub_client_receiver_close] client = EventHubClient.from_connection_string(connection_str) - receiver = client.add_receiver(consumer_group="$default", partition="0", offset=Offset('@latest')) + receiver = client.create_receiver(partition_id="0", consumer_group="$default", event_position=EventPosition('@latest')) try: - # Open the EventReceiver using the supplied conneciton. - receiver.open() - # Start receiving - except: - raise + receiver.receive(timeout=1) finally: # Close down the receive handler. receiver.close() - # [END eventhub_client_receiver_close] \ No newline at end of file + # [END eventhub_client_receiver_close] diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py index 427163aea410..4799df1a8634 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py @@ -10,12 +10,16 @@ from azure.eventhub import EventData, EventPosition from azure.eventhub.aio import EventHubClient -from azure.identity.aio import AsyncClientSecretCredential @pytest.mark.liveTest @pytest.mark.asyncio async def test_client_secret_credential_async(aad_credential, live_eventhub): + try: + from azure.identity.aio import AsyncClientSecretCredential + except ImportError: + pytest.skip("No azure identity library") + client_id, secret, tenant_id = aad_credential credential = AsyncClientSecretCredential(client_id=client_id, secret=secret, tenant_id=tenant_id) client = EventHubClient(host=live_eventhub['hostname'], diff --git a/sdk/eventhub/azure-eventhubs/tests/test_auth.py b/sdk/eventhub/azure-eventhubs/tests/test_auth.py index 2d97f42264ab..aa728d2d54de 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_auth.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_auth.py @@ -8,11 +8,14 @@ import time from azure.eventhub import EventData, EventHubClient, EventPosition -from azure.identity import ClientSecretCredential @pytest.mark.liveTest def test_client_secret_credential(aad_credential, live_eventhub): + try: + from azure.identity import ClientSecretCredential + except ImportError: + pytest.skip("No azure identity library") client_id, secret, tenant_id = aad_credential credential = ClientSecretCredential(client_id=client_id, secret=secret, tenant_id=tenant_id) client = EventHubClient(host=live_eventhub['hostname'],