From 56258f37e5fcd9b2e30306f0ef3589da17658da9 Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 12 Sep 2025 17:21:10 -0400 Subject: [PATCH 01/11] fix(pubsub): handle pubsub message attributes correctly in write operation Modify the PubSub write operation to properly deserialize protobuf messages and handle attributes when publishing. This ensures messages with attributes are published correctly rather than being treated as raw bytes. --- .../trigger_files/beam_PostCommit_Python.json | 2 +- sdks/python/apache_beam/io/gcp/pubsub.py | 20 ++++++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 8675e9535061..1fa29a890c2f 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 28 + "modification": 29 } diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 281827db034b..c3e5251cd0d0 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -593,11 +593,21 @@ def _flush(self): import time - # The elements in buffer are already serialized bytes from the previous - # transforms - futures = [ - self._pub_client.publish(self._topic, elem) for elem in self._buffer - ] + # The elements in buffer are serialized protobuf bytes from the previous + # transforms. We need to deserialize them to extract data and attributes. + futures = [] + for elem in self._buffer: + # Deserialize the protobuf to get the original PubsubMessage + pubsub_msg = PubsubMessage._from_proto_str(elem) + + # Publish with the correct data and attributes + if self.with_attributes and pubsub_msg.attributes: + future = self._pub_client.publish( + self._topic, pubsub_msg.data, **pubsub_msg.attributes) + else: + future = self._pub_client.publish(self._topic, pubsub_msg.data) + + futures.append(future) timer_start = time.time() for future in futures: From c4e6bc592755126ebb65790a80c2b5116d17b07d Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 12 Sep 2025 20:47:15 -0400 Subject: [PATCH 02/11] fix(pubsub): replace NotImplementedError with warnings for unsupported features Change raising NotImplementedError to logging warnings when id_label or timestamp_attribute are used in PubSub writes, as these features are not yet supported. Update tests to verify warning messages instead of exception handling. --- sdks/python/apache_beam/io/gcp/pubsub.py | 11 ++++++++--- sdks/python/apache_beam/io/gcp/pubsub_test.py | 14 ++++++++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index c3e5251cd0d0..25f3dcb33f38 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -32,6 +32,7 @@ # pytype: skip-file +import logging import re from typing import Any from typing import List @@ -57,6 +58,8 @@ except ImportError: pubsub = None +_LOGGER = logging.getLogger(__name__) + __all__ = [ 'MultipleReadFromPubSub', 'PubsubMessage', @@ -565,10 +568,12 @@ def __init__(self, transform): # TODO(https://github.com/apache/beam/issues/18939): Add support for # id_label and timestamp_attribute. if transform.id_label: - raise NotImplementedError('id_label is not supported for PubSub writes') + _LOGGER.warning( + 'id_label is not supported for PubSub writes and will be ignored') if transform.timestamp_attribute: - raise NotImplementedError( - 'timestamp_attribute is not supported for PubSub writes') + _LOGGER.warning( + 'timestamp_attribute is not supported for PubSub writes and will be ' + 'ignored') def setup(self): from google.cloud import pubsub diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 5650e920e635..091746528596 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -973,10 +973,10 @@ def test_write_messages_unsupported_features(self, mock_pubsub): attributes = {'key': 'value'} payloads = [PubsubMessage(data, attributes)] + # Test that id_label logs a warning instead of raising an exception options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True - with self.assertRaisesRegex(NotImplementedError, - r'id_label is not supported'): + with self.assertLogs(level='WARNING') as log: with TestPipeline(options=options) as p: _ = ( p @@ -985,11 +985,14 @@ def test_write_messages_unsupported_features(self, mock_pubsub): 'projects/fakeprj/topics/a_topic', id_label='a_label', with_attributes=True)) + self.assertIn( + 'id_label is not supported for PubSub writes and will be ignored', + str(log.output)) + # Test that timestamp_attribute logs a warning instead of raising an exception options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True - with self.assertRaisesRegex(NotImplementedError, - r'timestamp_attribute is not supported'): + with self.assertLogs(level='WARNING') as log: with TestPipeline(options=options) as p: _ = ( p @@ -998,6 +1001,9 @@ def test_write_messages_unsupported_features(self, mock_pubsub): 'projects/fakeprj/topics/a_topic', timestamp_attribute='timestamp', with_attributes=True)) + self.assertIn( + 'timestamp_attribute is not supported for PubSub writes and will be ignored', + str(log.output)) def test_runner_api_transformation(self, unused_mock_pubsub): sink = _PubSubSink( From 6800fc7a272f58e53f8f73ca54be03531decb461 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 04:29:58 -0400 Subject: [PATCH 03/11] check pipelines when raising errors --- sdks/python/apache_beam/io/gcp/pubsub.py | 44 ++++++++++++++----- sdks/python/apache_beam/io/gcp/pubsub_test.py | 14 ++---- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 25f3dcb33f38..6f5ee23f6433 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -32,7 +32,6 @@ # pytype: skip-file -import logging import re from typing import Any from typing import List @@ -58,8 +57,6 @@ except ImportError: pubsub = None -_LOGGER = logging.getLogger(__name__) - __all__ = [ 'MultipleReadFromPubSub', 'PubsubMessage', @@ -417,6 +414,7 @@ def __init__( self.project, self.topic_name = parse_topic(topic) self.full_topic = topic self._sink = _PubSubSink(topic, id_label, timestamp_attribute) + self.pipeline_options = None # Will be set during expand() @staticmethod def message_to_proto_str(element: PubsubMessage) -> bytes: @@ -432,6 +430,9 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes: return msg._to_proto_str(for_publish=True) def expand(self, pcoll): + # Store pipeline options for use in DoFn + self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None + if self.with_attributes: pcoll = pcoll | 'ToProtobufX' >> ParDo( _AddMetricsAndMap( @@ -567,13 +568,36 @@ def __init__(self, transform): # TODO(https://github.com/apache/beam/issues/18939): Add support for # id_label and timestamp_attribute. - if transform.id_label: - _LOGGER.warning( - 'id_label is not supported for PubSub writes and will be ignored') - if transform.timestamp_attribute: - _LOGGER.warning( - 'timestamp_attribute is not supported for PubSub writes and will be ' - 'ignored') + # Only raise errors for DirectRunner or batch pipelines + pipeline_options = transform.pipeline_options + should_raise_error = False + + if pipeline_options: + from apache_beam.options.pipeline_options import StandardOptions + from apache_beam.runners.direct.direct_runner import DirectRunner + + # Check if using DirectRunner + runner_name = getattr(pipeline_options, 'runner', None) + if runner_name and 'DirectRunner' in str(runner_name): + should_raise_error = True + + # Check if in batch mode (not streaming) + standard_options = pipeline_options.view_as(StandardOptions) + if not standard_options.streaming: + should_raise_error = True + else: + # If no pipeline options available, fall back to original behavior + should_raise_error = True + + if should_raise_error: + if transform.id_label: + raise NotImplementedError( + 'id_label is not supported for PubSub writes with DirectRunner ' + 'or in batch mode') + if transform.timestamp_attribute: + raise NotImplementedError( + 'timestamp_attribute is not supported for PubSub writes with ' + 'DirectRunner or in batch mode') def setup(self): from google.cloud import pubsub diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 091746528596..5650e920e635 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -973,10 +973,10 @@ def test_write_messages_unsupported_features(self, mock_pubsub): attributes = {'key': 'value'} payloads = [PubsubMessage(data, attributes)] - # Test that id_label logs a warning instead of raising an exception options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True - with self.assertLogs(level='WARNING') as log: + with self.assertRaisesRegex(NotImplementedError, + r'id_label is not supported'): with TestPipeline(options=options) as p: _ = ( p @@ -985,14 +985,11 @@ def test_write_messages_unsupported_features(self, mock_pubsub): 'projects/fakeprj/topics/a_topic', id_label='a_label', with_attributes=True)) - self.assertIn( - 'id_label is not supported for PubSub writes and will be ignored', - str(log.output)) - # Test that timestamp_attribute logs a warning instead of raising an exception options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True - with self.assertLogs(level='WARNING') as log: + with self.assertRaisesRegex(NotImplementedError, + r'timestamp_attribute is not supported'): with TestPipeline(options=options) as p: _ = ( p @@ -1001,9 +998,6 @@ def test_write_messages_unsupported_features(self, mock_pubsub): 'projects/fakeprj/topics/a_topic', timestamp_attribute='timestamp', with_attributes=True)) - self.assertIn( - 'timestamp_attribute is not supported for PubSub writes and will be ignored', - str(log.output)) def test_runner_api_transformation(self, unused_mock_pubsub): sink = _PubSubSink( From 6c045bef39042b62e6971bdb3050b191f19d1e90 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 04:30:29 -0400 Subject: [PATCH 04/11] lint --- sdks/python/apache_beam/io/gcp/pubsub.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 6f5ee23f6433..99a710a64320 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -574,8 +574,7 @@ def __init__(self, transform): if pipeline_options: from apache_beam.options.pipeline_options import StandardOptions - from apache_beam.runners.direct.direct_runner import DirectRunner - + # Check if using DirectRunner runner_name = getattr(pipeline_options, 'runner', None) if runner_name and 'DirectRunner' in str(runner_name): From 49c3af0020376ba36b64b6ab23a80708391251bc Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 04:30:50 -0400 Subject: [PATCH 05/11] lint --- sdks/python/apache_beam/io/gcp/pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 99a710a64320..9f6d38db37c5 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -574,7 +574,7 @@ def __init__(self, transform): if pipeline_options: from apache_beam.options.pipeline_options import StandardOptions - + # Check if using DirectRunner runner_name = getattr(pipeline_options, 'runner', None) if runner_name and 'DirectRunner' in str(runner_name): From df78be0f8ccf1dc3b90f3c42a47a7557f1a77760 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 08:24:41 -0400 Subject: [PATCH 06/11] fix tests --- sdks/python/apache_beam/io/gcp/pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 9f6d38db37c5..c180b16c906b 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -577,7 +577,7 @@ def __init__(self, transform): # Check if using DirectRunner runner_name = getattr(pipeline_options, 'runner', None) - if runner_name and 'DirectRunner' in str(runner_name): + if runner_name is None or 'DirectRunner' in str(runner_name): should_raise_error = True # Check if in batch mode (not streaming) From 5223779b69dcaad270f85b2dd9d14827194ae1e8 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 10:59:48 -0400 Subject: [PATCH 07/11] fix(pubsub): improve runner detection and error messaging Add more robust runner detection logic to handle DirectRunner variants and test runners. Include detailed debug logging and error messages to help troubleshoot unsupported PubSub write scenarios. --- sdks/python/apache_beam/io/gcp/pubsub.py | 40 ++++++++++++++++++++---- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index c180b16c906b..f08361656bb8 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -576,8 +576,18 @@ def __init__(self, transform): from apache_beam.options.pipeline_options import StandardOptions # Check if using DirectRunner - runner_name = getattr(pipeline_options, 'runner', None) - if runner_name is None or 'DirectRunner' in str(runner_name): + try: + # Get runner from pipeline options + all_options = pipeline_options.get_all_options() + runner_name = all_options.get('runner', StandardOptions.DEFAULT_RUNNER) + + # Check if it's a DirectRunner variant + if (runner_name in StandardOptions.LOCAL_RUNNERS or + 'DirectRunner' in str(runner_name) or + 'TestDirectRunner' in str(runner_name)): + should_raise_error = True + except Exception: + # If we can't determine runner, assume DirectRunner for safety should_raise_error = True # Check if in batch mode (not streaming) @@ -589,14 +599,32 @@ def __init__(self, transform): should_raise_error = True if should_raise_error: + # Log debug information for troubleshooting + import logging + runner_info = getattr( + pipeline_options, 'runner', + 'None') if pipeline_options else 'No options' + streaming_info = 'Unknown' + if pipeline_options: + try: + standard_options = pipeline_options.view_as(StandardOptions) + streaming_info = f'streaming={standard_options.streaming}' + except: + streaming_info = 'streaming=unknown' + + logging.warning( + f'PubSub unsupported feature check: ' + f'runner={runner_info}, {streaming_info}') + if transform.id_label: raise NotImplementedError( - 'id_label is not supported for PubSub writes with DirectRunner ' - 'or in batch mode') + f'id_label is not supported for PubSub writes with DirectRunner ' + f'or in batch mode (runner={runner_info}, {streaming_info})') if transform.timestamp_attribute: raise NotImplementedError( - 'timestamp_attribute is not supported for PubSub writes with ' - 'DirectRunner or in batch mode') + f'timestamp_attribute is not supported for PubSub writes with ' + f'DirectRunner or in batch mode ' + f'(runner={runner_info}, {streaming_info})') def setup(self): from google.cloud import pubsub From 396b3abcb98436297c24e3b93a4a0e7fbd3f28a9 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 11:03:55 -0400 Subject: [PATCH 08/11] test(pubsub): increase test timeout durations for reliability Increase TEST_PIPELINE_DURATION_MS from 8 to 10 minutes and MESSAGE_MATCHER_TIMEOUT_S from 5 to 10 minutes to account for potential delays in test environment --- sdks/python/apache_beam/io/gcp/pubsub_integration_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index c88f4af2016d..8387fe734fc1 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -44,10 +44,10 @@ # How long TestXXXRunner will wait for pubsub_it_pipeline to run before # cancelling it. -TEST_PIPELINE_DURATION_MS = 8 * 60 * 1000 +TEST_PIPELINE_DURATION_MS = 10 * 60 * 1000 # How long PubSubMessageMatcher will wait for the correct set of messages to # appear. -MESSAGE_MATCHER_TIMEOUT_S = 5 * 60 +MESSAGE_MATCHER_TIMEOUT_S = 10 * 60 class PubSubIntegrationTest(unittest.TestCase): From 4433ce6ca5919b1b1c5280f5005060a5bd01b584 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 11:32:51 -0400 Subject: [PATCH 09/11] fix lint --- sdks/python/apache_beam/io/gcp/pubsub.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index f08361656bb8..91480b864514 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -608,13 +608,14 @@ def __init__(self, transform): if pipeline_options: try: standard_options = pipeline_options.view_as(StandardOptions) - streaming_info = f'streaming={standard_options.streaming}' - except: + streaming_info = 'streaming=%s' % standard_options.streaming + except Exception: streaming_info = 'streaming=unknown' logging.warning( - f'PubSub unsupported feature check: ' - f'runner={runner_info}, {streaming_info}') + 'PubSub unsupported feature check: runner=%s, %s', + runner_info, + streaming_info) if transform.id_label: raise NotImplementedError( From 33a597c28816fefde25810b374af24edba4c88f8 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 12:31:49 -0400 Subject: [PATCH 10/11] fix(pubsub): handle None runner case and improve debug logging Move debug logging outside error condition and log at debug level instead of warning --- sdks/python/apache_beam/io/gcp/pubsub.py | 41 ++++++++++++------------ 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 91480b864514..bfa1fa152f07 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -582,9 +582,9 @@ def __init__(self, transform): runner_name = all_options.get('runner', StandardOptions.DEFAULT_RUNNER) # Check if it's a DirectRunner variant - if (runner_name in StandardOptions.LOCAL_RUNNERS or - 'DirectRunner' in str(runner_name) or - 'TestDirectRunner' in str(runner_name)): + if (runner_name is None or + (runner_name in StandardOptions.LOCAL_RUNNERS or 'DirectRunner' + in str(runner_name) or 'TestDirectRunner' in str(runner_name))): should_raise_error = True except Exception: # If we can't determine runner, assume DirectRunner for safety @@ -598,24 +598,25 @@ def __init__(self, transform): # If no pipeline options available, fall back to original behavior should_raise_error = True + # Log debug information for troubleshooting + import logging + runner_info = getattr( + pipeline_options, 'runner', + 'None') if pipeline_options else 'No options' + streaming_info = 'Unknown' + if pipeline_options: + try: + standard_options = pipeline_options.view_as(StandardOptions) + streaming_info = 'streaming=%s' % standard_options.streaming + except Exception: + streaming_info = 'streaming=unknown' + + logging.debug( + 'PubSub unsupported feature check: runner=%s, %s', + runner_info, + streaming_info) + if should_raise_error: - # Log debug information for troubleshooting - import logging - runner_info = getattr( - pipeline_options, 'runner', - 'None') if pipeline_options else 'No options' - streaming_info = 'Unknown' - if pipeline_options: - try: - standard_options = pipeline_options.view_as(StandardOptions) - streaming_info = 'streaming=%s' % standard_options.streaming - except Exception: - streaming_info = 'streaming=unknown' - - logging.warning( - 'PubSub unsupported feature check: runner=%s, %s', - runner_info, - streaming_info) if transform.id_label: raise NotImplementedError( From a712ac8e137cb9cabb121e69395cf589edee0707 Mon Sep 17 00:00:00 2001 From: liferoad Date: Tue, 16 Sep 2025 11:22:19 -0400 Subject: [PATCH 11/11] use output_labels_supported --- sdks/python/apache_beam/io/gcp/pubsub.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index bfa1fa152f07..59eadee5538e 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -570,7 +570,7 @@ def __init__(self, transform): # id_label and timestamp_attribute. # Only raise errors for DirectRunner or batch pipelines pipeline_options = transform.pipeline_options - should_raise_error = False + output_labels_supported = True if pipeline_options: from apache_beam.options.pipeline_options import StandardOptions @@ -585,18 +585,18 @@ def __init__(self, transform): if (runner_name is None or (runner_name in StandardOptions.LOCAL_RUNNERS or 'DirectRunner' in str(runner_name) or 'TestDirectRunner' in str(runner_name))): - should_raise_error = True + output_labels_supported = False except Exception: # If we can't determine runner, assume DirectRunner for safety - should_raise_error = True + output_labels_supported = False # Check if in batch mode (not streaming) standard_options = pipeline_options.view_as(StandardOptions) if not standard_options.streaming: - should_raise_error = True + output_labels_supported = False else: # If no pipeline options available, fall back to original behavior - should_raise_error = True + output_labels_supported = False # Log debug information for troubleshooting import logging @@ -616,7 +616,7 @@ def __init__(self, transform): runner_info, streaming_info) - if should_raise_error: + if not output_labels_supported: if transform.id_label: raise NotImplementedError(