Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public long identity() {
static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
/**
* If the checkpoint is for persisting: the reader who's snapshotted state we are persisting.
* If the checkpoint is for restoring: initially {@literal null}, then explicitly set.
* If the checkpoint is for restoring: {@literal null}.
* Not persisted in durable checkpoint.
* CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called
* the 'true' active reader may have changed.
Expand All @@ -248,7 +248,7 @@ static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
* Not persisted in durable checkpoint.
*/
@Nullable
private final List<String> safeToAckIds;
private List<String> safeToAckIds;

/**
* If the checkpoint is for persisting: The ACK ids of messages which have been received
Expand Down Expand Up @@ -299,6 +299,8 @@ public void finalizeCheckpoint() throws IOException {
} finally {
checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0,
"Miscounted in-flight checkpoints");
reader = null;
safeToAckIds = null;
}
}

Expand Down