From cb13bd8718a487427e66cbd3324e170da2163025 Mon Sep 17 00:00:00 2001 From: yijxie Date: Fri, 14 Feb 2020 11:23:21 -0800 Subject: [PATCH 01/12] Increment version --- .../azure-eventhub-checkpointstoreblob-aio/CHANGELOG.md | 2 ++ .../eventhub/extensions/checkpointstoreblobaio/_version.py | 2 +- sdk/eventhub/azure-eventhub-checkpointstoreblob/CHANGELOG.md | 2 ++ .../azure/eventhub/extensions/checkpointstoreblob/_version.py | 2 +- sdk/eventhub/azure-eventhub/CHANGELOG.md | 2 ++ sdk/eventhub/azure-eventhub/azure/eventhub/_version.py | 2 +- 6 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/CHANGELOG.md b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/CHANGELOG.md index 8e929a30f01f..4d6010a3249a 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/CHANGELOG.md @@ -1,5 +1,7 @@ # Release History +## 1.0.1 (Unreleased) + ## 1.0.0 (2020-01-13) Stable release. No new features or API changes. diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_version.py b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_version.py index 8eedef9ba349..010063f9dd93 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_version.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_version.py @@ -3,4 +3,4 @@ # Licensed under the MIT License. # ------------------------------------ -VERSION = "1.0.0" +VERSION = "1.0.1" diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob/CHANGELOG.md b/sdk/eventhub/azure-eventhub-checkpointstoreblob/CHANGELOG.md index 2498f3200d83..bbae3dace509 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob/CHANGELOG.md @@ -1,5 +1,7 @@ # Release History +## 1.0.1 (Unreleased) + ## 1.0.0 (2020-01-13) Stable release. No new features or API changes. diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_version.py b/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_version.py index 8eedef9ba349..010063f9dd93 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_version.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_version.py @@ -3,4 +3,4 @@ # Licensed under the MIT License. # ------------------------------------ -VERSION = "1.0.0" +VERSION = "1.0.1" diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index 6e48b02ca198..43675644db6b 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -1,5 +1,7 @@ # Release History +## 5.0.1 (Unreleased) + ## 5.0.0 (2020-01-13) **Breaking changes** diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py index 7330e03f2b28..aeddf5ca4e16 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_version.py @@ -3,4 +3,4 @@ # Licensed under the MIT License. # ------------------------------------ -VERSION = "5.0.0" +VERSION = "5.0.1" From 38e9f65493683c04c69040b11433dd6b02017f98 Mon Sep 17 00:00:00 2001 From: yijxie Date: Fri, 14 Feb 2020 11:23:39 -0800 Subject: [PATCH 02/12] Update Development Status --- sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/setup.py | 2 +- sdk/eventhub/azure-eventhub-checkpointstoreblob/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/setup.py b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/setup.py index 0b13ff1bbb0f..88f1fd79f107 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/setup.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/setup.py @@ -53,7 +53,7 @@ author_email='azpysdkhelp@microsoft.com', url='https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub-checkpointstoreblob-aio', classifiers=[ - 'Development Status :: 3 - Alpha', + 'Development Status :: 5 - Production/Stable', 'Programming Language :: Python', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.5', diff --git a/sdk/eventhub/azure-eventhub-checkpointstoreblob/setup.py b/sdk/eventhub/azure-eventhub-checkpointstoreblob/setup.py index a6bbf648336d..40f6e926ed84 100644 --- a/sdk/eventhub/azure-eventhub-checkpointstoreblob/setup.py +++ b/sdk/eventhub/azure-eventhub-checkpointstoreblob/setup.py @@ -53,7 +53,7 @@ author_email='azpysdkhelp@microsoft.com', url='https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub-checkpointstoreblob', classifiers=[ - 'Development Status :: 3 - Alpha', + 'Development Status :: 5 - Production/Stable', 'Programming Language :: Python', 'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2.7', From fcbaf65ae5f4ead7635c4595d67f5281f98ecded Mon Sep 17 00:00:00 2001 From: yijxie Date: Mon, 9 Mar 2020 15:44:12 -0700 Subject: [PATCH 03/12] Remove typing.Deque for Py3.5.3 --- sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py index b5e58ae6e803..6203ae636201 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py @@ -7,7 +7,7 @@ import uuid import logging from collections import deque -from typing import TYPE_CHECKING, Callable, Dict, Optional, Any, Deque +from typing import TYPE_CHECKING, Callable, Dict, Optional, Any import uamqp from uamqp import types, errors, utils @@ -120,7 +120,7 @@ def __init__(self, client, source, **kwargs): self._track_last_enqueued_event_properties = ( track_last_enqueued_event_properties ) - self._message_buffer = deque() # type: Deque[uamqp.Message] + self._message_buffer = deque() # type: ignore self._last_received_event = None # type: Optional[EventData] def _create_handler(self, auth): From 16e7e8b2ce519d02045acd838745c79580768b39 Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 30 Apr 2020 14:40:02 -0700 Subject: [PATCH 04/12] Add batch support in stress test script --- .../azure_eventhub_consumer_stress_async.py | 40 +++++++++++++++++-- .../azure_eventhub_consumer_stress_sync.py | 35 ++++++++++++++-- .../stress/azure_eventhub_producer_stress.py | 18 +++++++++ 3 files changed, 86 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py index 394a4e385ec1..a90be544752b 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py @@ -47,6 +47,10 @@ def parse_starting_position(args): parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss", type=str) parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0) parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int) +parser.add_argument("--max_batch_size", type=int, default=0, + help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()") +parser.add_argument("--max_wait_time", type=float, default=0, + help="max_wait_time of EventHubConsumerClient.receive_batch()") parser.add_argument("--track_last_enqueued_event_properties", action="store_true") parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10) parser.add_argument("--conn_str", help="EventHub connection string", @@ -111,6 +115,24 @@ async def on_event_received(partition_context, event): await partition_context.update_checkpoint(event) +async def on_event_batch_received(partition_context, event_batch): + recv_cnt_map[partition_context.partition_id] += len(event_batch) + if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0: + total_time_elapsed = time.perf_counter() - start_time + + partition_previous_time = recv_time_map.get(partition_context.partition_id) + partition_current_time = time.perf_counter() + recv_time_map[partition_context.partition_id] = partition_current_time + LOGGER.info("Partition: %r, Total received: %r, Time elapsed: %r, Speed since start: %r/s, Current speed: %r/s", + partition_context.partition_id, + recv_cnt_map[partition_context.partition_id], + total_time_elapsed, + recv_cnt_map[partition_context.partition_id] / total_time_elapsed, + LOG_PER_COUNT / (partition_current_time - partition_previous_time) if partition_previous_time else None + ) + await partition_context.update_checkpoint() + + def create_client(args): if args.storage_conn_str: @@ -180,11 +202,19 @@ async def run(args): "track_last_enqueued_event_properties": args.track_last_enqueued_event_properties, "starting_position": starting_position } + if args.max_batch_size: + kwargs_dict["max_batch_size"] = args.max_batch_size + if args.max_wait_time: + kwargs_dict["max_wait_time"] = args.max_wait_time if args.parallel_recv_cnt and args.parallel_recv_cnt > 1: clients = [create_client(args) for _ in range(args.parallel_recv_cnt)] tasks = [ asyncio.ensure_future( - clients[i].receive( + clients[i].receive_batch( + on_event_batch_received, + prefetch=args.link_credit, + max_batch_size=args.max_batch_size + ) if args.max_batch_size else clients[0].receive_batch( on_event_received, **kwargs_dict ) @@ -193,9 +223,13 @@ async def run(args): else: clients = [create_client(args)] tasks = [asyncio.ensure_future( - clients[0].receive( - on_event_received, + clients[0].receive_batch( + on_event_batch_received, prefetch=args.link_credit, + max_batch_size=args.max_batch_size + ) if args.max_batch_size else clients[0].receive_batch( + on_event_received, + **kwargs_dict ) )] diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py index bf377b16d11a..5fe432fd325d 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py @@ -45,6 +45,11 @@ def parse_starting_position(args): parser.add_argument("--starting_datetime", help="Starting datetime string, should be format of YYYY-mm-dd HH:mm:ss") parser.add_argument("--partitions", help="Number of partitions. 0 means to get partitions from eventhubs", type=int, default=0) parser.add_argument("--recv_partition_id", help="Receive from a specific partition if this is set", type=int) +parser.add_argument("--max_batch_size", type=int, default=0, + help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()") +parser.add_argument("--max_wait_time", type=float, default=0, + help="max_wait_time of EventHubConsumerClient.receive_batch()") + parser.add_argument("--track_last_enqueued_event_properties", action="store_true") parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10) parser.add_argument("--conn_str", help="EventHub connection string", @@ -109,6 +114,24 @@ def on_event_received(partition_context, event): partition_context.update_checkpoint(event) +def on_event_batch_received(partition_context, event_batch): + recv_cnt_map[partition_context.partition_id] += len(event_batch) + if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0: + total_time_elapsed = time.perf_counter() - start_time + + partition_previous_time = recv_time_map.get(partition_context.partition_id) + partition_current_time = time.perf_counter() + recv_time_map[partition_context.partition_id] = partition_current_time + LOGGER.info("Partition: %r, Total received: %r, Time elapsed: %r, Speed since start: %r/s, Current speed: %r/s", + partition_context.partition_id, + recv_cnt_map[partition_context.partition_id], + total_time_elapsed, + recv_cnt_map[partition_context.partition_id] / total_time_elapsed, + LOG_PER_COUNT / (partition_current_time - partition_previous_time) if partition_previous_time else None + ) + partition_context.update_checkpoint() + + def create_client(args): if args.storage_conn_str: checkpoint_store = BlobCheckpointStore.from_connection_string(args.storage_conn_str, args.storage_container_name) @@ -176,12 +199,16 @@ def run(args): "track_last_enqueued_event_properties": args.track_last_enqueued_event_properties, "starting_position": starting_position } + if args.max_batch_size: + kwargs_dict["max_batch_size"] = args.max_batch_size + if args.max_wait_time: + kwargs_dict["max_wait_time"] = args.max_wait_time if args.parallel_recv_cnt and args.parallel_recv_cnt > 1: clients = [create_client(args) for _ in range(args.parallel_recv_cnt)] threads = [ threading.Thread( - target=clients[i].receive, - args=(on_event_received,), + target=clients[i].receive_batch if args.max_batch_size else clients[i].receive, + args=(on_event_batch_received if args.max_batch_size else on_event_received,), kwargs=kwargs_dict, daemon=True ) for i in range(args.parallel_recv_cnt) @@ -189,8 +216,8 @@ def run(args): else: clients = [create_client(args)] threads = [threading.Thread( - target=clients[0].receive, - args=(on_event_received,), + target=clients[0].receive_batch if args.max_batch_size else clients[0].receive, + args=(on_event_batch_received if args.max_batch_size else on_event_received,), kwargs=kwargs_dict, daemon=True )] diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py index 99b3188ca11a..3ff550a31891 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_producer_stress.py @@ -28,6 +28,15 @@ def stress_send_sync(producer: EventHubProducerClient, args, logger): return len(batch) +def stress_send_list_sync(producer: EventHubProducerClient, args, logger): + quantity = int(256*1023 / args.payload) + send_list = [] + for _ in range(quantity): + send_list.append(EventData(body=b"D" * args.payload)) + producer.send_batch(send_list) + return len(send_list) + + async def stress_send_async(producer: EventHubProducerClientAsync, args, logger): batch = await producer.create_batch() try: @@ -39,6 +48,15 @@ async def stress_send_async(producer: EventHubProducerClientAsync, args, logger) return len(batch) +async def stress_send_list_async(producer: EventHubProducerClientAsync, args, logger): + quantity = int(256*1023 / args.payload) + send_list = [] + for _ in range(quantity): + send_list.append(EventData(body=b"D" * args.payload)) + await producer.send_batch(send_list) + return len(send_list) + + class StressTestRunner(object): def __init__(self, argument_parser): self.argument_parser = argument_parser From ba4e7885c56a10049eb6cc172aba50160afc0c6a Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 30 Apr 2020 14:57:16 -0700 Subject: [PATCH 05/12] Allow receive() to have max_wait_time --- .../stress/azure_eventhub_consumer_stress_async.py | 6 +++--- .../stress/azure_eventhub_consumer_stress_sync.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py index a90be544752b..b631d8de60fb 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py @@ -98,7 +98,7 @@ async def get_partition_ids(self): async def on_event_received(partition_context, event): - recv_cnt_map[partition_context.partition_id] += 1 + recv_cnt_map[partition_context.partition_id] += 1 if event else 0 if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0: total_time_elapsed = time.perf_counter() - start_time @@ -204,8 +204,8 @@ async def run(args): } if args.max_batch_size: kwargs_dict["max_batch_size"] = args.max_batch_size - if args.max_wait_time: - kwargs_dict["max_wait_time"] = args.max_wait_time + if args.max_wait_time: + kwargs_dict["max_wait_time"] = args.max_wait_time if args.parallel_recv_cnt and args.parallel_recv_cnt > 1: clients = [create_client(args) for _ in range(args.parallel_recv_cnt)] tasks = [ diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py index 5fe432fd325d..0f5599101b20 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py @@ -97,7 +97,7 @@ def get_partition_ids(self): def on_event_received(partition_context, event): - recv_cnt_map[partition_context.partition_id] += 1 + recv_cnt_map[partition_context.partition_id] += 1 if event else 0 if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0: total_time_elapsed = time.perf_counter() - start_time @@ -201,8 +201,8 @@ def run(args): } if args.max_batch_size: kwargs_dict["max_batch_size"] = args.max_batch_size - if args.max_wait_time: - kwargs_dict["max_wait_time"] = args.max_wait_time + if args.max_wait_time: + kwargs_dict["max_wait_time"] = args.max_wait_time if args.parallel_recv_cnt and args.parallel_recv_cnt > 1: clients = [create_client(args) for _ in range(args.parallel_recv_cnt)] threads = [ From 227d6b1920bec936d3277e8f65a2872044d3a828 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 30 Apr 2020 15:24:07 -0700 Subject: [PATCH 06/12] Update sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py Co-authored-by: Adam Ling (MSFT) <47871814+yunhaoling@users.noreply.github.com> --- .../stress/azure_eventhub_consumer_stress_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py index b631d8de60fb..f293ec0b75ce 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py @@ -50,7 +50,7 @@ def parse_starting_position(args): parser.add_argument("--max_batch_size", type=int, default=0, help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()") parser.add_argument("--max_wait_time", type=float, default=0, - help="max_wait_time of EventHubConsumerClient.receive_batch()") + help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()") parser.add_argument("--track_last_enqueued_event_properties", action="store_true") parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10) parser.add_argument("--conn_str", help="EventHub connection string", From 490b398776d827f192c9305385ccd73e012029b9 Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 30 Apr 2020 15:33:30 -0700 Subject: [PATCH 07/12] Fix bug --- .../stress/azure_eventhub_consumer_stress_async.py | 7 +++---- .../stress/azure_eventhub_consumer_stress_sync.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py index f293ec0b75ce..1124e52b32f4 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py @@ -212,9 +212,8 @@ async def run(args): asyncio.ensure_future( clients[i].receive_batch( on_event_batch_received, - prefetch=args.link_credit, - max_batch_size=args.max_batch_size - ) if args.max_batch_size else clients[0].receive_batch( + **kwargs_dict + ) if args.max_batch_size else clients[i].receive( on_event_received, **kwargs_dict ) @@ -227,7 +226,7 @@ async def run(args): on_event_batch_received, prefetch=args.link_credit, max_batch_size=args.max_batch_size - ) if args.max_batch_size else clients[0].receive_batch( + ) if args.max_batch_size else clients[0].receive( on_event_received, **kwargs_dict ) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py index 0f5599101b20..98abb796fdbe 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py @@ -48,7 +48,7 @@ def parse_starting_position(args): parser.add_argument("--max_batch_size", type=int, default=0, help="Call EventHubConsumerClient.receive_batch() if not 0, otherwise call receive()") parser.add_argument("--max_wait_time", type=float, default=0, - help="max_wait_time of EventHubConsumerClient.receive_batch()") + help="max_wait_time of EventHubConsumerClient.receive_batch() or EventHubConsumerClient.receive()") parser.add_argument("--track_last_enqueued_event_properties", action="store_true") parser.add_argument("--load_balancing_interval", help="time duration in seconds between two load balance", type=float, default=10) From aa114c9f01c22a8f95c16e15daa629cf819034bb Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 30 Apr 2020 16:31:40 -0700 Subject: [PATCH 08/12] Fix bug --- .../stress/azure_eventhub_consumer_stress_async.py | 5 ++++- .../stress/azure_eventhub_consumer_stress_sync.py | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py index 1124e52b32f4..8ae6e4ea3cf0 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py @@ -86,6 +86,7 @@ def parse_starting_position(args): start_time = time.perf_counter() recv_cnt_map = defaultdict(int) +recv_cnt_iteration_map = defaultdict(int) recv_time_map = dict() @@ -117,7 +118,8 @@ async def on_event_received(partition_context, event): async def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id] += len(event_batch) - if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0: + recv_cnt_iteration_map[partition_context.partition_id] += len(event_batch) + while recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT: total_time_elapsed = time.perf_counter() - start_time partition_previous_time = recv_time_map.get(partition_context.partition_id) @@ -130,6 +132,7 @@ async def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id] / total_time_elapsed, LOG_PER_COUNT / (partition_current_time - partition_previous_time) if partition_previous_time else None ) + recv_cnt_iteration_map[partition_context.partition_id] -= LOG_PER_COUNT await partition_context.update_checkpoint() diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py index 98abb796fdbe..92db3bf71640 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py @@ -85,6 +85,7 @@ def parse_starting_position(args): start_time = time.perf_counter() recv_cnt_map = defaultdict(int) +recv_cnt_iteration_map = defaultdict(int) recv_time_map = dict() @@ -116,9 +117,9 @@ def on_event_received(partition_context, event): def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id] += len(event_batch) - if recv_cnt_map[partition_context.partition_id] % LOG_PER_COUNT == 0: + recv_cnt_iteration_map[partition_context.partition_id] += len(event_batch) + while recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT: total_time_elapsed = time.perf_counter() - start_time - partition_previous_time = recv_time_map.get(partition_context.partition_id) partition_current_time = time.perf_counter() recv_time_map[partition_context.partition_id] = partition_current_time @@ -129,6 +130,7 @@ def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id] / total_time_elapsed, LOG_PER_COUNT / (partition_current_time - partition_previous_time) if partition_previous_time else None ) + recv_cnt_iteration_map[partition_context.partition_id] -= LOG_PER_COUNT partition_context.update_checkpoint() From 0d8dda5ae05f82279d9e59f0bfaf3756f0635d04 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 30 Apr 2020 16:53:34 -0700 Subject: [PATCH 09/12] Update sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py Co-authored-by: Adam Ling (MSFT) <47871814+yunhaoling@users.noreply.github.com> --- .../stress/azure_eventhub_consumer_stress_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py index 92db3bf71640..0cad9616064d 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py @@ -128,7 +128,7 @@ def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id], total_time_elapsed, recv_cnt_map[partition_context.partition_id] / total_time_elapsed, - LOG_PER_COUNT / (partition_current_time - partition_previous_time) if partition_previous_time else None + recv_cnt_iteration_map[partition_context.partition_id] / (partition_current_time - partition_previous_time) if partition_previous_time else None ) recv_cnt_iteration_map[partition_context.partition_id] -= LOG_PER_COUNT partition_context.update_checkpoint() From fc4b477da753ccaaf622d801c90ab8c8025c7d22 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 30 Apr 2020 16:53:44 -0700 Subject: [PATCH 10/12] Update sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py Co-authored-by: Adam Ling (MSFT) <47871814+yunhaoling@users.noreply.github.com> --- .../stress/azure_eventhub_consumer_stress_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py index 0cad9616064d..af4f97fc793f 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py @@ -130,7 +130,7 @@ def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id] / total_time_elapsed, recv_cnt_iteration_map[partition_context.partition_id] / (partition_current_time - partition_previous_time) if partition_previous_time else None ) - recv_cnt_iteration_map[partition_context.partition_id] -= LOG_PER_COUNT + recv_cnt_iteration_map[partition_context.partition_id] = 0 partition_context.update_checkpoint() From cad6a3c87c891417e1fcb21c89332950f9af19c1 Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 30 Apr 2020 16:59:00 -0700 Subject: [PATCH 11/12] Log all received event count per iteration --- .../stress/azure_eventhub_consumer_stress_async.py | 6 +++--- .../stress/azure_eventhub_consumer_stress_sync.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py index 8ae6e4ea3cf0..576d82498dc7 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py @@ -119,7 +119,7 @@ async def on_event_received(partition_context, event): async def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id] += len(event_batch) recv_cnt_iteration_map[partition_context.partition_id] += len(event_batch) - while recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT: + if recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT: total_time_elapsed = time.perf_counter() - start_time partition_previous_time = recv_time_map.get(partition_context.partition_id) @@ -130,9 +130,9 @@ async def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id], total_time_elapsed, recv_cnt_map[partition_context.partition_id] / total_time_elapsed, - LOG_PER_COUNT / (partition_current_time - partition_previous_time) if partition_previous_time else None + recv_cnt_iteration_map[partition_context.partition_id] / (partition_current_time - partition_previous_time) if partition_previous_time else None ) - recv_cnt_iteration_map[partition_context.partition_id] -= LOG_PER_COUNT + recv_cnt_iteration_map[partition_context.partition_id] = 0 await partition_context.update_checkpoint() diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py index af4f97fc793f..09a487206e33 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_sync.py @@ -118,7 +118,7 @@ def on_event_received(partition_context, event): def on_event_batch_received(partition_context, event_batch): recv_cnt_map[partition_context.partition_id] += len(event_batch) recv_cnt_iteration_map[partition_context.partition_id] += len(event_batch) - while recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT: + if recv_cnt_iteration_map[partition_context.partition_id] > LOG_PER_COUNT: total_time_elapsed = time.perf_counter() - start_time partition_previous_time = recv_time_map.get(partition_context.partition_id) partition_current_time = time.perf_counter() From c3e475d8719a90fdf02f98d6c4bf54cc5f20aceb Mon Sep 17 00:00:00 2001 From: yijxie Date: Thu, 30 Apr 2020 17:02:38 -0700 Subject: [PATCH 12/12] bug fix --- .../stress/azure_eventhub_consumer_stress_async.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py index 576d82498dc7..087157bf9cc1 100644 --- a/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py +++ b/sdk/eventhub/azure-eventhub/stress/azure_eventhub_consumer_stress_async.py @@ -227,8 +227,7 @@ async def run(args): tasks = [asyncio.ensure_future( clients[0].receive_batch( on_event_batch_received, - prefetch=args.link_credit, - max_batch_size=args.max_batch_size + **kwargs_dict ) if args.max_batch_size else clients[0].receive( on_event_received, **kwargs_dict