diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 868720fd4..d656c6ce4 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -40,7 +40,12 @@ SUBSCRIPTION_ASYNC = f"subscription-test-subscription-async-{PY_VERSION}-{UUID}" SUBSCRIPTION_SYNC = f"subscription-test-subscription-sync-{PY_VERSION}-{UUID}" SUBSCRIPTION_DLQ = f"subscription-test-subscription-dlq-{PY_VERSION}-{UUID}" -SUBSCRIPTION_EOD = f"subscription-test-subscription-eod-{PY_VERSION}-{UUID}" +SUBSCRIPTION_EOD_FOR_CREATE = ( + f"subscription-test-subscription-eod-for-create-{PY_VERSION}-{UUID}" +) +SUBSCRIPTION_EOD_FOR_RECEIVE = ( + f"subscription-test-subscription-eod-for-receive-{PY_VERSION}-{UUID}" +) ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push" NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2" REGIONAL_ENDPOINT = "us-east1-pubsub.googleapis.com:443" @@ -51,7 +56,6 @@ C = TypeVar("C", bound=Callable[..., Any]) typed_flaky = cast(Callable[[C], C], flaky(max_runs=3, min_passes=1)) -typed_super_flaky = cast(Callable[[C], C], flaky(max_runs=10, min_passes=10)) @pytest.fixture(scope="module") @@ -159,7 +163,8 @@ def subscription_sync( yield subscription.name typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=300), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=300), ) @typed_backoff @@ -230,12 +235,39 @@ def subscription_dlq( @pytest.fixture(scope="module") -def subscription_eod( +def subscription_eod_for_receive( subscriber_client: pubsub_v1.SubscriberClient, exactly_once_delivery_topic: str ) -> Generator[str, None, None]: subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_EOD + PROJECT_ID, SUBSCRIPTION_EOD_FOR_RECEIVE + ) + + try: + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + subscription = subscriber_client.create_subscription( + request={ + "name": subscription_path, + "topic": exactly_once_delivery_topic, + "enable_exactly_once_delivery": True, + } + ) + + yield subscription.name + + subscriber_client.delete_subscription(request={"subscription": subscription.name}) + + +@pytest.fixture(scope="module") +def subscription_eod_for_create( + subscriber_client: pubsub_v1.SubscriberClient, exactly_once_delivery_topic: str +) -> Generator[str, None, None]: + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, SUBSCRIPTION_EOD_FOR_CREATE ) try: @@ -469,11 +501,11 @@ def test_create_subscription_with_filtering( def test_create_subscription_with_exactly_once_delivery( subscriber_client: pubsub_v1.SubscriberClient, - subscription_eod: str, + subscription_eod_for_create: str, capsys: CaptureFixture[str], ) -> None: subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_EOD + PROJECT_ID, SUBSCRIPTION_EOD_FOR_CREATE ) try: subscriber_client.delete_subscription( @@ -483,12 +515,12 @@ def test_create_subscription_with_exactly_once_delivery( pass subscriber.create_subscription_with_exactly_once_delivery( - PROJECT_ID, EOD_TOPIC, SUBSCRIPTION_EOD + PROJECT_ID, EOD_TOPIC, SUBSCRIPTION_EOD_FOR_CREATE ) out, _ = capsys.readouterr() assert "Created subscription with exactly once delivery enabled" in out - assert f"{subscription_eod}" in out + assert f"{subscription_eod_for_create}" in out assert "enable_exactly_once_delivery: true" in out @@ -498,7 +530,8 @@ def test_create_push_subscription( capsys: CaptureFixture[str], ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) # The scope of `subscription_path` is limited to this function. @@ -525,11 +558,13 @@ def eventually_consistent_test() -> None: def test_update_push_suscription( - subscription_admin: str, capsys: CaptureFixture[str], + subscription_admin: str, + capsys: CaptureFixture[str], ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -551,7 +586,8 @@ def test_delete_subscription( subscriber.delete_subscription(PROJECT_ID, SUBSCRIPTION_ADMIN) typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -572,7 +608,8 @@ def test_receive( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -597,7 +634,8 @@ def test_receive_with_custom_attributes( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -625,7 +663,8 @@ def test_receive_with_flow_control( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=300), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=300), ) @typed_backoff @@ -655,7 +694,8 @@ def test_receive_with_blocking_shutdown( _shut_down = re.compile(r".*done waiting.*stream shutdown.*", flags=re.IGNORECASE) typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=300), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=300), ) @typed_backoff @@ -705,11 +745,10 @@ def eventually_consistent_test() -> None: eventually_consistent_test() -@typed_super_flaky def test_receive_messages_with_exactly_once_delivery_enabled( regional_publisher_client: pubsub_v1.PublisherClient, exactly_once_delivery_topic: str, - subscription_eod: str, + subscription_eod_for_receive: str, capsys: CaptureFixture[str], ) -> None: @@ -718,11 +757,11 @@ def test_receive_messages_with_exactly_once_delivery_enabled( ) subscriber.receive_messages_with_exactly_once_delivery_enabled( - PROJECT_ID, SUBSCRIPTION_EOD, 200 + PROJECT_ID, SUBSCRIPTION_EOD_FOR_RECEIVE, 30 ) out, _ = capsys.readouterr() - assert subscription_eod in out + assert subscription_eod_for_receive in out for message_id in message_ids: assert message_id in out @@ -735,7 +774,8 @@ def test_listen_for_errors( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=60), ) @typed_backoff @@ -776,7 +816,8 @@ def test_receive_synchronously_with_lease( ) -> None: typed_backoff = cast( - Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=300), + Callable[[C], C], + backoff.on_exception(backoff.expo, Unknown, max_time=300), ) @typed_backoff