Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ private boolean fireTimers() throws Exception {
evaluationContext
.createKeyedBundle(
null, keyTimers.getKey(), (PCollection) transform.getInput())
.add(WindowedValue.valueInEmptyWindows(work))
.add(WindowedValue.valueInGlobalWindow(work))
.commit(Instant.now());
scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
firedTimers = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ public void testFlattenInMemoryEvaluator() throws Exception {
rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
leftSideEvaluator.processElement(
WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)));
leftSideEvaluator.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING));
leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING));
rightSideEvaluator.processElement(
WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
rightSideEvaluator.processElement(
WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)));

Expand All @@ -107,12 +107,12 @@ public void testFlattenInMemoryEvaluator() throws Exception {
flattenedLeftBundle.commit(Instant.now()).getElements(),
containsInAnyOrder(
WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)),
WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING),
WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING),
WindowedValue.valueInGlobalWindow(1)));
assertThat(
flattenedRightBundle.commit(Instant.now()).getElements(),
containsInAnyOrder(
WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING),
WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING),
WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)),
WindowedValue.valueInGlobalWindow(-1)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ public void testInMemoryEvaluator() throws Exception {
.forApplication(
groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext);

evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(firstFoo)));
evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(secondFoo)));
evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(thirdFoo)));
evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(firstBar)));
evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(secondBar)));
evaluator.processElement(WindowedValue.valueInGlobalWindow(gwValue(firstBaz)));

evaluator.finishBundle();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName;
import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -1196,7 +1196,7 @@ public void processElement(ProcessContext c)
// are at a window boundary.
c.output(IsmRecord.of(
ImmutableList.of(previousWindow.get()),
valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(), map))));
valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.<V>of(), map))));
map = new HashMap<>();
}

Expand All @@ -1217,7 +1217,7 @@ public void processElement(ProcessContext c)
// window boundary.
c.output(IsmRecord.of(
ImmutableList.of(previousWindow.get()),
valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(), map))));
valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.<V>of(), map))));
}
}

Expand Down Expand Up @@ -1685,7 +1685,7 @@ public void processElement(ProcessContext c)
Iterable<WindowedValue<V>>,
Iterable<V>>>>of(
ImmutableList.of(previousWindow.get()),
valueInEmptyWindows(
valueInGlobalWindow(
new TransformedMap<>(
IterableWithWindowedValuesToIterable.<V>of(), resultMap))));
multimap = HashMultimap.create();
Expand All @@ -1706,7 +1706,7 @@ public void processElement(ProcessContext c)
Iterable<WindowedValue<V>>,
Iterable<V>>>>of(
ImmutableList.of(previousWindow.get()),
valueInEmptyWindows(
valueInGlobalWindow(
new TransformedMap<>(IterableWithWindowedValuesToIterable.<V>of(), resultMap))));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1269,7 +1269,7 @@ public static <K, V> void evaluateGroupByKeyOnly(
List<V> values = entry.getValue();
values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
outputElems.add(ValueWithMetadata
.of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
.of(WindowedValue.valueInGlobalWindow(KV.<K, Iterable<V>>of(key, values)))
.withKey(key));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,18 @@ public abstract class WindowedValue<T> {
protected final PaneInfo pane;

/**
* Returns a {@code WindowedValue} with the given value, timestamp,
* and windows.
* Returns a {@code WindowedValue} with the given value, timestamp, and windows.
*/
public static <T> WindowedValue<T> of(
T value,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
Preconditions.checkNotNull(pane);
checkNotNull(pane);
checkArgument(
windows.size() > 0, "Cannot create %s in no windows", WindowedValue.class.getName());

if (windows.size() == 0 && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
return valueInEmptyWindows(value, pane);
} else if (windows.size() == 1) {
if (windows.size() == 1) {
return of(value, timestamp, windows.iterator().next(), pane);
} else {
return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
Expand Down Expand Up @@ -131,22 +130,6 @@ public static <T> WindowedValue<T> timestampedValueInGlobalWindow(T value, Insta
}
}

/**
* Returns a {@code WindowedValue} with the given value in no windows, and the default timestamp
* and pane.
*/
public static <T> WindowedValue<T> valueInEmptyWindows(T value) {
return new ValueInEmptyWindows<T>(value, PaneInfo.NO_FIRING);
}

/**
* Returns a {@code WindowedValue} with the given value in no windows, and the default timestamp
* and the specified pane.
*/
public static <T> WindowedValue<T> valueInEmptyWindows(T value, PaneInfo pane) {
return new ValueInEmptyWindows<T>(value, pane);
}

private WindowedValue(T value, PaneInfo pane) {
this.value = value;
this.pane = checkNotNull(pane);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ List<WindowedValue<KV<K, OutputT>>> runGABW(
runner.startBundle();

if (values.size() > 0) {
runner.processElement(WindowedValue.valueInEmptyWindows(
runner.processElement(WindowedValue.valueInGlobalWindow(
KV.of(key, (Iterable<WindowedValue<InputT>>) values)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.util;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -66,15 +65,6 @@ public void testWindowedValueCoder() throws CoderException {
Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray());
}

@Test
public void testExplodeWindowsInNoWindowsEmptyIterable() {
WindowedValue<String> value =
WindowedValue.of(
"foo", Instant.now(), ImmutableList.<BoundedWindow>of(), PaneInfo.NO_FIRING);

assertThat(value.explodeWindows(), emptyIterable());
}

@Test
public void testExplodeWindowsInOneWindowEquals() {
Instant now = Instant.now();
Expand Down