Skip to content

Conversation

@mshields822
Copy link
Contributor

R: @dhalperi @tgroh

The PubsubUnboundendSource implementation has an assertion to confirm the checkpoint from which a fresh reader is instantiated has come via deserialization from an earlier finalized checkpoint. The in-process runner was reusing the checkpoint object directly, so the assertion failed. This adds the serialize/deserialize to the in-process runner, which I believe is the best solution since other UnboundedSources may be caught by the same issue. It also forces the user to exercise their checkpoint coder.

@tgroh
Copy link
Member

tgroh commented May 25, 2016

LGTM

@kennknowles
Copy link
Member

kennknowles commented May 25, 2016

Why not just fix PubsubIO to relinquish ownership? (and on the other side, accept any valid checkpoint)

@mshields822
Copy link
Contributor Author

Because

  • this change exercises more of the user's UnboundedSource machinery,
    helping them debug
  • it automatically prevents any lag due to holding on to more state than
    necessary in the checkpoint
  • the assertion in PubsubUnboundedSource is deliberate since 'checkpoints
    waiting for finalize' need to hold on to their runner instance, which thus
    shows up as a field which makes no sense when the checkpoint is restored
    into a new runner. It gave me the creeps so I added the assert.

On Wed, May 25, 2016 at 11:08 AM, Kenn Knowles notifications@github.com
wrote:

Why not just fix PubsubIO to relinquish ownership? (and on the other side,
accept any valid checkpoint)


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
#388 (comment)

@kennknowles
Copy link
Member

kennknowles commented May 25, 2016

Requiring that the checkpoint is finalized makes sense. Which sort of implies that finalize should return the true checkpoint...

@kennknowles
Copy link
Member

I should more fully state what I mean here: I agree with the change to the InProcessRunner temporarily, but also believe there is a bug in checkpoint finalization that this change will mask.

Longer term, the direct runner should do all of these:

  1. Reuse the reader
  2. Grab a checkpoint, finalize it, and use it to get a new reader
  3. Grab a checkpoint, finalize it, serialize/deserialize, and use it to get a new reader

If we temporarily have to choose between 2 and 3, then 3 is better. It exercises the important deserialization code path. If deserialization is correct, then any divergence between 2 and 3 is a bug in the source, and missing those bugs is just the short-term cost of switching to 3.

…restore. Allows same checkpoint object to be passed to new reader without a serialize/deserialize step.
@mshields822
Copy link
Contributor Author

PTAL
I've reverted the original fix and replaced it with a fix within PubsubUnboundedSource.

@mshields822 mshields822 changed the title [BEAM-306] Serialize/Deserialize checkpoints [BEAM-306] Make sure PubsubUnboundedSource works with the InProcessPipelineRunner May 25, 2016
@kennknowles
Copy link
Member

LGTM. Will merge when green.

@asfgit asfgit merged commit 521bfff into apache:master May 25, 2016
asfgit pushed a commit that referenced this pull request May 25, 2016
dhalperi pushed a commit to dhalperi/beam that referenced this pull request Aug 23, 2016
Backport Beam PR-835 "Fix NPE in BigQueryIO.TransformingReader when it contains an unsplittable reader."
tvalentyn pushed a commit to tvalentyn/beam that referenced this pull request May 15, 2018
  Explicitly define section id due to kramdown id generation changes
  Update Gemfile.lock
pl04351820 pushed a commit to pl04351820/beam that referenced this pull request Dec 20, 2023
* fix: Fixes apache#346 by reseeding for each auto id on py3.6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants