From 831b2c3e0bd47f2e2977529fe4d4d16219e40c79 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 2 May 2016 10:03:43 -0700 Subject: [PATCH 1/2] Move ReadyCheckingSideInputReader to util This SideInputReader allows callers to check for a side input being available before attempting to read the contents --- .../inprocess/InProcessEvaluationContext.java | 12 +------ .../InProcessSideInputContainer.java | 2 +- .../util/ReadyCheckingSideInputReader.java | 31 +++++++++++++++++++ .../InProcessSideInputContainerTest.java | 2 +- 4 files changed, 34 insertions(+), 13 deletions(-) create mode 100644 sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReadyCheckingSideInputReader.java diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 2b21cc8584..420c1a97a6 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -29,6 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; import com.google.cloud.dataflow.sdk.util.ExecutionContext; +import com.google.cloud.dataflow.sdk.util.ReadyCheckingSideInputReader; import com.google.cloud.dataflow.sdk.util.SideInputReader; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowedValue; @@ -330,17 +331,6 @@ public ReadyCheckingSideInputReader createSideInputReader( return sideInputContainer.createReaderForViews(sideInputs); } - /** - * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has - * had its contents set in a window. - */ - interface ReadyCheckingSideInputReader extends SideInputReader { - /** - * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. - */ - boolean isReady(PCollectionView view, BoundedWindow window); - } - /** * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent * of all other {@link CounterSet CounterSets} created by this call. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java index 697348039a..e6fd49f750 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -17,10 +17,10 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow; +import com.google.cloud.dataflow.sdk.util.ReadyCheckingSideInputReader; import com.google.cloud.dataflow.sdk.util.SideInputReader; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReadyCheckingSideInputReader.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReadyCheckingSideInputReader.java new file mode 100644 index 0000000000..718c9438de --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReadyCheckingSideInputReader.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.values.PCollectionView; + +/** + * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has + * had its contents set in a window. + */ +public interface ReadyCheckingSideInputReader extends SideInputReader { + /** + * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. + */ + boolean isReady(PCollectionView view, BoundedWindow window); +} + diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java index c4d855bf17..9073a4715e 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.Mean; @@ -35,6 +34,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing; import com.google.cloud.dataflow.sdk.util.PCollectionViews; +import com.google.cloud.dataflow.sdk.util.ReadyCheckingSideInputReader; import com.google.cloud.dataflow.sdk.util.SideInputReader; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; From d5604f8cf8c5babae5ffb4da748697fff8e95fae Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 2 May 2016 10:04:20 -0700 Subject: [PATCH 2/2] Add PushbackSideInputDoFnRunner This DoFnRunner wraps a DoFnRunner and provides an additional method to process an element in all the windows where all side inputs are ready, returning any elements that it could not process. --- .../sdk/util/PushbackSideInputDoFnRunner.java | 115 +++++++++ .../sdk/util/IdentitySideInputWindowFn.java | 54 ++++ .../util/PushbackSideInputDoFnRunnerTest.java | 234 ++++++++++++++++++ 3 files changed, 403 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java new file mode 100644 index 0000000000..4eeedf65a1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning + * them via the {@link #processElementInReadyWindows(WindowedValue)}. + */ +class PushbackSideInputDoFnRunner implements DoFnRunner { + private final DoFnRunner underlying; + private final Collection> views; + private final ReadyCheckingSideInputReader sideInputReader; + + private Set notReadyWindows; + + public static PushbackSideInputDoFnRunner create( + DoFnRunner underlying, + Collection> views, + ReadyCheckingSideInputReader sideInputReader) { + return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader); + } + + private PushbackSideInputDoFnRunner( + DoFnRunner underlying, + Collection> views, + ReadyCheckingSideInputReader sideInputReader) { + this.underlying = underlying; + this.views = views; + this.sideInputReader = sideInputReader; + } + + @Override + public void startBundle() { + notReadyWindows = new HashSet<>(); + underlying.startBundle(); + } + + /** + * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element + * for each window the element is in that is ready. + * + * @param elem the element to process in all ready windows + * @return each element that could not be processed because it requires a side input window + * that is not ready. + */ + public Iterable> processElementInReadyWindows(WindowedValue elem) { + if (views.isEmpty()) { + processElement(elem); + return Collections.emptyList(); + } + ImmutableList.Builder> pushedBack = ImmutableList.builder(); + for (WindowedValue windowElem : elem.explodeWindows()) { + BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); + boolean isReady = !notReadyWindows.contains(mainInputWindow); + for (PCollectionView view : views) { + BoundedWindow sideInputWindow = + view.getWindowingStrategyInternal() + .getWindowFn() + .getSideInputWindow(mainInputWindow); + if (!sideInputReader.isReady(view, sideInputWindow)) { + isReady = false; + break; + } + } + if (isReady) { + processElement(windowElem); + } else { + notReadyWindows.add(mainInputWindow); + pushedBack.add(windowElem); + } + } + return pushedBack.build(); + } + + @Override + public void processElement(WindowedValue elem) { + underlying.processElement(elem); + } + + /** + * Call the underlying {@link DoFnRunner#finishBundle()}. + */ + @Override + public void finishBundle() { + notReadyWindows = null; + underlying.finishBundle(); + } +} + diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java new file mode 100644 index 0000000000..ecab6f8eb6 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowFn; + +import java.util.Collection; + +/** + * A {@link WindowFn} for use during tests that returns the input window for calls to + * {@link #getSideInputWindow(BoundedWindow)}. + */ +class IdentitySideInputWindowFn extends NonMergingWindowFn { + @Override + public Collection assignWindows(WindowFn.AssignContext c) + throws Exception { + return (Collection) c.windows(); + } + + @Override + public boolean isCompatible(WindowFn other) { + return true; + } + + @Override + public Coder windowCoder() { + // not used + return (Coder) GlobalWindow.Coder.INSTANCE; + } + + @Override + public BoundedWindow getSideInputWindow(BoundedWindow window) { + return window; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java new file mode 100644 index 0000000000..88851187bb --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for {@link PushbackSideInputDoFnRunner}. + */ +@RunWith(JUnit4.class) +public class PushbackSideInputDoFnRunnerTest { + @Mock private ReadyCheckingSideInputReader reader; + private TestDoFnRunner underlying; + private PCollectionView singletonView; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + TestPipeline p = TestPipeline.create(); + PCollection created = p.apply(Create.of(1, 2, 3)); + singletonView = + created + .apply(Window.into(new IdentitySideInputWindowFn())) + .apply(Sum.integersGlobally().asSingletonView()); + + underlying = new TestDoFnRunner<>(); + } + + private PushbackSideInputDoFnRunner createRunner( + ImmutableList> views) { + PushbackSideInputDoFnRunner runner = + PushbackSideInputDoFnRunner.create(underlying, views, reader); + runner.startBundle(); + return runner; + } + + @Test + public void startFinishBundleDelegates() { + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + assertThat(underlying.started, is(true)); + assertThat(underlying.finished, is(false)); + runner.finishBundle(); + assertThat(underlying.finished, is(true)); + } + + @Test + public void processElementSideInputNotReady() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(false); + + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + WindowedValue oneWindow = + WindowedValue.of( + 2, + new Instant(-2), + new IntervalWindow(new Instant(-500L), new Instant(0L)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> oneWindowPushback = + runner.processElementInReadyWindows(oneWindow); + assertThat(oneWindowPushback, containsInAnyOrder(oneWindow)); + assertThat(underlying.inputElems, Matchers.>emptyIterable()); + } + + @Test + public void processElementSideInputNotReadyMultipleWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(false); + + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); + assertThat(underlying.inputElems, Matchers.>emptyIterable()); + } + + @Test + public void processElementSideInputNotReadySomeWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE))) + .thenReturn(false); + when( + reader.isReady( + Mockito.eq(singletonView), + org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE)))) + .thenReturn(true); + + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of(singletonView)); + + IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L)); + IntervalWindow bigWindow = + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)); + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE), + PaneInfo.NO_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat( + multiWindowPushback, + containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L)))); + assertThat(underlying.inputElems, + containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING), + WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING))); + } + + @Test + public void processElementSideInputReadyAllWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(true); + + ImmutableList> views = ImmutableList.>of(singletonView); + PushbackSideInputDoFnRunner runner = createRunner(views); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, emptyIterable()); + assertThat(underlying.inputElems, + containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray())); + } + + @Test + public void processElementNoSideInputs() { + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.>of()); + + WindowedValue multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, emptyIterable()); + assertThat(underlying.inputElems, containsInAnyOrder(multiWindow)); + } + + private static class TestDoFnRunner implements DoFnRunner { + List> inputElems; + private boolean started = false; + private boolean finished = false; + + @Override + public void startBundle() { + started = true; + inputElems = new ArrayList<>(); + } + + @Override + public void processElement(WindowedValue elem) { + inputElems.add(elem); + } + + @Override + public void finishBundle() { + finished = true; + } + } +}