From 521bfff45273eeccd858e838f6748dc96607275f Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Wed, 25 May 2016 14:21:31 -0700 Subject: [PATCH] Clear PubsubUnboundedSource checkpoint state which is not needed for restore. Allows same checkpoint object to be passed to new reader without a serialize/deserialize step. --- .../java/org/apache/beam/sdk/io/PubsubUnboundedSource.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index 0492c7623677..e7634eca1696 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -233,7 +233,7 @@ public long identity() { static class PubsubCheckpoint implements UnboundedSource.CheckpointMark { /** * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting. - * If the checkpoint is for restoring: initially {@literal null}, then explicitly set. + * If the checkpoint is for restoring: {@literal null}. * Not persisted in durable checkpoint. * CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called * the 'true' active reader may have changed. @@ -248,7 +248,7 @@ static class PubsubCheckpoint implements UnboundedSource.CheckpointMark { * Not persisted in durable checkpoint. */ @Nullable - private final List safeToAckIds; + private List safeToAckIds; /** * If the checkpoint is for persisting: The ACK ids of messages which have been received @@ -299,6 +299,8 @@ public void finalizeCheckpoint() throws IOException { } finally { checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0, "Miscounted in-flight checkpoints"); + reader = null; + safeToAckIds = null; } }