diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 34a42cac4..fd5d8d768 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -16,7 +16,7 @@ import re import sys import time -from typing import Any, Callable, cast, Generator, TypeVar +from typing import Any, Callable, cast, Generator, List, TypeVar import uuid from _pytest.capture import CaptureFixture @@ -35,10 +35,12 @@ PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] TOPIC = f"subscription-test-topic-{PY_VERSION}-{UUID}" DEAD_LETTER_TOPIC = f"subscription-test-dead-letter-topic-{PY_VERSION}-{UUID}" +EOD_TOPIC = f"subscription-test-eod-topic-{PY_VERSION}-{UUID}" SUBSCRIPTION_ADMIN = f"subscription-test-subscription-admin-{PY_VERSION}-{UUID}" SUBSCRIPTION_ASYNC = f"subscription-test-subscription-async-{PY_VERSION}-{UUID}" SUBSCRIPTION_SYNC = f"subscription-test-subscription-sync-{PY_VERSION}-{UUID}" SUBSCRIPTION_DLQ = f"subscription-test-subscription-dlq-{PY_VERSION}-{UUID}" +SUBSCRIPTION_EOD = f"subscription-test-subscription-eod-{PY_VERSION}-{UUID}" ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push" NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2" DEFAULT_MAX_DELIVERY_ATTEMPTS = 5 @@ -85,6 +87,22 @@ def dead_letter_topic( publisher_client.delete_topic(request={"topic": dead_letter_topic.name}) +@pytest.fixture(scope="module") +def exactly_once_delivery_topic( + publisher_client: pubsub_v1.PublisherClient, +) -> Generator[str, None, None]: + topic_path = publisher_client.topic_path(PROJECT_ID, EOD_TOPIC) + + try: + topic = publisher_client.get_topic(request={"topic": topic_path}) + except NotFound: + topic = publisher_client.create_topic(request={"name": topic_path}) + + yield topic.name + + publisher_client.delete_topic(request={"topic": topic.name}) + + @pytest.fixture(scope="module") def subscriber_client() -> Generator[pubsub_v1.SubscriberClient, None, None]: subscriber_client = pubsub_v1.SubscriberClient() @@ -202,16 +220,45 @@ def subscription_dlq( subscriber_client.delete_subscription(request={"subscription": subscription.name}) +@pytest.fixture(scope="module") +def subscription_eod( + subscriber_client: pubsub_v1.SubscriberClient, exactly_once_delivery_topic: str +) -> Generator[str, None, None]: + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, SUBSCRIPTION_EOD + ) + + try: + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + subscription = subscriber_client.create_subscription( + request={ + "name": subscription_path, + "topic": exactly_once_delivery_topic, + "enable_exactly_once_delivery": True + } + ) + + yield subscription.name + + subscriber_client.delete_subscription(request={"subscription": subscription.name}) + + def _publish_messages( publisher_client: pubsub_v1.PublisherClient, topic: str, message_num: int = 5, **attrs: Any, -) -> None: +) -> List[str]: + message_ids = [] for n in range(message_num): data = f"message {n}".encode("utf-8") publish_future = publisher_client.publish(topic, data, **attrs) - publish_future.result() + message_ids.append(publish_future.result()) + return message_ids def test_list_in_topic(subscription_admin: str, capsys: CaptureFixture[str]) -> None: @@ -307,7 +354,7 @@ def test_receive_with_delivery_attempts( # We keep retrying up to 10 minutes for mitigating the flakiness. @typed_backoff def run_sample() -> None: - _publish_messages(publisher_client, topic) + _ = _publish_messages(publisher_client, topic) subscriber.receive_messages_with_delivery_attempts( PROJECT_ID, SUBSCRIPTION_DLQ, 90 @@ -413,11 +460,11 @@ def test_create_subscription_with_filtering( def test_create_subscription_with_exactly_once_delivery( subscriber_client: pubsub_v1.SubscriberClient, - subscription_admin: str, + subscription_eod: str, capsys: CaptureFixture[str], ) -> None: subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_ADMIN + PROJECT_ID, SUBSCRIPTION_EOD ) try: subscriber_client.delete_subscription( @@ -427,12 +474,12 @@ def test_create_subscription_with_exactly_once_delivery( pass subscriber.create_subscription_with_exactly_once_delivery( - PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN + PROJECT_ID, EOD_TOPIC, SUBSCRIPTION_EOD ) out, _ = capsys.readouterr() assert "Created subscription with exactly once delivery enabled" in out - assert f"{subscription_admin}" in out + assert f"{subscription_eod}" in out assert "enable_exactly_once_delivery: true" in out @@ -521,7 +568,7 @@ def test_receive( @typed_backoff def eventually_consistent_test() -> None: - _publish_messages(publisher_client, topic) + _ = _publish_messages(publisher_client, topic) subscriber.receive_messages(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) @@ -546,7 +593,7 @@ def test_receive_with_custom_attributes( @typed_backoff def eventually_consistent_test() -> None: - _publish_messages(publisher_client, topic, origin="python-sample") + _ = _publish_messages(publisher_client, topic, origin="python-sample") subscriber.receive_messages_with_custom_attributes( PROJECT_ID, SUBSCRIPTION_ASYNC, 5 @@ -574,7 +621,7 @@ def test_receive_with_flow_control( @typed_backoff def eventually_consistent_test() -> None: - _publish_messages(publisher_client, topic) + _ = _publish_messages(publisher_client, topic) subscriber.receive_messages_with_flow_control(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) @@ -604,7 +651,7 @@ def test_receive_with_blocking_shutdown( @typed_backoff def eventually_consistent_test() -> None: - _publish_messages(publisher_client, topic, message_num=3) + _ = _publish_messages(publisher_client, topic, message_num=3) subscriber.receive_messages_with_blocking_shutdown( PROJECT_ID, SUBSCRIPTION_ASYNC, timeout=5.0 @@ -651,8 +698,8 @@ def eventually_consistent_test() -> None: def test_receive_messages_with_exactly_once_delivery_enabled( publisher_client: pubsub_v1.PublisherClient, - topic: str, - subscription_async: str, + exactly_once_delivery_topic: str, + subscription_eod: str, capsys: CaptureFixture[str], ) -> None: @@ -662,17 +709,19 @@ def test_receive_messages_with_exactly_once_delivery_enabled( @typed_backoff def eventually_consistent_test() -> None: - _publish_messages(publisher_client, topic) + message_ids = _publish_messages(publisher_client, exactly_once_delivery_topic) subscriber.receive_messages_with_exactly_once_delivery_enabled( - PROJECT_ID, SUBSCRIPTION_ASYNC, 10 + PROJECT_ID, SUBSCRIPTION_EOD, 10 ) out, _ = capsys.readouterr() assert "Listening" in out - assert subscription_async in out + assert subscription_eod in out assert "Received" in out assert "Ack" in out + for message_id in message_ids: + assert message_id in out eventually_consistent_test() @@ -690,7 +739,7 @@ def test_listen_for_errors( @typed_backoff def eventually_consistent_test() -> None: - _publish_messages(publisher_client, topic) + _ = _publish_messages(publisher_client, topic) subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) @@ -707,7 +756,7 @@ def test_receive_synchronously( subscription_sync: str, capsys: CaptureFixture[str], ) -> None: - _publish_messages(publisher_client, topic) + _ = _publish_messages(publisher_client, topic) subscriber.synchronous_pull(PROJECT_ID, SUBSCRIPTION_SYNC) @@ -731,7 +780,7 @@ def test_receive_synchronously_with_lease( @typed_backoff def run_sample() -> None: - _publish_messages(publisher_client, topic, message_num=10) + _ = _publish_messages(publisher_client, topic, message_num=10) # Pausing 10s to allow the subscriber to establish the connection # because sync pull often returns fewer messages than requested. # The intention is to fix flaky tests reporting errors like