Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.SortedMultiset;
Expand All @@ -39,6 +38,7 @@
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
Expand Down Expand Up @@ -143,7 +143,7 @@ class WatermarkManager {
* timestamp which indicates we have received all of the data and there will be no more on-time or
* late data. This value is represented by {@link WatermarkManager#THE_END_OF_TIME}.
*/
private interface Watermark {
@VisibleForTesting interface Watermark {
/**
* Returns the current value of this watermark.
*/
Expand Down Expand Up @@ -211,13 +211,13 @@ public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTim
*
* <p>See {@link #refresh()} for more information.
*/
private static class AppliedPTransformInputWatermark implements Watermark {
@VisibleForTesting static class AppliedPTransformInputWatermark implements Watermark {
private final Collection<? extends Watermark> inputWatermarks;
private final SortedMultiset<CommittedBundle<?>> pendingElements;

// This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key
// minimum
private final SortedMultiset<Instant> pendingTimers;
private final SortedMultiset<TimerData> pendingTimers;

// Entries in this table represent the authoritative timestamp for which
// a per-key-and-StateNamespace timer is set.
Expand Down Expand Up @@ -290,15 +290,15 @@ private synchronized void removePending(CommittedBundle<?> completed) {
pendingElements.remove(completed);
}

private synchronized Instant getEarliestTimerTimestamp() {
@VisibleForTesting synchronized Instant getEarliestTimerTimestamp() {
if (pendingTimers.isEmpty()) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} else {
return pendingTimers.firstEntry().getElement();
return pendingTimers.firstEntry().getElement().getTimestamp();
}
}

private synchronized void updateTimers(TimerUpdate update) {
@VisibleForTesting synchronized void updateTimers(TimerUpdate update) {
NavigableSet<TimerData> keyTimers =
objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>());
Table<StateNamespace, String, TimerData> existingTimersForKey =
Expand All @@ -311,10 +311,12 @@ private synchronized void updateTimers(TimerUpdate update) {
existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());

if (existingTimer == null) {
pendingTimers.add(timer.getTimestamp());
pendingTimers.add(timer);
keyTimers.add(timer);
} else if (!existingTimer.equals(timer)) {
pendingTimers.remove(existingTimer);
keyTimers.remove(existingTimer);
pendingTimers.add(timer);
keyTimers.add(timer);
} // else the timer is already set identically, so noop

Expand All @@ -329,7 +331,7 @@ private synchronized void updateTimers(TimerUpdate update) {
existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());

if (existingTimer != null) {
pendingTimers.remove(existingTimer.getTimestamp());
pendingTimers.remove(existingTimer);
keyTimers.remove(existingTimer);
existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
}
Expand All @@ -338,12 +340,14 @@ private synchronized void updateTimers(TimerUpdate update) {

for (TimerData timer : update.getCompletedTimers()) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
pendingTimers.remove(timer.getTimestamp());
keyTimers.remove(timer);
pendingTimers.remove(timer);
}
}
}

private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
@VisibleForTesting
synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
return extractFiredTimers(currentWatermark.get(), objectTimers);
}

Expand Down Expand Up @@ -1363,9 +1367,9 @@ public static final class TimerUpdateBuilder {

private TimerUpdateBuilder(StructuralKey<?> key) {
this.key = key;
this.completedTimers = new HashSet<>();
this.setTimers = new HashSet<>();
this.deletedTimers = new HashSet<>();
this.completedTimers = new LinkedHashSet<>();
this.setTimers = new LinkedHashSet<>();
this.deletedTimers = new LinkedHashSet<>();
}

/**
Expand Down Expand Up @@ -1409,9 +1413,9 @@ public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
public TimerUpdate build() {
return new TimerUpdate(
key,
ImmutableSet.copyOf(completedTimers),
ImmutableSet.copyOf(setTimers),
ImmutableSet.copyOf(deletedTimers));
ImmutableList.copyOf(completedTimers),
ImmutableList.copyOf(setTimers),
ImmutableList.copyOf(deletedTimers));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
Expand All @@ -32,15 +36,18 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
import org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
import org.apache.beam.runners.direct.WatermarkManager.Watermark;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -76,6 +83,7 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

/**
* Tests for {@link WatermarkManager}.
Expand Down Expand Up @@ -1475,6 +1483,52 @@ public void eventTimeTimersCanBeReset() {
assertThat(firstFired.getTimers(), contains(overridingTimer));
}

@Test
public void inputWatermarkDuplicates() {
Watermark mockWatermark = Mockito.mock(Watermark.class);

AppliedPTransformInputWatermark underTest =
new AppliedPTransformInputWatermark(ImmutableList.of(mockWatermark));

// Refresh
when(mockWatermark.get()).thenReturn(new Instant(0));
underTest.refresh();
assertEquals(new Instant(0), underTest.get());

// Apply a timer update
StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
TimerData timer1 = TimerData
.of("a", StateNamespaces.global(), new Instant(100), TimeDomain.EVENT_TIME);
TimerData timer2 = TimerData
.of("a", StateNamespaces.global(), new Instant(200), TimeDomain.EVENT_TIME);
underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build());

// Only the last timer update should be observable
assertEquals(timer2.getTimestamp(), underTest.getEarliestTimerTimestamp());

// Advance the input watermark
when(mockWatermark.get()).thenReturn(new Instant(1000));
underTest.refresh();
assertEquals(new Instant(1000), underTest.get()); // input watermark is not held by timers

// Examine the fired event time timers
Map<StructuralKey<?>, List<TimerData>> fired = underTest.extractFiredEventTimeTimers();
List<TimerData> timers = fired.get(key);
assertNotNull(timers);
assertThat(timers, contains(timer2));

// Update based on timer firings
underTest.updateTimers(TimerUpdate.builder(key)
.withCompletedTimers(timers).build());

// Now we should be able to advance
assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, underTest.getEarliestTimerTimestamp());

// Nothing left to fire
fired = underTest.extractFiredEventTimeTimers();
assertThat(fired.entrySet(), empty());
}

@Test
public void timerUpdateBuilderBuildAddsAllAddedTimers() {
TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3353,4 +3353,37 @@ public void onTimer(OnTimerContext c, PipelineOptions options) {

pipeline.run();
}

@Test
@Category(ValidatesRunner.class)
public void duplicateTimerSetting() {
TestStream<KV<String, String>> stream = TestStream
.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.addElements(KV.of("key1", "v1"))
.advanceWatermarkToInfinity();

PCollection<String> result = pipeline
.apply(stream)
.apply(ParDo.of(new TwoTimerDoFn()));
PAssert.that(result).containsInAnyOrder("It works");

pipeline.run().waitUntilFinish();
}

private static class TwoTimerDoFn extends DoFn<KV<String, String>, String> {
@TimerId("timer")
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void process(ProcessContext c,
@TimerId("timer") Timer timer) {
timer.offset(Duration.standardMinutes(10)).setRelative();
timer.offset(Duration.standardMinutes(30)).setRelative();
}

@OnTimer("timer")
public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
c.output("It works");
}
}
}