From 98563326294dd570ada84becd4c4dc4d0f8b8e9b Mon Sep 17 00:00:00 2001 From: Ben Chambers Date: Thu, 8 Mar 2018 10:01:47 -0800 Subject: [PATCH 1/3] Stateful ParDo test that sets a timer twice --- .../apache/beam/sdk/transforms/ParDoTest.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index f6f05ad03a8f..b5026b05a503 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -3353,4 +3353,37 @@ public void onTimer(OnTimerContext c, PipelineOptions options) { pipeline.run(); } + + @Test + @Category(ValidatesRunner.class) + public void duplicateTimerSetting() { + TestStream> stream = TestStream + .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) + .addElements(KV.of("key1", "v1")) + .advanceWatermarkToInfinity(); + + PCollection result = pipeline + .apply(stream) + .apply(ParDo.of(new TwoTimerDoFn())); + PAssert.that(result).containsInAnyOrder("It works"); + + pipeline.run().waitUntilFinish(); + } + + private static class TwoTimerDoFn extends DoFn, String> { + @TimerId("timer") + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process(ProcessContext c, + @TimerId("timer") Timer timer) { + timer.offset(Duration.standardMinutes(10)).setRelative(); + timer.offset(Duration.standardMinutes(30)).setRelative(); + } + + @OnTimer("timer") + public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) { + c.output("It works"); + } + } } From 0a6116ad5ea4b8621146f460178d28d4eb454499 Mon Sep 17 00:00:00 2001 From: Ben Chambers Date: Thu, 8 Mar 2018 10:03:06 -0800 Subject: [PATCH 2/3] Fix handling of timers in WatermarkManager 1. TimerUpdates should use an ordered collection so that the most recent timestamp set by user code is the one taken throughout the system. 2. Application of TimerUpdates needs to keep all three data structures in sync. 3. Add unit tests that the data structures are consistent. --- .../beam/runners/direct/WatermarkManager.java | 38 +++++++------ .../runners/direct/WatermarkManagerTest.java | 56 +++++++++++++++++++ 2 files changed, 77 insertions(+), 17 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index b01f166b625f..747a66719413 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -26,7 +26,6 @@ import com.google.common.collect.ComparisonChain; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.collect.SortedMultiset; @@ -39,6 +38,7 @@ import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -143,7 +143,7 @@ class WatermarkManager { * timestamp which indicates we have received all of the data and there will be no more on-time or * late data. This value is represented by {@link WatermarkManager#THE_END_OF_TIME}. */ - private interface Watermark { + @VisibleForTesting interface Watermark { /** * Returns the current value of this watermark. */ @@ -211,13 +211,13 @@ public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTim * *

See {@link #refresh()} for more information. */ - private static class AppliedPTransformInputWatermark implements Watermark { + @VisibleForTesting static class AppliedPTransformInputWatermark implements Watermark { private final Collection inputWatermarks; private final SortedMultiset> pendingElements; // This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key // minimum - private final SortedMultiset pendingTimers; + private final SortedMultiset pendingTimers; // Entries in this table represent the authoritative timestamp for which // a per-key-and-StateNamespace timer is set. @@ -290,15 +290,15 @@ private synchronized void removePending(CommittedBundle completed) { pendingElements.remove(completed); } - private synchronized Instant getEarliestTimerTimestamp() { + @VisibleForTesting synchronized Instant getEarliestTimerTimestamp() { if (pendingTimers.isEmpty()) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } else { - return pendingTimers.firstEntry().getElement(); + return pendingTimers.firstEntry().getElement().getTimestamp(); } } - private synchronized void updateTimers(TimerUpdate update) { + @VisibleForTesting synchronized void updateTimers(TimerUpdate update) { NavigableSet keyTimers = objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>()); Table existingTimersForKey = @@ -311,10 +311,12 @@ private synchronized void updateTimers(TimerUpdate update) { existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); if (existingTimer == null) { - pendingTimers.add(timer.getTimestamp()); + pendingTimers.add(timer); keyTimers.add(timer); } else if (!existingTimer.equals(timer)) { + pendingTimers.remove(existingTimer); keyTimers.remove(existingTimer); + pendingTimers.add(timer); keyTimers.add(timer); } // else the timer is already set identically, so noop @@ -329,7 +331,7 @@ private synchronized void updateTimers(TimerUpdate update) { existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); if (existingTimer != null) { - pendingTimers.remove(existingTimer.getTimestamp()); + pendingTimers.remove(existingTimer); keyTimers.remove(existingTimer); existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId()); } @@ -338,12 +340,14 @@ private synchronized void updateTimers(TimerUpdate update) { for (TimerData timer : update.getCompletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - pendingTimers.remove(timer.getTimestamp()); + keyTimers.remove(timer); + pendingTimers.remove(timer); } } } - private synchronized Map, List> extractFiredEventTimeTimers() { + @VisibleForTesting + synchronized Map, List> extractFiredEventTimeTimers() { return extractFiredTimers(currentWatermark.get(), objectTimers); } @@ -1363,9 +1367,9 @@ public static final class TimerUpdateBuilder { private TimerUpdateBuilder(StructuralKey key) { this.key = key; - this.completedTimers = new HashSet<>(); - this.setTimers = new HashSet<>(); - this.deletedTimers = new HashSet<>(); + this.completedTimers = new LinkedHashSet<>(); + this.setTimers = new LinkedHashSet<>(); + this.deletedTimers = new LinkedHashSet<>(); } /** @@ -1409,9 +1413,9 @@ public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) { public TimerUpdate build() { return new TimerUpdate( key, - ImmutableSet.copyOf(completedTimers), - ImmutableSet.copyOf(setTimers), - ImmutableSet.copyOf(deletedTimers)); + ImmutableList.copyOf(completedTimers), + ImmutableList.copyOf(setTimers), + ImmutableList.copyOf(deletedTimers)); } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index c0ad157e3c9a..a9896e6eaf36 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -19,10 +19,14 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -32,15 +36,19 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.CommittedResult.OutputType; +import org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; +import org.apache.beam.runners.direct.WatermarkManager.Watermark; import org.apache.beam.runners.local.StructuralKey; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -64,6 +72,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.BaseMatcher; +import org.hamcrest.CoreMatchers; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -76,6 +85,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; /** * Tests for {@link WatermarkManager}. @@ -1475,6 +1485,52 @@ public void eventTimeTimersCanBeReset() { assertThat(firstFired.getTimers(), contains(overridingTimer)); } + @Test + public void inputWatermarkDuplicates() { + Watermark mockWatermark = Mockito.mock(Watermark.class); + + AppliedPTransformInputWatermark underTest = + new AppliedPTransformInputWatermark(ImmutableList.of(mockWatermark)); + + // Refresh + when(mockWatermark.get()).thenReturn(new Instant(0)); + underTest.refresh(); + assertEquals(new Instant(0), underTest.get()); + + // Apply a timer update + StructuralKey key = StructuralKey.of("key", StringUtf8Coder.of()); + TimerData timer1 = TimerData + .of("a", StateNamespaces.global(), new Instant(100), TimeDomain.EVENT_TIME); + TimerData timer2 = TimerData + .of("a", StateNamespaces.global(), new Instant(200), TimeDomain.EVENT_TIME); + underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build()); + + // Only the last timer update should be observable + assertEquals(timer2.getTimestamp(), underTest.getEarliestTimerTimestamp()); + + // Advance the input watermark + when(mockWatermark.get()).thenReturn(new Instant(1000)); + underTest.refresh(); + assertEquals(new Instant(1000), underTest.get()); // input watermark is not held by timers + + // Examine the fired event time timers + Map, List> fired = underTest.extractFiredEventTimeTimers(); + List timers = fired.get(key); + assertNotNull(timers); + assertThat(timers, contains(timer2)); + + // Update based on timer firings + underTest.updateTimers(TimerUpdate.builder(key) + .withCompletedTimers(timers).build()); + + // Now we should be able to advance + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, underTest.getEarliestTimerTimestamp()); + + // Nothing left to fire + fired = underTest.extractFiredEventTimeTimers(); + assertThat(fired.entrySet(), empty()); + } + @Test public void timerUpdateBuilderBuildAddsAllAddedTimers() { TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME); From 9a71a30ee55e371a89270545b30cbc7218d64fb6 Mon Sep 17 00:00:00 2001 From: Ben Chambers Date: Fri, 9 Mar 2018 10:22:19 -0800 Subject: [PATCH 3/3] fixup! Unused imports --- .../org/apache/beam/runners/direct/WatermarkManagerTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index a9896e6eaf36..4f6d4da174f5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.CommittedResult.OutputType; @@ -72,7 +71,6 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.BaseMatcher; -import org.hamcrest.CoreMatchers; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.Matchers;