From 3eb4bdde2f64694e5832a1f349dd563c45a0883b Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 3 Jun 2025 13:05:39 -0400 Subject: [PATCH] rename WindowedValue.getPane() to WindowedValue.getPaneInfo() in preparation for making it public --- .../core/LateDataDroppingDoFnRunner.java | 2 +- ...oundedSplittableProcessElementInvoker.java | 6 +- .../beam/runners/core/SimpleDoFnRunner.java | 4 +- .../SplittableParDoViaKeyedWorkItems.java | 6 +- .../beam/runners/core/ReduceFnRunnerTest.java | 50 ++++++++------ .../beam/runners/core/WindowMatchers.java | 6 +- .../runners/direct/SideInputContainer.java | 4 +- .../direct/WindowEvaluatorFactory.java | 2 +- .../direct/WindowEvaluatorFactoryTest.java | 18 ++--- .../FlinkStreamingTransformTranslators.java | 4 +- .../functions/FlinkAssignWindows.java | 2 +- .../functions/FlinkDoFnFunction.java | 4 +- .../functions/SortingFlinkCombineRunner.java | 4 +- .../runners/dataflow/BatchViewOverrides.java | 2 +- .../worker/AssignWindowsParDoFnFactory.java | 2 +- .../worker/PartialGroupByKeyParDoFns.java | 6 +- ...eifyTimestampAndWindowsParDoFnFactory.java | 4 +- ...StreamingGroupAlsoByWindowReshuffleFn.java | 2 +- .../runners/dataflow/worker/WindmillSink.java | 2 +- .../BatchGroupAlsoByWindowReshuffleFn.java | 2 +- .../worker/util/ValueInEmptyWindows.java | 4 +- ...TimestampAndWindowsParDoFnFactoryTest.java | 2 +- .../control/BundleCheckpointHandlers.java | 2 +- .../runners/jet/processors/AssignWindowP.java | 2 +- .../beam/runners/jet/processors/ViewP.java | 2 +- .../runners/jet/processors/WindowGroupP.java | 2 +- .../beam/runners/samza/runtime/DoFnOp.java | 2 +- .../runners/samza/runtime/WindowAssignOp.java | 2 +- .../batch/WindowAssignTranslatorBatch.java | 2 +- .../GroupNonMergingWindowsFunctions.java | 3 +- .../ReifyTimestampsAndWindowsFunction.java | 5 +- .../translation/TransformTranslator.java | 7 +- .../spark/translation/TranslationUtils.java | 2 +- .../functions/AssignWindowsFunction.java | 2 +- .../ByteToWindowFunctionPrimitive.java | 2 +- .../functions/MapToTupleFunction.java | 4 +- .../apache/beam/sdk/transforms/Create.java | 5 +- .../apache/beam/sdk/util/WindowedValue.java | 51 +++++++------- .../beam/sdk/transforms/CreateTest.java | 2 +- .../beam/fn/harness/AssignWindowsRunner.java | 2 +- .../beam/fn/harness/FnApiDoFnRunner.java | 41 ++++++------ ...littablePairWithRestrictionDoFnRunner.java | 4 +- ...bleSplitAndSizeRestrictionsDoFnRunner.java | 4 +- .../fn/harness/AssignWindowsRunnerTest.java | 4 +- .../beam/fn/harness/FnApiDoFnRunnerTest.java | 66 +++++++++---------- ...ablePairWithRestrictionDoFnRunnerTest.java | 12 ++-- ...plitAndSizeRestrictionsDoFnRunnerTest.java | 24 +++---- 47 files changed, 206 insertions(+), 186 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index ca1a0393b95d..32019683ce93 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -141,7 +141,7 @@ public Iterable> filter( } else { nonLateElements.add( WindowedValue.of( - element.getValue(), element.getTimestamp(), window, element.getPane())); + element.getValue(), element.getTimestamp(), window, element.getPaneInfo())); } } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 51cd8c690aee..597b51ebeb06 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -375,7 +375,7 @@ public Instant timestamp() { @Override public PaneInfo pane() { - return element.getPane(); + return element.getPaneInfo(); } @Override @@ -390,7 +390,7 @@ public void output(OutputT output) { @Override public void outputWithTimestamp(OutputT value, Instant timestamp) { - outputWindowedValue(value, timestamp, element.getWindows(), element.getPane()); + outputWindowedValue(value, timestamp, element.getWindows(), element.getPaneInfo()); } @Override @@ -413,7 +413,7 @@ public void output(TupleTag tag, T value) { @Override public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) { - outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane()); + outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPaneInfo()); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index b375d38c5a98..4c6343c8b1ac 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -404,7 +404,7 @@ public T sideInput(PCollectionView view) { @Override public PaneInfo pane() { - return elem.getPane(); + return elem.getPaneInfo(); } @Override @@ -436,7 +436,7 @@ public void output(TupleTag tag, T output) { public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); checkTimestamp(elem.getTimestamp(), timestamp); - outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPane()); + outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo()); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index caa5565541a4..ffe9bcd35538 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -430,7 +430,7 @@ public PipelineOptions pipelineOptions() { @Override public PaneInfo paneInfo(DoFn doFn) { - return elementAndRestriction.getKey().getPane(); + return elementAndRestriction.getKey().getPaneInfo(); } @Override @@ -490,7 +490,7 @@ public PipelineOptions pipelineOptions() { @Override public PaneInfo paneInfo(DoFn doFn) { - return elementAndRestriction.getKey().getPane(); + return elementAndRestriction.getKey().getPaneInfo(); } @Override @@ -544,7 +544,7 @@ public PipelineOptions pipelineOptions() { @Override public PaneInfo paneInfo(DoFn doFn) { - return elementAndRestriction.getKey().getPane(); + return elementAndRestriction.getKey().getPaneInfo(); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index ea8bb406b60d..c32d18b0aa4f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -759,7 +759,8 @@ public void testWatermarkHoldAndLateData() throws Exception { equalTo(new Instant(1)), equalTo((BoundedWindow) expectedWindow)))); assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); + output.get(0).getPaneInfo(), + equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); // There is no end-of-window hold, but the timer set by the trigger holds the watermark assertThat(tester.getWatermarkHold(), nullValue()); @@ -805,7 +806,8 @@ public void testWatermarkHoldAndLateData() throws Exception { 0, // window start 10))); // window end assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))); + output.get(0).getPaneInfo(), + equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))); // Since the element hold is cleared, there is no hold remaining assertThat(tester.getWatermarkHold(), nullValue()); @@ -846,7 +848,8 @@ public void testWatermarkHoldAndLateData() throws Exception { 0, // window start 10))); // window end assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))); + output.get(0).getPaneInfo(), + equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))); tester.setAutoAdvanceOutputWatermark(true); @@ -879,7 +882,7 @@ public void testWatermarkHoldAndLateData() throws Exception { 0, // window start 10))); // window end assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1))); + output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1))); assertEquals(new Instant(50), tester.getOutputWatermark()); assertEquals(null, tester.getWatermarkHold()); @@ -1486,7 +1489,8 @@ public void testMergeBeforeFinalizing() throws Exception { 1, // window start 20)); // window end assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); + output.get(0).getPaneInfo(), + equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); } /** @@ -1527,7 +1531,8 @@ public void testMergingWithCloseBeforeGC() throws Exception { 1, // window start 20)); // window end assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); + output.get(0).getPaneInfo(), + equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); } /** Ensure a closed trigger has its state recorded in the merge result window. */ @@ -1614,7 +1619,8 @@ public void testMergingWithReusedWindow() throws Exception { equalTo((BoundedWindow) mergedWindow))); assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); + output.get(0).getPaneInfo(), + equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); } /** @@ -1661,7 +1667,7 @@ public void testMergingWithClosedRepresentative() throws Exception { 1, // window start 18)); // window end assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); + output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); } /** @@ -1702,7 +1708,7 @@ public void testMergingWithClosedDoesNotPoison() throws Exception { 2, // window start 12)); // window end assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); + output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); assertThat( output.get(1), isSingleWindowedValue( @@ -1711,7 +1717,8 @@ public void testMergingWithClosedDoesNotPoison() throws Exception { 1, // window start 13)); // window end assertThat( - output.get(1).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); + output.get(1).getPaneInfo(), + equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); } /** @@ -1811,7 +1818,7 @@ public void testIdempotentEmptyPanesDiscarding() throws Exception { // The late pane has the correct indices. assertThat(output.get(1).getValue(), contains(3)); assertThat( - output.get(1).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1))); + output.get(1).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1))); assertTrue(tester.isMarkedFinished(firstWindow)); tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); @@ -1850,7 +1857,8 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception { assertThat(output.size(), equalTo(1)); assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)); assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0))); + output.get(0).getPaneInfo(), + equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0))); // Fire another timer with no data; the empty pane should not be output even though the // trigger is ready to fire @@ -1868,7 +1876,7 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception { // The late pane has the correct indices. assertThat(output.get(0).getValue(), containsInAnyOrder(1, 2, 3)); assertThat( - output.get(0).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1))); + output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1))); assertTrue(tester.isMarkedFinished(firstWindow)); tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); @@ -2193,8 +2201,8 @@ public void fireNonEmptyOnDrainInGlobalWindow() throws Exception { List>> output = tester.extractOutput(); assertEquals(n / 3, output.size()); for (int i = 0; i < output.size(); i++) { - assertEquals(Timing.EARLY, output.get(i).getPane().getTiming()); - assertEquals(i, output.get(i).getPane().getIndex()); + assertEquals(Timing.EARLY, output.get(i).getPaneInfo().getTiming()); + assertEquals(i, output.get(i).getPaneInfo().getIndex()); assertEquals(3, Iterables.size(output.get(i).getValue())); } @@ -2202,8 +2210,8 @@ public void fireNonEmptyOnDrainInGlobalWindow() throws Exception { output = tester.extractOutput(); assertEquals(1, output.size()); - assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming()); - assertEquals(n / 3, output.get(0).getPane().getIndex()); + assertEquals(Timing.ON_TIME, output.get(0).getPaneInfo().getTiming()); + assertEquals(n / 3, output.get(0).getPaneInfo().getIndex()); assertEquals(n - ((n / 3) * 3), Iterables.size(output.get(0).getValue())); } @@ -2231,8 +2239,8 @@ public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception { List>> output = tester.extractOutput(); assertEquals((n + 3) / 4, output.size()); for (int i = 0; i < output.size(); i++) { - assertEquals(Timing.EARLY, output.get(i).getPane().getTiming()); - assertEquals(i, output.get(i).getPane().getIndex()); + assertEquals(Timing.EARLY, output.get(i).getPaneInfo().getTiming()); + assertEquals(i, output.get(i).getPaneInfo().getIndex()); assertEquals(4, Iterables.size(output.get(i).getValue())); } @@ -2240,8 +2248,8 @@ public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception { output = tester.extractOutput(); assertEquals(1, output.size()); - assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming()); - assertEquals((n + 3) / 4, output.get(0).getPane().getIndex()); + assertEquals(Timing.ON_TIME, output.get(0).getPaneInfo().getTiming()); + assertEquals((n + 3) / 4, output.get(0).getPaneInfo().getIndex()); assertEquals(0, Iterables.size(output.get(0).getValue())); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java index ee33eb28f69d..dacd826d1e48 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java @@ -155,13 +155,13 @@ public void describeTo(Description description) { @Override protected boolean matchesSafely(WindowedValue item) { - return Objects.equals(item.getPane(), paneInfo); + return Objects.equals(item.getPaneInfo(), paneInfo); } @Override protected void describeMismatchSafely( WindowedValue item, Description mismatchDescription) { - mismatchDescription.appendValue(item.getPane()); + mismatchDescription.appendValue(item.getPaneInfo()); } }; } @@ -212,7 +212,7 @@ protected boolean matchesSafely(WindowedValue windowedValue) { return valueMatcher.matches(windowedValue.getValue()) && timestampMatcher.matches(windowedValue.getTimestamp()) && windowsMatcher.matches(windowedValue.getWindows()) - && paneInfoMatcher.matches(windowedValue.getPane()); + && paneInfoMatcher.matches(windowedValue.getPaneInfo()); } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java index 62b0a06ddff1..17e77d05c5f8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java @@ -159,7 +159,7 @@ private void updatePCollectionViewWindowValues( // the value had never been set, so we set it and are done. return; } - PaneInfo newPane = windowValues.iterator().next().getPane(); + PaneInfo newPane = windowValues.iterator().next().getPaneInfo(); Iterable> existingValues; long existingPane; @@ -168,7 +168,7 @@ private void updatePCollectionViewWindowValues( existingPane = Iterables.isEmpty(existingValues) ? -1L - : existingValues.iterator().next().getPane().getIndex(); + : existingValues.iterator().next().getPaneInfo().getIndex(); } while (newPane.getIndex() > existingPane && !contents.compareAndSet(existingValues, windowValues)); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index 1f5292b4e7c9..a89b22516eca 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -91,7 +91,7 @@ public void processElement(WindowedValue compressedElement) throws Excep Collection windows = assignWindows(windowFn, element); outputBundle.add( WindowedValue.of( - element.getValue(), element.getTimestamp(), windows, element.getPane())); + element.getValue(), element.getTimestamp(), windows, element.getPaneInfo())); } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index b334c8431683..e586e14ad6ee 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -176,24 +176,24 @@ public void multipleWindowsWindowFnSucceeds() throws Exception { valueInIntervalWindow.getValue(), valueInIntervalWindow.getTimestamp(), ImmutableSet.of(wMinus1, wMinusSlide), - valueInIntervalWindow.getPane()), + valueInIntervalWindow.getPaneInfo()), // Value in three windows mapped to three windowed values in the same multiple windows isWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(w1, w2), - valueInGlobalAndTwoIntervalWindows.getPane()), + valueInGlobalAndTwoIntervalWindows.getPaneInfo()), isWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(w1, w2), - valueInGlobalAndTwoIntervalWindows.getPane()), + valueInGlobalAndTwoIntervalWindows.getPaneInfo()), isWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(w1, w2), - valueInGlobalAndTwoIntervalWindows.getPane()))); + valueInGlobalAndTwoIntervalWindows.getPaneInfo()))); } @Test @@ -219,14 +219,14 @@ public void referencesEarlierWindowsSucceeds() throws Exception { new IntervalWindow( valueInGlobalWindow.getTimestamp(), valueInGlobalWindow.getTimestamp().plus(Duration.millis(1L))), - valueInGlobalWindow.getPane()), + valueInGlobalWindow.getPaneInfo()), // Value in interval window mapped to the same window isWindowedValue( valueInIntervalWindow.getValue(), valueInIntervalWindow.getTimestamp(), valueInIntervalWindow.getWindows(), - valueInIntervalWindow.getPane()), + valueInIntervalWindow.getPaneInfo()), // Value in global window and two interval windows exploded and mapped in both ways isSingleWindowedValue( @@ -235,17 +235,17 @@ public void referencesEarlierWindowsSucceeds() throws Exception { new IntervalWindow( valueInGlobalAndTwoIntervalWindows.getTimestamp(), valueInGlobalAndTwoIntervalWindows.getTimestamp().plus(Duration.millis(1L))), - valueInGlobalAndTwoIntervalWindows.getPane()), + valueInGlobalAndTwoIntervalWindows.getPaneInfo()), isSingleWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), intervalWindow1, - valueInGlobalAndTwoIntervalWindows.getPane()), + valueInGlobalAndTwoIntervalWindows.getPaneInfo()), isSingleWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), intervalWindow2, - valueInGlobalAndTwoIntervalWindows.getPane()))); + valueInGlobalAndTwoIntervalWindows.getPaneInfo()))); } private CommittedBundle createInputBundle() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 36cf035a33be..7ac9fa9e70d5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -1404,7 +1404,7 @@ public void collect(WindowedValue> element) { OutputT originalValue = element.getValue().getValue(); WindowedValue output = WindowedValue.of( - originalValue, element.getTimestamp(), element.getWindows(), element.getPane()); + originalValue, element.getTimestamp(), element.getWindows(), element.getPaneInfo()); ctx.collect(output); } @@ -1414,7 +1414,7 @@ public void collectWithTimestamp( OutputT originalValue = element.getValue().getValue(); WindowedValue output = WindowedValue.of( - originalValue, element.getTimestamp(), element.getWindows(), element.getPane()); + originalValue, element.getTimestamp(), element.getWindows(), element.getPaneInfo()); ctx.collectWithTimestamp(output, timestamp); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java index d8c7219bdb63..7477f7882047 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java @@ -43,7 +43,7 @@ public void flatMap(WindowedValue input, Collector> collecto Collection windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input)); for (W window : windows) { collector.collect( - WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane())); + WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPaneInfo())); } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index f437daf86e71..729afc8eb8ef 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -223,7 +223,7 @@ public void output(TupleTag tag, WindowedValue output) { new RawUnionValue(0 /* single output */, output.getValue()), output.getTimestamp(), output.getWindows(), - output.getPane())); + output.getPaneInfo())); } } @@ -256,7 +256,7 @@ public void output(TupleTag tag, WindowedValue output) { new RawUnionValue(outputMap.get(tag), output.getValue()), output.getTimestamp(), output.getWindows(), - output.getPane())); + output.getPaneInfo())); } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java index 149c3e284032..c0ffd4ffb096 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java @@ -162,7 +162,7 @@ private void mergeWindow(List>> elements) { elements.set( j, WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + value.getValue(), value.getTimestamp(), currentWindow, value.getPaneInfo())); } currentStart = i; currentWindow = nextWindow; @@ -175,7 +175,7 @@ private void mergeWindow(List>> elements) { elements.set( j, WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + value.getValue(), value.getTimestamp(), currentWindow, value.getPaneInfo())); } } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 6af6c499cfad..79c597f5964e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1390,7 +1390,7 @@ public Collection getWindows() { } @Override - public PaneInfo getPane() { + public PaneInfo getPaneInfo() { return PaneInfo.NO_FIRING; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java index 7697271792a8..87383e6a8967 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java @@ -111,7 +111,7 @@ public BoundedWindow window() { }); WindowedValue res = - WindowedValue.of(elem.getValue(), elem.getTimestamp(), windows, elem.getPane()); + WindowedValue.of(elem.getValue(), elem.getTimestamp(), windows, elem.getPaneInfo()); receiver.process(res); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java index ca709de7effb..6ea65d141090 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java @@ -243,7 +243,7 @@ public Object createGroupingKey(WindowedValue key) throws Exception { // Ignore timestamp for grouping purposes. // The PGBK output will inherit the timestamp of one of its inputs. return WindowedValue.of( - coder.structuralValue(key.getValue()), ignored, key.getWindows(), key.getPane()); + coder.structuralValue(key.getValue()), ignored, key.getWindows(), key.getPaneInfo()); } } @@ -304,7 +304,7 @@ public void processElement(Object elem) throws Exception { WindowedValue> input = (WindowedValue>) elem; for (BoundedWindow w : input.getWindows()) { WindowedValue> windowsExpandedInput = - WindowedValue.of(input.getValue(), input.getTimestamp(), w, input.getPane()); + WindowedValue.of(input.getValue(), input.getTimestamp(), w, input.getPaneInfo()); groupingTable.put(windowsExpandedInput, receiver); } } @@ -361,7 +361,7 @@ public void processElement(Object elem) throws Exception { WindowedValue> input = (WindowedValue>) elem; for (BoundedWindow w : input.getWindows()) { WindowedValue> windowsExpandedInput = - WindowedValue.of(input.getValue(), input.getTimestamp(), w, input.getPane()); + WindowedValue.of(input.getValue(), input.getTimestamp(), w, input.getPaneInfo()); if (!sideInputFetcher.storeIfBlocked(windowsExpandedInput)) { groupingTable.put(windowsExpandedInput, receiver); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java index a5971bb6bdaf..0de4cb73d5b0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java @@ -77,10 +77,10 @@ public void processElement(Object untypedElem) throws Exception { typedElem.getValue().getValue(), typedElem.getTimestamp(), typedElem.getWindows(), - typedElem.getPane())), + typedElem.getPaneInfo())), typedElem.getTimestamp(), typedElem.getWindows(), - typedElem.getPane())); + typedElem.getPaneInfo())); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowReshuffleFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowReshuffleFn.java index be14e7cc3bcf..f94ee33ad9b1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowReshuffleFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowReshuffleFn.java @@ -58,7 +58,7 @@ public void processElement( KV.of(key, Collections.singletonList(item.getValue())), item.getTimestamp(), item.getWindows(), - item.getPane()); + item.getPaneInfo()); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index abdbeb9a469f..4096e1c4087c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -155,7 +155,7 @@ private ByteString encode(Coder coder, EncodeT object) throws public long add(WindowedValue data) throws IOException { ByteString key, value; ByteString id = ByteString.EMPTY; - ByteString metadata = encodeMetadata(windowsCoder, data.getWindows(), data.getPane()); + ByteString metadata = encodeMetadata(windowsCoder, data.getWindows(), data.getPaneInfo()); if (valueCoder instanceof KvCoder) { KvCoder kvCoder = (KvCoder) valueCoder; KV kv = (KV) data.getValue(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleFn.java index 815ac1f822e7..8f3e831123a6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowReshuffleFn.java @@ -57,7 +57,7 @@ public void processElement( KV.>of(key, Collections.singletonList(item.getValue())), item.getTimestamp(), item.getWindows(), - item.getPane()); + item.getPaneInfo()); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 5084af0d187e..86097f1c4cc5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -45,7 +45,7 @@ public ValueInEmptyWindows(T value) { } @Override - public PaneInfo getPane() { + public PaneInfo getPaneInfo() { return PaneInfo.NO_FIRING; } @@ -88,7 +88,7 @@ public int hashCode() { public String toString() { return MoreObjects.toStringHelper(getClass()) .add("value", getValue()) - .add("pane", getPane()) + .add("pane", getPaneInfo()) .toString(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactoryTest.java index 55b8d62261c7..b7253865ae46 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactoryTest.java @@ -55,7 +55,7 @@ private void verifyReifiedIsInTheSameWindows(WindowedValue> elem receiver.reified.getValue().getValue().getValue(), equalTo(elem.getValue().getValue())); assertThat(receiver.reified.getValue().getValue().getTimestamp(), equalTo(elem.getTimestamp())); assertThat(receiver.reified.getValue().getValue().getWindows(), equalTo(elem.getWindows())); - assertThat(receiver.reified.getValue().getValue().getPane(), equalTo(elem.getPane())); + assertThat(receiver.reified.getValue().getValue().getPaneInfo(), equalTo(elem.getPaneInfo())); } @Test diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java index c5ca5ae54361..8907ee107a7c 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java @@ -131,7 +131,7 @@ public void onCheckpoint(ProcessBundleResponse response) { stateValue.getValue(), stateValue.getTimestamp(), ImmutableList.of(window), - stateValue.getPane())); + stateValue.getPaneInfo())); } } catch (Exception e) { throw new RuntimeException("Failed to set timer/state for the residual", e); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AssignWindowP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AssignWindowP.java index 82151a812d63..0fcd0e651f62 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AssignWindowP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/AssignWindowP.java @@ -77,7 +77,7 @@ private AssignWindowP( inputValue.getValue(), inputValue.getTimestamp(), windows, - inputValue.getPane()); + inputValue.getPaneInfo()); traverser.accept(Utils.encode(outputValue, outputCoder)); return traverser; }); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/ViewP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/ViewP.java index 1106585501ff..e708eef44022 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/ViewP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/ViewP.java @@ -74,7 +74,7 @@ protected boolean tryProcess(int ordinal, @Nonnull Object item) { values.merge( window, new TimestampAndValues( - windowedValue.getPane(), windowedValue.getTimestamp(), windowedValue.getValue()), + windowedValue.getPaneInfo(), windowedValue.getTimestamp(), windowedValue.getValue()), (o, n) -> o.merge(timestampCombiner, n)); } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java index 81a9e8562fe0..a222bf2f670b 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/WindowGroupP.java @@ -134,7 +134,7 @@ private WindowGroupP( value, windowedValue.getTimestamp(), windowedValue.getWindows(), - windowedValue.getPane()); + windowedValue.getPaneInfo()); keyManagers .computeIfAbsent(keyBytes, x -> new KeyManager(key)) .processElement(updatedWindowedValue); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index bb396ae0456b..86398d4f36bd 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -485,7 +485,7 @@ static CompletionStage> createOutputFuture( valueMapper.apply(res), windowedValue.getTimestamp(), windowedValue.getWindows(), - windowedValue.getPane())); + windowedValue.getPaneInfo())); } /** diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java index 48fb96917cb3..747c10fd4800 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java @@ -46,7 +46,7 @@ public void processElement(WindowedValue inputElement, OpEmitter emitter) inputElement.getValue(), inputElement.getTimestamp(), window, - inputElement.getPane())) + inputElement.getPaneInfo())) .forEach(outputElement -> emitter.emitElement(outputElement)); } } diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index 140f2f88d8cd..ea55a517376e 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -93,7 +93,7 @@ public T element() { return window; } }); - return WindowedValue.of(element, timestamp, windows, value.getPane()); + return WindowedValue.of(element, timestamp, windows, value.getPaneInfo()); }; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java index 14630fbb0a1f..12eeee75c6b1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctions.java @@ -131,7 +131,8 @@ static JavaPairRDD final W window = (W) Iterables.getOnlyElement(item.getWindows()); final byte[] windowBytes = CoderHelpers.toByteArray(window, windowCoder); WindowedValue> valueOut = - WindowedValue.of(item.getValue(), item.getTimestamp(), window, item.getPane()); + WindowedValue.of( + item.getValue(), item.getTimestamp(), window, item.getPaneInfo()); final ByteArray windowedKey = new ByteArray(Bytes.concat(keyBytes, windowBytes)); return new Tuple2<>(windowedKey, mappingFn.apply(valueOut)); }); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java index dfb58c5c4c53..18bb8aa821ff 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java @@ -33,6 +33,9 @@ public KV> call(WindowedValue> elem) throws Excepti return KV.of( elem.getValue().getKey(), WindowedValue.of( - elem.getValue().getValue(), elem.getTimestamp(), elem.getWindows(), elem.getPane())); + elem.getValue().getValue(), + elem.getTimestamp(), + elem.getWindows(), + elem.getPaneInfo())); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 1fea8b9329c6..498c394beb67 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -231,7 +231,7 @@ public void evaluate( in.getValue().getValue(), sparkCombineFn.ctxtForValue(in))), in.getTimestamp(), in.getWindows(), - in.getPane())); + in.getPaneInfo())); context.putDataset(transform, new BoundedDataset<>(outRDD)); } @@ -698,7 +698,10 @@ protected WindowedValue> computeNext() { WindowedValue wv = CoderHelpers.fromByteArray(read._2(), wvCoder); consumed(); return WindowedValue.of( - KV.of(key, wv.getValue()), wv.getTimestamp(), wv.getWindows(), wv.getPane()); + KV.of(key, wv.getValue()), + wv.getTimestamp(), + wv.getWindows(), + wv.getPaneInfo()); } } return endOfData(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index f2455e64b956..3a3cc79b14e4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -109,7 +109,7 @@ public WindowedValue> call(WindowedValue>> .apply(windowedKv.getValue().getValue(), fn.ctxtForValue(windowedKv))), windowedKv.getTimestamp(), windowedKv.getWindows(), - windowedKv.getPane()); + windowedKv.getPaneInfo()); } } diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/AssignWindowsFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/AssignWindowsFunction.java index cc7b9d9a5373..9f1d4af64f9a 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/AssignWindowsFunction.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/AssignWindowsFunction.java @@ -70,7 +70,7 @@ public void compute(Iterator> input, RecordCollector> map(Tuple input) { KV.of(key, value.getValue()), value.getTimestamp(), value.getWindows(), - value.getPane()); + value.getPaneInfo()); return element; } diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/MapToTupleFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/MapToTupleFunction.java index d96fa224ddac..e3d4b840106d 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/MapToTupleFunction.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/MapToTupleFunction.java @@ -68,10 +68,10 @@ public Tuple map(WindowedValue> input) { input.getValue().getValue(), input.getTimestamp(), input.getWindows(), - input.getPane())), + input.getPaneInfo())), input.getTimestamp(), input.getWindows(), - input.getPane()); + input.getPaneInfo()); try { element = new Tuple<>( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index dc81e23b10ee..2bd951ae308e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -913,7 +913,10 @@ private static class ConvertWindowedValues extends DoFn, T> @ProcessElement public void processElement(@Element WindowedValue element, OutputReceiver r) { r.outputWindowedValue( - element.getValue(), element.getTimestamp(), element.getWindows(), element.getPane()); + element.getValue(), + element.getTimestamp(), + element.getWindows(), + element.getPaneInfo()); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index d11166001f05..176bc5b07fca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -155,7 +155,7 @@ public static WindowedValue timestampedValueInGlobalWindow( public abstract Collection getWindows(); /** Returns the pane of this {@code WindowedValue} in its window. */ - public abstract PaneInfo getPane(); + public abstract PaneInfo getPaneInfo(); /** Returns {@code true} if this WindowedValue has exactly one window. */ public boolean isSingleWindowedValue() { @@ -172,7 +172,7 @@ public Iterable> explodeWindows() { } ImmutableList.Builder> windowedValues = ImmutableList.builder(); for (BoundedWindow w : getWindows()) { - windowedValues.add(of(getValue(), getTimestamp(), w, getPane())); + windowedValues.add(of(getValue(), getTimestamp(), w, getPaneInfo())); } return windowedValues.build(); } @@ -190,14 +190,14 @@ public boolean equals(@Nullable Object other) { return this.getTimestamp().isEqual(that.getTimestamp()) && Objects.equals(this.getValue(), that.getValue()) && Objects.equals(this.getWindows(), that.getWindows()) - && Objects.equals(this.getPane(), that.getPane()); + && Objects.equals(this.getPaneInfo(), that.getPaneInfo()); } } @Override public int hashCode() { // Hash only the millis of the timestamp to be consistent with equals - return Objects.hash(getValue(), getTimestamp().getMillis(), getWindows(), getPane()); + return Objects.hash(getValue(), getTimestamp().getMillis(), getWindows(), getPaneInfo()); } @Override @@ -228,7 +228,7 @@ protected SimpleWindowedValue(T value, PaneInfo pane) { } @Override - public PaneInfo getPane() { + public PaneInfo getPaneInfo() { return pane; } @@ -260,7 +260,7 @@ public ValueInGlobalWindow(T value, PaneInfo pane) { @Override public WindowedValue withValue(NewT newValue) { - return new ValueInGlobalWindow<>(newValue, getPane()); + return new ValueInGlobalWindow<>(newValue, getPaneInfo()); } @Override @@ -282,7 +282,7 @@ public BoundedWindow getWindow() { public boolean equals(@Nullable Object o) { if (o instanceof ValueInGlobalWindow) { ValueInGlobalWindow that = (ValueInGlobalWindow) o; - return Objects.equals(that.getPane(), this.getPane()) + return Objects.equals(that.getPaneInfo(), this.getPaneInfo()) && Objects.equals(that.getValue(), this.getValue()); } else { return super.equals(o); @@ -291,14 +291,14 @@ public boolean equals(@Nullable Object o) { @Override public int hashCode() { - return Objects.hash(getValue(), getPane()); + return Objects.hash(getValue(), getPaneInfo()); } @Override public String toString() { return MoreObjects.toStringHelper(getClass()) .add("value", getValue()) - .add("pane", getPane()) + .add("pane", getPaneInfo()) .toString(); } } @@ -331,7 +331,7 @@ public TimestampedValueInGlobalWindow(T value, Instant timestamp, PaneInfo pane) @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPane()); + return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPaneInfo()); } @Override @@ -357,7 +357,7 @@ public boolean equals(@Nullable Object o) { // Also compare timestamps according to millis-since-epoch because otherwise expensive // comparisons are made on their Chronology objects. return this.getTimestamp().isEqual(that.getTimestamp()) - && Objects.equals(that.getPane(), this.getPane()) + && Objects.equals(that.getPaneInfo(), this.getPaneInfo()) && Objects.equals(that.getValue(), this.getValue()); } else { return super.equals(o); @@ -367,7 +367,7 @@ public boolean equals(@Nullable Object o) { @Override public int hashCode() { // Hash only the millis of the timestamp to be consistent with equals - return Objects.hash(getValue(), getPane(), getTimestamp().getMillis()); + return Objects.hash(getValue(), getPaneInfo(), getTimestamp().getMillis()); } @Override @@ -375,7 +375,7 @@ public String toString() { return MoreObjects.toStringHelper(getClass()) .add("value", getValue()) .add("timestamp", getTimestamp()) - .add("pane", getPane()) + .add("pane", getPaneInfo()) .toString(); } } @@ -397,7 +397,7 @@ public TimestampedValueInSingleWindow( @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPane()); + return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPaneInfo()); } @Override @@ -424,7 +424,7 @@ public boolean equals(@Nullable Object o) { // comparisons are made on their Chronology objects. return this.getTimestamp().isEqual(that.getTimestamp()) && Objects.equals(that.getValue(), this.getValue()) - && Objects.equals(that.getPane(), this.getPane()) + && Objects.equals(that.getPaneInfo(), this.getPaneInfo()) && Objects.equals(that.window, this.window); } else { return super.equals(o); @@ -434,7 +434,7 @@ public boolean equals(@Nullable Object o) { @Override public int hashCode() { // Hash only the millis of the timestamp to be consistent with equals - return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), window); + return Objects.hash(getValue(), getTimestamp().getMillis(), getPaneInfo(), window); } @Override @@ -443,7 +443,7 @@ public String toString() { .add("value", getValue()) .add("timestamp", getTimestamp()) .add("window", window) - .add("pane", getPane()) + .add("pane", getPaneInfo()) .toString(); } } @@ -460,7 +460,8 @@ public TimestampedValueInMultipleWindows( @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInMultipleWindows<>(newValue, getTimestamp(), windows, getPane()); + return new TimestampedValueInMultipleWindows<>( + newValue, getTimestamp(), windows, getPaneInfo()); } @Override @@ -477,7 +478,7 @@ public boolean equals(@Nullable Object o) { // comparisons are made on their Chronology objects. if (this.getTimestamp().isEqual(that.getTimestamp()) && Objects.equals(that.getValue(), this.getValue()) - && Objects.equals(that.getPane(), this.getPane())) { + && Objects.equals(that.getPaneInfo(), this.getPaneInfo())) { ensureWindowsAreASet(); that.ensureWindowsAreASet(); return that.windows.equals(this.windows); @@ -493,7 +494,7 @@ public boolean equals(@Nullable Object o) { public int hashCode() { // Hash only the millis of the timestamp to be consistent with equals ensureWindowsAreASet(); - return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), windows); + return Objects.hash(getValue(), getTimestamp().getMillis(), getPaneInfo(), windows); } @Override @@ -502,7 +503,7 @@ public String toString() { .add("value", getValue()) .add("timestamp", getTimestamp()) .add("windows", windows) - .add("pane", getPane()) + .add("pane", getPaneInfo()) .toString(); } @@ -603,7 +604,7 @@ public void encode(WindowedValue windowedElem, OutputStream outStream, Contex throws CoderException, IOException { InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); windowsCoder.encode(windowedElem.getWindows(), outStream); - PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); + PaneInfoCoder.INSTANCE.encode(windowedElem.getPaneInfo(), outStream); valueCoder.encode(windowedElem.getValue(), outStream, context); } @@ -638,7 +639,7 @@ public void registerByteSizeObserver(WindowedValue value, ElementByteSizeObse throws Exception { InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer); windowsCoder.registerByteSizeObserver(value.getWindows(), observer); - PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer); + PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPaneInfo(), observer); valueCoder.registerByteSizeObserver(value.getValue(), observer); } @@ -840,7 +841,7 @@ public Collection getWindows() { } public PaneInfo getPane() { - return windowedValuePrototype.getPane(); + return windowedValuePrototype.getPaneInfo(); } /** Returns the serialized payload that will be provided when deserializing this coder. */ @@ -877,7 +878,7 @@ public static WindowedValue.ParamWindowedValueCoder fromComponents( windowCoder, windowedValue.getTimestamp(), windowedValue.getWindows(), - windowedValue.getPane()); + windowedValue.getPaneInfo()); } catch (IOException e) { throw new RuntimeException( "Unable to decode constant members from payload for ParamWindowedValueCoder: ", e); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 85c8d0d04ede..4a0388baf52e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -366,7 +366,7 @@ public void testCreateWindowedValues() { windowedValue.getValue(), windowedValue.getTimestamp(), w, - windowedValue.getPane()))) + windowedValue.getPaneInfo()))) .collect(Collectors.toList()); PCollection output = diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java index 8e4f6a2b2036..7ae7c55d1ca1 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/AssignWindowsRunner.java @@ -109,6 +109,6 @@ public BoundedWindow window() { } }; Collection windows = windowFn.assignWindows(ctxt); - return WindowedValue.of(input.getValue(), input.getTimestamp(), windows, input.getPane()); + return WindowedValue.of(input.getValue(), input.getTimestamp(), windows, input.getPaneInfo()); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index e264fa14788a..05f01e92c38e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1015,28 +1015,28 @@ public Object restriction() { KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize), splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(), splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(), - splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()), + splitResult.getPrimaryInFullyProcessedWindowsRoot().getPaneInfo()), splitResult.getPrimarySplitRoot() == null ? null : WindowedValue.of( KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize), splitResult.getPrimarySplitRoot().getTimestamp(), splitResult.getPrimarySplitRoot().getWindows(), - splitResult.getPrimarySplitRoot().getPane()), + splitResult.getPrimarySplitRoot().getPaneInfo()), splitResult.getResidualSplitRoot() == null ? null : WindowedValue.of( KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize), splitResult.getResidualSplitRoot().getTimestamp(), splitResult.getResidualSplitRoot().getWindows(), - splitResult.getResidualSplitRoot().getPane()), + splitResult.getResidualSplitRoot().getPaneInfo()), splitResult.getResidualInUnprocessedWindowsRoot() == null ? null : WindowedValue.of( KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize), splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(), splitResult.getResidualInUnprocessedWindowsRoot().getWindows(), - splitResult.getResidualInUnprocessedWindowsRoot().getPane())); + splitResult.getResidualInUnprocessedWindowsRoot().getPaneInfo())); } private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction( @@ -1119,7 +1119,7 @@ private static WindowedSplitResult computeWindowSplit KV.of(currentRestriction, currentWatermarkEstimatorState)), currentElement.getTimestamp(), primaryFullyProcessedWindows, - currentElement.getPane()), + currentElement.getPaneInfo()), splitResult == null ? null : WindowedValue.of( @@ -1128,7 +1128,7 @@ private static WindowedSplitResult computeWindowSplit KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), currentElement.getTimestamp(), currentWindow, - currentElement.getPane()), + currentElement.getPaneInfo()), splitResult == null ? null : WindowedValue.of( @@ -1137,7 +1137,7 @@ private static WindowedSplitResult computeWindowSplit KV.of(splitResult.getResidual(), watermarkAndState.getValue())), currentElement.getTimestamp(), currentWindow, - currentElement.getPane()), + currentElement.getPaneInfo()), residualUnprocessedWindows.isEmpty() ? null : WindowedValue.of( @@ -1146,7 +1146,7 @@ private static WindowedSplitResult computeWindowSplit KV.of(currentRestriction, currentWatermarkEstimatorState)), currentElement.getTimestamp(), residualUnprocessedWindows, - currentElement.getPane())); + currentElement.getPaneInfo())); return windowedSplitResult; } @@ -1979,7 +1979,7 @@ public void output(OutputT output) { outputTo( mainOutputConsumer, WindowedValue.of( - output, currentElement.getTimestamp(), currentWindow, currentElement.getPane())); + output, currentElement.getTimestamp(), currentWindow, currentElement.getPaneInfo())); } @Override @@ -1993,7 +1993,7 @@ public void output(TupleTag tag, T output) { outputTo( consumer, WindowedValue.of( - output, currentElement.getTimestamp(), currentWindow, currentElement.getPane())); + output, currentElement.getTimestamp(), currentWindow, currentElement.getPaneInfo())); } @Override @@ -2002,7 +2002,7 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { // runners can provide proper timestamps. outputTo( mainOutputConsumer, - WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); + WindowedValue.of(output, timestamp, currentWindow, currentElement.getPaneInfo())); } @Override @@ -2026,7 +2026,8 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } outputTo( - consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); + consumer, + WindowedValue.of(output, timestamp, currentWindow, currentElement.getPaneInfo())); } @Override @@ -2083,7 +2084,7 @@ public org.apache.beam.sdk.state.Timer timer(String timerId) { currentWindow, currentElement.getTimestamp(), currentElement.getTimestamp(), - currentElement.getPane(), + currentElement.getPaneInfo(), timeDomain); } @@ -2095,7 +2096,7 @@ public TimerMap timerFamily(String timerFamilyId) { currentWindow, currentElement.getTimestamp(), currentElement.getTimestamp(), - currentElement.getPane()); + currentElement.getPaneInfo()); } } @@ -2142,7 +2143,7 @@ public Instant timestamp(DoFn doFn) { size), currentElement.getTimestamp(), currentWindow, - currentElement.getPane())); + currentElement.getPaneInfo())); } @Override @@ -2187,7 +2188,7 @@ public Instant timestamp(DoFn doFn) { size), timestamp, currentWindow, - currentElement.getPane())); + currentElement.getPaneInfo())); } @Override @@ -2355,7 +2356,7 @@ public Instant timestamp(DoFn doFn) { size), timestamp, currentElement.getWindows(), - currentElement.getPane())); + currentElement.getPaneInfo())); } @Override @@ -2451,7 +2452,7 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputTo( mainOutputConsumer, WindowedValue.of( - output, timestamp, currentElement.getWindows(), currentElement.getPane())); + output, timestamp, currentElement.getWindows(), currentElement.getPaneInfo())); } @Override @@ -2475,7 +2476,7 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp outputTo( consumer, WindowedValue.of( - output, timestamp, currentElement.getWindows(), currentElement.getPane())); + output, timestamp, currentElement.getWindows(), currentElement.getPaneInfo())); } @Override @@ -2785,7 +2786,7 @@ public Instant timestamp() { @Override public PaneInfo pane() { - return currentElement.getPane(); + return currentElement.getPaneInfo(); } @Override diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java index f19e98ee9e73..1214a754891c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java @@ -207,7 +207,7 @@ private void processElementForWindowObservingPairWithRestriction(WindowedValue doFn) { @Override public PaneInfo paneInfo(DoFn doFn) { - return getCurrentElementOrFail().getPane(); + return getCurrentElementOrFail().getPaneInfo(); } @Override diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java index 9c13e2ae495d..1a18647d16fe 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java @@ -352,7 +352,7 @@ public void output(RestrictionT subrestriction) { size), getCurrentElement().getTimestamp(), getCurrentWindow(), - getCurrentElement().getPane())); + getCurrentElement().getPaneInfo())); } } @@ -411,7 +411,7 @@ public Instant timestamp(DoFn doFn) { @Override public PaneInfo paneInfo(DoFn doFn) { - return getCurrentElement().getPane(); + return getCurrentElement().getPaneInfo(); } @Override diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java index 838715d1beda..82d7dab7d439 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java @@ -120,7 +120,7 @@ public void singleInputMultipleOutputSucceeds() throws Exception { -3, new Instant(-12), ImmutableSet.of(firstWindow, secondWindow), - firstValue.getPane()))); + firstValue.getPaneInfo()))); WindowedValue secondValue = WindowedValue.of( 3, @@ -135,7 +135,7 @@ public void singleInputMultipleOutputSucceeds() throws Exception { 3, new Instant(12), ImmutableSet.of(secondWindow, thirdWindow), - secondValue.getPane()))); + secondValue.getPaneInfo()))); } @Test diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 6ca085495a3d..4fea6246bdb3 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -2266,7 +2266,7 @@ public void testProcessElementForWindowedSizedElementAndRestriction() throws Exc 3.0), firstValue.getTimestamp(), window1, - firstValue.getPane())); + firstValue.getPaneInfo())); assertEquals( decode(inputCoder, residualRoot.getApplication().getElement()), WindowedValue.of( @@ -2279,7 +2279,7 @@ public void testProcessElementForWindowedSizedElementAndRestriction() throws Exc 2.0), firstValue.getTimestamp(), window1, - firstValue.getPane())); + firstValue.getPaneInfo())); assertEquals( decode(inputCoder, residualRootForUnprocessedWindows.getApplication().getElement()), WindowedValue.of( @@ -2292,7 +2292,7 @@ public void testProcessElementForWindowedSizedElementAndRestriction() throws Exc 5.0), firstValue.getTimestamp(), window2, - firstValue.getPane())); + firstValue.getPaneInfo())); splitListener.clear(); // Check that before processing an element we don't report progress @@ -2319,37 +2319,37 @@ public void testProcessElementForWindowedSizedElementAndRestriction() throws Exc "5:5", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(5)), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( "5:6", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(6)), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( "5:7", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(7)), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( "2:0", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(0)), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( "2:1", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( "2:0", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(0)), window2, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( "2:1", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)), window2, - firstValue.getPane()))); + firstValue.getPaneInfo()))); assertTrue(splitListener.getPrimaryRoots().isEmpty()); assertTrue(splitListener.getResidualRoots().isEmpty()); mainOutputValues.clear(); @@ -2411,22 +2411,22 @@ public void testProcessElementForWindowedSizedElementAndRestriction() throws Exc "7:0", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(0)), window1, - splitValue.getPane()), + splitValue.getPaneInfo()), WindowedValue.of( "7:1", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)), window1, - splitValue.getPane()), + splitValue.getPaneInfo()), WindowedValue.of( "7:2", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)), window1, - splitValue.getPane()), + splitValue.getPaneInfo()), WindowedValue.of( "7:3", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(3)), window1, - splitValue.getPane()))); + splitValue.getPaneInfo()))); BundleApplication primaryRoot = Iterables.getOnlyElement(trySplitResult.getPrimaryRoots()); assertEquals(2, trySplitResult.getResidualRoots().size()); @@ -2500,7 +2500,7 @@ public void testProcessElementForWindowedSizedElementAndRestriction() throws Exc 5.0), splitValue.getTimestamp(), window2, - splitValue.getPane()), + splitValue.getPaneInfo()), inputCoder.decode( residualRootInUnprocessedWindows.getApplication().getElement().newInput())); @@ -2716,14 +2716,14 @@ public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObser 3.0), splitValue.getTimestamp(), window1, - splitValue.getPane()), + splitValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("7", KV.of(new OffsetRange(0, 3), GlobalWindow.TIMESTAMP_MIN_VALUE)), 3.0), splitValue.getTimestamp(), window2, - splitValue.getPane()))); + splitValue.getPaneInfo()))); SplitResult expectedElementSplit = createSplitResult(0); BundleApplication expectedElementSplitPrimary = @@ -2735,7 +2735,7 @@ public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObser KV.of("7", KV.of(new OffsetRange(0, 6), GlobalWindow.TIMESTAMP_MIN_VALUE)), 6.0), splitValue.getTimestamp(), window1, - splitValue.getPane()), + splitValue.getPaneInfo()), primaryBytes); BundleApplication expectedWindowedPrimary = BundleApplication.newBuilder() @@ -2752,7 +2752,7 @@ public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObser KV.of("7", KV.of(new OffsetRange(0, 6), GlobalWindow.TIMESTAMP_MIN_VALUE)), 6.0), splitValue.getTimestamp(), window3, - splitValue.getPane()), + splitValue.getPaneInfo()), residualBytes); DelayedBundleApplication expectedWindowedResidual = DelayedBundleApplication.newBuilder() @@ -3026,28 +3026,28 @@ public void testProcessElementForWindowedTruncateAndSizeRestriction() throws Exc 2.0), firstValue.getTimestamp(), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("5", KV.of(new OffsetRange(0, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 2.0), firstValue.getTimestamp(), window2, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), firstValue.getTimestamp(), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), firstValue.getTimestamp(), window2, - firstValue.getPane()))); + firstValue.getPaneInfo()))); mainOutputValues.clear(); assertTrue(context.getFinishBundleFunctions().isEmpty()); @@ -3149,14 +3149,14 @@ public void testProcessElementForWindowedTruncateAndSizeRestriction() throws Exc 2.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), - firstValue.getPane()))); + firstValue.getPaneInfo()))); mainOutputValues.clear(); assertTrue(context.getFinishBundleFunctions().isEmpty()); @@ -3339,14 +3339,14 @@ private KV createSplitInWindow( KV.of(primaryRestriction, currentWatermarkEstimatorState)), currentElement.getTimestamp(), window, - currentElement.getPane()), + currentElement.getPaneInfo()), WindowedValue.of( KV.of( currentElement.getValue(), KV.of(residualRestriction, watermarkAndState.getValue())), currentElement.getTimestamp(), window, - currentElement.getPane())); + currentElement.getPaneInfo())); } private KV createSplitAcrossWindows( @@ -3360,7 +3360,7 @@ private KV createSplitAcrossWindows( KV.of(currentRestriction, currentWatermarkEstimatorState)), currentElement.getTimestamp(), primaryWindows, - currentElement.getPane()), + currentElement.getPaneInfo()), residualWindows.isEmpty() ? null : WindowedValue.of( @@ -3369,7 +3369,7 @@ private KV createSplitAcrossWindows( KV.of(currentRestriction, currentWatermarkEstimatorState)), currentElement.getTimestamp(), residualWindows, - currentElement.getPane())); + currentElement.getPaneInfo())); } private KV createSplitWithSizeInWindow( @@ -3383,7 +3383,7 @@ private KV createSplitWithSizeInWindow( (double) (primaryRestriction.getTo() - primaryRestriction.getFrom())), currentElement.getTimestamp(), window, - currentElement.getPane()), + currentElement.getPaneInfo()), WindowedValue.of( KV.of( KV.of( @@ -3392,7 +3392,7 @@ private KV createSplitWithSizeInWindow( (double) (residualRestriction.getTo() - residualRestriction.getFrom())), currentElement.getTimestamp(), window, - currentElement.getPane())); + currentElement.getPaneInfo())); } private KV createSplitWithSizeAcrossWindows( @@ -3408,7 +3408,7 @@ private KV createSplitWithSizeAcrossWindows( (double) (currentRestriction.getTo() - currentRestriction.getFrom())), currentElement.getTimestamp(), primaryWindows, - currentElement.getPane()), + currentElement.getPaneInfo()), residualWindows.isEmpty() ? null : WindowedValue.of( @@ -3419,7 +3419,7 @@ private KV createSplitWithSizeAcrossWindows( (double) (currentRestriction.getTo() - currentRestriction.getFrom())), currentElement.getTimestamp(), residualWindows, - currentElement.getPane())); + currentElement.getPaneInfo())); } @Before diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java index 7c52da950882..7fc7cb1cb5a2 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java @@ -361,7 +361,7 @@ public void testProcessElementForWindowedPairWithRestriction() throws Exception GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)))), firstValue.getTimestamp(), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( "5", @@ -370,7 +370,7 @@ public void testProcessElementForWindowedPairWithRestriction() throws Exception GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)))), firstValue.getTimestamp(), window2, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( "2", @@ -379,7 +379,7 @@ public void testProcessElementForWindowedPairWithRestriction() throws Exception GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)))), secondValue.getTimestamp(), window1, - secondValue.getPane()), + secondValue.getPaneInfo()), WindowedValue.of( KV.of( "2", @@ -388,7 +388,7 @@ public void testProcessElementForWindowedPairWithRestriction() throws Exception GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)))), secondValue.getTimestamp(), window2, - secondValue.getPane()))); + secondValue.getPaneInfo()))); mainOutputValues.clear(); assertTrue(context.getFinishBundleFunctions().isEmpty()); @@ -476,7 +476,7 @@ public void testProcessElementForWindowedPairWithRestrictionWithNonWindowObservi GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)))), firstValue.getTimestamp(), ImmutableList.of(window1, window2), - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( "2", @@ -485,7 +485,7 @@ public void testProcessElementForWindowedPairWithRestrictionWithNonWindowObservi GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)))), secondValue.getTimestamp(), ImmutableList.of(window1, window2), - secondValue.getPane()))); + secondValue.getPaneInfo()))); mainOutputValues.clear(); assertTrue(context.getFinishBundleFunctions().isEmpty()); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunnerTest.java index 95b9129eb763..bbfdc13a0ce0 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunnerTest.java @@ -333,56 +333,56 @@ public void testProcessElementForWindowedSplitAndSizeRestriction() throws Except 2.0), firstValue.getTimestamp(), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("5", KV.of(new OffsetRange(2, 5), GlobalWindow.TIMESTAMP_MIN_VALUE)), 3.0), firstValue.getTimestamp(), window1, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("5", KV.of(new OffsetRange(0, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 2.0), firstValue.getTimestamp(), window2, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("5", KV.of(new OffsetRange(2, 5), GlobalWindow.TIMESTAMP_MIN_VALUE)), 3.0), firstValue.getTimestamp(), window2, - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), secondValue.getTimestamp(), window1, - secondValue.getPane()), + secondValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(1, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), secondValue.getTimestamp(), window1, - secondValue.getPane()), + secondValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), secondValue.getTimestamp(), window2, - secondValue.getPane()), + secondValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(1, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), secondValue.getTimestamp(), window2, - secondValue.getPane()))); + secondValue.getPaneInfo()))); mainOutputValues.clear(); assertTrue(context.getFinishBundleFunctions().isEmpty()); @@ -476,28 +476,28 @@ public void testProcessElementForWindowedSplitAndSizeRestriction() throws Except 2.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("5", KV.of(new OffsetRange(2, 5), GlobalWindow.TIMESTAMP_MIN_VALUE)), 3.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(0, 1), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), - firstValue.getPane()), + firstValue.getPaneInfo()), WindowedValue.of( KV.of( KV.of("2", KV.of(new OffsetRange(1, 2), GlobalWindow.TIMESTAMP_MIN_VALUE)), 1.0), firstValue.getTimestamp(), ImmutableList.of(window1, window2), - firstValue.getPane()))); + firstValue.getPaneInfo()))); mainOutputValues.clear(); assertTrue(context.getFinishBundleFunctions().isEmpty());