Skip to content

Conversation

@liferoad
Copy link
Contributor

@liferoad liferoad commented Sep 12, 2025

Modify the PubSub write operation to properly deserialize protobuf messages and handle attributes when publishing. This ensures messages with attributes are published correctly rather than being treated as raw bytes.

TESTED:

./scripts/run_integration_test.sh --test_opts "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest::test_streaming_data_only" --runner TestDataflowRunner 

Post-commit:

https://productionresultssa2.blob.core.windows.net/actions-results/9cb12a0b-c070-4e0e-b45d-96c2727adebf/workflow-job-run-d3aedbda-7bdc-58eb-a3a2-f314b6192bfc/logs/job/job-logs.txt?rsct=text%2Fplain&se=2025-09-13T18%3A50%3A01Z&sig=MfRJuGs6N3FuJ1f%2B0xIJXBZHZiUOC42khjuIonnEISk%3D&ske=2025-09-14T05%3A40%3A50Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2025-09-13T17%3A40%3A50Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2025-05-05&sp=r&spr=https&sr=b&st=2025-09-13T18%3A39%3A56Z&sv=2025-05-05

2025-09-13T16:47:05.0820603Z [gw1] �[32mPASSED�[0m apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest::test_batch_write_data_only 

2025-09-13T16:47:48.7819588Z [gw1] �[32mPASSED�[0m apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest::test_batch_write_with_attributes 



Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

…ation

Modify the PubSub write operation to properly deserialize protobuf messages and handle attributes when publishing. This ensures messages with attributes are published correctly rather than being treated as raw bytes.
…d features

Change raising NotImplementedError to logging warnings when id_label or timestamp_attribute are used in PubSub writes, as these features are not yet supported. Update tests to verify warning messages instead of exception handling.
@codecov
Copy link

codecov bot commented Sep 13, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 56.77%. Comparing base (1cd03d3) to head (c4e6bc5).
⚠️ Report is 59 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #36140      +/-   ##
============================================
+ Coverage     56.75%   56.77%   +0.02%     
  Complexity     3385     3385              
============================================
  Files          1220     1220              
  Lines        185124   185524     +400     
  Branches       3508     3520      +12     
============================================
+ Hits         105068   105336     +268     
- Misses        76731    76851     +120     
- Partials       3325     3337      +12     
Flag Coverage Δ
python 80.91% <100.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@liferoad
Copy link
Contributor Author

This should fix #30513

Add more robust runner detection logic to handle DirectRunner variants and test runners. Include detailed debug logging and error messages to help troubleshoot unsupported PubSub write scenarios.
Increase TEST_PIPELINE_DURATION_MS from 8 to 10 minutes and MESSAGE_MATCHER_TIMEOUT_S from 5 to 10 minutes to account for potential delays in test environment
Move debug logging outside error condition and log at debug level instead of warning
@liferoad liferoad marked this pull request as ready for review September 13, 2025 18:42
@liferoad liferoad requested a review from scwhittle September 13, 2025 18:42

if should_raise_error:

if transform.id_label:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only dataflow runner with streaming can support these two parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some method that is invoked during pickling that this could be moved to instead? It seems that you need this runner check because this class is init'ed even when Dataflow streaming will swap it out and not use it.

If we have it in the pickle method perhaps we can remove all the runner checking and just unconditionally throw the error.

Alternatively perhaps it's simpler to just implement these, it shouldn't be too complex. the id can just be a random string and the timestamp should be the element data timestamp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataflow Runner with streaming support these two fields. We have to check these based on the runner and the pipeline type. Implementing them is tracked by #18939. Since this fix is blocking several postcommit workflows, I think we should merge it first.

futures = []
for elem in self._buffer:
# Deserialize the protobuf to get the original PubsubMessage
pubsub_msg = PubsubMessage._from_proto_str(elem)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pubsub client does not accept PubsubMessage to publish messages.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@liferoad
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly fixes an issue in the Pub/Sub write operation where message attributes were not being handled properly. The main change involves deserializing the protobuf message before publishing to correctly handle data and attributes. Additionally, it introduces logic to conditionally disable id_label and timestamp_attribute for runners or modes that do not support them. My feedback focuses on refactoring this new conditional logic to improve its clarity, robustness, and maintainability.


if should_raise_error:

if transform.id_label:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some method that is invoked during pickling that this could be moved to instead? It seems that you need this runner check because this class is init'ed even when Dataflow streaming will swap it out and not use it.

If we have it in the pickle method perhaps we can remove all the runner checking and just unconditionally throw the error.

Alternatively perhaps it's simpler to just implement these, it shouldn't be too complex. the id can just be a random string and the timestamp should be the element data timestamp

@liferoad liferoad requested a review from scwhittle September 16, 2025 15:25
@github-actions
Copy link
Contributor

Assigning reviewers:

R: @damccorm for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this LGTM and should help get us green on our workflows

@liferoad
Copy link
Contributor Author

I am going to merge this for now to unblock some post-commit workflows and will create new PRs if needed.

@liferoad liferoad merged commit 3cad999 into apache:master Sep 18, 2025
119 of 129 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants