From 9297d7e9c6a84b92704159f17ff52a0520aef3e3 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Fri, 22 Apr 2022 13:43:32 -0400 Subject: [PATCH 1/3] samples (tests): separate eod creation and receive in parallel tests --- samples/snippets/subscriber_test.py | 50 ++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 868720fd4..5b386634b 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -40,7 +40,8 @@ SUBSCRIPTION_ASYNC = f"subscription-test-subscription-async-{PY_VERSION}-{UUID}" SUBSCRIPTION_SYNC = f"subscription-test-subscription-sync-{PY_VERSION}-{UUID}" SUBSCRIPTION_DLQ = f"subscription-test-subscription-dlq-{PY_VERSION}-{UUID}" -SUBSCRIPTION_EOD = f"subscription-test-subscription-eod-{PY_VERSION}-{UUID}" +SUBSCRIPTION_EOD_FOR_CREATE = f"subscription-test-subscription-eod-for-create-{PY_VERSION}-{UUID}" +SUBSCRIPTION_EOD_FOR_RECEIVE = f"subscription-test-subscription-eod-for-receive-{PY_VERSION}-{UUID}" ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push" NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2" REGIONAL_ENDPOINT = "us-east1-pubsub.googleapis.com:443" @@ -51,7 +52,6 @@ C = TypeVar("C", bound=Callable[..., Any]) typed_flaky = cast(Callable[[C], C], flaky(max_runs=3, min_passes=1)) -typed_super_flaky = cast(Callable[[C], C], flaky(max_runs=10, min_passes=10)) @pytest.fixture(scope="module") @@ -230,12 +230,12 @@ def subscription_dlq( @pytest.fixture(scope="module") -def subscription_eod( +def subscription_eod_for_receive( subscriber_client: pubsub_v1.SubscriberClient, exactly_once_delivery_topic: str ) -> Generator[str, None, None]: subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_EOD + PROJECT_ID, SUBSCRIPTION_EOD_FOR_RECEIVE ) try: @@ -255,6 +255,33 @@ def subscription_eod( subscriber_client.delete_subscription(request={"subscription": subscription.name}) +@pytest.fixture(scope="module") +def subscription_eod_for_create( + subscriber_client: pubsub_v1.SubscriberClient, exactly_once_delivery_topic: str +) -> Generator[str, None, None]: + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, SUBSCRIPTION_EOD_FOR_CREATE + ) + + try: + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + subscription = subscriber_client.create_subscription( + request={ + "name": subscription_path, + "topic": exactly_once_delivery_topic, + "enable_exactly_once_delivery": True, + } + ) + + yield subscription.name + + subscriber_client.delete_subscription(request={"subscription": subscription.name}) + + def _publish_messages( publisher_client: pubsub_v1.PublisherClient, @@ -469,11 +496,11 @@ def test_create_subscription_with_filtering( def test_create_subscription_with_exactly_once_delivery( subscriber_client: pubsub_v1.SubscriberClient, - subscription_eod: str, + subscription_eod_for_create: str, capsys: CaptureFixture[str], ) -> None: subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_EOD + PROJECT_ID, SUBSCRIPTION_EOD_FOR_CREATE ) try: subscriber_client.delete_subscription( @@ -483,12 +510,12 @@ def test_create_subscription_with_exactly_once_delivery( pass subscriber.create_subscription_with_exactly_once_delivery( - PROJECT_ID, EOD_TOPIC, SUBSCRIPTION_EOD + PROJECT_ID, EOD_TOPIC, SUBSCRIPTION_EOD_FOR_CREATE ) out, _ = capsys.readouterr() assert "Created subscription with exactly once delivery enabled" in out - assert f"{subscription_eod}" in out + assert f"{subscription_eod_for_create}" in out assert "enable_exactly_once_delivery: true" in out @@ -705,11 +732,10 @@ def eventually_consistent_test() -> None: eventually_consistent_test() -@typed_super_flaky def test_receive_messages_with_exactly_once_delivery_enabled( regional_publisher_client: pubsub_v1.PublisherClient, exactly_once_delivery_topic: str, - subscription_eod: str, + subscription_eod_for_receive: str, capsys: CaptureFixture[str], ) -> None: @@ -718,11 +744,11 @@ def test_receive_messages_with_exactly_once_delivery_enabled( ) subscriber.receive_messages_with_exactly_once_delivery_enabled( - PROJECT_ID, SUBSCRIPTION_EOD, 200 + PROJECT_ID, SUBSCRIPTION_EOD_FOR_CREATE, 200 ) out, _ = capsys.readouterr() - assert subscription_eod in out + assert subscription_eod_for_receive in out for message_id in message_ids: assert message_id in out From 8b92fabcedd88164c6fb1bef19b64a3ef1583ee1 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Fri, 22 Apr 2022 13:46:00 -0400 Subject: [PATCH 2/3] running black --- samples/snippets/subscriber_test.py | 43 +++++++++++++++++++---------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 5b386634b..0605c559b 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -40,8 +40,12 @@ SUBSCRIPTION_ASYNC = f"subscription-test-subscription-async-{PY_VERSION}-{UUID}" SUBSCRIPTION_SYNC = f"subscription-test-subscription-sync-{PY_VERSION}-{UUID}" SUBSCRIPTION_DLQ = f"subscription-test-subscription-dlq-{PY_VERSION}-{UUID}" -SUBSCRIPTION_EOD_FOR_CREATE = f"subscription-test-subscription-eod-for-create-{PY_VERSION}-{UUID}" -SUBSCRIPTION_EOD_FOR_RECEIVE = f"subscription-test-subscription-eod-for-receive-{PY_VERSION}-{UUID}" +SUBSCRIPTION_EOD_FOR_CREATE = ( + f"subscription-test-subscription-eod-for-create-{PY_VERSION}-{UUID}" +) +SUBSCRIPTION_EOD_FOR_RECEIVE = ( + f"subscription-test-subscription-eod-for-receive-{PY_VERSION}-{UUID}" +) ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push" NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2" REGIONAL_ENDPOINT = "us-east1-pubsub.googleapis.com:443" @@ -159,7 +163,8 @@ def subscription_sync( yield subscription.name typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=300), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=300), ) @typed_backoff @@ -255,6 +260,7 @@ def subscription_eod_for_receive( subscriber_client.delete_subscription(request={"subscription": subscription.name}) + @pytest.fixture(scope="module") def subscription_eod_for_create( subscriber_client: pubsub_v1.SubscriberClient, exactly_once_delivery_topic: str @@ -282,7 +288,6 @@ def subscription_eod_for_create( subscriber_client.delete_subscription(request={"subscription": subscription.name}) - def _publish_messages( publisher_client: pubsub_v1.PublisherClient, topic: str, @@ -525,7 +530,8 @@ def test_create_push_subscription( capsys: CaptureFixture[str], ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) # The scope of `subscription_path` is limited to this function. @@ -552,11 +558,13 @@ def eventually_consistent_test() -> None: def test_update_push_suscription( - subscription_admin: str, capsys: CaptureFixture[str], + subscription_admin: str, + capsys: CaptureFixture[str], ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -578,7 +586,8 @@ def test_delete_subscription( subscriber.delete_subscription(PROJECT_ID, SUBSCRIPTION_ADMIN) typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -599,7 +608,8 @@ def test_receive( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -624,7 +634,8 @@ def test_receive_with_custom_attributes( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -652,7 +663,8 @@ def test_receive_with_flow_control( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=300), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=300), ) @typed_backoff @@ -682,7 +694,8 @@ def test_receive_with_blocking_shutdown( _shut_down = re.compile(r".*done waiting.*stream shutdown.*", flags=re.IGNORECASE) typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=300), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=300), ) @typed_backoff @@ -761,7 +774,8 @@ def test_listen_for_errors( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -802,7 +816,8 @@ def test_receive_synchronously_with_lease( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=300), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=300), ) @typed_backoff From 64b06814ffe1beaef77672cbf71c83ecdc76e4c7 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Fri, 22 Apr 2022 14:00:50 -0400 Subject: [PATCH 3/3] fixed typo --- samples/snippets/subscriber_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 0605c559b..d656c6ce4 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -757,7 +757,7 @@ def test_receive_messages_with_exactly_once_delivery_enabled( ) subscriber.receive_messages_with_exactly_once_delivery_enabled( - PROJECT_ID, SUBSCRIPTION_EOD_FOR_CREATE, 200 + PROJECT_ID, SUBSCRIPTION_EOD_FOR_RECEIVE, 30 ) out, _ = capsys.readouterr()