From 07b0e04b07a902c29326978670a2d345516772d5 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 25 Apr 2016 09:08:09 -0700 Subject: [PATCH 1/2] Refactor CommittedBundle in InProcessBundleFactory Move to a static nested class. --- .../inprocess/InProcessBundleFactory.java | 82 +++++++++++-------- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java index 99b5c1dfea..da5f97c2d5 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java @@ -97,38 +97,56 @@ public CommittedBundle commit(final Instant synchronizedCompletionTime) { checkState(!committed, "Can't commit already committed bundle %s", this); committed = true; final Iterable> committedElements = elements.build(); - return new CommittedBundle() { - @Override - @Nullable - public Object getKey() { - return key; - } - - @Override - public Iterable> getElements() { - return committedElements; - } - - @Override - public PCollection getPCollection() { - return pcollection; - } - - @Override - public Instant getSynchronizedProcessingOutputWatermark() { - return synchronizedCompletionTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .omitNullValues() - .add("pcollection", pcollection) - .add("key", key) - .add("elements", committedElements) - .toString(); - } - }; + return new CommittedInProcessBundle<>( + pcollection, key, committedElements, synchronizedCompletionTime); + } + } + private static class CommittedInProcessBundle implements CommittedBundle { + public CommittedInProcessBundle( + PCollection pcollection, + Object key, + Iterable> committedElements, + Instant synchronizedCompletionTime) { + this.pcollection = pcollection; + this.key = key; + this.committedElements = committedElements; + this.synchronizedCompletionTime = synchronizedCompletionTime; + } + + private final PCollection pcollection; + private final Object key; + private final Iterable> committedElements; + private final Instant synchronizedCompletionTime; + + @Override + @Nullable + public Object getKey() { + return key; + } + + @Override + public Iterable> getElements() { + return committedElements; + } + + @Override + public PCollection getPCollection() { + return pcollection; + } + + @Override + public Instant getSynchronizedProcessingOutputWatermark() { + return synchronizedCompletionTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); } } } From 4f669811729b6733d3f14bf105ea89e4ce5181d6 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 25 Apr 2016 09:09:15 -0700 Subject: [PATCH 2/2] Add withElements to CommittedBundle The unprocessed elements of a partially completed bundle must be placed in a new bundle to be processed at a later time. The bundle in which they are processed should also have identical properties to the bundle which the elements were initially present in. withElements provides a simple way to create a "copy" of a bundle that contains different elements. --- .../inprocess/InProcessBundleFactory.java | 7 ++++ .../inprocess/InProcessPipelineRunner.java | 13 ++++++ .../inprocess/InProcessBundleFactoryTest.java | 42 +++++++++++++++++-- 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java index da5f97c2d5..73a18dbf33 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java @@ -101,6 +101,7 @@ public CommittedBundle commit(final Instant synchronizedCompletionTime) { pcollection, key, committedElements, synchronizedCompletionTime); } } + private static class CommittedInProcessBundle implements CommittedBundle { public CommittedInProcessBundle( PCollection pcollection, @@ -148,6 +149,12 @@ public String toString() { .add("elements", committedElements) .toString(); } + + @Override + public CommittedBundle withElements(Iterable> elements) { + return new CommittedInProcessBundle<>( + pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime); + } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 3e333399a4..297e9e8ed9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -149,6 +149,19 @@ public static interface CommittedBundle { * timers that fired to produce this bundle. */ Instant getSynchronizedProcessingOutputWatermark(); + + /** + * Return a new {@link CommittedBundle} that is like this one, except calls to + * {@link #getElements()} will return the provided elements. This bundle is unchanged. + * + *

+ * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing + * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from + * the current bundle. This is used to ensure a {@link PTransform} that could not complete + * processing on input elements properly holds the synchronized processing time to the + * appropriate value. + */ + CommittedBundle withElements(Iterable> elements); } /** diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java index 9b4e630042..a88f6b3ddc 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java @@ -1,4 +1,4 @@ - /* +/* * Copyright (C) 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not @@ -25,6 +25,8 @@ import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -94,7 +96,8 @@ public void keyedWithKeyShouldCreateKeyedBundle() { createKeyedBundle(new Object()); } - private void afterCommitGetElementsShouldHaveAddedElements(Iterable> elems) { + private CommittedBundle + afterCommitGetElementsShouldHaveAddedElements(Iterable> elems) { PCollection pcollection = TestPipeline.create().apply(Create.of()); UncommittedBundle bundle = bundleFactory.createRootBundle(pcollection); @@ -105,7 +108,10 @@ private void afterCommitGetElementsShouldHaveAddedElements(Iterable>> containsMatcher = Matchers.>containsInAnyOrder(expectations); - assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher); + CommittedBundle committed = bundle.commit(Instant.now()); + assertThat(committed.getElements(), containsMatcher); + + return committed; } @Test @@ -122,6 +128,36 @@ public void getElementsAfterAddShouldReturnAddedElements() { afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); } + @SuppressWarnings("unchecked") + @Test + public void withElementsShouldReturnIndependentBundle() { + WindowedValue firstValue = WindowedValue.valueInGlobalWindow(1); + WindowedValue secondValue = + WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L)); + + CommittedBundle committed = + afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); + + WindowedValue firstReplacement = + WindowedValue.of( + 9, + new Instant(2048L), + new IntervalWindow(new Instant(2044L), Instant.now()), + PaneInfo.NO_FIRING); + WindowedValue secondReplacement = + WindowedValue.timestampedValueInGlobalWindow(-1, Instant.now()); + CommittedBundle withed = + committed.withElements(ImmutableList.of(firstReplacement, secondReplacement)); + + assertThat(withed.getElements(), containsInAnyOrder(firstReplacement, secondReplacement)); + assertThat(committed.getElements(), containsInAnyOrder(firstValue, secondValue)); + assertThat(withed.getKey(), equalTo(committed.getKey())); + assertThat(withed.getPCollection(), equalTo(committed.getPCollection())); + assertThat( + withed.getSynchronizedProcessingOutputWatermark(), + equalTo(committed.getSynchronizedProcessingOutputWatermark())); + } + @Test public void addAfterCommitShouldThrowException() { PCollection pcollection = TestPipeline.create().apply(Create.of());