Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 2 additions & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3
"modification": 3,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 2,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
3 changes: 2 additions & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV1.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 1,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
3 changes: 2 additions & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 3,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2,
"https://github.com/apache/beam/pull/34294": "noting that PR #34294 should run this test"
"https://github.com/apache/beam/pull/34294": "noting that PR #34294 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test"
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test"
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31761": "noting that PR #31761 should run this test"
"https://github.com/apache/beam/pull/31761": "noting that PR #31761 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/32440": "testing datastream optimizations",
"runFor": "#33606"
"runFor": "#33606",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute"
"https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
"https://github.com/apache/beam/pull/34123": "noting that PR #34123 should run this test",
"https://github.com/apache/beam/pull/34080": "noting that PR #34080 should run this test",
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test",
"https://github.com/apache/beam/pull/34560": "noting that PR #34560 should run this test"
"https://github.com/apache/beam/pull/34560": "noting that PR #34560 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test",
"https://github.com/apache/beam/pull/34123": "noting that PR #34123 should run this test",
"https://github.com/apache/beam/pull/34080": "noting that PR #34080 should run this test",
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test"
"https://github.com/apache/beam/pull/34155": "noting that PR #34155 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test"
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.TriggerTranslation;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

Expand Down Expand Up @@ -99,7 +99,7 @@ public void outputWindowedValue(
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
outputManager.output(mainTag, WindowedValue.of(output, timestamp, windows, pane));
outputManager.output(mainTag, WindowedValues.of(output, timestamp, windows, pane));
}

@Override
Expand All @@ -109,7 +109,7 @@ public <AdditionalOutputT> void outputWindowedValue(
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
outputManager.output(tag, WindowedValues.of(output, timestamp, windows, pane));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.beam.runners.core;

import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;

/**
* Interface that contains all the timers and elements associated with a specific work item.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/** A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Collections;
import java.util.Objects;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
Expand Down Expand Up @@ -140,7 +141,7 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
timerInternals.currentOutputWatermarkTime());
} else {
nonLateElements.add(
WindowedValue.of(
WindowedValues.of(
element.getValue(), element.getTimestamp(), window, element.getPane()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
Expand Down Expand Up @@ -331,7 +332,7 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) {

@Override
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
outputWindowedValue(tag, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING));
}
}

Expand Down Expand Up @@ -447,7 +448,7 @@ public <T> void outputWindowedValue(
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, windows, paneInfo));
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
}

@Override
Expand Down Expand Up @@ -888,7 +889,7 @@ public <T> void outputWindowedValue(
PaneInfo paneInfo) {
checkTimestamp(timestamp(), timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, windows, paneInfo));
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
}

@Override
Expand Down Expand Up @@ -1117,7 +1118,7 @@ public <T> void outputWindowedValue(
PaneInfo paneInfo) {
checkTimestamp(this.timestamp, timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, windows, paneInfo));
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.PTransformReplacements;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.PTransformTranslation.RawPTransform;
Expand All @@ -59,6 +58,8 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -295,7 +296,7 @@ public ProcessFn(
this.elementTag =
StateTags.value(
"element",
WindowedValue.getFullCoder(
WindowedValues.getFullCoder(
elementCoder, inputWindowingStrategy.getWindowFn().windowCoder()));
this.restrictionTag = StateTags.value("restriction", restrictionCoder);
this.watermarkEstimatorStateTag =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowedValue;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.joda.time.Duration;
Expand Down Expand Up @@ -98,7 +99,7 @@ public StatefulDoFnRunner(

this.sortBufferTag =
StateTags.makeSystemTagInternal(
StateTags.bag(SORT_BUFFER_STATE, WindowedValue.getFullCoder(inputCoder, windowCoder)));
StateTags.bag(SORT_BUFFER_STATE, WindowedValues.getFullCoder(inputCoder, windowCoder)));

rejectMergingWindowFn(windowFn);
}
Expand Down
Loading
Loading