From e0034314ad196d2274cef9831ed63e090bf4d4c1 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 24 Jan 2018 14:13:30 -0800 Subject: [PATCH 1/6] Adds PositionT and claim callback to RestrictionTracker --- .../apex/translation/ParDoTranslator.java | 2 +- .../operators/ApexParDoOperator.java | 2 +- .../construction/PTransformMatchersTest.java | 2 +- .../construction/ParDoTranslationTest.java | 4 +- .../construction/SplittableParDoTest.java | 7 +- ...oundedSplittableProcessElementInvoker.java | 4 +- .../beam/runners/core/SimpleDoFnRunner.java | 8 +- .../SplittableParDoViaKeyedWorkItems.java | 4 +- .../core/SplittableProcessElementInvoker.java | 2 +- .../core/SplittableParDoProcessFnTest.java | 9 +- ...ttableProcessElementsEvaluatorFactory.java | 2 +- .../FlinkStreamingTransformTranslators.java | 2 +- .../streaming/SplittableDoFnOperator.java | 2 +- .../beam/sdk/transforms/DoFnTester.java | 2 +- .../org/apache/beam/sdk/transforms/Watch.java | 118 +++++++++++------- .../sdk/transforms/reflect/DoFnInvoker.java | 6 +- .../transforms/reflect/DoFnSignatures.java | 6 +- .../splittabledofn/HasDefaultTracker.java | 2 +- .../splittabledofn/OffsetRangeTracker.java | 5 +- .../splittabledofn/RestrictionTracker.java | 35 +++++- .../sdk/transforms/SplittableDoFnTest.java | 6 +- .../apache/beam/sdk/transforms/WatchTest.java | 100 ++++++++------- .../transforms/reflect/DoFnInvokersTest.java | 14 ++- .../DoFnSignaturesProcessElementTest.java | 2 +- .../DoFnSignaturesSplittableDoFnTest.java | 6 +- .../OffsetRangeTrackerTest.java | 62 ++++----- .../beam/fn/harness/FnApiDoFnRunner.java | 6 +- 27 files changed, 246 insertions(+), 174 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index 7210692b6f81..f02633d4592e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -117,7 +117,7 @@ public void translate(ParDo.MultiOutput transform, TranslationC } static class SplittableProcessElementsTranslator< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> implements TransformTranslator> { @Override diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index c410ca083260..9e05c889ded9 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -474,7 +474,7 @@ public TimerInternals timerInternals() { (StateInternalsFactory) this.currentKeyStateInternals.getFactory(); @SuppressWarnings({ "rawtypes", "unchecked" }) - ProcessFn> + ProcessFn> splittableDoFn = (ProcessFn) doFn; splittableDoFn.setStateInternalsFactory(stateInternalsFactory); TimerInternalsFactory timerInternalsFactory = key -> currentKeyTimerInternals; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index ca57e92cc32e..9ca6d9408d02 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -160,7 +160,7 @@ public void simpleProcess(ProcessContext ctxt) { ctxt.output(ctxt.element().getValue() + 1); } }; - private abstract static class SomeTracker implements RestrictionTracker {} + private abstract static class SomeTracker extends RestrictionTracker {} private DoFn, Integer> splittableDoFn = new DoFn, Integer>() { @ProcessElement diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java index a945574b6f50..9bd7fe493b56 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java @@ -231,7 +231,7 @@ public int hashCode() { private static class SplittableDropElementsFn extends DoFn, Void> { @ProcessElement - public void proc(ProcessContext context, RestrictionTracker restriction) { + public void proc(ProcessContext context, RestrictionTracker restriction) { context.output(null); } @@ -241,7 +241,7 @@ public Integer restriction(KV elem) { } @NewTracker - public RestrictionTracker newTracker(Integer restriction) { + public RestrictionTracker newTracker(Integer restriction) { throw new UnsupportedOperationException("Should never be called; only to test translation"); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java index 05c471dd2da6..68365c85bc97 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java @@ -50,13 +50,18 @@ public SomeRestrictionTracker newTracker() { } } - private static class SomeRestrictionTracker implements RestrictionTracker { + private static class SomeRestrictionTracker extends RestrictionTracker { private final SomeRestriction someRestriction; public SomeRestrictionTracker(SomeRestriction someRestriction) { this.someRestriction = someRestriction; } + @Override + protected boolean tryClaimImpl(Void position) { + return false; + } + @Override public SomeRestriction currentRestriction() { return someRestriction; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index c53efcc23b72..d43e783d6b94 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -50,7 +50,7 @@ * outputs), or runs for the given duration. */ public class OutputAndTimeBoundedSplittableProcessElementInvoker< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> extends SplittableProcessElementInvoker { private final DoFn fn; private final PipelineOptions pipelineOptions; @@ -107,7 +107,7 @@ public DoFn.ProcessContext processContext( } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { return tracker; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 6ae6754812db..d4c5775464b0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -262,7 +262,7 @@ public DoFn.OnTimerContext onTimerContext(DoFn } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( "Cannot access RestrictionTracker outside of @ProcessElement method."); } @@ -332,7 +332,7 @@ public DoFn.OnTimerContext onTimerContext(DoFn } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( "Cannot access RestrictionTracker outside of @ProcessElement method."); } @@ -504,7 +504,7 @@ public DoFn.OnTimerContext onTimerContext(DoFn } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); } @@ -615,7 +615,7 @@ public DoFn.OnTimerContext onTimerContext(DoFn } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 4e490e27c267..ff238bee2f73 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -150,7 +150,7 @@ public PCollectionTuple expand(PCollection>> /** A primitive transform wrapping around {@link ProcessFn}. */ public static class ProcessElements< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> extends PTransform< PCollection>>, PCollectionTuple> { private final ProcessKeyedElements original; @@ -211,7 +211,7 @@ public PCollectionTuple expand( */ @VisibleForTesting public static class ProcessFn< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> extends DoFn>, OutputT> { /** * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java index 5b9cbf2bfe71..9d5475a8a33a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java @@ -31,7 +31,7 @@ * DoFn}, in particular, allowing the runner to access the {@link RestrictionTracker}. */ public abstract class SplittableProcessElementInvoker< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> { + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> { /** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */ public class Result { @Nullable diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index c7bee2556fbe..29efc1bc49a9 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -83,13 +83,18 @@ public SomeRestrictionTracker newTracker() { } } - private static class SomeRestrictionTracker implements RestrictionTracker { + private static class SomeRestrictionTracker extends RestrictionTracker { private final SomeRestriction someRestriction; public SomeRestrictionTracker(SomeRestriction someRestriction) { this.someRestriction = someRestriction; } + @Override + protected boolean tryClaimImpl(Void position) { + return false; + } + @Override public SomeRestriction currentRestriction() { return someRestriction; @@ -112,7 +117,7 @@ public void checkDone() {} * possibly over multiple {@link DoFn.ProcessElement} calls). */ private static class ProcessFnTester< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> implements AutoCloseable { private final DoFnTester>, OutputT> tester; private Instant currentProcessingTime; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 44df845e908b..016ec3263a5a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -42,7 +42,7 @@ import org.joda.time.Instant; class SplittableProcessElementsEvaluatorFactory< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> implements TransformEvaluatorFactory { private final ParDoEvaluatorFactory>, OutputT> delegateFactory; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index a2923a97cc4f..b82fb4ca0bff 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -624,7 +624,7 @@ public void translateNode( } private static class SplittableProcessElementsStreamingTranslator< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< SplittableParDoViaKeyedWorkItems.ProcessElements> { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index d0d283044ee1..1a418a036986 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -55,7 +55,7 @@ * the {@code @ProcessElement} method of a splittable {@link DoFn}. */ public class SplittableDoFnOperator< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> extends DoFnOperator>, OutputT> { private transient ScheduledExecutorService executorService; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index db72ee8da4e7..159bcef75e88 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -319,7 +319,7 @@ public OnTimerContext onTimerContext(DoFn doFn) { } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester"); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index a71d9bbe7340..bbe54b046cb9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -27,6 +27,7 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -39,8 +40,6 @@ import java.io.OutputStream; import java.io.Serializable; import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -52,7 +51,6 @@ import org.apache.beam.sdk.coders.DurationCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -737,11 +735,11 @@ public ProcessContinuation process( } while (tracker.hasPending()) { c.updateWatermark(tracker.getWatermark()); - - TimestampedValue nextPending = tracker.tryClaimNextPending(); - if (nextPending == null) { + Map.Entry> entry = tracker.getNextPending(); + if (!tracker.tryClaim(entry.getKey())) { return stop(); } + TimestampedValue nextPending = entry.getValue(); c.outputWithTimestamp( KV.of(c.element(), nextPending.getValue()), nextPending.getTimestamp()); } @@ -792,10 +790,10 @@ static class GrowthState { // timestamp is more than X behind the watermark. // As of writing, we don't do this, but preserve the information for forward compatibility // in case of pipeline update. TODO: do this. - private final Map completed; + private final ImmutableMap completed; // Outputs that are known to be present in a poll result, but have not yet been returned // from a ProcessElement call, sorted by timestamp to help smooth watermark progress. - private final List> pending; + private final ImmutableMap> pending; // If true, processing of this restriction should only output "pending". Otherwise, it should // also continue polling. private final boolean isOutputComplete; @@ -805,24 +803,24 @@ static class GrowthState { @Nullable private final Instant pollWatermark; GrowthState(TerminationStateT terminationState) { - this.completed = Collections.emptyMap(); - this.pending = Collections.emptyList(); + this.completed = ImmutableMap.of(); + this.pending = ImmutableMap.of(); this.isOutputComplete = false; this.terminationState = checkNotNull(terminationState); this.pollWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; } GrowthState( - Map completed, - List> pending, + ImmutableMap completed, + ImmutableMap> pending, boolean isOutputComplete, @Nullable TerminationStateT terminationState, @Nullable Instant pollWatermark) { if (!isOutputComplete) { checkNotNull(terminationState); } - this.completed = Collections.unmodifiableMap(completed); - this.pending = Collections.unmodifiableList(pending); + this.completed = completed; + this.pending = pending; this.isOutputComplete = isOutputComplete; this.terminationState = terminationState; this.pollWatermark = pollWatermark; @@ -848,7 +846,7 @@ public String toString(Growth.TerminationCondition termina @VisibleForTesting static class GrowthTracker - implements RestrictionTracker> { + extends RestrictionTracker, HashCode> { private final Funnel coderFunnel; private final Growth.TerminationCondition terminationCondition; @@ -861,9 +859,9 @@ static class GrowthTracker // Remaining pending outputs; initialized from state.pending (if non-empty) or in // addNewAsPending(); drained via tryClaimNextPending(). - private LinkedList> pending; + private Map> pending; // Outputs that have been claimed in the current ProcessElement call. A prefix of "pending". - private List> claimed = Lists.newArrayList(); + private Map> claimed = Maps.newLinkedHashMap(); private boolean isOutputComplete; @Nullable private TerminationStateT terminationState; @Nullable private Instant pollWatermark; @@ -889,7 +887,10 @@ static class GrowthTracker this.isOutputComplete = state.isOutputComplete; this.pollWatermark = state.pollWatermark; this.terminationState = state.terminationState; - this.pending = Lists.newLinkedList(state.pending); + this.pending = Maps.newLinkedHashMapWithExpectedSize(state.pending.size()); + for (Map.Entry> entry : state.pending.entrySet()) { + this.pending.put(entry.getKey(), entry.getValue()); + } } @Override @@ -904,22 +905,23 @@ public synchronized GrowthState checkpoint() { GrowthState primary = new GrowthState<>( state.completed /* completed */, - claimed /* pending */, + ImmutableMap.copyOf(claimed) /* pending */, true /* isOutputComplete */, null /* terminationState */, BoundedWindow.TIMESTAMP_MAX_VALUE /* pollWatermark */); // residual should contain exactly the work *not* claimed in the current ProcessElement call - // unclaimed pending outputs plus future polling outputs. - Map newCompleted = Maps.newHashMap(state.completed); - for (TimestampedValue claimedOutput : claimed) { + ImmutableMap.Builder newCompleted = ImmutableMap.builder(); + newCompleted.putAll(state.completed); + for (Map.Entry> claimedOutput : claimed.entrySet()) { newCompleted.put( - hash128(claimedOutput.getValue()), claimedOutput.getTimestamp()); + claimedOutput.getKey(), claimedOutput.getValue().getTimestamp()); } GrowthState residual = new GrowthState<>( - newCompleted /* completed */, - pending /* pending */, + newCompleted.build() /* completed */, + ImmutableMap.copyOf(pending) /* pending */, isOutputComplete /* isOutputComplete */, terminationState, pollWatermark); @@ -930,7 +932,7 @@ public synchronized GrowthState checkpoint() { this.isOutputComplete = primary.isOutputComplete; this.pollWatermark = primary.pollWatermark; this.terminationState = null; - this.pending = Lists.newLinkedList(); + this.pending = Maps.newLinkedHashMap(); this.shouldStop = true; return residual; @@ -955,15 +957,21 @@ synchronized boolean hasPending() { } @VisibleForTesting - @Nullable - synchronized TimestampedValue tryClaimNextPending() { + synchronized Map.Entry> getNextPending() { + checkState (!pending.isEmpty(), "Pending set is empty"); + return pending.entrySet().iterator().next(); + } + + @Override + protected synchronized boolean tryClaimImpl(HashCode hash) { if (shouldStop) { - return null; + return false; } checkState(!pending.isEmpty(), "No more unclaimed pending outputs"); - TimestampedValue value = pending.removeFirst(); - claimed.add(value); - return value; + TimestampedValue value = pending.remove(hash); + checkArgument(value != null, "Attempted to claim unknown hash %s", hash); + claimed.put(hash, value); + return true; } @VisibleForTesting @@ -999,19 +1007,23 @@ synchronized int addNewAsPending(Growth.PollResult pollResult) { if (!newPending.isEmpty()) { terminationState = terminationCondition.onSeenNewOutput(Instant.now(), terminationState); } - this.pending = - Lists.newLinkedList( - Ordering.natural() - .onResultOf( - (Function, Comparable>) - TimestampedValue::getTimestamp) - .sortedCopy(newPending.values())); + + List>> sortedPending = + Ordering.natural() + .onResultOf( + (Map.Entry> entry) -> + entry.getValue().getTimestamp()) + .sortedCopy(newPending.entrySet()); + this.pending = Maps.newLinkedHashMap(); + for (Map.Entry> entry : sortedPending) { + this.pending.put(entry.getKey(), entry.getValue()); + } // If poll result doesn't provide a watermark, assume that future new outputs may // arrive with about the same timestamps as the current new outputs. if (pollResult.getWatermark() != null) { this.pollWatermark = pollResult.getWatermark(); } else if (!pending.isEmpty()) { - this.pollWatermark = pending.getFirst().getTimestamp(); + this.pollWatermark = pending.values().iterator().next().getTimestamp(); } if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(pollWatermark)) { isOutputComplete = true; @@ -1026,7 +1038,9 @@ synchronized Instant getWatermark() { // min(watermark for future polling, earliest remaining pending element) return Ordering.natural() .nullsLast() - .min(pollWatermark, pending.isEmpty() ? null : pending.getFirst().getTimestamp()); + .min( + pollWatermark, + pending.isEmpty() ? null : pending.values().iterator().next().getTimestamp()); } @Override @@ -1091,7 +1105,7 @@ GrowthStateCoder of( private final Coder outputCoder; private final Coder> completedCoder; - private final Coder>> pendingCoder; + private final Coder> timestampedOutputCoder; private final Coder terminationStateCoder; private GrowthStateCoder( @@ -1099,14 +1113,18 @@ private GrowthStateCoder( this.outputCoder = outputCoder; this.terminationStateCoder = terminationStateCoder; this.completedCoder = MapCoder.of(HASH_CODE_CODER, INSTANT_CODER); - this.pendingCoder = ListCoder.of(TimestampedValue.TimestampedValueCoder.of(outputCoder)); + this.timestampedOutputCoder = TimestampedValue.TimestampedValueCoder.of(outputCoder); } @Override public void encode(GrowthState value, OutputStream os) throws IOException { completedCoder.encode(value.completed, os); - pendingCoder.encode(value.pending, os); + VarIntCoder.of().encode(value.pending.size(), os); + for (Map.Entry> entry : value.pending.entrySet()) { + HASH_CODE_CODER.encode(entry.getKey(), os); + timestampedOutputCoder.encode(entry.getValue(), os); + } BOOLEAN_CODER.encode(value.isOutputComplete, os); terminationStateCoder.encode(value.terminationState, os); INSTANT_CODER.encode(value.pollWatermark, os); @@ -1115,12 +1133,22 @@ public void encode(GrowthState value, OutputSt @Override public GrowthState decode(InputStream is) throws IOException { Map completed = completedCoder.decode(is); - List> pending = pendingCoder.decode(is); + int numPending = VarIntCoder.of().decode(is); + ImmutableMap.Builder> pending = ImmutableMap.builder(); + for (int i = 0; i < numPending; ++i) { + HashCode hash = HASH_CODE_CODER.decode(is); + TimestampedValue output = timestampedOutputCoder.decode(is); + pending.put(hash, output); + } boolean isOutputComplete = BOOLEAN_CODER.decode(is); TerminationStateT terminationState = terminationStateCoder.decode(is); Instant pollWatermark = INSTANT_CODER.decode(is); return new GrowthState<>( - completed, pending, isOutputComplete, terminationState, pollWatermark); + ImmutableMap.copyOf(completed), + pending.build(), + isOutputComplete, + terminationState, + pollWatermark); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index ec2bf342a590..ddd2c3f1d0a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -78,7 +78,7 @@ void invokeSplitRestriction( DoFn.OutputReceiver restrictionReceiver); /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */ - > TrackerT invokeNewTracker( + > TrackerT invokeNewTracker( RestrictionT restriction); /** Get the bound {@link DoFn}. */ @@ -124,7 +124,7 @@ interface ArgumentProvider { * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with * the current {@link ProcessElement} call. */ - RestrictionTracker restrictionTracker(); + RestrictionTracker restrictionTracker(); /** Returns the state cell for the given {@link StateId}. */ State state(String stateId); @@ -203,7 +203,7 @@ public Timer timer(String timerId) { FakeArgumentProvider.class.getSimpleName())); } - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( String.format( "Should never call non-overridden methods of %s", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 6d6ed8a4186b..de3acfd67003 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -908,7 +908,7 @@ private static Parameter analyzeExtraParameter( List allowedParamTypes = Arrays.asList( formatType(new TypeDescriptor() {}), - formatType(new TypeDescriptor>() {})); + formatType(new TypeDescriptor>() {})); paramErrors.throwIllegalArgument( "%s is not a valid context parameter. Should be one of %s", formatType(paramT), allowedParamTypes); @@ -1131,9 +1131,9 @@ static DoFnSignature.GetRestrictionCoderMethod analyzeGetRestrictionCoderMethod( * RestrictionT}. */ private static - TypeDescriptor> restrictionTrackerTypeOf( + TypeDescriptor> restrictionTrackerTypeOf( TypeDescriptor restrictionT) { - return new TypeDescriptor>() {}.where( + return new TypeDescriptor>() {}.where( new TypeParameter() {}, restrictionT); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java index 3366dfecaacd..8badd5cc2a37 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java @@ -24,7 +24,7 @@ */ public interface HasDefaultTracker< RestrictionT extends HasDefaultTracker, - TrackerT extends RestrictionTracker> { + TrackerT extends RestrictionTracker> { /** Creates a new tracker for {@code this}. */ TrackerT newTracker(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java index 8ec2c6b6a875..6722a23b8276 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java @@ -30,7 +30,7 @@ * A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange} in a monotonically * increasing fashion. */ -public class OffsetRangeTracker implements RestrictionTracker { +public class OffsetRangeTracker extends RestrictionTracker { private OffsetRange range; @Nullable private Long lastClaimedOffset = null; @Nullable private Long lastAttemptedOffset = null; @@ -64,7 +64,8 @@ public synchronized OffsetRange checkpoint() { * @return {@code true} if the offset was successfully claimed, {@code false} if it is outside the * current {@link OffsetRange} of this tracker (in that case this operation is a no-op). */ - public synchronized boolean tryClaim(long i) { + @Override + protected synchronized boolean tryClaimImpl(Long i) { checkArgument( lastAttemptedOffset == null || i > lastAttemptedOffset, "Trying to claim offset %s while last attempted was %s", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 8cb0a6bd4baa..0c37a776023b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -23,12 +23,39 @@ * Manages concurrent access to the restriction and keeps track of its claimed part for a splittable {@link DoFn}. */ -public interface RestrictionTracker { +public abstract class RestrictionTracker { + interface ClaimObserver { + void onClaimed(PositionT position); + void onClaimFailed(PositionT position); + } + + private ClaimObserver claimObserver; + + void setClaimObserver(ClaimObserver claimObserver) { + this.claimObserver = claimObserver; + } + + public final boolean tryClaim(PositionT position) { + if (tryClaimImpl(position)) { + if (claimObserver != null) { + claimObserver.onClaimed(position); + } + return true; + } else { + if (claimObserver != null) { + claimObserver.onClaimFailed(position); + } + return false; + } + } + + protected abstract boolean tryClaimImpl(PositionT position); + /** * Returns a restriction accurately describing the full range of work the current {@link * DoFn.ProcessElement} call will do, including already completed work. */ - RestrictionT currentRestriction(); + public abstract RestrictionT currentRestriction(); /** * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible: @@ -39,7 +66,7 @@ public interface RestrictionTracker { * work: the old value of {@link #currentRestriction} is equivalent to the new value and the * return value of this method combined. Must be called at most once on a given object. */ - RestrictionT checkpoint(); + public abstract RestrictionT checkpoint(); /** * Called by the runner after {@link DoFn.ProcessElement} returns. @@ -47,7 +74,7 @@ public interface RestrictionTracker { *

Must throw an exception with an informative error message, if there is still any unclaimed * work remaining in the restriction. */ - void checkDone() throws IllegalStateException; + public abstract void checkDone() throws IllegalStateException; // TODO: Add the more general splitRemainderAfterFraction() and other methods. } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 596a335cb484..5c2281891df4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -223,7 +223,7 @@ public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker t int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); for (int i = trueStart, numIterations = 1; - tracker.tryClaim(blockStarts[i]); + tracker.tryClaim((long) blockStarts[i]); ++i, ++numIterations) { for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) { c.output(index); @@ -351,7 +351,7 @@ public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker t int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); for (int i = trueStart, numIterations = 1; - tracker.tryClaim(blockStarts[i]); + tracker.tryClaim((long) blockStarts[i]); ++i, ++numIterations) { for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) { c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index)); @@ -516,7 +516,7 @@ private enum State { @ProcessElement public void processElement(ProcessContext c, OffsetRangeTracker tracker) { assertEquals(State.INSIDE_BUNDLE, state); - assertTrue(tracker.tryClaim(0)); + assertTrue(tracker.tryClaim(0L)); c.output(c.element()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java index 2d0e6e32f098..a1c1da2c27c6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java @@ -27,19 +27,21 @@ import static org.joda.time.Duration.standardSeconds; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.hash.HashCode; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -506,8 +508,8 @@ public void testGrowthTrackerCheckpointEmpty() { assertEquals( primary.toString(condition), new GrowthState<>( - Collections.emptyMap() /* completed */, - Collections.>emptyList() /* pending */, + ImmutableMap.of() /* completed */, + ImmutableMap.of() /* pending */, true /* isOutputFinal */, (Integer) null /* terminationState */, BoundedWindow.TIMESTAMP_MAX_VALUE /* pollWatermark */) @@ -515,14 +517,21 @@ public void testGrowthTrackerCheckpointEmpty() { assertEquals( residual.toString(condition), new GrowthState<>( - Collections.emptyMap() /* completed */, - Collections.>emptyList() /* pending */, + ImmutableMap.of() /* completed */, + ImmutableMap.of() /* pending */, false /* isOutputFinal */, 0 /* terminationState */, BoundedWindow.TIMESTAMP_MIN_VALUE /* pollWatermark */) .toString(condition)); } + private String tryClaimNextPending(GrowthTracker tracker) { + assertTrue(tracker.hasPending()); + Map.Entry> entry = tracker.getNextPending(); + tracker.tryClaim(entry.getKey()); + return entry.getValue().getValue(); + } + @Test public void testGrowthTrackerCheckpointNonEmpty() { Instant now = Instant.now(); @@ -537,10 +546,8 @@ public void testGrowthTrackerCheckpointNonEmpty() { .withWatermark(now.plus(standardSeconds(7)))); assertEquals(now.plus(standardSeconds(1)), tracker.getWatermark()); - assertTrue(tracker.hasPending()); - assertEquals("a", tracker.tryClaimNextPending().getValue()); - assertTrue(tracker.hasPending()); - assertEquals("b", tracker.tryClaimNextPending().getValue()); + assertEquals("a", tryClaimNextPending(tracker)); + assertEquals("b", tryClaimNextPending(tracker)); assertTrue(tracker.hasPending()); assertEquals(now.plus(standardSeconds(3)), tracker.getWatermark()); @@ -550,10 +557,8 @@ public void testGrowthTrackerCheckpointNonEmpty() { // Verify primary: should contain what the current tracker claimed, and nothing else. assertEquals(now.plus(standardSeconds(1)), primaryTracker.getWatermark()); - assertTrue(primaryTracker.hasPending()); - assertEquals("a", primaryTracker.tryClaimNextPending().getValue()); - assertTrue(primaryTracker.hasPending()); - assertEquals("b", primaryTracker.tryClaimNextPending().getValue()); + assertEquals("a", tryClaimNextPending(primaryTracker)); + assertEquals("b", tryClaimNextPending(primaryTracker)); assertFalse(primaryTracker.hasPending()); assertFalse(primaryTracker.shouldPollMore()); // No more pending elements in primary restriction, and no polling. @@ -562,19 +567,16 @@ public void testGrowthTrackerCheckpointNonEmpty() { // Verify residual: should contain what the current tracker didn't claim. assertEquals(now.plus(standardSeconds(3)), residualTracker.getWatermark()); - assertTrue(residualTracker.hasPending()); - assertEquals("c", residualTracker.tryClaimNextPending().getValue()); - assertTrue(residualTracker.hasPending()); - assertEquals("d", residualTracker.tryClaimNextPending().getValue()); + assertEquals("c", tryClaimNextPending(residualTracker)); + assertEquals("d", tryClaimNextPending(residualTracker)); assertFalse(residualTracker.hasPending()); assertTrue(residualTracker.shouldPollMore()); // No more pending elements in residual restriction, but poll watermark still holds. assertEquals(now.plus(standardSeconds(7)), residualTracker.getWatermark()); // Verify current tracker: it was checkpointed, so should contain nothing else. - assertNull(tracker.tryClaimNextPending()); - tracker.checkDone(); assertFalse(tracker.hasPending()); + tracker.checkDone(); assertFalse(tracker.shouldPollMore()); assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, tracker.getWatermark()); } @@ -592,10 +594,10 @@ public void testGrowthTrackerOutputFullyBeforeCheckpointIncomplete() { TimestampedValue.of("b", now.plus(standardSeconds(2))))) .withWatermark(now.plus(standardSeconds(7)))); - assertEquals("a", tracker.tryClaimNextPending().getValue()); - assertEquals("b", tracker.tryClaimNextPending().getValue()); - assertEquals("c", tracker.tryClaimNextPending().getValue()); - assertEquals("d", tracker.tryClaimNextPending().getValue()); + assertEquals("a", tryClaimNextPending(tracker)); + assertEquals("b", tryClaimNextPending(tracker)); + assertEquals("c", tryClaimNextPending(tracker)); + assertEquals("d", tryClaimNextPending(tracker)); assertFalse(tracker.hasPending()); assertEquals(now.plus(standardSeconds(7)), tracker.getWatermark()); @@ -605,14 +607,10 @@ public void testGrowthTrackerOutputFullyBeforeCheckpointIncomplete() { // Verify primary: should contain what the current tracker claimed, and nothing else. assertEquals(now.plus(standardSeconds(1)), primaryTracker.getWatermark()); - assertTrue(primaryTracker.hasPending()); - assertEquals("a", primaryTracker.tryClaimNextPending().getValue()); - assertTrue(primaryTracker.hasPending()); - assertEquals("b", primaryTracker.tryClaimNextPending().getValue()); - assertTrue(primaryTracker.hasPending()); - assertEquals("c", primaryTracker.tryClaimNextPending().getValue()); - assertTrue(primaryTracker.hasPending()); - assertEquals("d", primaryTracker.tryClaimNextPending().getValue()); + assertEquals("a", tryClaimNextPending(primaryTracker)); + assertEquals("b", tryClaimNextPending(primaryTracker)); + assertEquals("c", tryClaimNextPending(primaryTracker)); + assertEquals("d", tryClaimNextPending(primaryTracker)); assertFalse(primaryTracker.hasPending()); assertFalse(primaryTracker.shouldPollMore()); // No more pending elements in primary restriction, and no polling. @@ -645,10 +643,10 @@ public void testGrowthTrackerPollAfterCheckpointIncompleteWithNewOutputs() { TimestampedValue.of("b", now.plus(standardSeconds(2))))) .withWatermark(now.plus(standardSeconds(7)))); - assertEquals("a", tracker.tryClaimNextPending().getValue()); - assertEquals("b", tracker.tryClaimNextPending().getValue()); - assertEquals("c", tracker.tryClaimNextPending().getValue()); - assertEquals("d", tracker.tryClaimNextPending().getValue()); + assertEquals("a", tryClaimNextPending(tracker)); + assertEquals("b", tryClaimNextPending(tracker)); + assertEquals("c", tryClaimNextPending(tracker)); + assertEquals("d", tryClaimNextPending(tracker)); GrowthState checkpoint = tracker.checkpoint(); // Simulate resuming from the checkpoint and adding more elements. @@ -666,9 +664,9 @@ public void testGrowthTrackerPollAfterCheckpointIncompleteWithNewOutputs() { .withWatermark(now.plus(standardSeconds(12)))); assertEquals(now.plus(standardSeconds(5)), residualTracker.getWatermark()); - assertEquals("e", residualTracker.tryClaimNextPending().getValue()); + assertEquals("e", tryClaimNextPending(residualTracker)); assertEquals(now.plus(standardSeconds(8)), residualTracker.getWatermark()); - assertEquals("f", residualTracker.tryClaimNextPending().getValue()); + assertEquals("f", tryClaimNextPending(residualTracker)); assertFalse(residualTracker.hasPending()); assertTrue(residualTracker.shouldPollMore()); @@ -688,9 +686,9 @@ public void testGrowthTrackerPollAfterCheckpointIncompleteWithNewOutputs() { TimestampedValue.of("f", now.plus(standardSeconds(8)))))); assertEquals(now.plus(standardSeconds(5)), residualTracker.getWatermark()); - assertEquals("e", residualTracker.tryClaimNextPending().getValue()); + assertEquals("e", tryClaimNextPending(residualTracker)); assertEquals(now.plus(standardSeconds(5)), residualTracker.getWatermark()); - assertEquals("f", residualTracker.tryClaimNextPending().getValue()); + assertEquals("f", tryClaimNextPending(residualTracker)); assertFalse(residualTracker.hasPending()); assertTrue(residualTracker.shouldPollMore()); @@ -711,10 +709,10 @@ public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputs() { TimestampedValue.of("b", now.plus(standardSeconds(2))))) .withWatermark(now.plus(standardSeconds(7)))); - assertEquals("a", tracker.tryClaimNextPending().getValue()); - assertEquals("b", tracker.tryClaimNextPending().getValue()); - assertEquals("c", tracker.tryClaimNextPending().getValue()); - assertEquals("d", tracker.tryClaimNextPending().getValue()); + assertEquals("a", tryClaimNextPending(tracker)); + assertEquals("b", tryClaimNextPending(tracker)); + assertEquals("c", tryClaimNextPending(tracker)); + assertEquals("d", tryClaimNextPending(tracker)); // Simulate resuming from the checkpoint but there are no new elements. GrowthState checkpoint = tracker.checkpoint(); @@ -759,10 +757,10 @@ public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputsNoWatermark() { TimestampedValue.of("c", now.plus(standardSeconds(3))), TimestampedValue.of("a", now.plus(standardSeconds(1))), TimestampedValue.of("b", now.plus(standardSeconds(2)))))); - assertEquals("a", tracker.tryClaimNextPending().getValue()); - assertEquals("b", tracker.tryClaimNextPending().getValue()); - assertEquals("c", tracker.tryClaimNextPending().getValue()); - assertEquals("d", tracker.tryClaimNextPending().getValue()); + assertEquals("a", tryClaimNextPending(tracker)); + assertEquals("b", tryClaimNextPending(tracker)); + assertEquals("c", tryClaimNextPending(tracker)); + assertEquals("d", tryClaimNextPending(tracker)); assertEquals(now.plus(standardSeconds(1)), tracker.getWatermark()); // Simulate resuming from the checkpoint but there are no new elements. @@ -822,10 +820,10 @@ public void testGrowthTrackerOutputFullyBeforeCheckpointComplete() { TimestampedValue.of("a", now.plus(standardSeconds(1))), TimestampedValue.of("b", now.plus(standardSeconds(2)))))); - assertEquals("a", tracker.tryClaimNextPending().getValue()); - assertEquals("b", tracker.tryClaimNextPending().getValue()); - assertEquals("c", tracker.tryClaimNextPending().getValue()); - assertEquals("d", tracker.tryClaimNextPending().getValue()); + assertEquals("a", tryClaimNextPending(tracker)); + assertEquals("b", tryClaimNextPending(tracker)); + assertEquals("c", tryClaimNextPending(tracker)); + assertEquals("d", tryClaimNextPending(tracker)); assertFalse(tracker.hasPending()); assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, tracker.getWatermark()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index cdd7f6007ff5..15c8bbb19a64 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -295,7 +295,7 @@ public void after() {} private static class SomeRestriction {} private abstract static class SomeRestrictionTracker - implements RestrictionTracker {} + extends RestrictionTracker {} private static class SomeRestrictionCoder extends AtomicCoder { public static SomeRestrictionCoder of() { @@ -385,7 +385,7 @@ public DoFn.ProcessContext processContext(DoFn f } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { return tracker; } })); @@ -399,7 +399,13 @@ public DefaultTracker newTracker() { } } - private static class DefaultTracker implements RestrictionTracker { + private static class DefaultTracker + extends RestrictionTracker { + @Override + protected boolean tryClaimImpl(Void position) { + throw new UnsupportedOperationException(); + } + @Override public RestrictionWithDefaultTracker currentRestriction() { throw new UnsupportedOperationException(); @@ -653,7 +659,7 @@ public DoFn.ProcessContext processContext(DoFn doFn) { } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { return null; // will not be touched } }); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java index 44ae5c4f2425..239cb2e2c257 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java @@ -39,7 +39,7 @@ public void testBadExtraProcessContextType() throws Exception { thrown.expect(IllegalArgumentException.class); thrown.expectMessage( "Integer is not a valid context parameter. " - + "Should be one of [BoundedWindow, RestrictionTracker]"); + + "Should be one of [BoundedWindow, RestrictionTracker]"); analyzeProcessElementMethod( new AnonymousMethod() { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index 50621c1bca83..7a9b209007f6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -55,7 +55,7 @@ private abstract static class SomeRestriction implements HasDefaultTracker {} private abstract static class SomeRestrictionTracker - implements RestrictionTracker {} + extends RestrictionTracker {} private abstract static class SomeRestrictionCoder extends StructuredCoder {} @@ -332,7 +332,9 @@ public void process(ProcessContext context, SomeRestrictionTracker tracker) {} DoFnSignatures.getSignature(BadFn.class); } - abstract class SomeDefaultTracker implements RestrictionTracker {} + abstract class SomeDefaultTracker + extends RestrictionTracker {} + abstract class RestrictionWithDefaultTracker implements HasDefaultTracker {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java index 8aed6b9c01ca..2d122fe699f4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java @@ -38,10 +38,10 @@ public void testTryClaim() throws Exception { OffsetRange range = new OffsetRange(100, 200); OffsetRangeTracker tracker = new OffsetRangeTracker(range); assertEquals(range, tracker.currentRestriction()); - assertTrue(tracker.tryClaim(100)); - assertTrue(tracker.tryClaim(150)); - assertTrue(tracker.tryClaim(199)); - assertFalse(tracker.tryClaim(200)); + assertTrue(tracker.tryClaim(100L)); + assertTrue(tracker.tryClaim(150L)); + assertTrue(tracker.tryClaim(199L)); + assertFalse(tracker.tryClaim(200L)); } @Test @@ -55,7 +55,7 @@ public void testCheckpointUnstarted() throws Exception { @Test public void testCheckpointJustStarted() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(100)); + assertTrue(tracker.tryClaim(100L)); OffsetRange checkpoint = tracker.checkpoint(); assertEquals(new OffsetRange(100, 101), tracker.currentRestriction()); assertEquals(new OffsetRange(101, 200), checkpoint); @@ -64,8 +64,8 @@ public void testCheckpointJustStarted() throws Exception { @Test public void testCheckpointRegular() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(105)); - assertTrue(tracker.tryClaim(110)); + assertTrue(tracker.tryClaim(105L)); + assertTrue(tracker.tryClaim(110L)); OffsetRange checkpoint = tracker.checkpoint(); assertEquals(new OffsetRange(100, 111), tracker.currentRestriction()); assertEquals(new OffsetRange(111, 200), checkpoint); @@ -74,9 +74,9 @@ public void testCheckpointRegular() throws Exception { @Test public void testCheckpointClaimedLast() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(105)); - assertTrue(tracker.tryClaim(110)); - assertTrue(tracker.tryClaim(199)); + assertTrue(tracker.tryClaim(105L)); + assertTrue(tracker.tryClaim(110L)); + assertTrue(tracker.tryClaim(199L)); OffsetRange checkpoint = tracker.checkpoint(); assertEquals(new OffsetRange(100, 200), tracker.currentRestriction()); assertEquals(new OffsetRange(200, 200), checkpoint); @@ -85,10 +85,10 @@ public void testCheckpointClaimedLast() throws Exception { @Test public void testCheckpointAfterFailedClaim() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(105)); - assertTrue(tracker.tryClaim(110)); - assertTrue(tracker.tryClaim(160)); - assertFalse(tracker.tryClaim(240)); + assertTrue(tracker.tryClaim(105L)); + assertTrue(tracker.tryClaim(110L)); + assertTrue(tracker.tryClaim(160L)); + assertFalse(tracker.tryClaim(240L)); OffsetRange checkpoint = tracker.checkpoint(); assertEquals(new OffsetRange(100, 161), tracker.currentRestriction()); assertEquals(new OffsetRange(161, 200), checkpoint); @@ -98,50 +98,50 @@ public void testCheckpointAfterFailedClaim() throws Exception { public void testNonMonotonicClaim() throws Exception { expected.expectMessage("Trying to claim offset 103 while last attempted was 110"); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(105)); - assertTrue(tracker.tryClaim(110)); - tracker.tryClaim(103); + assertTrue(tracker.tryClaim(105L)); + assertTrue(tracker.tryClaim(110L)); + tracker.tryClaim(103L); } @Test public void testClaimBeforeStartOfRange() throws Exception { expected.expectMessage("Trying to claim offset 90 before start of the range [100, 200)"); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - tracker.tryClaim(90); + tracker.tryClaim(90L); } @Test public void testCheckDoneAfterTryClaimPastEndOfRange() { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(150)); - assertTrue(tracker.tryClaim(175)); - assertFalse(tracker.tryClaim(220)); + assertTrue(tracker.tryClaim(150L)); + assertTrue(tracker.tryClaim(175L)); + assertFalse(tracker.tryClaim(220L)); tracker.checkDone(); } @Test public void testCheckDoneAfterTryClaimAtEndOfRange() { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(150)); - assertTrue(tracker.tryClaim(175)); - assertFalse(tracker.tryClaim(200)); + assertTrue(tracker.tryClaim(150L)); + assertTrue(tracker.tryClaim(175L)); + assertFalse(tracker.tryClaim(200L)); tracker.checkDone(); } @Test public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(150)); - assertTrue(tracker.tryClaim(175)); - assertTrue(tracker.tryClaim(199)); + assertTrue(tracker.tryClaim(150L)); + assertTrue(tracker.tryClaim(175L)); + assertTrue(tracker.tryClaim(199L)); tracker.checkDone(); } @Test public void testCheckDoneWhenNotDone() { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(150)); - assertTrue(tracker.tryClaim(175)); + assertTrue(tracker.tryClaim(150L)); + assertTrue(tracker.tryClaim(175L)); expected.expectMessage( "Last attempted offset was 175 in range [100, 200), " + "claiming work in [176, 200) was not attempted"); @@ -151,8 +151,8 @@ public void testCheckDoneWhenNotDone() { @Test public void testCheckDoneWhenExplicitlyMarkedDone() { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); - assertTrue(tracker.tryClaim(150)); - assertTrue(tracker.tryClaim(175)); + assertTrue(tracker.tryClaim(150L)); + assertTrue(tracker.tryClaim(175L)); tracker.markDone(); tracker.checkDone(); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index cf3a227d49be..f7dcb650b4b0 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -512,7 +512,7 @@ public DoFn.OnTimerContext onTimerContext(DoFn } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( "Cannot access RestrictionTracker outside of @ProcessElement method."); } @@ -569,7 +569,7 @@ public OnTimerContext onTimerContext(DoFn doFn) { } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); } @@ -728,7 +728,7 @@ public DoFn.OnTimerContext onTimerContext(DoFn } @Override - public RestrictionTracker restrictionTracker() { + public RestrictionTracker restrictionTracker() { throw new UnsupportedOperationException( "Cannot access RestrictionTracker outside of @ProcessElement method."); } From eca41b9db9bc45c1df52b468ea10601061f5a63d Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 24 Jan 2018 15:48:14 -0800 Subject: [PATCH 2/6] Changes OutputAndTimeBounded invoker to start checkpoint timer after first claim, and verifies more invariants --- .../operators/ApexParDoOperator.java | 2 +- ...oundedSplittableProcessElementInvoker.java | 82 ++++++++++--- ...edSplittableProcessElementInvokerTest.java | 111 ++++++++++++++---- .../core/SplittableParDoProcessFnTest.java | 29 +++-- ...ttableProcessElementsEvaluatorFactory.java | 6 +- .../org/apache/beam/sdk/transforms/Watch.java | 38 ++++-- .../splittabledofn/OffsetRangeTracker.java | 7 +- .../splittabledofn/RestrictionTracker.java | 20 +++- .../apache/beam/sdk/transforms/WatchTest.java | 40 ------- .../DoFnSignaturesSplittableDoFnTest.java | 5 +- .../OffsetRangeTrackerTest.java | 11 +- 11 files changed, 238 insertions(+), 113 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 9e05c889ded9..8db73df43bc2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -474,7 +474,7 @@ public TimerInternals timerInternals() { (StateInternalsFactory) this.currentKeyStateInternals.getFactory(); @SuppressWarnings({ "rawtypes", "unchecked" }) - ProcessFn> + ProcessFn> splittableDoFn = (ProcessFn) doFn; splittableDoFn.setStateInternalsFactory(stateInternalsFactory); TimerInternalsFactory timerInternalsFactory = key -> currentKeyTimerInternals; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index d43e783d6b94..e6965d48c98d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -50,7 +50,11 @@ * outputs), or runs for the given duration. */ public class OutputAndTimeBoundedSplittableProcessElementInvoker< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, + OutputT, + RestrictionT, + PositionT, + TrackerT extends RestrictionTracker> extends SplittableProcessElementInvoker { private final DoFn fn; private final PipelineOptions pipelineOptions; @@ -71,9 +75,10 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< * @param maxNumOutputs Maximum number of outputs, in total over all output tags, after which a * checkpoint will be requested. This is a best-effort request - the {@link DoFn} may output * more after receiving the request. - * @param maxDuration Maximum duration of the {@link DoFn.ProcessElement} call after which a - * checkpoint will be requested. This is a best-effort request - the {@link DoFn} may run for - * longer after receiving the request. + * @param maxDuration Maximum duration of the {@link DoFn.ProcessElement} call (counted from the + * first successful {@link RestrictionTracker#tryClaim} call) after which a checkpoint will be + * requested. This is a best-effort request - the {@link DoFn} may run for longer after + * receiving the request. */ public OutputAndTimeBoundedSplittableProcessElementInvoker( DoFn fn, @@ -98,6 +103,7 @@ public Result invokeProcessElement( final WindowedValue element, final TrackerT tracker) { final ProcessContext processContext = new ProcessContext(element, tracker); + tracker.setClaimObserver(processContext); DoFn.ProcessContinuation cont = invoker.invokeProcessElement( new DoFnInvoker.ArgumentProvider() { @Override @@ -157,19 +163,30 @@ public Timer timer(String timerId) { "Access to timers not supported in Splittable DoFn"); } }); - // TODO: verify that if there was a failed tryClaim() call, then cont.shouldResume() is false. - // Currently we can't verify this because there are no hooks into tryClaim(). - // See https://issues.apache.org/jira/browse/BEAM-2607 processContext.cancelScheduledCheckpoint(); @Nullable KV residual = processContext.getTakenCheckpoint(); if (cont.shouldResume()) { + checkState( + !processContext.hasClaimFailed, + "After tryClaim() returned false, @ProcessElement must return stop(), " + + "but returned resume()"); if (residual == null) { // No checkpoint had been taken by the runner while the ProcessElement call ran, however // the call says that not the whole restriction has been processed. So we need to take // a checkpoint now: checkpoint() guarantees that the primary restriction describes exactly // the work that was done in the current ProcessElement call, and returns a residual // restriction that describes exactly the work that wasn't done in the current call. - residual = checkNotNull(processContext.takeCheckpointNow()); + if (processContext.numClaimedBlocks > 0) { + residual = checkNotNull(processContext.takeCheckpointNow()); + tracker.checkDone(); + } else { + // The call returned resume() without trying to claim any blocks, i.e. it is unaware + // of any work to be done at the moment, but more might emerge later. In this case, + // we must simply reschedule the original restriction - checkpointing a tracker that + // hasn't claimed any work is not allowed. + residual = KV.of(tracker.currentRestriction(), processContext.lastReportedWatermark); + // Don't call tracker.checkDone() - it's not done. + } } else { // A checkpoint was taken by the runner, and then the ProcessElement call returned resume() // without making more tryClaim() calls (since no tryClaim() calls can succeed after @@ -180,14 +197,15 @@ public Timer timer(String timerId) { // ProcessElement call. // In other words, if we took a checkpoint *after* ProcessElement completed (like in the // branch above), it would have been equivalent to this one. + tracker.checkDone(); } } else { // The ProcessElement call returned stop() - that means the tracker's current restriction // has been fully processed by the call. A checkpoint may or may not have been taken in // "residual"; if it was, then we'll need to process it; if no, then we don't - nothing // special needs to be done. + tracker.checkDone(); } - tracker.checkDone(); if (residual == null) { // Can only be true if cont.shouldResume() is false and no checkpoint was taken. // This means the restriction has been fully processed. @@ -197,9 +215,12 @@ public Timer timer(String timerId) { return new Result(residual.getKey(), cont, residual.getValue()); } - private class ProcessContext extends DoFn.ProcessContext { + private class ProcessContext extends DoFn.ProcessContext + implements RestrictionTracker.ClaimObserver { private final WindowedValue element; private final TrackerT tracker; + private int numClaimedBlocks; + private boolean hasClaimFailed; private int numOutputs; // Checkpoint may be initiated either when the given number of outputs is reached, @@ -212,20 +233,46 @@ private class ProcessContext extends DoFn.ProcessContext { // on the output from "checkpoint". private @Nullable Instant residualWatermark; // A handle on the scheduled action to take a checkpoint. - private Future scheduledCheckpoint; + private @Nullable Future scheduledCheckpoint; private @Nullable Instant lastReportedWatermark; public ProcessContext(WindowedValue element, TrackerT tracker) { fn.super(); this.element = element; this.tracker = tracker; + } - this.scheduledCheckpoint = - executor.schedule( - (Runnable) this::takeCheckpointNow, maxDuration.getMillis(), TimeUnit.MILLISECONDS); + void checkClaimHasNotFailed() { + checkState( + !hasClaimFailed, + "Must not call tryClaim() after it has previously returned false"); + } + + @Override + public void onClaimed(PositionT position) { + checkClaimHasNotFailed(); + if (numClaimedBlocks == 0) { + // Claiming first block: can schedule the checkpoint now. + // We don't schedule it right away to prevent checkpointing before any blocks are claimed, + // in a state where no work has been done yet - because such a checkpoint is equivalent to + // the original restriction, i.e. pointless. + this.scheduledCheckpoint = + executor.schedule( + (Runnable) this::takeCheckpointNow, maxDuration.getMillis(), TimeUnit.MILLISECONDS); + } + ++numClaimedBlocks; + } + + @Override + public void onClaimFailed(PositionT position) { + checkClaimHasNotFailed(); + hasClaimFailed = true; } void cancelScheduledCheckpoint() { + if (scheduledCheckpoint == null) { + return; + } scheduledCheckpoint.cancel(true); try { Futures.getUnchecked(scheduledCheckpoint); @@ -275,6 +322,7 @@ public PaneInfo pane() { @Override public synchronized void updateWatermark(Instant watermark) { + // Updating the watermark without any claimed blocks is allowed. lastReportedWatermark = watermark; } @@ -290,8 +338,8 @@ public void output(OutputT output) { @Override public void outputWithTimestamp(OutputT value, Instant timestamp) { - output.outputWindowedValue(value, timestamp, element.getWindows(), element.getPane()); noteOutput(); + output.outputWindowedValue(value, timestamp, element.getWindows(), element.getPane()); } @Override @@ -301,12 +349,14 @@ public void output(TupleTag tag, T value) { @Override public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) { + noteOutput(); output.outputWindowedValue( tag, value, timestamp, element.getWindows(), element.getPane()); - noteOutput(); } private void noteOutput() { + checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()"); + checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()"); ++numOutputs; if (numOutputs >= maxNumOutputs) { takeCheckpointNow(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index 959909e6690e..991b9299fb8f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -27,8 +27,10 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collection; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; @@ -41,26 +43,38 @@ import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; /** Tests for {@link OutputAndTimeBoundedSplittableProcessElementInvoker}. */ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest { - private static class SomeFn extends DoFn { + @Rule + public transient ExpectedException e = ExpectedException.none(); + + private static class SomeFn extends DoFn { + private final Duration sleepBeforeFirstClaim; private final int numOutputsPerProcessCall; private final Duration sleepBeforeEachOutput; - private SomeFn(int numOutputsPerProcessCall, Duration sleepBeforeEachOutput) { + private SomeFn( + Duration sleepBeforeFirstClaim, + int numOutputsPerProcessCall, + Duration sleepBeforeEachOutput) { + this.sleepBeforeFirstClaim = sleepBeforeFirstClaim; this.numOutputsPerProcessCall = numOutputsPerProcessCall; this.sleepBeforeEachOutput = sleepBeforeEachOutput; } @ProcessElement - public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker) - throws Exception { + public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker) { + Uninterruptibles.sleepUninterruptibly( + sleepBeforeFirstClaim.getMillis(), TimeUnit.MILLISECONDS); for (long i = tracker.currentRestriction().getFrom(), numIterations = 1; tracker.tryClaim(i); ++i, ++numIterations) { - Thread.sleep(sleepBeforeEachOutput.getMillis()); + Uninterruptibles.sleepUninterruptibly( + sleepBeforeEachOutput.getMillis(), TimeUnit.MILLISECONDS); context.output("" + i); if (numIterations == numOutputsPerProcessCall) { return resume(); @@ -70,15 +84,25 @@ public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tr } @GetInitialRestriction - public OffsetRange getInitialRestriction(Integer element) { + public OffsetRange getInitialRestriction(Void element) { throw new UnsupportedOperationException("Should not be called in this test"); } } - private SplittableProcessElementInvoker.Result - runTest(int totalNumOutputs, int numOutputsPerProcessCall, Duration sleepPerElement) { - SomeFn fn = new SomeFn(numOutputsPerProcessCall, sleepPerElement); - SplittableProcessElementInvoker invoker = + private SplittableProcessElementInvoker.Result + runTest( + int totalNumOutputs, + Duration sleepBeforeFirstClaim, + int numOutputsPerProcessCall, + Duration sleepBeforeEachOutput) { + SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall, sleepBeforeEachOutput); + OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs); + return runTest(fn, initialRestriction); + } + + private SplittableProcessElementInvoker.Result + runTest(DoFn fn, OffsetRange initialRestriction) { + SplittableProcessElementInvoker invoker = new OutputAndTimeBoundedSplittableProcessElementInvoker<>( fn, PipelineOptionsFactory.create(), @@ -105,14 +129,14 @@ public void outputWindowedValue( return invoker.invokeProcessElement( DoFnInvokers.invokerFor(fn), - WindowedValue.of(totalNumOutputs, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), - new OffsetRangeTracker(new OffsetRange(0, totalNumOutputs))); + WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + new OffsetRangeTracker(initialRestriction)); } @Test public void testInvokeProcessElementOutputBounded() throws Exception { - SplittableProcessElementInvoker.Result res = - runTest(10000, Integer.MAX_VALUE, Duration.ZERO); + SplittableProcessElementInvoker.Result res = + runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.ZERO); assertFalse(res.getContinuation().shouldResume()); OffsetRange residualRange = res.getResidualRestriction(); // Should process the first 100 elements. @@ -122,8 +146,8 @@ public void testInvokeProcessElementOutputBounded() throws Exception { @Test public void testInvokeProcessElementTimeBounded() throws Exception { - SplittableProcessElementInvoker.Result res = - runTest(10000, Integer.MAX_VALUE, Duration.millis(100)); + SplittableProcessElementInvoker.Result res = + runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100)); assertFalse(res.getContinuation().shouldResume()); OffsetRange residualRange = res.getResidualRestriction(); // Should process ideally around 30 elements - but due to timing flakiness, we can't enforce @@ -133,19 +157,66 @@ public void testInvokeProcessElementTimeBounded() throws Exception { assertEquals(10000, residualRange.getTo()); } + @Test + public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws Exception { + SplittableProcessElementInvoker.Result res = + runTest(10000, Duration.standardSeconds(3), Integer.MAX_VALUE, Duration.millis(100)); + assertFalse(res.getContinuation().shouldResume()); + OffsetRange residualRange = res.getResidualRestriction(); + // Same as above, but this time it counts from the time of the first tryClaim() call + assertThat(residualRange.getFrom(), greaterThan(10L)); + assertThat(residualRange.getFrom(), lessThan(100L)); + assertEquals(10000, residualRange.getTo()); + } + @Test public void testInvokeProcessElementVoluntaryReturnStop() throws Exception { - SplittableProcessElementInvoker.Result res = - runTest(5, Integer.MAX_VALUE, Duration.millis(100)); + SplittableProcessElementInvoker.Result res = + runTest(5, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100)); assertFalse(res.getContinuation().shouldResume()); assertNull(res.getResidualRestriction()); } @Test public void testInvokeProcessElementVoluntaryReturnResume() throws Exception { - SplittableProcessElementInvoker.Result res = - runTest(10, 5, Duration.millis(100)); + SplittableProcessElementInvoker.Result res = + runTest(10, Duration.ZERO, 5, Duration.millis(100)); assertTrue(res.getContinuation().shouldResume()); assertEquals(new OffsetRange(5, 10), res.getResidualRestriction()); } + + @Test + public void testInvokeProcessElementOutputDisallowedBeforeTryClaim() throws Exception { + DoFn brokenFn = new DoFn() { + @ProcessElement + public void process(ProcessContext c, OffsetRangeTracker tracker) { + c.output("foo"); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(Void element) { + throw new UnsupportedOperationException("Should not be called in this test"); + } + }; + e.expectMessage("Output is not allowed before tryClaim()"); + runTest(brokenFn, new OffsetRange(0, 5)); + } + + @Test + public void testInvokeProcessElementOutputDisallowedAfterFailedTryClaim() throws Exception { + DoFn brokenFn = new DoFn() { + @ProcessElement + public void process(ProcessContext c, OffsetRangeTracker tracker) { + assertFalse(tracker.tryClaim(6L)); + c.output("foo"); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(Void element) { + throw new UnsupportedOperationException("Should not be called in this test"); + } + }; + e.expectMessage("Output is not allowed after a failed tryClaim()"); + runTest(brokenFn, new OffsetRange(0, 5)); + } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index 29efc1bc49a9..b9fd0ab1f41d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core; +import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.hamcrest.Matchers.contains; @@ -92,7 +93,7 @@ public SomeRestrictionTracker(SomeRestriction someRestriction) { @Override protected boolean tryClaimImpl(Void position) { - return false; + return true; } @Override @@ -113,11 +114,15 @@ public void checkDone() {} public TestPipeline pipeline = TestPipeline.create(); /** - * A helper for testing {@link ProcessFn} on 1 element (but - * possibly over multiple {@link DoFn.ProcessElement} calls). + * A helper for testing {@link ProcessFn} on 1 element (but possibly over multiple {@link + * DoFn.ProcessElement} calls). */ private static class ProcessFnTester< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, + OutputT, + RestrictionT, + PositionT, + TrackerT extends RestrictionTracker> implements AutoCloseable { private final DoFnTester>, OutputT> tester; private Instant currentProcessingTime; @@ -265,6 +270,7 @@ public void outputWindowedValue( private static class ToStringFn extends DoFn { @ProcessElement public void process(ProcessContext c, SomeRestrictionTracker tracker) { + checkState(tracker.tryClaim(null)); c.output(c.element().toString() + "a"); c.output(c.element().toString() + "b"); c.output(c.element().toString() + "c"); @@ -289,7 +295,7 @@ public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exce new IntervalWindow( base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1))); - ProcessFnTester tester = + ProcessFnTester tester = new ProcessFnTester<>( base, fn, @@ -337,7 +343,7 @@ public void testUpdatesWatermark() throws Exception { DoFn fn = new WatermarkUpdateFn(); Instant base = Instant.now(); - ProcessFnTester tester = + ProcessFnTester tester = new ProcessFnTester<>( base, fn, @@ -363,6 +369,7 @@ public void testUpdatesWatermark() throws Exception { private static class SelfInitiatedResumeFn extends DoFn { @ProcessElement public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) { + checkState(tracker.tryClaim(null)); c.output(c.element().toString()); return resume().withResumeDelay(Duration.standardSeconds(5)); } @@ -377,7 +384,7 @@ public SomeRestriction getInitialRestriction(Integer elem) { public void testResumeSetsTimer() throws Exception { DoFn fn = new SelfInitiatedResumeFn(); Instant base = Instant.now(); - ProcessFnTester tester = + ProcessFnTester tester = new ProcessFnTester<>( base, fn, @@ -435,7 +442,7 @@ public OffsetRange getInitialRestriction(Integer elem) { public void testResumeCarriesOverState() throws Exception { DoFn fn = new CounterFn(1); Instant base = Instant.now(); - ProcessFnTester tester = + ProcessFnTester tester = new ProcessFnTester<>( base, fn, @@ -464,7 +471,7 @@ public void testCheckpointsAfterNumOutputs() throws Exception { Instant base = Instant.now(); int baseIndex = 42; - ProcessFnTester tester = + ProcessFnTester tester = new ProcessFnTester<>( base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class), max, MAX_BUNDLE_DURATION); @@ -506,7 +513,7 @@ public void testCheckpointsAfterDuration() throws Exception { Instant base = Instant.now(); int baseIndex = 42; - ProcessFnTester tester = + ProcessFnTester tester = new ProcessFnTester<>( base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class), max, maxBundleDuration); @@ -571,7 +578,7 @@ public void finishBundle() { @Test public void testInvokesLifecycleMethods() throws Exception { DoFn fn = new LifecycleVerifyingFn(); - try (ProcessFnTester tester = + try (ProcessFnTester tester = new ProcessFnTester<>( Instant.now(), fn, diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 016ec3263a5a..f4c489544b24 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -42,7 +42,11 @@ import org.joda.time.Instant; class SplittableProcessElementsEvaluatorFactory< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + InputT, + OutputT, + RestrictionT, + PositionT, + TrackerT extends RestrictionTracker> implements TransformEvaluatorFactory { private final ParDoEvaluatorFactory>, OutputT> delegateFactory; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index bbe54b046cb9..8da887e34fbd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -713,7 +713,7 @@ public ProcessContinuation process( ProcessContext c, final GrowthTracker tracker) throws Exception { if (!tracker.hasPending() && !tracker.currentRestriction().isOutputComplete) { - LOG.debug("{} - polling input", c.element()); + Instant now = Instant.now(); Growth.PollResult res = spec.getPollFn().getClosure().apply(c.element(), wrapProcessContext(c)); // TODO (https://issues.apache.org/jira/browse/BEAM-2680): @@ -724,25 +724,30 @@ public ProcessContinuation process( int numPending = tracker.addNewAsPending(res); if (numPending > 0) { LOG.info( - "{} - polling returned {} results, of which {} were new. The output is {}.", + "{} - current round of polling took {} ms and returned {} results, " + + "of which {} were new. The output is {}.", c.element(), + new Duration(now, Instant.now()).getMillis(), res.getOutputs().size(), numPending, BoundedWindow.TIMESTAMP_MAX_VALUE.equals(res.getWatermark()) - ? "complete" - : "incomplete"); + ? "final" + : "not yet final"); } } - while (tracker.hasPending()) { + int numEmitted = 0; + while (true) { c.updateWatermark(tracker.getWatermark()); Map.Entry> entry = tracker.getNextPending(); - if (!tracker.tryClaim(entry.getKey())) { - return stop(); + if (entry == null || !tracker.tryClaim(entry.getKey())) { + break; } TimestampedValue nextPending = entry.getValue(); c.outputWithTimestamp( KV.of(c.element(), nextPending.getValue()), nextPending.getTimestamp()); + ++numEmitted; } + LOG.debug("{} - emitted {} new results.", c.element(), numEmitted); Instant watermark = tracker.getWatermark(); if (watermark != null) { // Null means the poll result did not provide a watermark and there were no new elements, @@ -752,14 +757,17 @@ public ProcessContinuation process( // No more pending outputs - future output will come from more polling, // unless output is complete or termination condition is reached. if (tracker.shouldPollMore()) { + LOG.info( + "{} - emitted all known results so far; will resume polling in {} ms", + c.element(), + spec.getPollInterval().getMillis()); return resume().withResumeDelay(spec.getPollInterval()); } return stop(); } private Growth.TerminationCondition getTerminationCondition() { - return ((Growth.TerminationCondition) - spec.getTerminationPerInput()); + return (Growth.TerminationCondition) spec.getTerminationPerInput(); } @GetInitialRestriction @@ -833,7 +841,7 @@ public String toString(Growth.TerminationCondition termina + " elements>, pending=<" + pending.size() + " elements" - + (pending.isEmpty() ? "" : (", earliest " + pending.get(0))) + + (pending.isEmpty() ? "" : (", earliest " + pending.values().iterator().next())) + ">, isOutputComplete=" + isOutputComplete + ", terminationState=" @@ -900,6 +908,9 @@ public synchronized GrowthState currentRestric @Override public synchronized GrowthState checkpoint() { + checkState( + !claimed.isEmpty(), "Can't checkpoint before any element was successfully claimed"); + // primary should contain exactly the work claimed in the current ProcessElement call - i.e. // claimed outputs become pending, and it shouldn't poll again. GrowthState primary = @@ -957,8 +968,11 @@ synchronized boolean hasPending() { } @VisibleForTesting + @Nullable synchronized Map.Entry> getNextPending() { - checkState (!pending.isEmpty(), "Pending set is empty"); + if (pending.isEmpty()) { + return null; + } return pending.entrySet().iterator().next(); } @@ -1051,7 +1065,7 @@ public synchronized String toString() { + ", pending=<" + pending.size() + " elements" - + (pending.isEmpty() ? "" : (", earliest " + pending.get(0))) + + (pending.isEmpty() ? "" : (", earliest " + pending.values().iterator().next())) + ">, claimed=<" + claimed.size() + " elements>, isOutputComplete=" diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java index 6722a23b8276..f2d9e5cfd819 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java @@ -46,11 +46,8 @@ public synchronized OffsetRange currentRestriction() { @Override public synchronized OffsetRange checkpoint() { - if (lastClaimedOffset == null) { - OffsetRange res = range; - range = new OffsetRange(range.getFrom(), range.getFrom()); - return res; - } + checkState( + lastClaimedOffset != null, "Can't checkpoint before any offset was successfully claimed"); OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo()); this.range = new OffsetRange(range.getFrom(), lastClaimedOffset + 1); return res; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 0c37a776023b..ccbbe25b3d89 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -17,6 +17,11 @@ */ package org.apache.beam.sdk.transforms.splittabledofn; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.DoFn; /** @@ -24,14 +29,19 @@ * href="https://s.apache.org/splittable-do-fn">splittable {@link DoFn}. */ public abstract class RestrictionTracker { - interface ClaimObserver { + @Internal + public interface ClaimObserver { void onClaimed(PositionT position); void onClaimFailed(PositionT position); } + @Nullable private ClaimObserver claimObserver; - void setClaimObserver(ClaimObserver claimObserver) { + @Internal + public void setClaimObserver(ClaimObserver claimObserver) { + checkNotNull(claimObserver, "claimObserver"); + checkState(this.claimObserver == null, "A claim observer has already been set"); this.claimObserver = claimObserver; } @@ -49,6 +59,7 @@ public final boolean tryClaim(PositionT position) { } } + @Internal protected abstract boolean tryClaimImpl(PositionT position); /** @@ -64,7 +75,10 @@ public final boolean tryClaim(PositionT position) { * *

Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the * work: the old value of {@link #currentRestriction} is equivalent to the new value and the - * return value of this method combined. Must be called at most once on a given object. + * return value of this method combined. + * + *

Must be called at most once on a given object. Must not be called before the first + * successful {@link #tryClaim} call. */ public abstract RestrictionT checkpoint(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java index a1c1da2c27c6..fcece90dece3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java @@ -32,7 +32,6 @@ import static org.junit.Assert.fail; import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -498,33 +497,6 @@ private static GrowthTracker newTracker() { return newTracker(new GrowthState<>(never().forNewInput(Instant.now(), null))); } - @Test - public void testGrowthTrackerCheckpointEmpty() { - // Checkpoint an empty tracker. - GrowthTracker tracker = newTracker(); - GrowthState residual = tracker.checkpoint(); - GrowthState primary = tracker.currentRestriction(); - Watch.Growth.Never condition = never(); - assertEquals( - primary.toString(condition), - new GrowthState<>( - ImmutableMap.of() /* completed */, - ImmutableMap.of() /* pending */, - true /* isOutputFinal */, - (Integer) null /* terminationState */, - BoundedWindow.TIMESTAMP_MAX_VALUE /* pollWatermark */) - .toString(condition)); - assertEquals( - residual.toString(condition), - new GrowthState<>( - ImmutableMap.of() /* completed */, - ImmutableMap.of() /* pending */, - false /* isOutputFinal */, - 0 /* terminationState */, - BoundedWindow.TIMESTAMP_MIN_VALUE /* pollWatermark */) - .toString(condition)); - } - private String tryClaimNextPending(GrowthTracker tracker) { assertTrue(tracker.hasPending()); Map.Entry> entry = tracker.getNextPending(); @@ -784,12 +756,6 @@ public void testGrowthTrackerRepeatedEmptyPollWatermark() { GrowthTracker tracker = newTracker(); tracker.addNewAsPending(PollResult.incomplete(Collections.emptyList())); assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, tracker.getWatermark()); - - // Simulate resuming from the checkpoint but there are still no new elements. - GrowthTracker residualTracker = newTracker(tracker.checkpoint()); - tracker.addNewAsPending(PollResult.incomplete(Collections.emptyList())); - // No new elements and no explicit watermark supplied - still no watermark. - assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, residualTracker.getWatermark()); } // Empty poll result with watermark { @@ -799,12 +765,6 @@ public void testGrowthTrackerRepeatedEmptyPollWatermark() { PollResult.incomplete(Collections.>emptyList()) .withWatermark(now)); assertEquals(now, tracker.getWatermark()); - - // Simulate resuming from the checkpoint but there are still no new elements. - GrowthTracker residualTracker = newTracker(tracker.checkpoint()); - tracker.addNewAsPending(PollResult.incomplete(Collections.emptyList())); - // No new elements and no explicit watermark supplied - should keep old watermark. - assertEquals(now, residualTracker.getWatermark()); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index 7a9b209007f6..ce29ecb5f72e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -392,7 +392,7 @@ public SomeRestriction getInitialRestriction(Integer element) { } thrown.expectMessage( - "Returns void, but must return a subtype of RestrictionTracker"); + "Returns void, but must return a subtype of RestrictionTracker"); DoFnSignatures.getSignature(BadFn.class); } @@ -580,7 +580,8 @@ private SomeRestrictionTracker method(SomeRestriction restriction, Object extra) @Test public void testNewTrackerInconsistent() throws Exception { thrown.expectMessage( - "Returns SomeRestrictionTracker, but must return a subtype of RestrictionTracker"); + "Returns SomeRestrictionTracker, " + + "but must return a subtype of RestrictionTracker"); DoFnSignatures.analyzeNewTrackerMethod( errors(), TypeDescriptor.of(FakeDoFn.class), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java index 2d122fe699f4..b723dd886603 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java @@ -47,9 +47,16 @@ public void testTryClaim() throws Exception { @Test public void testCheckpointUnstarted() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); + expected.expect(IllegalStateException.class); + tracker.checkpoint(); + } + + @Test + public void testCheckpointOnlyFailedClaim() throws Exception { + OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200)); + assertFalse(tracker.tryClaim(250L)); + expected.expect(IllegalStateException.class); OffsetRange checkpoint = tracker.checkpoint(); - assertEquals(new OffsetRange(100, 100), tracker.currentRestriction()); - assertEquals(new OffsetRange(100, 200), checkpoint); } @Test From 037184851b315a8f135f5b22f5c7258d67d8cbc2 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 24 Jan 2018 17:42:43 -0800 Subject: [PATCH 3/6] Compresses encoded GrowthState with Snappy - about 2x-3x more compact --- .../org/apache/beam/sdk/transforms/Watch.java | 44 +++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index 8da887e34fbd..1accd1baf07a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -26,7 +26,7 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -46,8 +46,10 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.DurationCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -58,6 +60,7 @@ import org.apache.beam.sdk.transforms.Contextful.Fn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; @@ -69,6 +72,7 @@ import org.joda.time.ReadableDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; /** * Given a "poll function" that produces a potentially growing set of outputs for an input, this @@ -785,8 +789,8 @@ public GrowthTracker newTracker( @GetRestrictionCoder @SuppressWarnings({"unchecked", "rawtypes"}) public Coder> getRestrictionCoder() { - return GrowthStateCoder.of( - outputCoder, (Coder) spec.getTerminationPerInput().getStateCoder()); + return SnappyCoder.of(GrowthStateCoder.of( + outputCoder, (Coder) spec.getTerminationPerInput().getStateCoder())); } } @@ -1105,6 +1109,40 @@ public HashCode decode(InputStream is) throws IOException { } } + private static class SnappyCoder extends StructuredCoder { + private final Coder innerCoder; + + public static SnappyCoder of(Coder innerCoder) { + return new SnappyCoder<>(innerCoder); + } + + private SnappyCoder(Coder innerCoder) { + this.innerCoder = innerCoder; + } + + @Override + public void encode(T value, OutputStream os) throws IOException { + ByteArrayCoder.of() + .encode(Snappy.compress(CoderUtils.encodeToByteArray(innerCoder, value)), os); + } + + @Override + public T decode(InputStream is) throws CoderException, IOException { + return CoderUtils.decodeFromByteArray( + innerCoder, Snappy.uncompress(ByteArrayCoder.of().decode(is))); + } + + @Override + public List> getCoderArguments() { + return ImmutableList.of(innerCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + innerCoder.verifyDeterministic(); + } + } + private static class GrowthStateCoder extends StructuredCoder> { public static From 32a427c0ac9b8f4da58504d0d27545a406df00c8 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 24 Jan 2018 17:43:14 -0800 Subject: [PATCH 4/6] InMemoryStateInternals.copy clones the values using the coder --- .../runners/core/InMemoryStateInternals.java | 74 +++++++++++++++---- .../CopyOnAccessInMemoryStateInternals.java | 10 +-- 2 files changed, 64 insertions(+), 20 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 32c561e48134..8f6d0820317d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.MapState; @@ -49,6 +50,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.CombineFnUtil; import org.joda.time.Instant; @@ -126,25 +128,25 @@ public InMemoryStateBinder(StateContext c) { @Override public ValueState bindValue( StateTag> address, Coder coder) { - return new InMemoryValue<>(); + return new InMemoryValue<>(coder); } @Override public BagState bindBag( final StateTag> address, Coder elemCoder) { - return new InMemoryBag<>(); + return new InMemoryBag<>(elemCoder); } @Override public SetState bindSet(StateTag> spec, Coder elemCoder) { - return new InMemorySet<>(); + return new InMemorySet<>(elemCoder); } @Override public MapState bindMap( StateTag> spec, Coder mapKeyCoder, Coder mapValueCoder) { - return new InMemoryMap<>(); + return new InMemoryMap<>(mapKeyCoder, mapValueCoder); } @Override @@ -153,7 +155,7 @@ public MapState bindMap( StateTag> address, Coder accumCoder, final CombineFn combineFn) { - return new InMemoryCombiningState<>(combineFn); + return new InMemoryCombiningState<>(combineFn, accumCoder); } @Override @@ -178,9 +180,15 @@ public WatermarkHoldState bindWatermark( */ public static final class InMemoryValue implements ValueState, InMemoryState> { + private final Coder coder; + private boolean isCleared = true; private @Nullable T value = null; + public InMemoryValue(Coder coder) { + this.coder = coder; + } + @Override public void clear() { // Even though we're clearing we can't remove this from the in-memory state map, since @@ -207,10 +215,10 @@ public void write(T input) { @Override public InMemoryValue copy() { - InMemoryValue that = new InMemoryValue<>(); + InMemoryValue that = new InMemoryValue<>(coder); if (!this.isCleared) { that.isCleared = this.isCleared; - that.value = this.value; + that.value = unsafeClone(coder, this.value); } return that; } @@ -305,14 +313,16 @@ public InMemoryWatermarkHold copy() { public static final class InMemoryCombiningState implements CombiningState, InMemoryState> { - private boolean isCleared = true; private final CombineFn combineFn; + private final Coder accumCoder; + private boolean isCleared = true; private AccumT accum; public InMemoryCombiningState( - CombineFn combineFn) { + CombineFn combineFn, Coder accumCoder) { this.combineFn = combineFn; accum = combineFn.createAccumulator(); + this.accumCoder = accumCoder; } @Override @@ -378,7 +388,7 @@ public boolean isCleared() { @Override public InMemoryCombiningState copy() { InMemoryCombiningState that = - new InMemoryCombiningState<>(combineFn); + new InMemoryCombiningState<>(combineFn, accumCoder); if (!this.isCleared) { that.isCleared = this.isCleared; that.addAccum(accum); @@ -391,8 +401,13 @@ public InMemoryCombiningState copy() { * An {@link InMemoryState} implementation of {@link BagState}. */ public static final class InMemoryBag implements BagState, InMemoryState> { + private final Coder elemCoder; private List contents = new ArrayList<>(); + public InMemoryBag(Coder elemCoder) { + this.elemCoder = elemCoder; + } + @Override public void clear() { // Even though we're clearing we can't remove this from the in-memory state map, since @@ -442,8 +457,10 @@ public Boolean read() { @Override public InMemoryBag copy() { - InMemoryBag that = new InMemoryBag<>(); - that.contents.addAll(this.contents); + InMemoryBag that = new InMemoryBag<>(elemCoder); + for (T elem : this.contents) { + that.contents.add(unsafeClone(elemCoder, elem)); + } return that; } } @@ -452,8 +469,13 @@ public InMemoryBag copy() { * An {@link InMemoryState} implementation of {@link SetState}. */ public static final class InMemorySet implements SetState, InMemoryState> { + private final Coder elemCoder; private Set contents = new HashSet<>(); + public InMemorySet(Coder elemCoder) { + this.elemCoder = elemCoder; + } + @Override public void clear() { contents = new HashSet<>(); @@ -513,8 +535,10 @@ public Boolean read() { @Override public InMemorySet copy() { - InMemorySet that = new InMemorySet<>(); - that.contents.addAll(this.contents); + InMemorySet that = new InMemorySet<>(elemCoder); + for (T elem : this.contents) { + that.contents.add(unsafeClone(elemCoder, elem)); + } return that; } } @@ -524,8 +548,16 @@ public InMemorySet copy() { */ public static final class InMemoryMap implements MapState, InMemoryState> { + private final Coder keyCoder; + private final Coder valueCoder; + private Map contents = new HashMap<>(); + public InMemoryMap(Coder keyCoder, Coder valueCoder) { + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + @Override public void clear() { contents = new HashMap<>(); @@ -600,9 +632,21 @@ public boolean isCleared() { @Override public InMemoryMap copy() { - InMemoryMap that = new InMemoryMap<>(); + InMemoryMap that = new InMemoryMap<>(keyCoder, valueCoder); + for (Map.Entry entry : this.contents.entrySet()) { + that.contents.put( + unsafeClone(keyCoder, entry.getKey()), unsafeClone(valueCoder, entry.getValue())); + } that.contents.putAll(this.contents); return that; } } + + private static T unsafeClone(Coder coder, T value) { + try { + return CoderUtils.clone(coder, value); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index 848bf712da80..1747a5372ce4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -300,7 +300,7 @@ public ValueState bindValue( underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryValue<>(); + return new InMemoryValue<>(coder); } } @@ -317,7 +317,7 @@ CombiningState bindCombiningValue( underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryCombiningState<>(combineFn); + return new InMemoryCombiningState<>(combineFn, accumCoder); } } @@ -331,7 +331,7 @@ public BagState bindBag( underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryBag<>(); + return new InMemoryBag<>(elemCoder); } } @@ -345,7 +345,7 @@ public SetState bindSet( underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemorySet<>(); + return new InMemorySet<>(elemCoder); } } @@ -361,7 +361,7 @@ public MapState bindMap( underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryMap<>(); + return new InMemoryMap<>(mapKeyCoder, mapValueCoder); } } From 8151d82510fa4a2bd5816ee6123e1cf54d292ba3 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 24 Jan 2018 18:18:58 -0800 Subject: [PATCH 5/6] Final fixups --- .../runners/core/InMemoryStateInternals.java | 13 ++-- ...oundedSplittableProcessElementInvoker.java | 38 ++++++++--- .../apache/beam/sdk/coders/SnappyCoder.java | 65 ++++++++++++++++++ .../org/apache/beam/sdk/transforms/Watch.java | 67 ++++++------------- .../splittabledofn/RestrictionTracker.java | 29 +++++++- 5 files changed, 146 insertions(+), 66 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SnappyCoder.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 8f6d0820317d..ebd2a8873e0b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -218,7 +218,7 @@ public InMemoryValue copy() { InMemoryValue that = new InMemoryValue<>(coder); if (!this.isCleared) { that.isCleared = this.isCleared; - that.value = unsafeClone(coder, this.value); + that.value = uncheckedClone(coder, this.value); } return that; } @@ -391,7 +391,7 @@ public InMemoryCombiningState copy() { new InMemoryCombiningState<>(combineFn, accumCoder); if (!this.isCleared) { that.isCleared = this.isCleared; - that.addAccum(accum); + that.addAccum(uncheckedClone(accumCoder, accum)); } return that; } @@ -459,7 +459,7 @@ public Boolean read() { public InMemoryBag copy() { InMemoryBag that = new InMemoryBag<>(elemCoder); for (T elem : this.contents) { - that.contents.add(unsafeClone(elemCoder, elem)); + that.contents.add(uncheckedClone(elemCoder, elem)); } return that; } @@ -537,7 +537,7 @@ public Boolean read() { public InMemorySet copy() { InMemorySet that = new InMemorySet<>(elemCoder); for (T elem : this.contents) { - that.contents.add(unsafeClone(elemCoder, elem)); + that.contents.add(uncheckedClone(elemCoder, elem)); } return that; } @@ -635,14 +635,15 @@ public InMemoryMap copy() { InMemoryMap that = new InMemoryMap<>(keyCoder, valueCoder); for (Map.Entry entry : this.contents.entrySet()) { that.contents.put( - unsafeClone(keyCoder, entry.getKey()), unsafeClone(valueCoder, entry.getValue())); + uncheckedClone(keyCoder, entry.getKey()), uncheckedClone(valueCoder, entry.getValue())); } that.contents.putAll(this.contents); return that; } } - private static T unsafeClone(Coder coder, T value) { + /** Like {@link CoderUtils#clone} but without a checked exception. */ + private static T uncheckedClone(Coder coder, T value) { try { return CoderUtils.clone(coder, value); } catch (CoderException e) { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index e6965d48c98d..b1a3f3bdb62c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -181,10 +181,19 @@ public Timer timer(String timerId) { tracker.checkDone(); } else { // The call returned resume() without trying to claim any blocks, i.e. it is unaware - // of any work to be done at the moment, but more might emerge later. In this case, - // we must simply reschedule the original restriction - checkpointing a tracker that - // hasn't claimed any work is not allowed. - residual = KV.of(tracker.currentRestriction(), processContext.lastReportedWatermark); + // of any work to be done at the moment, but more might emerge later. This is a valid + // use case: e.g. a DoFn reading from a streaming source might see that there are + // currently no new elements (hence not claim anything) and return resume() with a delay + // to check again later. + // In this case, we must simply reschedule the original restriction - checkpointing a + // tracker that hasn't claimed any work is not allowed. + // + // Note that the situation "a DoFn repeatedly says that it doesn't have any work to claim + // and asks to try again later with the same restriction" is different from the situation + // "a runner repeatedly checkpoints the DoFn before it has a chance to even attempt + // claiming work": the former is valid, and the latter would be a bug, and is addressed + // by not checkpointing the tracker until it attempts to claim some work. + residual = KV.of(tracker.currentRestriction(), processContext.getLastReportedWatermark()); // Don't call tracker.checkDone() - it's not done. } } else { @@ -242,15 +251,11 @@ public ProcessContext(WindowedValue element, TrackerT tracker) { this.tracker = tracker; } - void checkClaimHasNotFailed() { + @Override + public void onClaimed(PositionT position) { checkState( !hasClaimFailed, "Must not call tryClaim() after it has previously returned false"); - } - - @Override - public void onClaimed(PositionT position) { - checkClaimHasNotFailed(); if (numClaimedBlocks == 0) { // Claiming first block: can schedule the checkpoint now. // We don't schedule it right away to prevent checkpointing before any blocks are claimed, @@ -265,7 +270,9 @@ public void onClaimed(PositionT position) { @Override public void onClaimFailed(PositionT position) { - checkClaimHasNotFailed(); + checkState( + !hasClaimFailed, + "Must not call tryClaim() after it has previously returned false"); hasClaimFailed = true; } @@ -323,9 +330,18 @@ public PaneInfo pane() { @Override public synchronized void updateWatermark(Instant watermark) { // Updating the watermark without any claimed blocks is allowed. + // The watermark is a promise about the timestamps of output from future claimed blocks. + // Such a promise can be made even if there are no claimed blocks. E.g. imagine reading + // from a streaming source that currently has no new data: there are no blocks to claim, but + // we may still want to advance the watermark if we have information about what timestamps + // of future elements in the source will be like. lastReportedWatermark = watermark; } + synchronized Instant getLastReportedWatermark() { + return lastReportedWatermark; + } + @Override public PipelineOptions getPipelineOptions() { return pipelineOptions; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SnappyCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SnappyCoder.java new file mode 100644 index 000000000000..b3e4698f5151 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SnappyCoder.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.coders; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import org.apache.beam.sdk.util.CoderUtils; +import org.xerial.snappy.Snappy; + +/** + * Wraps an existing coder with Snappy compression. It makes sense to use this coder only when it's + * likely that the encoded value is quite large and compressible. + */ +public class SnappyCoder extends StructuredCoder { + private final Coder innerCoder; + + /** Wraps the given coder into a {@link SnappyCoder}. */ + public static SnappyCoder of(Coder innerCoder) { + return new SnappyCoder<>(innerCoder); + } + + private SnappyCoder(Coder innerCoder) { + this.innerCoder = innerCoder; + } + + @Override + public void encode(T value, OutputStream os) throws IOException { + ByteArrayCoder.of() + .encode(Snappy.compress(CoderUtils.encodeToByteArray(innerCoder, value)), os); + } + + @Override + public T decode(InputStream is) throws IOException { + return CoderUtils.decodeFromByteArray( + innerCoder, Snappy.uncompress(ByteArrayCoder.of().decode(is))); + } + + @Override + public List> getCoderArguments() { + return ImmutableList.of(innerCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + innerCoder.verifyDeterministic(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index 1accd1baf07a..9eca32f1efc4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -26,7 +26,6 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -46,21 +45,19 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BooleanCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.DurationCoder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.SnappyCoder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Contextful.Fn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; @@ -72,7 +69,6 @@ import org.joda.time.ReadableDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.xerial.snappy.Snappy; /** * Given a "poll function" that produces a potentially growing set of outputs for an input, this @@ -739,7 +735,10 @@ public ProcessContinuation process( : "not yet final"); } } - int numEmitted = 0; + int numEmittedInThisRound = 0; + int numTotalPending = tracker.getNumPending(); + int numPreviouslyEmitted = tracker.currentRestriction().completed.size(); + int numTotalKnown = numPreviouslyEmitted + numTotalPending; while (true) { c.updateWatermark(tracker.getWatermark()); Map.Entry> entry = tracker.getNextPending(); @@ -749,9 +748,15 @@ public ProcessContinuation process( TimestampedValue nextPending = entry.getValue(); c.outputWithTimestamp( KV.of(c.element(), nextPending.getValue()), nextPending.getTimestamp()); - ++numEmitted; + ++numEmittedInThisRound; } - LOG.debug("{} - emitted {} new results.", c.element(), numEmitted); + LOG.info( + "{} - emitted {} new results (of {} total known: {} emitted so far, {} more to emit).", + c.element(), + numEmittedInThisRound, + numTotalKnown, + numEmittedInThisRound + numPreviouslyEmitted, + numTotalPending - numEmittedInThisRound); Instant watermark = tracker.getWatermark(); if (watermark != null) { // Null means the poll result did not provide a watermark and there were no new elements, @@ -762,8 +767,9 @@ public ProcessContinuation process( // unless output is complete or termination condition is reached. if (tracker.shouldPollMore()) { LOG.info( - "{} - emitted all known results so far; will resume polling in {} ms", + "{} - emitted all {} known results so far; will resume polling in {} ms", c.element(), + numTotalKnown, spec.getPollInterval().getMillis()); return resume().withResumeDelay(spec.getPollInterval()); } @@ -899,10 +905,7 @@ static class GrowthTracker this.isOutputComplete = state.isOutputComplete; this.pollWatermark = state.pollWatermark; this.terminationState = state.terminationState; - this.pending = Maps.newLinkedHashMapWithExpectedSize(state.pending.size()); - for (Map.Entry> entry : state.pending.entrySet()) { - this.pending.put(entry.getKey(), entry.getValue()); - } + this.pending = Maps.newLinkedHashMap(state.pending); } @Override @@ -971,6 +974,10 @@ synchronized boolean hasPending() { return !pending.isEmpty(); } + private synchronized int getNumPending() { + return pending.size(); + } + @VisibleForTesting @Nullable synchronized Map.Entry> getNextPending() { @@ -1109,40 +1116,6 @@ public HashCode decode(InputStream is) throws IOException { } } - private static class SnappyCoder extends StructuredCoder { - private final Coder innerCoder; - - public static SnappyCoder of(Coder innerCoder) { - return new SnappyCoder<>(innerCoder); - } - - private SnappyCoder(Coder innerCoder) { - this.innerCoder = innerCoder; - } - - @Override - public void encode(T value, OutputStream os) throws IOException { - ByteArrayCoder.of() - .encode(Snappy.compress(CoderUtils.encodeToByteArray(innerCoder, value)), os); - } - - @Override - public T decode(InputStream is) throws CoderException, IOException { - return CoderUtils.decodeFromByteArray( - innerCoder, Snappy.uncompress(ByteArrayCoder.of().decode(is))); - } - - @Override - public List> getCoderArguments() { - return ImmutableList.of(innerCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - innerCoder.verifyDeterministic(); - } - } - private static class GrowthStateCoder extends StructuredCoder> { public static diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index ccbbe25b3d89..8b59f054b96e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -29,15 +29,22 @@ * href="https://s.apache.org/splittable-do-fn">splittable {@link DoFn}. */ public abstract class RestrictionTracker { + /** Internal interface allowing a runner to observe the calls to {@link #tryClaim}. */ @Internal public interface ClaimObserver { + /** Called when {@link #tryClaim} returns true. */ void onClaimed(PositionT position); + + /** Called when {@link #tryClaim} returns false. */ void onClaimFailed(PositionT position); } - @Nullable - private ClaimObserver claimObserver; + @Nullable private ClaimObserver claimObserver; + /** + * Sets a {@link ClaimObserver} to be invoked on every call to {@link #tryClaim}. Internal: + * intended only for runner authors. + */ @Internal public void setClaimObserver(ClaimObserver claimObserver) { checkNotNull(claimObserver, "claimObserver"); @@ -45,6 +52,23 @@ public void setClaimObserver(ClaimObserver claimObserver) { this.claimObserver = claimObserver; } + /** + * Attempts to claim the block of work in the current restriction identified by the given + * position. + * + *

If this succeeds, the DoFn MUST execute the entire block of work. If this fails: + * + *

    + *
  • {@link DoFn.ProcessElement} MUST return {@link DoFn.ProcessContinuation#stop} without + * performing any additional work or emitting output (note that emitting output or + * performing work from {@link DoFn.ProcessElement} is also not allowed before the first + * call to this method). + *
  • {@link RestrictionTracker#checkDone} MUST succeed. + *
+ * + *

Under the hood, calls {@link #tryClaimImpl} and notifies {@link ClaimObserver} of the + * result. + */ public final boolean tryClaim(PositionT position) { if (tryClaimImpl(position)) { if (claimObserver != null) { @@ -59,6 +83,7 @@ public final boolean tryClaim(PositionT position) { } } + /** Tracker-specific implementation of {@link #tryClaim}. */ @Internal protected abstract boolean tryClaimImpl(PositionT position); From 6857cb95af93f2749fcb9b099eaf7608200be9db Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Mon, 5 Feb 2018 18:42:11 -0800 Subject: [PATCH 6/6] Bump worker to 20180205 --- runners/google-cloud-dataflow-java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 7c427b205fb3..7367a8410a13 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ jar - beam-master-20180122 + beam-master-20180205 1 7