Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ static Preparer newPreparer() {
* A class preparing the immutable checkpoint. It is mutable so that new messages can be added.
*/
static class Preparer {
private Instant newestPrevCheckpointTimestamp = Instant.now();
private Instant oldestMessageTimestamp = Instant.now();
private Instant newestMessageTimestamp = Instant.now();
private transient @Nullable Message lastMessage = null;

@VisibleForTesting transient boolean discarded = false;
Expand All @@ -148,17 +150,33 @@ void add(Message message) throws Exception {
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
} else if (currentMessageTimestamp.isAfter(newestMessageTimestamp)) {
newestMessageTimestamp = currentMessageTimestamp;
}
lastMessage = message;
} finally {
lock.writeLock().unlock();
}
}

Instant getOldestMessageTimestamp() {
/** Internal method for watermark. This method should be called inside read or write lock. */
private Instant getWatermarkUnlock() {
return oldestMessageTimestamp.isBefore(newestPrevCheckpointTimestamp)
? oldestMessageTimestamp
: newestPrevCheckpointTimestamp;
}

/**
* The oldest timestamp is used to estimate the watermark. It is determined by these rules.
*
* <p>1) at initialization, it sets to now. This value may be used if reader.start() returns
* false 2) at data read, it sets to the earlier one of (i) current oldest message timestamp of
* the pending checkpoint, and (ii) the newest message timestamp of the previous checkpoint
*/
Instant getWatermark() {
lock.readLock().lock();
try {
return this.oldestMessageTimestamp;
return getWatermarkUnlock();
} finally {
lock.readLock().unlock();
}
Expand All @@ -184,11 +202,12 @@ JmsCheckpointMark newCheckpoint(@Nullable MessageConsumer consumer, @Nullable Se
try {
if (discarded) {
lastMessage = null;
checkpointMark = this.emptyCheckpoint();
checkpointMark = new JmsCheckpointMark(getWatermarkUnlock(), null, null, null);
} else {
checkpointMark =
new JmsCheckpointMark(oldestMessageTimestamp, lastMessage, consumer, session);
new JmsCheckpointMark(getWatermarkUnlock(), lastMessage, consumer, session);
lastMessage = null;
newestPrevCheckpointTimestamp = newestMessageTimestamp;
oldestMessageTimestamp = Instant.now();
}
} finally {
Expand All @@ -198,7 +217,16 @@ JmsCheckpointMark newCheckpoint(@Nullable MessageConsumer consumer, @Nullable Se
}

JmsCheckpointMark emptyCheckpoint() {
return new JmsCheckpointMark(oldestMessageTimestamp, null, null, null);
lock.writeLock().lock();
try {
JmsCheckpointMark checkpointMark =
new JmsCheckpointMark(getWatermarkUnlock(), null, null, null);
newestPrevCheckpointTimestamp = newestMessageTimestamp;
oldestMessageTimestamp = Instant.now();
return checkpointMark;
} finally {
lock.writeLock().unlock();
}
}

boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ public T getCurrent() throws NoSuchElementException {

@Override
public Instant getWatermark() {
return checkpointMarkPreparer.getOldestMessageTimestamp();
return checkpointMarkPreparer.getWatermark();
}

@Override
Expand Down