diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java index 99b5c1dfea..73a18dbf33 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java @@ -97,38 +97,63 @@ public CommittedBundle commit(final Instant synchronizedCompletionTime) { checkState(!committed, "Can't commit already committed bundle %s", this); committed = true; final Iterable> committedElements = elements.build(); - return new CommittedBundle() { - @Override - @Nullable - public Object getKey() { - return key; - } - - @Override - public Iterable> getElements() { - return committedElements; - } - - @Override - public PCollection getPCollection() { - return pcollection; - } - - @Override - public Instant getSynchronizedProcessingOutputWatermark() { - return synchronizedCompletionTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .omitNullValues() - .add("pcollection", pcollection) - .add("key", key) - .add("elements", committedElements) - .toString(); - } - }; + return new CommittedInProcessBundle<>( + pcollection, key, committedElements, synchronizedCompletionTime); + } + } + + private static class CommittedInProcessBundle implements CommittedBundle { + public CommittedInProcessBundle( + PCollection pcollection, + Object key, + Iterable> committedElements, + Instant synchronizedCompletionTime) { + this.pcollection = pcollection; + this.key = key; + this.committedElements = committedElements; + this.synchronizedCompletionTime = synchronizedCompletionTime; + } + + private final PCollection pcollection; + private final Object key; + private final Iterable> committedElements; + private final Instant synchronizedCompletionTime; + + @Override + @Nullable + public Object getKey() { + return key; + } + + @Override + public Iterable> getElements() { + return committedElements; + } + + @Override + public PCollection getPCollection() { + return pcollection; + } + + @Override + public Instant getSynchronizedProcessingOutputWatermark() { + return synchronizedCompletionTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); + } + + @Override + public CommittedBundle withElements(Iterable> elements) { + return new CommittedInProcessBundle<>( + pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime); } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 3e333399a4..297e9e8ed9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -149,6 +149,19 @@ public static interface CommittedBundle { * timers that fired to produce this bundle. */ Instant getSynchronizedProcessingOutputWatermark(); + + /** + * Return a new {@link CommittedBundle} that is like this one, except calls to + * {@link #getElements()} will return the provided elements. This bundle is unchanged. + * + *

+ * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing + * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from + * the current bundle. This is used to ensure a {@link PTransform} that could not complete + * processing on input elements properly holds the synchronized processing time to the + * appropriate value. + */ + CommittedBundle withElements(Iterable> elements); } /** diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java index 9b4e630042..a88f6b3ddc 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java @@ -1,4 +1,4 @@ - /* +/* * Copyright (C) 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not @@ -25,6 +25,8 @@ import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -94,7 +96,8 @@ public void keyedWithKeyShouldCreateKeyedBundle() { createKeyedBundle(new Object()); } - private void afterCommitGetElementsShouldHaveAddedElements(Iterable> elems) { + private CommittedBundle + afterCommitGetElementsShouldHaveAddedElements(Iterable> elems) { PCollection pcollection = TestPipeline.create().apply(Create.of()); UncommittedBundle bundle = bundleFactory.createRootBundle(pcollection); @@ -105,7 +108,10 @@ private void afterCommitGetElementsShouldHaveAddedElements(Iterable>> containsMatcher = Matchers.>containsInAnyOrder(expectations); - assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher); + CommittedBundle committed = bundle.commit(Instant.now()); + assertThat(committed.getElements(), containsMatcher); + + return committed; } @Test @@ -122,6 +128,36 @@ public void getElementsAfterAddShouldReturnAddedElements() { afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); } + @SuppressWarnings("unchecked") + @Test + public void withElementsShouldReturnIndependentBundle() { + WindowedValue firstValue = WindowedValue.valueInGlobalWindow(1); + WindowedValue secondValue = + WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L)); + + CommittedBundle committed = + afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); + + WindowedValue firstReplacement = + WindowedValue.of( + 9, + new Instant(2048L), + new IntervalWindow(new Instant(2044L), Instant.now()), + PaneInfo.NO_FIRING); + WindowedValue secondReplacement = + WindowedValue.timestampedValueInGlobalWindow(-1, Instant.now()); + CommittedBundle withed = + committed.withElements(ImmutableList.of(firstReplacement, secondReplacement)); + + assertThat(withed.getElements(), containsInAnyOrder(firstReplacement, secondReplacement)); + assertThat(committed.getElements(), containsInAnyOrder(firstValue, secondValue)); + assertThat(withed.getKey(), equalTo(committed.getKey())); + assertThat(withed.getPCollection(), equalTo(committed.getPCollection())); + assertThat( + withed.getSynchronizedProcessingOutputWatermark(), + equalTo(committed.getSynchronizedProcessingOutputWatermark())); + } + @Test public void addAfterCommitShouldThrowException() { PCollection pcollection = TestPipeline.create().apply(Create.of());