Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,38 +100,63 @@ public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
checkState(!committed, "Can't commit already committed bundle %s", this);
committed = true;
final Iterable<WindowedValue<T>> committedElements = elements.build();
return new CommittedBundle<T>() {
@Override
@Nullable
public Object getKey() {
return key;
}

@Override
public Iterable<WindowedValue<T>> getElements() {
return committedElements;
}

@Override
public PCollection<T> getPCollection() {
return pcollection;
}

@Override
public Instant getSynchronizedProcessingOutputWatermark() {
return synchronizedCompletionTime;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("pcollection", pcollection)
.add("key", key)
.add("elements", committedElements)
.toString();
}
};
return new CommittedInProcessBundle<>(
pcollection, key, committedElements, synchronizedCompletionTime);
}
}

private static class CommittedInProcessBundle<T> implements CommittedBundle<T> {
public CommittedInProcessBundle(
PCollection<T> pcollection,
Object key,
Iterable<WindowedValue<T>> committedElements,
Instant synchronizedCompletionTime) {
this.pcollection = pcollection;
this.key = key;
this.committedElements = committedElements;
this.synchronizedCompletionTime = synchronizedCompletionTime;
}

private final PCollection<T> pcollection;
private final Object key;
private final Iterable<WindowedValue<T>> committedElements;
private final Instant synchronizedCompletionTime;

@Override
@Nullable
public Object getKey() {
return key;
}

@Override
public Iterable<WindowedValue<T>> getElements() {
return committedElements;
}

@Override
public PCollection<T> getPCollection() {
return pcollection;
}

@Override
public Instant getSynchronizedProcessingOutputWatermark() {
return synchronizedCompletionTime;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("pcollection", pcollection)
.add("key", key)
.add("elements", committedElements)
.toString();
}

@Override
public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this may need to inherit the synchronizedCompletionTime from the elements as well. Is that not the case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted why the current value is propagated in the CommittedBundle javadoc.

return new CommittedInProcessBundle<>(
pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ public static interface CommittedBundle<T> {
* timers that fired to produce this bundle.
*/
Instant getSynchronizedProcessingOutputWatermark();

/**
* Return a new {@link CommittedBundle} that is like this one, except calls to
* {@link #getElements()} will return the provided elements. This bundle is unchanged.
*
* <p>
* The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing
* output watermark} of the returned {@link CommittedBundle} is equal to the value returned from
* the current bundle. This is used to ensure a {@link PTransform} that could not complete
* processing on input elements properly holds the synchronized processing time to the
* appropriate value.
*/
CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -97,7 +99,8 @@ public void keyedWithKeyShouldCreateKeyedBundle() {
createKeyedBundle(new Object());
}

private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
private <T> CommittedBundle<T>
afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of());

UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pcollection);
Expand All @@ -108,7 +111,10 @@ private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<Windowed
}
Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher);
CommittedBundle<T> committed = bundle.commit(Instant.now());
assertThat(committed.getElements(), containsMatcher);

return committed;
}

@Test
Expand All @@ -125,6 +131,36 @@ public void getElementsAfterAddShouldReturnAddedElements() {
afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue));
}

@SuppressWarnings("unchecked")
@Test
public void withElementsShouldReturnIndependentBundle() {
WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1);
WindowedValue<Integer> secondValue =
WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L));

CommittedBundle<Integer> committed =
afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue));

WindowedValue<Integer> firstReplacement =
WindowedValue.of(
9,
new Instant(2048L),
new IntervalWindow(new Instant(2044L), Instant.now()),
PaneInfo.NO_FIRING);
WindowedValue<Integer> secondReplacement =
WindowedValue.timestampedValueInGlobalWindow(-1, Instant.now());
CommittedBundle<Integer> withed =
committed.withElements(ImmutableList.of(firstReplacement, secondReplacement));

assertThat(withed.getElements(), containsInAnyOrder(firstReplacement, secondReplacement));
assertThat(committed.getElements(), containsInAnyOrder(firstValue, secondValue));
assertThat(withed.getKey(), equalTo(committed.getKey()));
assertThat(withed.getPCollection(), equalTo(committed.getPCollection()));
assertThat(
withed.getSynchronizedProcessingOutputWatermark(),
equalTo(committed.getSynchronizedProcessingOutputWatermark()));
}

@Test
public void addAfterCommitShouldThrowException() {
PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
Expand Down