From f123b2b82f257fed312912fea8776a8059230c8e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 10 May 2016 11:39:35 -0700 Subject: [PATCH] Remove WindowedValue.valueInEmptyWindows A value in empty windows expands to no values, so it can be dropped at any time, perhaps unintentionally. This has bitten runner authors, including Spark & Dataflow. While creating such a thing in memory is not automatically problematic, it is also not really useful. So this change removes it. --- .../ExecutorServiceParallelExecutor.java | 2 +- .../direct/FlattenEvaluatorFactoryTest.java | 8 +++--- .../GroupByKeyEvaluatorFactoryTest.java | 12 ++++----- .../dataflow/DataflowPipelineRunner.java | 10 +++---- .../sdk/runners/DirectPipelineRunner.java | 2 +- .../apache/beam/sdk/util/WindowedValue.java | 27 ++++--------------- .../util/GroupAlsoByWindowsProperties.java | 2 +- .../beam/sdk/util/WindowedValueTest.java | 10 ------- 8 files changed, 23 insertions(+), 50 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 367c19084092..0aa31feb5af6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -417,7 +417,7 @@ private boolean fireTimers() throws Exception { evaluationContext .createKeyedBundle( null, keyTimers.getKey(), (PCollection) transform.getInput()) - .add(WindowedValue.valueInEmptyWindows(work)) + .add(WindowedValue.valueInGlobalWindow(work)) .commit(Instant.now()); scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); firedTimers = true; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 66a510685c99..ccd06848a0cb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -81,9 +81,9 @@ public void testFlattenInMemoryEvaluator() throws Exception { rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1)); leftSideEvaluator.processElement( WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024))); - leftSideEvaluator.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING)); + leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING)); rightSideEvaluator.processElement( - WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING)); + WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING)); rightSideEvaluator.processElement( WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096))); @@ -107,12 +107,12 @@ public void testFlattenInMemoryEvaluator() throws Exception { flattenedLeftBundle.commit(Instant.now()).getElements(), containsInAnyOrder( WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)), - WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING), + WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING), WindowedValue.valueInGlobalWindow(1))); assertThat( flattenedRightBundle.commit(Instant.now()).getElements(), containsInAnyOrder( - WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), + WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)), WindowedValue.valueInGlobalWindow(-1))); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index 267266d3b891..364183f87c11 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -93,12 +93,12 @@ public void testInMemoryEvaluator() throws Exception { .forApplication( groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz))); + evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(firstFoo))); + evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(secondFoo))); + evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(thirdFoo))); + evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(firstBar))); + evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(secondBar))); + evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(firstBaz))); evaluator.finishBundle(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java index 407680215183..e36d566aabcf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java @@ -19,7 +19,7 @@ import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName; import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; -import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows; +import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -1196,7 +1196,7 @@ public void processElement(ProcessContext c) // are at a window boundary. c.output(IsmRecord.of( ImmutableList.of(previousWindow.get()), - valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.of(), map)))); + valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.of(), map)))); map = new HashMap<>(); } @@ -1217,7 +1217,7 @@ public void processElement(ProcessContext c) // window boundary. c.output(IsmRecord.of( ImmutableList.of(previousWindow.get()), - valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.of(), map)))); + valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.of(), map)))); } } @@ -1685,7 +1685,7 @@ public void processElement(ProcessContext c) Iterable>, Iterable>>>of( ImmutableList.of(previousWindow.get()), - valueInEmptyWindows( + valueInGlobalWindow( new TransformedMap<>( IterableWithWindowedValuesToIterable.of(), resultMap)))); multimap = HashMultimap.create(); @@ -1706,7 +1706,7 @@ public void processElement(ProcessContext c) Iterable>, Iterable>>>of( ImmutableList.of(previousWindow.get()), - valueInEmptyWindows( + valueInGlobalWindow( new TransformedMap<>(IterableWithWindowedValuesToIterable.of(), resultMap)))); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java index 590ce6fa8139..b0f116a53589 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java @@ -1269,7 +1269,7 @@ public static void evaluateGroupByKeyOnly( List values = entry.getValue(); values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */); outputElems.add(ValueWithMetadata - .of(WindowedValue.valueInEmptyWindows(KV.>of(key, values))) + .of(WindowedValue.valueInGlobalWindow(KV.>of(key, values))) .withKey(key)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 1bbdbd96f29f..8e3b6627c01b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -64,19 +64,18 @@ public abstract class WindowedValue { protected final PaneInfo pane; /** - * Returns a {@code WindowedValue} with the given value, timestamp, - * and windows. + * Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo pane) { - Preconditions.checkNotNull(pane); + checkNotNull(pane); + checkArgument( + windows.size() > 0, "Cannot create %s in no windows", WindowedValue.class.getName()); - if (windows.size() == 0 && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { - return valueInEmptyWindows(value, pane); - } else if (windows.size() == 1) { + if (windows.size() == 1) { return of(value, timestamp, windows.iterator().next(), pane); } else { return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane); @@ -131,22 +130,6 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta } } - /** - * Returns a {@code WindowedValue} with the given value in no windows, and the default timestamp - * and pane. - */ - public static WindowedValue valueInEmptyWindows(T value) { - return new ValueInEmptyWindows(value, PaneInfo.NO_FIRING); - } - - /** - * Returns a {@code WindowedValue} with the given value in no windows, and the default timestamp - * and the specified pane. - */ - public static WindowedValue valueInEmptyWindows(T value, PaneInfo pane) { - return new ValueInEmptyWindows(value, pane); - } - private WindowedValue(T value, PaneInfo pane) { this.value = value; this.pane = checkNotNull(pane); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java index 4518f9f7a8b5..aa759c042386 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java @@ -627,7 +627,7 @@ List>> runGABW( runner.startBundle(); if (values.size() > 0) { - runner.processElement(WindowedValue.valueInEmptyWindows( + runner.processElement(WindowedValue.valueInGlobalWindow( KV.of(key, (Iterable>) values))); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 90969b784106..40647b8101a1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.util; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; @@ -66,15 +65,6 @@ public void testWindowedValueCoder() throws CoderException { Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); } - @Test - public void testExplodeWindowsInNoWindowsEmptyIterable() { - WindowedValue value = - WindowedValue.of( - "foo", Instant.now(), ImmutableList.of(), PaneInfo.NO_FIRING); - - assertThat(value.explodeWindows(), emptyIterable()); - } - @Test public void testExplodeWindowsInOneWindowEquals() { Instant now = Instant.now();