diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index f8dbfd61e836..b16dad86df18 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -383,6 +383,16 @@ public PaneInfo pane() { return element.getPaneInfo(); } + @Override + public String currentRecordId() { + return element.getCurrentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return element.getCurrentRecordOffset(); + } + @Override public PipelineOptions getPipelineOptions() { return pipelineOptions; @@ -411,6 +421,24 @@ public void outputWindowedValue( outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); + } + outputReceiver.output( + mainOutputTag, + WindowedValues.of( + value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public void output(TupleTag tag, T value) { outputWithTimestamp(tag, value, element.getTimestamp()); @@ -429,11 +457,26 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { + outputWindowedValue(tag, value, timestamp, windows, paneInfo, null, null); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { noteOutput(); if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } - outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo)); + outputReceiver.output( + tag, + WindowedValues.of( + value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } private void noteOutput() { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 840245edf7ad..217c06c56fe5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -334,6 +334,35 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) { public void output(TupleTag tag, T output, Instant timestamp, BoundedWindow window) { outputWindowedValue(tag, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING)); } + + @Override + public void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + output(mainOutputTag, output, timestamp, window, currentRecordId, currentRecordOffset); + } + + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + tag, + WindowedValues.of( + output, + timestamp, + Collections.singletonList(window), + PaneInfo.NO_FIRING, + currentRecordId, + currentRecordOffset)); + } } private final DoFnFinishBundleArgumentProvider.Context context = @@ -427,6 +456,24 @@ public void outputWindowedValue( outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + mainOutputTag, + output, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset); + } + @Override public void output(TupleTag tag, T output) { checkNotNull(tag, "Tag passed to output cannot be null"); @@ -451,11 +498,36 @@ public void outputWindowedValue( tag, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + SimpleDoFnRunner.this.outputWindowedValue( + tag, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public Instant timestamp() { return elem.getTimestamp(); } + @Override + public String currentRecordId() { + return elem.getCurrentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return elem.getCurrentRecordOffset(); + } + public Collection windows() { return elem.getWindows(); } @@ -867,6 +939,24 @@ public void outputWindowedValue( outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + mainOutputTag, + output, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset); + } + @Override public void output(TupleTag tag, T output) { checkTimestamp(timestamp(), timestamp); @@ -892,6 +982,22 @@ public void outputWindowedValue( tag, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + checkTimestamp(timestamp(), timestamp); + SimpleDoFnRunner.this.outputWindowedValue( + tag, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( @@ -1096,6 +1202,24 @@ public void outputWindowedValue( outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + mainOutputTag, + output, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset); + } + @Override public void output(TupleTag tag, T output) { checkTimestamp(this.timestamp, timestamp); @@ -1121,6 +1245,22 @@ public void outputWindowedValue( tag, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + checkTimestamp(this.timestamp, timestamp); + SimpleDoFnRunner.this.outputWindowedValue( + tag, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index fafb02f9dd0b..dd4f3d54cbaf 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -663,6 +663,27 @@ public void output( throwUnsupportedOutput(); } + @Override + public void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + throwUnsupportedOutput(); + } + + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + throwUnsupportedOutput(); + } + @Override public PipelineOptions getPipelineOptions() { return baseContext.getPipelineOptions(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 537a2d855921..51c1642f848c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -1395,6 +1395,16 @@ public PaneInfo getPaneInfo() { return PaneInfo.NO_FIRING; } + @Override + public @Nullable String getCurrentRecordId() { + return null; + } + + @Override + public @Nullable Long getCurrentRecordOffset() { + return null; + } + @Override public Iterable> explodeWindows() { return Collections.emptyList(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index b156ff45caf6..31782634ce4f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -23,6 +23,7 @@ import com.google.auto.service.AutoService; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -43,6 +44,7 @@ import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -221,15 +223,26 @@ public long add(WindowedValue data) throws IOException { throw new RuntimeException( "Unexpected record ID via ValueWithRecordIdCoder while offset-based deduplication enabled."); } - byte[] rawId = context.getCurrentRecordId(); - if (rawId.length == 0) { + byte[] rawId = null; + + if (data.getCurrentRecordId() != null) { + rawId = data.getCurrentRecordId().getBytes(StandardCharsets.UTF_8); + } else { + rawId = context.getCurrentRecordId(); + } + if (rawId == null || rawId.length == 0) { throw new RuntimeException( "Unexpected empty record ID while offset-based deduplication enabled."); } id = ByteString.copyFrom(rawId); - byte[] rawOffset = context.getCurrentRecordOffset(); - if (rawOffset.length == 0) { + byte[] rawOffset = null; + if (data.getCurrentRecordOffset() != null) { + rawOffset = Longs.toByteArray(data.getCurrentRecordOffset()); + } else { + rawOffset = context.getCurrentRecordOffset(); + } + if (rawOffset == null || rawOffset.length == 0) { throw new RuntimeException( "Unexpected empty record offset while offset-based deduplication enabled."); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 42174629b3b8..1119617a068e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -49,6 +49,16 @@ public PaneInfo getPaneInfo() { return PaneInfo.NO_FIRING; } + @Override + public @Nullable String getCurrentRecordId() { + return null; + } + + @Override + public @Nullable Long getCurrentRecordOffset() { + return null; + } + @Override public Iterable> explodeWindows() { return Collections.emptyList(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java index 30a6577d3e97..d03914a256ca 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java @@ -110,6 +110,16 @@ public PaneInfo getPaneInfo() { return PaneInfo.NO_FIRING; } + @Override + public @Nullable String getCurrentRecordId() { + return null; + } + + @Override + public @Nullable Long getCurrentRecordOffset() { + return null; + } + @Override public Iterable> explodeWindows() { return Collections.emptyList(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 0961c8512523..10904b2aa393 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -122,6 +122,12 @@ public abstract class FinishBundleContext { */ public abstract void output(OutputT output, Instant timestamp, BoundedWindow window); + public abstract void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset); /** * Adds the given element to the output {@code PCollection} with the given tag at the given * timestamp in the given window. @@ -133,6 +139,14 @@ public abstract class FinishBundleContext { */ public abstract void output( TupleTag tag, T output, Instant timestamp, BoundedWindow window); + + public abstract void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset); } /** @@ -211,6 +225,14 @@ public abstract void outputWindowedValue( Collection windows, PaneInfo paneInfo); + public abstract void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset); + /** * Adds the given element to the output {@code PCollection} with the given tag. * @@ -283,6 +305,15 @@ public abstract void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo); + + public abstract void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ @@ -323,6 +354,12 @@ public abstract class ProcessContext extends WindowedContext { */ @Pure public abstract PaneInfo pane(); + + @Pure + public abstract String currentRecordId(); + + @Pure + public abstract Long currentRecordOffset(); } /** Information accessible when running a {@link DoFn.OnTimer} method. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index fb1947ad5ba3..f4670a4d0e94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -213,7 +213,7 @@ public void processWindowedElement(InputT element, Instant timestamp, final Boun try { final DoFn.ProcessContext processContext = createProcessContext( - ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING)); + ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING, null, null)); fnInvoker.invokeProcessElement( new DoFnInvoker.BaseArgumentProvider() { @@ -478,7 +478,38 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) { @Override public void output(TupleTag tag, T output, Instant timestamp, BoundedWindow window) { getMutableOutput(tag) - .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING)); + .add( + ValueInSingleWindow.of( + output, timestamp, window, PaneInfo.NO_FIRING, null, null)); + } + + @Override + public void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + output(mainOutputTag, output, timestamp, window, currentRecordId, currentRecordOffset); + } + + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + output, + timestamp, + window, + PaneInfo.NO_FIRING, + currentRecordId, + currentRecordOffset)); } }; } @@ -567,6 +598,16 @@ public PaneInfo pane() { return element.getPaneInfo(); } + @Override + public String currentRecordId() { + return element.getCurrentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return element.getCurrentRecordOffset(); + } + @Override public PipelineOptions getPipelineOptions() { return options; @@ -591,6 +632,24 @@ public void outputWindowedValue( outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + mainOutputTag, + output, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset); + } + @Override public void output(TupleTag tag, T output) { outputWithTimestamp(tag, output, element.getTimestamp()); @@ -601,7 +660,7 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp getMutableOutput(tag) .add( ValueInSingleWindow.of( - output, timestamp, element.getWindow(), element.getPaneInfo())); + output, timestamp, element.getWindow(), element.getPaneInfo(), null, null)); } @Override @@ -612,7 +671,25 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo) { for (BoundedWindow w : windows) { - getMutableOutput(tag).add(ValueInSingleWindow.of(output, timestamp, w, paneInfo)); + getMutableOutput(tag) + .add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null)); + } + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + for (BoundedWindow w : windows) { + getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + output, timestamp, w, paneInfo, currentRecordId, currentRecordOffset)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index edae34fbecf9..d462d422446c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -397,6 +397,29 @@ public void output( "Output from FinishBundle for SDF is not supported in naive implementation"); } + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + throw new UnsupportedOperationException( + "Output from FinishBundle for SDF is not supported in naive implementation"); + } + + @Override + public void output( + @Nullable OutputT output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + throw new UnsupportedOperationException( + "Output from FinishBundle for SDF is not supported in naive implementation"); + } + @Override public void output( TupleTag tag, T output, Instant timestamp, BoundedWindow window) { @@ -617,6 +640,18 @@ public void outputWindowedValue( outerContext.outputWindowedValue(output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outerContext.outputWindowedValue( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset); + } + @Override public void output(TupleTag tag, T output) { outerContext.output(tag, output); @@ -637,6 +672,19 @@ public void outputWindowedValue( outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outerContext.outputWindowedValue( + tag, output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset); + } + @Override public InputT element() { return element; @@ -657,6 +705,16 @@ public PaneInfo pane() { return outerContext.pane(); } + @Override + public String currentRecordId() { + return outerContext.currentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return outerContext.currentRecordOffset(); + } + @Override public Object watermarkEstimatorState() { throw new UnsupportedOperationException( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 74717fc606b2..7dc5fef52ecb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -60,9 +60,24 @@ public T getValue() { /** Returns the pane of this {@code ValueInSingleWindow} in its window. */ public abstract PaneInfo getPaneInfo(); + public abstract @Nullable String getCurrentRecordId(); + + public abstract @Nullable Long getCurrentRecordOffset(); + + public static ValueInSingleWindow of( + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + return new AutoValue_ValueInSingleWindow<>( + value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset); + } + public static ValueInSingleWindow of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo); + return of(value, timestamp, window, paneInfo, null, null); } /** A coder for {@link ValueInSingleWindow}. */ @@ -110,7 +125,7 @@ public ValueInSingleWindow decode(InputStream inStream, Context context) thro BoundedWindow window = windowCoder.decode(inStream); PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream); T value = valueCoder.decode(inStream, context); - return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo); + return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo, null, null); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java index d2c4d1f07da7..0512be524b91 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java @@ -20,6 +20,7 @@ import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; import org.joda.time.Instant; @@ -45,6 +46,12 @@ public interface WindowedValue { @Pure PaneInfo getPaneInfo(); + @Nullable + String getCurrentRecordId(); + + @Nullable + Long getCurrentRecordOffset(); + /** * A representation of each of the actual values represented by this compressed {@link * WindowedValue}, one per window. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index 9616fd845fa7..4bbab33a8936 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -61,16 +61,27 @@ public class WindowedValues { private WindowedValues() {} // non-instantiable utility class - /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { + return of(value, timestamp, windows, paneInfo, null, null); + } + + /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ + public static WindowedValue of( + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); if (windows.size() == 1) { return of(value, timestamp, windows.iterator().next(), paneInfo); } else { - return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, paneInfo); + return new TimestampedValueInMultipleWindows<>( + value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset); } } @@ -81,7 +92,8 @@ static WindowedValue createWithoutValidation( if (windows.size() == 1) { return of(value, timestamp, windows.iterator().next(), paneInfo); } else { - return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, paneInfo); + return new TimestampedValueInMultipleWindows<>( + value, timestamp, windows, paneInfo, null, null); } } @@ -94,9 +106,9 @@ public static WindowedValue of( if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { return valueInGlobalWindow(value, paneInfo); } else if (isGlobal) { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo); + return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null, null); } else { - return new TimestampedValueInSingleWindow<>(value, timestamp, window, paneInfo); + return new TimestampedValueInSingleWindow<>(value, timestamp, window, paneInfo, null, null); } } @@ -105,7 +117,7 @@ public static WindowedValue of( * default timestamp and pane. */ public static WindowedValue valueInGlobalWindow(T value) { - return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING); + return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null); } /** @@ -113,7 +125,7 @@ public static WindowedValue valueInGlobalWindow(T value) { * default timestamp and the specified pane. */ public static WindowedValue valueInGlobalWindow(T value, PaneInfo paneInfo) { - return new ValueInGlobalWindow<>(value, paneInfo); + return new ValueInGlobalWindow<>(value, paneInfo, null, null); } /** @@ -124,7 +136,7 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta if (BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { return valueInGlobalWindow(value); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, PaneInfo.NO_FIRING); + return new TimestampedValueInGlobalWindow<>(value, timestamp, PaneInfo.NO_FIRING, null, null); } } @@ -137,7 +149,7 @@ public static WindowedValue timestampedValueInGlobalWindow( if (paneInfo.equals(PaneInfo.NO_FIRING)) { return timestampedValueInGlobalWindow(value, timestamp); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo); + return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null, null); } } @@ -151,7 +163,9 @@ public static WindowedValue withValue( newValue, windowedValue.getTimestamp(), windowedValue.getWindows(), - windowedValue.getPaneInfo()); + windowedValue.getPaneInfo(), + windowedValue.getCurrentRecordId(), + windowedValue.getCurrentRecordOffset()); } public static boolean equals( @@ -200,10 +214,28 @@ private abstract static class SimpleWindowedValue implements WindowedValue private final T value; private final PaneInfo paneInfo; + private final @Nullable String currentRecordId; + private final @Nullable Long currentRecordOffset; + + @Override + public @Nullable String getCurrentRecordId() { + return currentRecordId; + } - protected SimpleWindowedValue(T value, PaneInfo paneInfo) { + @Override + public @Nullable Long getCurrentRecordOffset() { + return currentRecordOffset; + } + + protected SimpleWindowedValue( + T value, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { this.value = value; this.paneInfo = checkNotNull(paneInfo); + this.currentRecordId = currentRecordId; + this.currentRecordOffset = currentRecordOffset; } @Override @@ -232,8 +264,13 @@ public Iterable> explodeWindows() { /** The abstract superclass of WindowedValue representations where timestamp == MIN. */ private abstract static class MinTimestampWindowedValue extends SimpleWindowedValue { - public MinTimestampWindowedValue(T value, PaneInfo paneInfo) { - super(value, paneInfo); + + public MinTimestampWindowedValue( + T value, + PaneInfo pane, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, pane, currentRecordId, currentRecordOffset); } @Override @@ -246,8 +283,12 @@ public Instant getTimestamp() { private static class ValueInGlobalWindow extends MinTimestampWindowedValue implements SingleWindowedValue { - public ValueInGlobalWindow(T value, PaneInfo paneInfo) { - super(value, paneInfo); + public ValueInGlobalWindow( + T value, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, paneInfo, currentRecordId, currentRecordOffset); } @Override @@ -262,7 +303,8 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { - return new ValueInGlobalWindow<>(newValue, getPaneInfo()); + return new ValueInGlobalWindow<>( + newValue, getPaneInfo(), getCurrentRecordId(), getCurrentRecordOffset()); } @Override @@ -294,8 +336,13 @@ public String toString() { private abstract static class TimestampedWindowedValue extends SimpleWindowedValue { private final Instant timestamp; - public TimestampedWindowedValue(T value, Instant timestamp, PaneInfo paneInfo) { - super(value, paneInfo); + public TimestampedWindowedValue( + T value, + Instant timestamp, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, paneInfo, currentRecordId, currentRecordOffset); this.timestamp = checkNotNull(timestamp); } @@ -312,8 +359,13 @@ public Instant getTimestamp() { private static class TimestampedValueInGlobalWindow extends TimestampedWindowedValue implements SingleWindowedValue { - public TimestampedValueInGlobalWindow(T value, Instant timestamp, PaneInfo paneInfo) { - super(value, timestamp, paneInfo); + public TimestampedValueInGlobalWindow( + T value, + Instant timestamp, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset); } @Override @@ -328,7 +380,8 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPaneInfo()); + return new TimestampedValueInGlobalWindow<>( + newValue, getTimestamp(), getPaneInfo(), getCurrentRecordId(), getCurrentRecordOffset()); } @Override @@ -372,14 +425,25 @@ private static class TimestampedValueInSingleWindow extends TimestampedWindow private final BoundedWindow window; public TimestampedValueInSingleWindow( - T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - super(value, timestamp, paneInfo); + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset); this.window = checkNotNull(window); } @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPaneInfo()); + return new TimestampedValueInSingleWindow<>( + newValue, + getTimestamp(), + window, + getPaneInfo(), + getCurrentRecordId(), + getCurrentRecordOffset()); } @Override @@ -433,8 +497,10 @@ public TimestampedValueInMultipleWindows( T value, Instant timestamp, Collection windows, - PaneInfo paneInfo) { - super(value, timestamp, paneInfo); + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset); this.windows = checkNotNull(windows); } @@ -446,7 +512,12 @@ public Collection getWindows() { @Override public WindowedValue withValue(NewT newValue) { return new TimestampedValueInMultipleWindows<>( - newValue, getTimestamp(), getWindows(), getPaneInfo()); + newValue, + getTimestamp(), + getWindows(), + getPaneInfo(), + getCurrentRecordId(), + getCurrentRecordOffset()); } @Override diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index cc7c971e10bc..fbadc984c3d6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1667,6 +1667,48 @@ public void output(TupleTag tag, T output, Instant timestamp, BoundedWind } outputTo(consumer, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING)); } + + @Override + public void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputTo( + mainOutputConsumer, + WindowedValues.of( + output, + timestamp, + Collections.singletonList(window), + PaneInfo.NO_FIRING, + currentRecordId, + currentRecordOffset)); + } + + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, + timestamp, + Collections.singletonList(window), + PaneInfo.NO_FIRING, + currentRecordId, + currentRecordOffset)); + } } private final FinishBundleArgumentProvider.Context context = @@ -1758,6 +1800,22 @@ public void outputWindowedValue( outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all + // runners can provide proper timestamps. + outputTo( + mainOutputConsumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all @@ -1789,6 +1847,26 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public State state(String stateId, boolean alwaysFetched) { StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId); @@ -1886,6 +1964,21 @@ public void outputWindowedValue( outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + checkTimestamp(timestamp); + outputTo( + mainOutputConsumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkTimestamp(timestamp); @@ -1915,6 +2008,27 @@ public void outputWindowedValue( } outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + checkTimestamp(timestamp); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } } /** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -2205,6 +2319,16 @@ public Instant timestamp() { return currentElement.getTimestamp(); } + @Override + public String currentRecordId() { + return currentElement.getCurrentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return currentElement.getCurrentRecordOffset(); + } + @Override public PaneInfo pane() { return currentElement.getPaneInfo(); @@ -2271,6 +2395,21 @@ public void outputWindowedValue( outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + checkOnWindowExpirationTimestamp(timestamp); + outputTo( + mainOutputConsumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public void output(TupleTag tag, T output) { FnDataReceiver> consumer = @@ -2307,10 +2446,25 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { + outputWindowedValue(tag, output, timestamp, windows, paneInfo, null, null); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkOnWindowExpirationTimestamp(timestamp); FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); + outputTo( + consumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @SuppressWarnings( @@ -2574,6 +2728,21 @@ public void outputWindowedValue( outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + checkTimerTimestamp(timestamp); + outputTo( + mainOutputConsumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + } + @Override public void output(TupleTag tag, T output) { checkTimerTimestamp(currentTimer.getHoldTimestamp()); @@ -2612,6 +2781,16 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo) {} + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) {} + @Override public TimeDomain timeDomain() { return currentTimeDomain; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 338ee59c3800..92860b4ecf9e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -92,6 +92,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.construction.PTransformMatchers; import org.apache.beam.sdk.util.construction.ReplacementOutputs; @@ -109,6 +110,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -1617,10 +1619,14 @@ private void checkRedistributeConfiguration() { isRedistributed(), "withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform."); } - if (getOffsetDeduplication() != null && getOffsetDeduplication()) { + if (getOffsetDeduplication() != null && getOffsetDeduplication() && isRedistributed()) { checkState( - isRedistributed() && !isAllowDuplicates(), - "withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false)."); + !isAllowDuplicates(), + "withOffsetDeduplication and withRedistribute can only be used when withAllowDuplicates is set to false."); + } + if (getOffsetDeduplication() != null && getOffsetDeduplication() && !isRedistributed()) { + LOG.warn( + "Offsets used for deduplication are available in WindowedValue's metadata. Combining, aggregating, mutating them may risk with data loss."); } } @@ -1765,13 +1771,18 @@ public PCollection> expand(PBegin input) { .withMaxReadTime(kafkaRead.getMaxReadTime()) .withMaxNumRecords(kafkaRead.getMaxNumRecords()); } - + PCollection> output = input.getPipeline().apply(transform); + if (kafkaRead.getOffsetDeduplication() != null && kafkaRead.getOffsetDeduplication()) { + output = + output.apply( + "Insert Offset for offset deduplication", + ParDo.of(new OffsetDeduplicationIdExtractor<>())); + } if (kafkaRead.isRedistributed()) { if (kafkaRead.isCommitOffsetsInFinalizeEnabled() && kafkaRead.isAllowDuplicates()) { LOG.warn( "Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled"); } - PCollection> output = input.getPipeline().apply(transform); if (kafkaRead.getRedistributeNumKeys() == 0) { return output.apply( @@ -1786,7 +1797,7 @@ public PCollection> expand(PBegin input) { .withNumBuckets((int) kafkaRead.getRedistributeNumKeys())); } } - return input.getPipeline().apply(transform); + return output; } } @@ -1895,6 +1906,29 @@ public PCollection> expand(PBegin input) { } } + static class OffsetDeduplicationIdExtractor + extends DoFn, KafkaRecord> { + + @ProcessElement + public void processElement(ProcessContext pc) { + KafkaRecord element = pc.element(); + Long offset = null; + String uniqueId = null; + if (element != null) { + offset = element.getOffset(); + uniqueId = + (String.format("%s-%d-%d", element.getTopic(), element.getPartition(), offset)); + } + pc.outputWindowedValue( + element, + pc.timestamp(), + Lists.newArrayList(GlobalWindow.INSTANCE), + pc.pane(), + uniqueId, + offset); + } + } + /** * A DoFn which generates {@link KafkaSourceDescriptor} based on the configuration of {@link * Read}.