Skip to content

Conversation

@liferoad
Copy link
Contributor

@liferoad liferoad commented Aug 28, 2025

Implement WriteToPubSub batch mode support by using DirectRunner implementation Add PTransform override for batch mode and integration tests

Fixes #35990

Internal bug: b/441584693

Will update CHANGES.md when PR is good to go.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Implement WriteToPubSub batch mode support by using DirectRunner implementation
Add PTransform override for batch mode and integration tests
@liferoad liferoad changed the title [TEST-ONLY] feat(pubsub): add batch mode support for WriteToPubSub in DataflowRunner feat(pubsub): add batch mode support for WriteToPubSub in DataflowRunner Aug 29, 2025
# use BundleBasedDirectRunner
# since Prism does not support transform overrides
transform_overrides = _get_transform_overrides(options)
if transform_overrides:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shunping @damccorm fall back to DirectRunner for now when detecting transform overrides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked under #36011

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to apply these overrides in Prism?

If we're applying an override, it is usually because some feature is not included which is needed for the transform - it seems like it would be better to just exclude prism when the feature itself is needed vs assuming all transforms with overrides don't work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change this just for pub/sub if other transform overrides are not needed for Prism if we can confirm that. I do this simply since transform overrides are not called in Prism now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the pub/sub transform override? I assume there is a missing feature here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this PR, the batch mode does not work since WriteToPubSub does not do any write. DirectRunner uses the overrides to make it work. I just use the same idea for Dataflow Runner.

Please check the internal bug b/441584693

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Java, instead of using overrides to implement Dataflow batch, direct runner, etc we only override for Dataflow streaming (which specializes with internally publishing). I think this is a better approach because then the basic pubsub write transform works as is and overriding is just for specialization.

See
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1624
and the override (which also has an experiment to disable):
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L659

Can we do that for Python too by:

  • implementing PubsubSink in pubsub.py with something like the existing Direct runner implementation. We may need to add some pushback if the publisher client itself doesn't do it as otherwise we may pull in all the messages into memory and have them pileup in publishing and oom.
  • removing all the direct runner stuff
  • add transform override for dataflow streaming and remove it for dataflow batch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My PR makes sure only minimal changes are introduced without breaking any potential update compatibility. This is why I overrides the batch implementation instead of touching any streaming part.

return pcoll | Write(self._sink)
: are you sure we can easily change this based on the Java implementation? or is it worth matching the Java one given this issue has been existing for a while and my current PR (not perfect) solves this for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a better approach because then the basic pubsub write transform works as is and overriding is just for specialization.

I think this is the key thing - ideally you would not need to implement an override for this to work on arbitrary runners; overrides should be the exceptional case

@liferoad liferoad marked this pull request as ready for review August 29, 2025 14:36
@liferoad liferoad requested a review from scwhittle August 29, 2025 14:37
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

# use BundleBasedDirectRunner
# since Prism does not support transform overrides
transform_overrides = _get_transform_overrides(options)
if transform_overrides:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Java, instead of using overrides to implement Dataflow batch, direct runner, etc we only override for Dataflow streaming (which specializes with internally publishing). I think this is a better approach because then the basic pubsub write transform works as is and overriding is just for specialization.

See
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1624
and the override (which also has an experiment to disable):
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L659

Can we do that for Python too by:

  • implementing PubsubSink in pubsub.py with something like the existing Direct runner implementation. We may need to add some pushback if the publisher client itself doesn't do it as otherwise we may pull in all the messages into memory and have them pileup in publishing and oom.
  • removing all the direct runner stuff
  • add transform override for dataflow streaming and remove it for dataflow batch

@liferoad
Copy link
Contributor Author

liferoad commented Sep 3, 2025

close this. #36027 is the recommended way.

@liferoad liferoad closed this Sep 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: WriteToPubSub Sink breaks in batch mode

3 participants