From 3057f154c96b7eb7c2f43a7fbb39909a0c2e60af Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Fri, 4 Feb 2022 15:11:16 -0800 Subject: [PATCH] [BEAM-11971] Revert "Fix timer consistency in direct runner" This reverts commit 8ea77f98376c396fa1b7002c98e26187bfe2d239. --- .../beam/runners/core/TimerInternals.java | 17 +- .../runners/direct/DirectTimerInternals.java | 72 +++-- .../ExecutorServiceParallelExecutor.java | 2 +- .../beam/runners/direct/QuiescenceDriver.java | 78 +++--- .../direct/StatefulParDoEvaluatorFactory.java | 83 +++--- .../beam/runners/direct/WatermarkManager.java | 250 +++++++++++------- .../beam/runners/local/ExecutionDriver.java | 2 +- .../apache/beam/sdk/transforms/ParDoTest.java | 10 +- 8 files changed, 282 insertions(+), 232 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 143254a5b37b..965be822e0d3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -262,22 +262,11 @@ public int compareTo(TimerData that) { .compare(this.getDomain(), that.getDomain()) .compare(this.getTimerId(), that.getTimerId()) .compare(this.getTimerFamilyId(), that.getTimerFamilyId()); - int compResult = chain.result(); - if (compResult == 0 && !this.getNamespace().equals(that.getNamespace())) { + if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) { // Obtaining the stringKey may be expensive; only do so if required - compResult = this.getNamespace().stringKey().compareTo(that.getNamespace().stringKey()); + chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey()); } - return compResult; - } - - public String stringKey() { - return getNamespace().stringKey() - + "/" - + getDomain().toString() - + "/" - + getTimerFamilyId() - + ":" - + getTimerId(); + return chain.result(); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 23d011ff0c8b..5a477bb86c5e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.direct; -import java.util.Map; -import java.util.NavigableSet; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -26,8 +24,6 @@ import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -39,8 +35,6 @@ class DirectTimerInternals implements TimerInternals { private final Clock processingTimeClock; private final TransformWatermarks watermarks; private final TimerUpdateBuilder timerUpdateBuilder; - private final Map> modifiedTimers; - private final Map modifiedTimerIds; public static DirectTimerInternals create( Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) { @@ -52,11 +46,6 @@ private DirectTimerInternals( this.processingTimeClock = clock; this.watermarks = watermarks; this.timerUpdateBuilder = timerUpdateBuilder; - this.modifiedTimers = Maps.newHashMap(); - this.modifiedTimers.put(TimeDomain.EVENT_TIME, Sets.newTreeSet()); - this.modifiedTimers.put(TimeDomain.PROCESSING_TIME, Sets.newTreeSet()); - this.modifiedTimers.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, Sets.newTreeSet()); - this.modifiedTimerIds = Maps.newHashMap(); } @Override @@ -67,7 +56,8 @@ public void setTimer( Instant target, Instant outputTimestamp, TimeDomain timeDomain) { - setTimer(TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain)); + timerUpdateBuilder.setTimer( + TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain)); } /** @@ -78,8 +68,6 @@ public void setTimer( @Override public void setTimer(TimerData timerData) { timerUpdateBuilder.setTimer(timerData); - getModifiedTimersOrdered(timerData.getDomain()).add(timerData); - modifiedTimerIds.put(timerData.stringKey(), timerData); } @Override @@ -105,25 +93,27 @@ public void deleteTimer(StateNamespace namespace, String timerId, String timerFa /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ @Deprecated @Override - public void deleteTimer(TimerData timerData) { - timerUpdateBuilder.deletedTimer(timerData); - modifiedTimerIds.put(timerData.stringKey(), timerData); + public void deleteTimer(TimerData timerKey) { + timerUpdateBuilder.deletedTimer(timerKey); } public TimerUpdate getTimerUpdate() { return timerUpdateBuilder.build(); } - public NavigableSet getModifiedTimersOrdered(TimeDomain timeDomain) { - NavigableSet modified = modifiedTimers.get(timeDomain); - if (modified == null) { - throw new IllegalStateException("Unexpected time domain " + timeDomain); - } - return modified; - } - - public Map getModifiedTimerIds() { - return modifiedTimerIds; + public boolean containsUpdateForTimeBefore( + Instant maxWatermarkTime, Instant maxProcessingTime, Instant maxSynchronizedProcessingTime) { + TimerUpdate update = timerUpdateBuilder.build(); + return hasTimeBefore( + update.getSetTimers(), + maxWatermarkTime, + maxProcessingTime, + maxSynchronizedProcessingTime) + || hasTimeBefore( + update.getDeletedTimers(), + maxWatermarkTime, + maxProcessingTime, + maxSynchronizedProcessingTime); } @Override @@ -145,4 +135,32 @@ public Instant currentInputWatermarkTime() { public @Nullable Instant currentOutputWatermarkTime() { return watermarks.getOutputWatermark(); } + + private boolean hasTimeBefore( + Iterable timers, + Instant maxWatermarkTime, + Instant maxProcessingTime, + Instant maxSynchronizedProcessingTime) { + for (TimerData timerData : timers) { + Instant currentTime; + switch (timerData.getDomain()) { + case EVENT_TIME: + currentTime = maxWatermarkTime; + break; + case PROCESSING_TIME: + currentTime = maxProcessingTime; + break; + case SYNCHRONIZED_PROCESSING_TIME: + currentTime = maxSynchronizedProcessingTime; + break; + default: + throw new RuntimeException("Unexpected timeDomain " + timerData.getDomain()); + } + if (timerData.getTimestamp().isBefore(currentTime) + || timerData.getTimestamp().isEqual(currentTime)) { + return true; + } + } + return false; + } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index a43afe6118c4..a537d30f4529 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -172,7 +172,7 @@ public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry) @Override public void run() { DriverState drive = executionDriver.drive(); - if (drive.isTerminal()) { + if (drive.isTermainal()) { State newPipelineState = State.UNKNOWN; switch (drive) { case FAILED: diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java index dfd3ee486cd1..1d4fa87b8e68 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -38,7 +39,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +81,7 @@ public static ExecutionDriver create( // watermark of a PTransform before enqueuing the resulting bundle to pendingUpdates of downstream // PTransform, which can lead to watermark being updated past the emitted elements. private final Map, Collection>> inflightBundles = - Maps.newHashMap(); + new ConcurrentHashMap<>(); private final AtomicReference state = new AtomicReference<>(ExecutorState.QUIESCENT); @@ -166,17 +166,15 @@ private void processBundle(CommittedBundle bundle, AppliedPTransform private void processBundle( CommittedBundle bundle, AppliedPTransform consumer, CompletionCallback callback) { - synchronized (inflightBundles) { - inflightBundles.compute( - consumer, - (k, v) -> { - if (v == null) { - v = new ArrayList<>(); - } - v.add(bundle); - return v; - }); - } + inflightBundles.compute( + consumer, + (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(bundle); + return v; + }); outstandingWork.incrementAndGet(); bundleProcessor.process(bundle, consumer, callback); } @@ -184,28 +182,24 @@ private void processBundle( /** Fires any available timers. */ private void fireTimers() { try { - synchronized (inflightBundles) { - for (FiredTimers> transformTimers : - evaluationContext.extractFiredTimers(inflightBundles.keySet())) { - Collection delivery = transformTimers.getTimers(); - KeyedWorkItem work = - KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery); - @SuppressWarnings({"unchecked", "rawtypes"}) - CommittedBundle bundle = - evaluationContext - .createKeyedBundle( - transformTimers.getKey(), - (PCollection) - Iterables.getOnlyElement( - transformTimers.getExecutable().getMainInputs().values())) - .add(WindowedValue.valueInGlobalWindow(work)) - .commit(evaluationContext.now()); - processBundle( - bundle, - transformTimers.getExecutable(), - new TimerIterableCompletionCallback(delivery)); - state.set(ExecutorState.ACTIVE); - } + for (FiredTimers> transformTimers : + evaluationContext.extractFiredTimers(inflightBundles.keySet())) { + Collection delivery = transformTimers.getTimers(); + KeyedWorkItem work = + KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery); + @SuppressWarnings({"unchecked", "rawtypes"}) + CommittedBundle bundle = + evaluationContext + .createKeyedBundle( + transformTimers.getKey(), + (PCollection) + Iterables.getOnlyElement( + transformTimers.getExecutable().getMainInputs().values())) + .add(WindowedValue.valueInGlobalWindow(work)) + .commit(evaluationContext.now()); + processBundle( + bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery)); + state.set(ExecutorState.ACTIVE); } } catch (Exception e) { LOG.error("Internal Error while delivering timers", e); @@ -317,14 +311,12 @@ public final CommittedResult handleResult( state.set(ExecutorState.ACTIVE); } outstandingWork.decrementAndGet(); - synchronized (inflightBundles) { - inflightBundles.compute( - result.getTransform(), - (k, v) -> { - v.remove(inputBundle); - return v.isEmpty() ? null : v; - }); - } + inflightBundles.compute( + result.getTransform(), + (k, v) -> { + v.remove(inputBundle); + return v.isEmpty() ? null : v; + }); return committedResult; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 054b22d7d7b9..ebd305f4f365 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -17,10 +17,14 @@ */ package org.apache.beam.runners.direct; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + import com.google.auto.value.AutoValue; +import java.util.ArrayList; import java.util.Collections; -import java.util.NavigableSet; -import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; @@ -43,6 +47,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering; import org.joda.time.Instant; /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */ @@ -150,6 +155,7 @@ private static class StatefulParDoEvaluator implements TransformEvaluator>> { private final DoFnLifecycleManagerRemovingTransformEvaluator> delegateEvaluator; + private final List pushedBackTimers = new ArrayList<>(); private final DirectTimerInternals timerInternals; DirectStepContext stepContext; @@ -168,46 +174,45 @@ public void processElement(WindowedValue>> gbkRes for (WindowedValue> windowedValue : gbkResult.getValue().elementsIterable()) { delegateEvaluator.processElement(windowedValue); } + PriorityQueue toBeFiredTimers = + new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp)); + Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; for (TimerData timerData : gbkResult.getValue().timersIterable()) { - // Get any new or modified timers that are earlier than the current one. In order to - // maintain timer ordering, - // we need to fire these timers first. - NavigableSet earlierTimers = - timerInternals.getModifiedTimersOrdered(timerData.getDomain()).headSet(timerData, true); - while (!earlierTimers.isEmpty()) { - TimerData insertedTimer = earlierTimers.pollFirst(); - if (timerModified(insertedTimer)) { - continue; - } - // Make sure to register this timer as deleted. This could be a timer that was originally - // set for the future - // and not in the bundle but was reset to an earlier time in this bundle. If we don't - // explicity delete the - // future timer, then it will still fire. - timerInternals.deleteTimer(insertedTimer); - processTimer(insertedTimer, gbkResult.getValue().key()); - } - - // As long as the timer hasn't been modified or deleted earlier in the bundle, fire it. - if (!timerModified(timerData)) { - processTimer(timerData, gbkResult.getValue().key()); + toBeFiredTimers.add(timerData); + switch (timerData.getDomain()) { + case EVENT_TIME: + maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, timerData.getTimestamp()); + break; + case PROCESSING_TIME: + maxProcessingTime = Ordering.natural().max(maxProcessingTime, timerData.getTimestamp()); + break; + case SYNCHRONIZED_PROCESSING_TIME: + maxSynchronizedProcessingTime = + Ordering.natural().max(maxSynchronizedProcessingTime, timerData.getTimestamp()); } } - } - // Check to see if a timer has been modified inside this bundle. - private boolean timerModified(TimerData timerData) { - @Nullable - TimerData modifiedTimer = timerInternals.getModifiedTimerIds().get(timerData.stringKey()); - return modifiedTimer != null && !modifiedTimer.equals(timerData); - } - - private void processTimer(TimerData timerData, K key) throws Exception { - WindowNamespace windowNamespace = (WindowNamespace) timerData.getNamespace(); - BoundedWindow timerWindow = windowNamespace.getWindow(); - delegateEvaluator.onTimer(timerData, key, timerWindow); - clearWatermarkHold(timerData); + while (!timerInternals.containsUpdateForTimeBefore( + maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime) + && !toBeFiredTimers.isEmpty()) { + + TimerData timer = toBeFiredTimers.poll(); + checkState( + timer.getNamespace() instanceof WindowNamespace, + "Expected Timer %s to be in a %s, but got %s", + timer, + WindowNamespace.class.getSimpleName(), + timer.getNamespace().getClass().getName()); + WindowNamespace windowNamespace = (WindowNamespace) timer.getNamespace(); + BoundedWindow timerWindow = windowNamespace.getWindow(); + + delegateEvaluator.onTimer(timer, gbkResult.getValue().key(), timerWindow); + clearWatermarkHold(timer); + } + pushedBackTimers.addAll(toBeFiredTimers); } private void clearWatermarkHold(TimerData timer) { @@ -251,7 +256,9 @@ public TransformResult>> finishBundle() throws Ex watermarkHold = delegateResult.getWatermarkHold(); } - TimerUpdate timerUpdate = delegateResult.getTimerUpdate(); + TimerUpdate timerUpdate = + delegateResult.getTimerUpdate().withPushedBackTimers(pushedBackTimers); + pushedBackTimers.clear(); StepTransformResult.Builder>> regroupedResult = StepTransformResult.>>withHold( delegateResult.getTransform(), watermarkHold) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index a68021e05581..e6626d0f2589 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -26,15 +26,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Objects; -import java.util.Queue; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +45,7 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.local.Bundle; @@ -57,14 +60,16 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Queues; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SortedMultiset; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TreeMultiset; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -234,7 +239,7 @@ static class AppliedPTransformInputWatermark implements Watermark { // Entries in this table represent the authoritative timestamp for which // a per-key-and-StateNamespace timer is set. - private final Map, Map> existingTimers; + private final Map, Table> existingTimers; // This per-key sorted set allows quick retrieval of timers that should fire for a key private final Map, NavigableSet> objectTimers; @@ -338,14 +343,15 @@ private Instant getMinimumOutputTimestamp(SortedMultiset timers) { synchronized void updateTimers(TimerUpdate update) { NavigableSet keyTimers = objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>()); - Map existingTimersForKey = - existingTimers.computeIfAbsent(update.key, k -> Maps.newHashMap()); + Table existingTimersForKey = + existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create()); - HashSet newSetTimers = Sets.newHashSet(); for (TimerData timer : update.getSetTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - newSetTimers.add(timer.stringKey()); - @Nullable TimerData existingTimer = existingTimersForKey.get(timer.stringKey()); + @Nullable + TimerData existingTimer = + existingTimersForKey.get( + timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId()); if (existingTimer == null) { pendingTimers.add(timer); @@ -360,29 +366,32 @@ synchronized void updateTimers(TimerUpdate update) { keyTimers.add(timer); } - existingTimersForKey.put(timer.stringKey(), timer); + existingTimersForKey.put( + timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId(), timer); } } for (TimerData timer : update.getDeletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - @Nullable TimerData existingTimer = existingTimersForKey.get(timer.stringKey()); + @Nullable + TimerData existingTimer = + existingTimersForKey.get( + timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId()); if (existingTimer != null) { pendingTimers.remove(existingTimer); keyTimers.remove(existingTimer); - existingTimersForKey.remove(existingTimer.stringKey()); + existingTimersForKey.remove( + existingTimer.getNamespace(), + existingTimer.getTimerId() + '+' + existingTimer.getTimerFamilyId()); } } } for (TimerData timer : update.getCompletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - if (!newSetTimers.contains(timer.stringKey())) { - keyTimers.remove(timer); - pendingTimers.remove(timer); - existingTimersForKey.remove(timer.stringKey()); - } + keyTimers.remove(timer); + pendingTimers.remove(timer); } } @@ -510,7 +519,7 @@ private static class SynchronizedProcessingTimeInputWatermark implements Waterma private final Collection> pendingBundles; private final Map, NavigableSet> processingTimers; private final Map, NavigableSet> synchronizedProcessingTimers; - private final Map, Map> existingTimers; + private final Map, Table> existingTimers; private final NavigableSet pendingTimers; @@ -620,18 +629,21 @@ private Instant getMinimumOutputTimestamp(NavigableSet timers) { } private synchronized void updateTimers(TimerUpdate update) { - Map existingTimersForKey = - existingTimers.computeIfAbsent(update.key, k -> Maps.newHashMap()); + Map> timerMap = timerMap(update.key); + Table existingTimersForKey = + existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create()); - HashSet newSetTimers = Sets.newHashSet(); for (TimerData addedTimer : update.setTimers.values()) { - NavigableSet timerQueue = - processQueueForDomain(update.key, addedTimer.getDomain()); + NavigableSet timerQueue = timerMap.get(addedTimer.getDomain()); if (timerQueue == null) { continue; } - newSetTimers.add(addedTimer.stringKey()); - @Nullable TimerData existingTimer = existingTimersForKey.get(addedTimer.stringKey()); + + @Nullable + TimerData existingTimer = + existingTimersForKey.get( + addedTimer.getNamespace(), + addedTimer.getTimerId() + '+' + addedTimer.getTimerFamilyId()); if (existingTimer == null) { timerQueue.add(addedTimer); } else if (!existingTimer.equals(addedTimer)) { @@ -639,30 +651,34 @@ private synchronized void updateTimers(TimerUpdate update) { timerQueue.add(addedTimer); } // else the timer is already set identically, so noop. - existingTimersForKey.put(addedTimer.stringKey(), addedTimer); + existingTimersForKey.put( + addedTimer.getNamespace(), + addedTimer.getTimerId() + '+' + addedTimer.getTimerFamilyId(), + addedTimer); } for (TimerData deletedTimer : update.deletedTimers.values()) { - NavigableSet timerQueue = - processQueueForDomain(update.key, deletedTimer.getDomain()); + NavigableSet timerQueue = timerMap.get(deletedTimer.getDomain()); if (timerQueue == null) { continue; } - String timerKey = deletedTimer.stringKey(); - @Nullable TimerData existingTimer = existingTimersForKey.get(timerKey); + + @Nullable + TimerData existingTimer = + existingTimersForKey.get( + deletedTimer.getNamespace(), + deletedTimer.getTimerId() + '+' + deletedTimer.getTimerFamilyId()); if (existingTimer != null) { pendingTimers.remove(deletedTimer); timerQueue.remove(deletedTimer); - existingTimersForKey.remove(timerKey); + existingTimersForKey.remove( + existingTimer.getNamespace(), + existingTimer.getTimerId() + '+' + existingTimer.getTimerFamilyId()); } } for (TimerData completedTimer : update.completedTimers) { - String timerKey = completedTimer.stringKey(); - if (!newSetTimers.contains(timerKey)) { - pendingTimers.remove(completedTimer); - existingTimersForKey.remove(timerKey); - } + pendingTimers.remove(completedTimer); } // notify of TimerData update @@ -697,16 +713,15 @@ private synchronized Map, List> extractFiredDomainTi return firedTimers; } - private @Nullable NavigableSet processQueueForDomain( - StructuralKey key, TimeDomain timeDomain) { - switch (timeDomain) { - case PROCESSING_TIME: - return processingTimers.computeIfAbsent(key, k -> new TreeSet<>()); - case SYNCHRONIZED_PROCESSING_TIME: - return synchronizedProcessingTimers.computeIfAbsent(key, k -> new TreeSet<>()); - default: - return null; - } + private Map> timerMap(StructuralKey key) { + NavigableSet processingQueue = + processingTimers.computeIfAbsent(key, k -> new TreeSet<>()); + NavigableSet synchronizedProcessingQueue = + synchronizedProcessingTimers.computeIfAbsent(key, k -> new TreeSet<>()); + EnumMap> result = new EnumMap<>(TimeDomain.class); + result.put(TimeDomain.PROCESSING_TIME, processingQueue); + result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue); + return result; } @Override @@ -838,7 +853,7 @@ public Instant get() { * *

The result collection retains ordering of timers (from earliest to latest). */ - private static synchronized Map, List> extractFiredTimers( + private static Map, List> extractFiredTimers( Instant latestTime, Map, NavigableSet> objectTimers) { Map, List> result = new HashMap<>(); Set> emptyKeys = new HashSet<>(); @@ -880,7 +895,8 @@ private static synchronized Map, List> extractFiredT private final Map transformToWatermarks; /** A queue of pending updates to the state of this {@link WatermarkManager}. */ - private final Queue> pendingUpdates; + private final ConcurrentLinkedQueue> + pendingUpdates; /** A lock used to control concurrency for updating pending values. */ private final Lock refreshLock; @@ -898,7 +914,7 @@ private static synchronized Map, List> extractFiredT * bundle processor at a time. */ private final Map> transformsWithAlreadyExtractedTimers = - Maps.newHashMap(); + new ConcurrentHashMap<>(); /** * Creates a new {@link WatermarkManager}. All watermarks within the newly created {@link @@ -926,7 +942,7 @@ private WatermarkManager( this.graph = graph; this.getName = getName; - this.pendingUpdates = Queues.newArrayDeque(); + this.pendingUpdates = new ConcurrentLinkedQueue<>(); this.refreshLock = new ReentrantLock(); this.pendingRefreshes = new HashSet<>(); @@ -984,20 +1000,18 @@ private static Consumer timerUpdateConsumer( Map> transformsWithAlreadyExtractedTimers, ExecutableT executable) { return update -> { - String timerIdWithNs = update.stringKey(); - synchronized (transformsWithAlreadyExtractedTimers) { - transformsWithAlreadyExtractedTimers.compute( - executable, - (k, v) -> { - if (v != null) { - v.remove(timerIdWithNs); - if (v.isEmpty()) { - v = null; - } + String timerIdWithNs = TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(update); + transformsWithAlreadyExtractedTimers.compute( + executable, + (k, v) -> { + if (v != null) { + v.remove(timerIdWithNs); + if (v.isEmpty()) { + v = null; } - return v; - }); - } + } + return v; + }); }; } @@ -1094,11 +1108,9 @@ public void updateWatermarks( @Nullable Bundle unprocessedInputs, Iterable> outputs, Instant earliestHold) { - synchronized (pendingUpdates) { - pendingUpdates.offer( - PendingWatermarkUpdate.create( - executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold)); - } + pendingUpdates.offer( + PendingWatermarkUpdate.create( + executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold)); tryApplyPendingUpdates(); } @@ -1128,12 +1140,10 @@ private void applyAllPendingUpdates() { /** Applies up to {@code numUpdates}, or all available updates if numUpdates is non-positive. */ @GuardedBy("refreshLock") private void applyNUpdates(int numUpdates) { - synchronized (pendingUpdates) { - for (int i = 0; !pendingUpdates.isEmpty() && ((i < numUpdates) || (numUpdates <= 0)); i++) { - PendingWatermarkUpdate pending = pendingUpdates.poll(); - applyPendingUpdate(pending); - pendingRefreshes.add(pending.getExecutable()); - } + for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates <= 0); i++) { + PendingWatermarkUpdate pending = pendingUpdates.poll(); + applyPendingUpdate(pending); + pendingRefreshes.add(pending.getExecutable()); } } @@ -1259,27 +1269,26 @@ public Collection> extractFiredTimers( if (ignoredExecutables.contains(transform)) { continue; } - synchronized (transformsWithAlreadyExtractedTimers) { - if (!transformsWithAlreadyExtractedTimers.containsKey(transform)) { - TransformWatermarks watermarks = watermarksEntry.getValue(); - Collection> firedTimers = watermarks.extractFiredTimers(); - if (!firedTimers.isEmpty()) { - List newTimers = - firedTimers.stream() - .flatMap(f -> f.getTimers().stream()) - .collect(Collectors.toList()); - transformsWithAlreadyExtractedTimers.compute( - transform, - (k, v) -> { - if (v == null) { - v = new HashSet<>(); - } - final Set toUpdate = v; - newTimers.forEach(td -> toUpdate.add(td.stringKey())); - return v; - }); - allTimers.addAll(firedTimers); - } + if (!transformsWithAlreadyExtractedTimers.containsKey(transform)) { + TransformWatermarks watermarks = watermarksEntry.getValue(); + Collection> firedTimers = watermarks.extractFiredTimers(); + if (!firedTimers.isEmpty()) { + List newTimers = + firedTimers.stream() + .flatMap(f -> f.getTimers().stream()) + .collect(Collectors.toList()); + transformsWithAlreadyExtractedTimers.compute( + transform, + (k, v) -> { + if (v == null) { + v = new HashSet<>(); + } + final Set toUpdate = v; + newTimers.forEach( + td -> toUpdate.add(TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(td))); + return v; + }); + allTimers.addAll(firedTimers); } } } @@ -1554,18 +1563,24 @@ static TimerKey of(TimerData timerData) { * *

setTimers and deletedTimers are collections of {@link TimerData} that have been added to the * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as - * the input to the executed step. + * the input to the executed step. pushedBackTimers are timers that were in completedTimers at the + * input, but were pushed back due to processing constraints. */ public static class TimerUpdate { private final StructuralKey key; private final Iterable completedTimers; private final Map setTimers; private final Map deletedTimers; + private final Iterable pushedBackTimers; /** Returns a TimerUpdate for a null key with no timers. */ public static TimerUpdate empty() { return new TimerUpdate( - null, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap()); + null, + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList()); } /** @@ -1636,19 +1651,26 @@ public TimerUpdate build() { key, ImmutableList.copyOf(completedTimers), ImmutableMap.copyOf(setTimers), - ImmutableMap.copyOf(deletedTimers)); + ImmutableMap.copyOf(deletedTimers), + Collections.emptyList()); } } + private static String getTimerIdAndTimerFamilyIdWithNamespace(TimerData td) { + return td.getNamespace() + td.getTimerId() + td.getTimerFamilyId(); + } + private TimerUpdate( StructuralKey key, Iterable completedTimers, Map setTimers, - Map deletedTimers) { + Map deletedTimers, + Iterable pushedBackTimers) { this.key = key; this.completedTimers = completedTimers; this.setTimers = setTimers; this.deletedTimers = deletedTimers; + this.pushedBackTimers = pushedBackTimers; } @VisibleForTesting @@ -1671,21 +1693,46 @@ public Iterable getDeletedTimers() { return deletedTimers.values(); } + Iterable getPushedBackTimers() { + return pushedBackTimers; + } + boolean isEmpty() { - return Iterables.isEmpty(completedTimers) && setTimers.isEmpty() && deletedTimers.isEmpty(); + return Iterables.isEmpty(completedTimers) + && setTimers.isEmpty() + && deletedTimers.isEmpty() + && Iterables.isEmpty(pushedBackTimers); } /** * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers. + * Note that if any of the completed timers is in pushedBackTimers, then it is set instead. The + * pushedBackTimers are cleared afterwards. */ public TimerUpdate withCompletedTimers(Iterable completedTimers) { List timersToComplete = new ArrayList<>(); + Set pushedBack = Sets.newHashSet(pushedBackTimers); Map newSetTimers = Maps.newLinkedHashMap(); newSetTimers.putAll(setTimers); for (TimerData td : completedTimers) { - timersToComplete.add(td); + TimerKey timerKey = TimerKey.of(td); + if (!pushedBack.contains(td)) { + timersToComplete.add(td); + } else if (!newSetTimers.containsKey(timerKey)) { + newSetTimers.put(timerKey, td); + } } - return new TimerUpdate(key, timersToComplete, newSetTimers, deletedTimers); + return new TimerUpdate( + key, timersToComplete, newSetTimers, deletedTimers, Collections.emptyList()); + } + + /** + * Returns a {@link TimerUpdate} that is like this one, but with the pushedBackTimersare removed + * set by provided pushedBackTimers. + */ + public TimerUpdate withPushedBackTimers(Iterable pushedBackTimers) { + return new TimerUpdate( + key, completedTimers, setTimers, deletedTimers, Lists.newArrayList(pushedBackTimers)); } @Override @@ -1712,6 +1759,7 @@ public String toString() { .add("setTimers", setTimers) .add("completedTimers", completedTimers) .add("deletedTimers", deletedTimers) + .add("pushedBackTimers", pushedBackTimers) .toString(); } } diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java index 3638e4e2878c..c17dc5479a96 100644 --- a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java +++ b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java @@ -33,7 +33,7 @@ enum DriverState { this.terminal = terminal; } - public boolean isTerminal() { + public boolean isTermainal() { return terminal; } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 5f8d173d8ad1..f5ce5e20c4c0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -4085,8 +4085,7 @@ public void onTimer(OutputReceiver r) { ValidatesRunner.class, UsesStatefulParDo.class, UsesTimersInParDo.class, - UsesLoopingTimer.class, - UsesStrictTimerOrdering.class + UsesLoopingTimer.class }) public void testEventTimeTimerLoop() { final String stateId = "count"; @@ -4106,7 +4105,6 @@ public void testEventTimeTimerLoop() { public void processElement( @StateId(stateId) ValueState countState, @TimerId(timerId) Timer loopTimer) { - countState.write(0); loopTimer.offset(Duration.millis(1)).setRelative(); } @@ -4115,7 +4113,7 @@ public void onLoopTimer( @StateId(stateId) ValueState countState, @TimerId(timerId) Timer loopTimer, OutputReceiver r) { - int count = Preconditions.checkNotNull(countState.read()); + int count = MoreObjects.firstNonNull(countState.read(), 0); if (count < loopCount) { r.output(count); countState.write(count + 1); @@ -4125,9 +4123,7 @@ public void onLoopTimer( }; PCollection output = - pipeline - .apply(Create.of(KV.of("hello1", 42), KV.of("hello2", 42), KV.of("hello3", 42))) - .apply(ParDo.of(fn)); + pipeline.apply(Create.of(KV.of("hello", 42))).apply(ParDo.of(fn)); PAssert.that(output).containsInAnyOrder(0, 1, 2, 3, 4); pipeline.run();