From 399043bd05e350e877b1c9aeeb2e1f904d7ba687 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 25 Apr 2016 09:08:09 -0700 Subject: [PATCH 1/3] Refactor CommittedBundle in InProcessBundleFactory Move to a static nested class. --- .../inprocess/InProcessBundleFactory.java | 82 +++++++++++-------- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java index e39d02ea4fff..788fde194080 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java @@ -100,38 +100,56 @@ public CommittedBundle commit(final Instant synchronizedCompletionTime) { checkState(!committed, "Can't commit already committed bundle %s", this); committed = true; final Iterable> committedElements = elements.build(); - return new CommittedBundle() { - @Override - @Nullable - public Object getKey() { - return key; - } - - @Override - public Iterable> getElements() { - return committedElements; - } - - @Override - public PCollection getPCollection() { - return pcollection; - } - - @Override - public Instant getSynchronizedProcessingOutputWatermark() { - return synchronizedCompletionTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .omitNullValues() - .add("pcollection", pcollection) - .add("key", key) - .add("elements", committedElements) - .toString(); - } - }; + return new CommittedInProcessBundle<>( + pcollection, key, committedElements, synchronizedCompletionTime); + } + } + private static class CommittedInProcessBundle implements CommittedBundle { + public CommittedInProcessBundle( + PCollection pcollection, + Object key, + Iterable> committedElements, + Instant synchronizedCompletionTime) { + this.pcollection = pcollection; + this.key = key; + this.committedElements = committedElements; + this.synchronizedCompletionTime = synchronizedCompletionTime; + } + + private final PCollection pcollection; + private final Object key; + private final Iterable> committedElements; + private final Instant synchronizedCompletionTime; + + @Override + @Nullable + public Object getKey() { + return key; + } + + @Override + public Iterable> getElements() { + return committedElements; + } + + @Override + public PCollection getPCollection() { + return pcollection; + } + + @Override + public Instant getSynchronizedProcessingOutputWatermark() { + return synchronizedCompletionTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); } } } From 2df419fdcfc86dcc6fdcf81d05afd9e814b137e1 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 25 Apr 2016 09:09:15 -0700 Subject: [PATCH 2/3] Add withElements to CommittedBundle The unprocessed elements of a partially completed bundle must be placed in a new bundle to be processed at a later time. The bundle in which they are processed should also have identical properties to the bundle which the elements were initially present in. withElements provides a simple way to create a "copy" of a bundle that contains different elements. --- .../inprocess/InProcessBundleFactory.java | 7 ++++ .../inprocess/InProcessPipelineRunner.java | 12 ++++++ .../inprocess/InProcessBundleFactoryTest.java | 40 ++++++++++++++++++- 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java index 788fde194080..58bc4fb75ff0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactory.java @@ -104,6 +104,7 @@ public CommittedBundle commit(final Instant synchronizedCompletionTime) { pcollection, key, committedElements, synchronizedCompletionTime); } } + private static class CommittedInProcessBundle implements CommittedBundle { public CommittedInProcessBundle( PCollection pcollection, @@ -151,5 +152,11 @@ public String toString() { .add("elements", committedElements) .toString(); } + + @Override + public CommittedBundle withElements(Iterable> elements) { + return new CommittedInProcessBundle<>( + pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime); + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java index 7c28238d0dad..877f6e9bbe1e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -154,6 +154,18 @@ public static interface CommittedBundle { * timers that fired to produce this bundle. */ Instant getSynchronizedProcessingOutputWatermark(); + + /** + * Return a new {@link CommittedBundle} that is like this one, except calls to + * {@link #getElements()} will return the provided elements. This bundle is unchanged. + * + *

+ * The value of {@link #getSynchronizedProcessingOutputWatermark()} of the returned + * {@link CommittedBundle} is equal to the value returned from this one. This is used to ensure + * a {@link PTransform} that could not complete processing on input elements properly holds the + * synchronized processing time to the appropriate value. + */ + CommittedBundle withElements(Iterable> elements); } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java index 9adb6f9b0f7d..3c7c2c6b0cb5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleFactoryTest.java @@ -27,6 +27,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -97,7 +99,8 @@ public void keyedWithKeyShouldCreateKeyedBundle() { createKeyedBundle(new Object()); } - private void afterCommitGetElementsShouldHaveAddedElements(Iterable> elems) { + private CommittedBundle + afterCommitGetElementsShouldHaveAddedElements(Iterable> elems) { PCollection pcollection = TestPipeline.create().apply(Create.of()); UncommittedBundle bundle = bundleFactory.createRootBundle(pcollection); @@ -108,7 +111,10 @@ private void afterCommitGetElementsShouldHaveAddedElements(Iterable>> containsMatcher = Matchers.>containsInAnyOrder(expectations); - assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher); + CommittedBundle committed = bundle.commit(Instant.now()); + assertThat(committed.getElements(), containsMatcher); + + return committed; } @Test @@ -125,6 +131,36 @@ public void getElementsAfterAddShouldReturnAddedElements() { afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); } + @SuppressWarnings("unchecked") + @Test + public void withElementsShouldReturnIndependentBundle() { + WindowedValue firstValue = WindowedValue.valueInGlobalWindow(1); + WindowedValue secondValue = + WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L)); + + CommittedBundle committed = + afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); + + WindowedValue firstReplacement = + WindowedValue.of( + 9, + new Instant(2048L), + new IntervalWindow(new Instant(2044L), Instant.now()), + PaneInfo.NO_FIRING); + WindowedValue secondReplacement = + WindowedValue.timestampedValueInGlobalWindow(-1, Instant.now()); + CommittedBundle withed = + committed.withElements(ImmutableList.of(firstReplacement, secondReplacement)); + + assertThat(withed.getElements(), containsInAnyOrder(firstReplacement, secondReplacement)); + assertThat(committed.getElements(), containsInAnyOrder(firstValue, secondValue)); + assertThat(withed.getKey(), equalTo(committed.getKey())); + assertThat(withed.getPCollection(), equalTo(committed.getPCollection())); + assertThat( + withed.getSynchronizedProcessingOutputWatermark(), + equalTo(committed.getSynchronizedProcessingOutputWatermark())); + } + @Test public void addAfterCommitShouldThrowException() { PCollection pcollection = TestPipeline.create().apply(Create.of()); From fbda57d8f232cfad47adedf4420956e73b7d4ee5 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 25 Apr 2016 12:53:48 -0700 Subject: [PATCH 3/3] fixup! Add withElements to CommittedBundle --- .../sdk/runners/inprocess/InProcessPipelineRunner.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java index 877f6e9bbe1e..7897f2e31caf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -160,10 +160,11 @@ public static interface CommittedBundle { * {@link #getElements()} will return the provided elements. This bundle is unchanged. * *

- * The value of {@link #getSynchronizedProcessingOutputWatermark()} of the returned - * {@link CommittedBundle} is equal to the value returned from this one. This is used to ensure - * a {@link PTransform} that could not complete processing on input elements properly holds the - * synchronized processing time to the appropriate value. + * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing + * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from + * the current bundle. This is used to ensure a {@link PTransform} that could not complete + * processing on input elements properly holds the synchronized processing time to the + * appropriate value. */ CommittedBundle withElements(Iterable> elements); }