-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-53] Wire PubsubUnbounded{Source,Sink} into PubsubIO #346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| context.addStep(transform, "ParallelRead"); | ||
| context.addInput(PropertyNames.FORMAT, "pubsub"); | ||
| if (transform.getTopic() != null) { | ||
| context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this no longer supported? is this a regression of some sort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the dataflow service has a notion of 'cleanup when the job is done', and so can delete random subscriptions created if PubsubIO.Read is only given a topic. So PubsubUnboundedSource requires a subscription and PubsubIO.Read requires it to be specified.
But then this translation approach won't work since the topic has not been captured by the PubsubUnboundedSource. Dang, missed that, thanks for noticing. Perhaps I can look at the parent PubsubIO.Read PTransform?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should leave in the ability to create a subscription on-demand, but record an ugly log message that there will be a leftover subscription and proceed. We have finalizers/fault tolerance on the pipeline roadmap, but it's not here yet.
I feel like there's a JIRA issue for this general issue, but cannot find it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugly, but I capture both the subscription and topic and defer we're given a subscription to the PubsubUnboundedSource.apply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't like it.
The batch direct runner creates the subscription on one worker when the pipeline begins data processing, cleans up when done.
The streaming Google Cloud Dataflow runner creates the subscription on dax when the job is setup, cleans up when the job is terminated.
The streaming Java-only runner would create it when the pipeline graph is constructed, and never cleans it up.
The lack of cleanup plus phase difference of creation time all feels wrong...
... but I'll do it anyway.
|
Keep pinging me or @peihe as you get tests going and if you have trouble with the translator. |
|
No luck running against apache_beam-on-google runner (NoClassDefFound exceptions). |
|
Ok I'm back after getting the Google Cloud Dataflow runner rewrites working. |
| // In streaming mode must use either the custom Pubsub unbounded source/sink or | ||
| // defer to Windmill's built-in implementation. | ||
| builder.put(PubsubIO.Read.Bound.PubsubBoundedReader.class, UnsupportedIO.class); | ||
| builder.put(PubsubIO.Write.Bound.PubsubBoundedWriter.class, UnsupportedIO.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are both DoFns, not PTransforms, so I do not think this will have any effect.
@kennknowles any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added support for this in UnsupportedIO below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
|
PTAL |
|
Confirmed working with beam-on-dataflow + beam-worker for both java-only and internal pubsub sources/sinks. |
| ? "streaming" : "batch"; | ||
| String name = | ||
| transform == null ? approximateSimpleName(doFn.getClass()) : | ||
| approximatePTransformName(transform.getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix wrapping here? : should be at the start of the line
|
LGTM. I can fix the : and then merge. |
|
Thanks! On Thu, May 19, 2016 at 6:27 PM, asfgit notifications@github.com wrote:
|
* Add ValueProvider * Update DataflowPipelineRunner.java * Update DataflowPipelineRunnerHooks.java * Update DataflowPipelineRunnerHooks.java * Create TemplatingDataflowPipelineRunner.java * Add test.' * update * add test * Fix messages * Remove ValueProvider * Fix formatting. * Remove dupe * Fix typo * Fix style errors. * Fix style errors. * Fix things * fixes * Update TemplatingDataflowPipelineRunnerTest.java * Do not return null, return a useless Job instead. * Update TemplatingDataflowPipelineRunner.java * Update TemplatingDataflowPipelineRunner.java
* fix: Fixes apache#346 by reseeding for each auto id on py3.6
This also refines the handling of record ids in the sink to be random-but-reused-on-failure, using the same trick as we do for the BigQuery sink.
Still need to re-do the load tests I did a few weeks back with the actual change.
Note that last time I tested the DataflowPipelineTranslator does not kick in and replace the new transforms with the correct native transforms. Need to dig deeper.
R: @dhalperi