diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index 0492c7623677..e7634eca1696 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -233,7 +233,7 @@ public long identity() { static class PubsubCheckpoint implements UnboundedSource.CheckpointMark { /** * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting. - * If the checkpoint is for restoring: initially {@literal null}, then explicitly set. + * If the checkpoint is for restoring: {@literal null}. * Not persisted in durable checkpoint. * CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called * the 'true' active reader may have changed. @@ -248,7 +248,7 @@ static class PubsubCheckpoint implements UnboundedSource.CheckpointMark { * Not persisted in durable checkpoint. */ @Nullable - private final List safeToAckIds; + private List safeToAckIds; /** * If the checkpoint is for persisting: The ACK ids of messages which have been received @@ -299,6 +299,8 @@ public void finalizeCheckpoint() throws IOException { } finally { checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0, "Miscounted in-flight checkpoints"); + reader = null; + safeToAckIds = null; } }