Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,38 +97,63 @@ public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
checkState(!committed, "Can't commit already committed bundle %s", this);
committed = true;
final Iterable<WindowedValue<T>> committedElements = elements.build();
return new CommittedBundle<T>() {
@Override
@Nullable
public Object getKey() {
return key;
}

@Override
public Iterable<WindowedValue<T>> getElements() {
return committedElements;
}

@Override
public PCollection<T> getPCollection() {
return pcollection;
}

@Override
public Instant getSynchronizedProcessingOutputWatermark() {
return synchronizedCompletionTime;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("pcollection", pcollection)
.add("key", key)
.add("elements", committedElements)
.toString();
}
};
return new CommittedInProcessBundle<>(
pcollection, key, committedElements, synchronizedCompletionTime);
}
}

private static class CommittedInProcessBundle<T> implements CommittedBundle<T> {
public CommittedInProcessBundle(
PCollection<T> pcollection,
Object key,
Iterable<WindowedValue<T>> committedElements,
Instant synchronizedCompletionTime) {
this.pcollection = pcollection;
this.key = key;
this.committedElements = committedElements;
this.synchronizedCompletionTime = synchronizedCompletionTime;
}

private final PCollection<T> pcollection;
private final Object key;
private final Iterable<WindowedValue<T>> committedElements;
private final Instant synchronizedCompletionTime;

@Override
@Nullable
public Object getKey() {
return key;
}

@Override
public Iterable<WindowedValue<T>> getElements() {
return committedElements;
}

@Override
public PCollection<T> getPCollection() {
return pcollection;
}

@Override
public Instant getSynchronizedProcessingOutputWatermark() {
return synchronizedCompletionTime;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("pcollection", pcollection)
.add("key", key)
.add("elements", committedElements)
.toString();
}

@Override
public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) {
return new CommittedInProcessBundle<>(
pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@ public static interface CommittedBundle<T> {
* timers that fired to produce this bundle.
*/
Instant getSynchronizedProcessingOutputWatermark();

/**
* Return a new {@link CommittedBundle} that is like this one, except calls to
* {@link #getElements()} will return the provided elements. This bundle is unchanged.
*
* <p>
* The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing
* output watermark} of the returned {@link CommittedBundle} is equal to the value returned from
* the current bundle. This is used to ensure a {@link PTransform} that could not complete
* processing on input elements properly holds the synchronized processing time to the
* appropriate value.
*/
CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
Expand All @@ -25,6 +25,8 @@
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.WithKeys;
import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -94,7 +96,8 @@ public void keyedWithKeyShouldCreateKeyedBundle() {
createKeyedBundle(new Object());
}

private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
private <T> CommittedBundle<T>
afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of());

UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pcollection);
Expand All @@ -105,7 +108,10 @@ private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<Windowed
}
Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher);
CommittedBundle<T> committed = bundle.commit(Instant.now());
assertThat(committed.getElements(), containsMatcher);

return committed;
}

@Test
Expand All @@ -122,6 +128,36 @@ public void getElementsAfterAddShouldReturnAddedElements() {
afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue));
}

@SuppressWarnings("unchecked")
@Test
public void withElementsShouldReturnIndependentBundle() {
WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1);
WindowedValue<Integer> secondValue =
WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L));

CommittedBundle<Integer> committed =
afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue));

WindowedValue<Integer> firstReplacement =
WindowedValue.of(
9,
new Instant(2048L),
new IntervalWindow(new Instant(2044L), Instant.now()),
PaneInfo.NO_FIRING);
WindowedValue<Integer> secondReplacement =
WindowedValue.timestampedValueInGlobalWindow(-1, Instant.now());
CommittedBundle<Integer> withed =
committed.withElements(ImmutableList.of(firstReplacement, secondReplacement));

assertThat(withed.getElements(), containsInAnyOrder(firstReplacement, secondReplacement));
assertThat(committed.getElements(), containsInAnyOrder(firstValue, secondValue));
assertThat(withed.getKey(), equalTo(committed.getKey()));
assertThat(withed.getPCollection(), equalTo(committed.getPCollection()));
assertThat(
withed.getSynchronizedProcessingOutputWatermark(),
equalTo(committed.getSynchronizedProcessingOutputWatermark()));
}

@Test
public void addAfterCommitShouldThrowException() {
PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
Expand Down