Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31761": "noting that PR #31761 should run this test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/32440": "testing datastream optimizations",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testTeamScoresOnTime() {
.addElements(
event(TestUser.RED_ONE, 1, Duration.standardMinutes(4)),
event(TestUser.BLUE_ONE, 2, Duration.standardSeconds(270)))
// The window should close and emit an ON_TIME pane
// The window should close and emit an ON_TIME paneInfo
.advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores =
Expand Down Expand Up @@ -137,7 +137,8 @@ public void testTeamScoresSpeculative() {
// Some additional time passes and we get a speculative pane for the red team
.advanceProcessingTime(Duration.standardMinutes(12))
.addElements(event(TestUser.BLUE_TWO, 3, Duration.standardSeconds(22)))
// More time passes and a speculative pane containing a refined value for the blue pane
// More time passes and a speculative pane containing a refined value for the blue
// paneInfo
// is
// emitted
.advanceProcessingTime(Duration.standardMinutes(10))
Expand All @@ -155,7 +156,7 @@ public void testTeamScoresSpeculative() {
String blueTeam = TestUser.BLUE_ONE.getTeam();
String redTeam = TestUser.RED_ONE.getTeam();
IntervalWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
// The window contains speculative panes alongside the on-time pane
// The window contains speculative panes alongside the on-time paneInfo
PAssert.that(teamScores)
.inWindow(window)
.containsInAnyOrder(
Expand Down Expand Up @@ -190,7 +191,7 @@ public void testTeamScoresUnobservablyLate() {
.advanceWatermarkTo(
baseTime.plus(TEAM_WINDOW_DURATION).minus(Duration.standardMinutes(1)))
// These events are late, but the window hasn't closed yet, so the elements are in the
// on-time pane
// on-time paneInfo
.addElements(
event(TestUser.RED_TWO, 2, Duration.ZERO),
event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)),
Expand Down Expand Up @@ -235,7 +236,7 @@ public void testTeamScoresObservablyLate() {
event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5)))
.advanceWatermarkTo(firstWindowCloses.minus(Duration.standardMinutes(1)))
// These events are late but should still appear in a late pane
// These events are late but should still appear in a late paneInfo
.addElements(
event(TestUser.RED_TWO, 2, Duration.ZERO),
event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)),
Expand All @@ -244,7 +245,7 @@ public void testTeamScoresObservablyLate() {
// has
// not yet closed because the watermark has not advanced
.advanceProcessingTime(Duration.standardMinutes(12))
// These elements should appear in the final pane
// These elements should appear in the final paneInfo
.addElements(
event(TestUser.RED_TWO, 9, Duration.standardMinutes(1)),
event(TestUser.RED_TWO, 1, Duration.standardMinutes(3)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
} else {
nonLateElements.add(
WindowedValues.of(
element.getValue(), element.getTimestamp(), window, element.getPane()));
element.getValue(), element.getTimestamp(), window, element.getPaneInfo()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public Instant timestamp() {

@Override
public PaneInfo pane() {
return element.getPane();
return element.getPaneInfo();
}

@Override
Expand All @@ -395,7 +395,7 @@ public void output(OutputT output) {

@Override
public void outputWithTimestamp(OutputT value, Instant timestamp) {
outputWindowedValue(value, timestamp, element.getWindows(), element.getPane());
outputWindowedValue(value, timestamp, element.getWindows(), element.getPaneInfo());
}

@Override
Expand All @@ -419,7 +419,7 @@ public <T> void output(TupleTag<T> tag, T value) {
@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
outputReceiver.output(
tag, WindowedValues.of(value, timestamp, element.getWindows(), element.getPane()));
tag, WindowedValues.of(value, timestamp, element.getWindows(), element.getPaneInfo()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public PaneInfoTracker(TimerInternals timerInternals) {

@VisibleForTesting
static final StateTag<ValueState<PaneInfo>> PANE_INFO_TAG =
StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
StateTags.makeSystemTagInternal(StateTags.value("paneInfo", PaneInfoCoder.INSTANCE));

public void clear(StateAccessor<?> state) {
state.access(PANE_INFO_TAG).clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
}

public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(
W window, PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
W window, PaneInfo paneInfo, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
return new OnTriggerContextImpl(stateAccessor(window, style), paneInfo, callbacks);
}

public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
Expand Down Expand Up @@ -402,15 +402,15 @@ public Timers timers() {

private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
private final StateAccessorImpl<K, W> state;
private final PaneInfo pane;
private final PaneInfo paneInfo;
private final OnTriggerCallbacks<OutputT> callbacks;
private final TimersImpl timers;

private OnTriggerContextImpl(
StateAccessorImpl<K, W> state, PaneInfo pane, OnTriggerCallbacks<OutputT> callbacks) {
StateAccessorImpl<K, W> state, PaneInfo paneInfo, OnTriggerCallbacks<OutputT> callbacks) {
reduceFn.super();
this.state = state;
this.pane = pane;
this.paneInfo = paneInfo;
this.callbacks = callbacks;
this.timers = new TimersImpl(state.namespace());
}
Expand All @@ -437,7 +437,7 @@ public StateAccessor<K> state() {

@Override
public PaneInfo paneInfo() {
return pane;
return paneInfo;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,28 +1037,28 @@ private void prefetchOnTrigger(
}

// Calculate the pane info.
final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
final PaneInfo paneInfo = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();

// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
if (needToEmit(isEmpty, isFinished, paneInfo.getTiming())) {
// Run reduceFn.onTrigger method.
final List<W> windows = Collections.singletonList(directContext.window());
ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
contextFactory.forTrigger(
directContext.window(),
pane,
paneInfo,
StateStyle.RENAMED,
toOutput -> {
// We're going to output panes, so commit the (now used) PaneInfo.
// This is unnecessary if the trigger isFinished since the saved
// state will be immediately deleted.
if (!isFinished) {
paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
paneInfoTracker.storeCurrentPaneInfo(directContext, paneInfo);
}

// Output the actual value.
outputter.output(
WindowedValues.of(KV.of(key, toOutput), outputTimestamp, windows, pane));
WindowedValues.of(KV.of(key, toOutput), outputTimestamp, windows, paneInfo));
});

reduceFn.onTrigger(renamedTriggerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public <T> T sideInput(PCollectionView<T> view) {

@Override
public PaneInfo pane() {
return elem.getPane();
return elem.getPaneInfo();
}

@Override
Expand Down Expand Up @@ -437,7 +437,7 @@ public <T> void output(TupleTag<T> tag, T output) {
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
checkTimestamp(elem.getTimestamp(), timestamp);
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPane());
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ public PipelineOptions pipelineOptions() {

@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getPane();
return elementAndRestriction.getKey().getPaneInfo();
}

@Override
Expand Down Expand Up @@ -491,7 +491,7 @@ public PipelineOptions pipelineOptions() {

@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getPane();
return elementAndRestriction.getKey().getPaneInfo();
}

@Override
Expand Down Expand Up @@ -545,7 +545,7 @@ public PipelineOptions pipelineOptions() {

@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getPane();
return elementAndRestriction.getKey().getPaneInfo();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane);
PaneInfo paneInfo);

/** Output the value to a tagged output at the specified timestamp in the listed windows. */
<AdditionalOutputT> void outputWindowedValue(
TupleTag<AdditionalOutputT> tag,
AdditionalOutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane);
PaneInfo paneInfo);

/**
* Return the timer manager provided by the underlying system, or null if Timers need to be
Expand All @@ -67,7 +67,7 @@ <AdditionalOutputT> void outputWindowedValue(
Collection<? extends BoundedWindow> windows();

/** Access the pane of the current window(s). */
PaneInfo pane();
PaneInfo paneInfo();

/** Return the value of the side input for a particular side input window. */
<T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow);
Expand Down
Loading
Loading