diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index b01f166b625f..747a66719413 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -26,7 +26,6 @@
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.SortedMultiset;
@@ -39,6 +38,7 @@
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
@@ -143,7 +143,7 @@ class WatermarkManager {
* timestamp which indicates we have received all of the data and there will be no more on-time or
* late data. This value is represented by {@link WatermarkManager#THE_END_OF_TIME}.
*/
- private interface Watermark {
+ @VisibleForTesting interface Watermark {
/**
* Returns the current value of this watermark.
*/
@@ -211,13 +211,13 @@ public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTim
*
*
See {@link #refresh()} for more information.
*/
- private static class AppliedPTransformInputWatermark implements Watermark {
+ @VisibleForTesting static class AppliedPTransformInputWatermark implements Watermark {
private final Collection extends Watermark> inputWatermarks;
private final SortedMultiset> pendingElements;
// This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key
// minimum
- private final SortedMultiset pendingTimers;
+ private final SortedMultiset pendingTimers;
// Entries in this table represent the authoritative timestamp for which
// a per-key-and-StateNamespace timer is set.
@@ -290,15 +290,15 @@ private synchronized void removePending(CommittedBundle> completed) {
pendingElements.remove(completed);
}
- private synchronized Instant getEarliestTimerTimestamp() {
+ @VisibleForTesting synchronized Instant getEarliestTimerTimestamp() {
if (pendingTimers.isEmpty()) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} else {
- return pendingTimers.firstEntry().getElement();
+ return pendingTimers.firstEntry().getElement().getTimestamp();
}
}
- private synchronized void updateTimers(TimerUpdate update) {
+ @VisibleForTesting synchronized void updateTimers(TimerUpdate update) {
NavigableSet keyTimers =
objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>());
Table existingTimersForKey =
@@ -311,10 +311,12 @@ private synchronized void updateTimers(TimerUpdate update) {
existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
if (existingTimer == null) {
- pendingTimers.add(timer.getTimestamp());
+ pendingTimers.add(timer);
keyTimers.add(timer);
} else if (!existingTimer.equals(timer)) {
+ pendingTimers.remove(existingTimer);
keyTimers.remove(existingTimer);
+ pendingTimers.add(timer);
keyTimers.add(timer);
} // else the timer is already set identically, so noop
@@ -329,7 +331,7 @@ private synchronized void updateTimers(TimerUpdate update) {
existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
if (existingTimer != null) {
- pendingTimers.remove(existingTimer.getTimestamp());
+ pendingTimers.remove(existingTimer);
keyTimers.remove(existingTimer);
existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
}
@@ -338,12 +340,14 @@ private synchronized void updateTimers(TimerUpdate update) {
for (TimerData timer : update.getCompletedTimers()) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
- pendingTimers.remove(timer.getTimestamp());
+ keyTimers.remove(timer);
+ pendingTimers.remove(timer);
}
}
}
- private synchronized Map, List> extractFiredEventTimeTimers() {
+ @VisibleForTesting
+ synchronized Map, List> extractFiredEventTimeTimers() {
return extractFiredTimers(currentWatermark.get(), objectTimers);
}
@@ -1363,9 +1367,9 @@ public static final class TimerUpdateBuilder {
private TimerUpdateBuilder(StructuralKey> key) {
this.key = key;
- this.completedTimers = new HashSet<>();
- this.setTimers = new HashSet<>();
- this.deletedTimers = new HashSet<>();
+ this.completedTimers = new LinkedHashSet<>();
+ this.setTimers = new LinkedHashSet<>();
+ this.deletedTimers = new LinkedHashSet<>();
}
/**
@@ -1409,9 +1413,9 @@ public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
public TimerUpdate build() {
return new TimerUpdate(
key,
- ImmutableSet.copyOf(completedTimers),
- ImmutableSet.copyOf(setTimers),
- ImmutableSet.copyOf(deletedTimers));
+ ImmutableList.copyOf(completedTimers),
+ ImmutableList.copyOf(setTimers),
+ ImmutableList.copyOf(deletedTimers));
}
}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index c0ad157e3c9a..4f6d4da174f5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -19,10 +19,14 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -32,15 +36,18 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
+import org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.WatermarkManager.Watermark;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -76,6 +83,7 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
/**
* Tests for {@link WatermarkManager}.
@@ -1475,6 +1483,52 @@ public void eventTimeTimersCanBeReset() {
assertThat(firstFired.getTimers(), contains(overridingTimer));
}
+ @Test
+ public void inputWatermarkDuplicates() {
+ Watermark mockWatermark = Mockito.mock(Watermark.class);
+
+ AppliedPTransformInputWatermark underTest =
+ new AppliedPTransformInputWatermark(ImmutableList.of(mockWatermark));
+
+ // Refresh
+ when(mockWatermark.get()).thenReturn(new Instant(0));
+ underTest.refresh();
+ assertEquals(new Instant(0), underTest.get());
+
+ // Apply a timer update
+ StructuralKey key = StructuralKey.of("key", StringUtf8Coder.of());
+ TimerData timer1 = TimerData
+ .of("a", StateNamespaces.global(), new Instant(100), TimeDomain.EVENT_TIME);
+ TimerData timer2 = TimerData
+ .of("a", StateNamespaces.global(), new Instant(200), TimeDomain.EVENT_TIME);
+ underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build());
+
+ // Only the last timer update should be observable
+ assertEquals(timer2.getTimestamp(), underTest.getEarliestTimerTimestamp());
+
+ // Advance the input watermark
+ when(mockWatermark.get()).thenReturn(new Instant(1000));
+ underTest.refresh();
+ assertEquals(new Instant(1000), underTest.get()); // input watermark is not held by timers
+
+ // Examine the fired event time timers
+ Map, List> fired = underTest.extractFiredEventTimeTimers();
+ List timers = fired.get(key);
+ assertNotNull(timers);
+ assertThat(timers, contains(timer2));
+
+ // Update based on timer firings
+ underTest.updateTimers(TimerUpdate.builder(key)
+ .withCompletedTimers(timers).build());
+
+ // Now we should be able to advance
+ assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, underTest.getEarliestTimerTimestamp());
+
+ // Nothing left to fire
+ fired = underTest.extractFiredEventTimeTimers();
+ assertThat(fired.entrySet(), empty());
+ }
+
@Test
public void timerUpdateBuilderBuildAddsAllAddedTimers() {
TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME);
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index f6f05ad03a8f..b5026b05a503 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3353,4 +3353,37 @@ public void onTimer(OnTimerContext c, PipelineOptions options) {
pipeline.run();
}
+
+ @Test
+ @Category(ValidatesRunner.class)
+ public void duplicateTimerSetting() {
+ TestStream> stream = TestStream
+ .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+ .addElements(KV.of("key1", "v1"))
+ .advanceWatermarkToInfinity();
+
+ PCollection result = pipeline
+ .apply(stream)
+ .apply(ParDo.of(new TwoTimerDoFn()));
+ PAssert.that(result).containsInAnyOrder("It works");
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ private static class TwoTimerDoFn extends DoFn, String> {
+ @TimerId("timer")
+ private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void process(ProcessContext c,
+ @TimerId("timer") Timer timer) {
+ timer.offset(Duration.standardMinutes(10)).setRelative();
+ timer.offset(Duration.standardMinutes(30)).setRelative();
+ }
+
+ @OnTimer("timer")
+ public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
+ c.output("It works");
+ }
+ }
}