Skip to content

Conversation

@chadrik
Copy link
Contributor

@chadrik chadrik commented Aug 24, 2019

As far as I can tell Python does not care about boundedness of PCollections even in streaming mode, but external transforms do.  In my ongoing effort to get PubsubIO external transforms working I discovered that I could not generate an unbounded write using the expansion service from python: it always came back as a bounded write. 

My pipeline looks like this:

(
pipe
  | external.pubsub.ReadFromPubSub(subscription=subscription, with_attributes=True)
  | external.pubsub.WriteToPubSub(topic=OUTPUT_TOPIC, with_attributes=True)
)

The PCollections returned from the external Read are Unbounded, as expected, but python is responsible for creating the intermediate PCollection, which is always Bounded, and thus external Write generated by Java is always Bounded.

If I'm on the right track here I'll make some tests. I've manually tested it enough to confirm that it gets my external xform tests working.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@chadrik
Copy link
Contributor Author

chadrik commented Aug 24, 2019

R: @robertwb
R: @mxm

@mxm
Copy link
Contributor

mxm commented Aug 30, 2019

Just to clarify, this is to have an unbounded PCollection during the expansion of the WriteToPubSub write?

@chadrik
Copy link
Contributor Author

chadrik commented Aug 30, 2019

Just to clarify, this is to have an unbounded PCollection during the expansion of the WriteToPubSub write?

Correct.

PubSubIO.Write.expand looks like this:

    @Override
    public PDone expand(PCollection<T> input) {
      if (getTopicProvider() == null) {
        throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
      }

      switch (input.isBounded()) {
        case BOUNDED:
          input.apply(
              ParDo.of(
                  new PubsubBoundedWriter(
                      MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE),
                      MoreObjects.firstNonNull(
                          getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT))));
          return PDone.in(input.getPipeline());
        case UNBOUNDED:
          return input
              .apply(MapElements.via(getFormatFn()))
              .apply(
                  new PubsubUnboundedSink(
                      Optional.ofNullable(getPubsubClientFactory()).orElse(FACTORY),
                      NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
                      getTimestampAttribute(),
                      getIdAttribute(),
                      100 /* numShards */,
                      MoreObjects.firstNonNull(
                          getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
                      MoreObjects.firstNonNull(
                          getMaxBatchBytesSize(),
                          PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES)));
      }
      throw new RuntimeException(); // cases are exhaustive.
    }

The ReadFromPubsub was expanded by Java, but Python was not properly handling the unbounded PCollections that it returned, so when WriteToPubsub was subsequently expanded, it always produced a PubsubBoundedWriter.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems fair. The boundedness is part of the Proto and thus should be available also from within Python.

Should we add a test for this in pipeline_test.py?

@chadrik
Copy link
Contributor Author

chadrik commented Sep 4, 2019

Should we add a test for this in pipeline_test.py?

Done!

@mxm mxm merged commit 9983115 into apache:master Sep 5, 2019
@mxm
Copy link
Contributor

mxm commented Sep 5, 2019

Thanks!

@chadrik chadrik deleted the py_track_unbounded branch September 5, 2019 21:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants