From 33316d14f4ae848f2ffb423d8faf5806359eb474 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 16 Feb 2024 15:14:49 -0500 Subject: [PATCH] Attempt fix Jms watermark --- .../beam/sdk/io/jms/JmsCheckpointMark.java | 38 ++++++++++++++++--- .../org/apache/beam/sdk/io/jms/JmsIO.java | 2 +- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java index e213561917d6..25eb676a676b 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java @@ -128,7 +128,9 @@ static Preparer newPreparer() { * A class preparing the immutable checkpoint. It is mutable so that new messages can be added. */ static class Preparer { + private Instant newestPrevCheckpointTimestamp = Instant.now(); private Instant oldestMessageTimestamp = Instant.now(); + private Instant newestMessageTimestamp = Instant.now(); private transient @Nullable Message lastMessage = null; @VisibleForTesting transient boolean discarded = false; @@ -148,6 +150,8 @@ void add(Message message) throws Exception { Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp()); if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) { oldestMessageTimestamp = currentMessageTimestamp; + } else if (currentMessageTimestamp.isAfter(newestMessageTimestamp)) { + newestMessageTimestamp = currentMessageTimestamp; } lastMessage = message; } finally { @@ -155,10 +159,24 @@ void add(Message message) throws Exception { } } - Instant getOldestMessageTimestamp() { + /** Internal method for watermark. This method should be called inside read or write lock. */ + private Instant getWatermarkUnlock() { + return oldestMessageTimestamp.isBefore(newestPrevCheckpointTimestamp) + ? oldestMessageTimestamp + : newestPrevCheckpointTimestamp; + } + + /** + * The oldest timestamp is used to estimate the watermark. It is determined by these rules. + * + *

1) at initialization, it sets to now. This value may be used if reader.start() returns + * false 2) at data read, it sets to the earlier one of (i) current oldest message timestamp of + * the pending checkpoint, and (ii) the newest message timestamp of the previous checkpoint + */ + Instant getWatermark() { lock.readLock().lock(); try { - return this.oldestMessageTimestamp; + return getWatermarkUnlock(); } finally { lock.readLock().unlock(); } @@ -184,11 +202,12 @@ JmsCheckpointMark newCheckpoint(@Nullable MessageConsumer consumer, @Nullable Se try { if (discarded) { lastMessage = null; - checkpointMark = this.emptyCheckpoint(); + checkpointMark = new JmsCheckpointMark(getWatermarkUnlock(), null, null, null); } else { checkpointMark = - new JmsCheckpointMark(oldestMessageTimestamp, lastMessage, consumer, session); + new JmsCheckpointMark(getWatermarkUnlock(), lastMessage, consumer, session); lastMessage = null; + newestPrevCheckpointTimestamp = newestMessageTimestamp; oldestMessageTimestamp = Instant.now(); } } finally { @@ -198,7 +217,16 @@ JmsCheckpointMark newCheckpoint(@Nullable MessageConsumer consumer, @Nullable Se } JmsCheckpointMark emptyCheckpoint() { - return new JmsCheckpointMark(oldestMessageTimestamp, null, null, null); + lock.writeLock().lock(); + try { + JmsCheckpointMark checkpointMark = + new JmsCheckpointMark(getWatermarkUnlock(), null, null, null); + newestPrevCheckpointTimestamp = newestMessageTimestamp; + oldestMessageTimestamp = Instant.now(); + return checkpointMark; + } finally { + lock.writeLock().unlock(); + } } boolean isEmpty() { diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index cbb5738d303f..83e48cf22d8c 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -630,7 +630,7 @@ public T getCurrent() throws NoSuchElementException { @Override public Instant getWatermark() { - return checkpointMarkPreparer.getOldestMessageTimestamp(); + return checkpointMarkPreparer.getWatermark(); } @Override