From 02d088e5e56c734e3536ad8f3c7b0b46f0a3213d Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 30 Jul 2025 16:09:15 +0200 Subject: [PATCH 1/7] Add elemement metadata to propagate offset and unique id from source as part of offset deduplication --- ...oundedSplittableProcessElementInvoker.java | 37 +++++ .../beam/runners/core/SimpleDoFnRunner.java | 94 ++++++++++++ .../SplittableParDoViaKeyedWorkItems.java | 20 +++ .../runners/dataflow/BatchViewOverrides.java | 21 +++ .../runners/dataflow/worker/WindmillSink.java | 17 ++- .../worker/util/ValueInEmptyWindows.java | 21 +++ .../org/apache/beam/sdk/transforms/DoFn.java | 28 ++++ .../beam/sdk/transforms/DoFnTester.java | 64 +++++++- .../SplittableParDoNaiveBounded.java | 48 ++++++ .../beam/sdk/values/ElementMetadata.java | 31 ++++ .../beam/sdk/values/ValueInSingleWindow.java | 15 +- .../apache/beam/sdk/values/WindowedValue.java | 13 ++ .../beam/sdk/values/WindowedValues.java | 139 +++++++++++++---- .../beam/fn/harness/FnApiDoFnRunner.java | 140 ++++++++++++++++++ 14 files changed, 652 insertions(+), 36 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/values/ElementMetadata.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index f8dbfd61e836..0f14aa47681b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -378,6 +379,11 @@ public Instant timestamp() { return element.getTimestamp(); } + @Override + public ElementMetadata elementMetadata() { + return element.getElementMetadata(); + } + @Override public PaneInfo pane() { return element.getPaneInfo(); @@ -411,6 +417,21 @@ public void outputWindowedValue( outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); + } + outputReceiver.output( + mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public void output(TupleTag tag, T value) { outputWithTimestamp(tag, value, element.getTimestamp()); @@ -436,6 +457,22 @@ public void outputWindowedValue( outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); + } + outputReceiver.output( + tag, WindowedValues.of(value, timestamp, windows, paneInfo, elementMetadata)); + } + private void noteOutput() { checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()"); checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()"); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 840245edf7ad..a653c32bde26 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; @@ -334,6 +335,26 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) { public void output(TupleTag tag, T output, Instant timestamp, BoundedWindow window) { outputWindowedValue(tag, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING)); } + + @Override + public void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + output(mainOutputTag, output, timestamp, window, elementMetadata); + } + + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + outputWindowedValue( + tag, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING, elementMetadata)); + } } private final DoFnFinishBundleArgumentProvider.Context context = @@ -427,6 +448,16 @@ public void outputWindowedValue( outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, elementMetadata); + } + @Override public void output(TupleTag tag, T output) { checkNotNull(tag, "Tag passed to output cannot be null"); @@ -451,11 +482,28 @@ public void outputWindowedValue( tag, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + SimpleDoFnRunner.this.outputWindowedValue( + tag, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public Instant timestamp() { return elem.getTimestamp(); } + @Override + public ElementMetadata elementMetadata() { + return elem.getElementMetadata(); + } + public Collection windows() { return elem.getWindows(); } @@ -867,6 +915,16 @@ public void outputWindowedValue( outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, elementMetadata); + } + @Override public void output(TupleTag tag, T output) { checkTimestamp(timestamp(), timestamp); @@ -892,6 +950,19 @@ public void outputWindowedValue( tag, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + checkTimestamp(timestamp(), timestamp); + SimpleDoFnRunner.this.outputWindowedValue( + tag, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( @@ -1096,6 +1167,16 @@ public void outputWindowedValue( outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, elementMetadata); + } + @Override public void output(TupleTag tag, T output) { checkTimestamp(this.timestamp, timestamp); @@ -1121,6 +1202,19 @@ public void outputWindowedValue( tag, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + checkTimestamp(this.timestamp, timestamp); + SimpleDoFnRunner.this.outputWindowedValue( + tag, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index fafb02f9dd0b..7fbedb5babc0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.util.construction.SplittableParDo.ProcessKeyedElements; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -663,6 +664,25 @@ public void output( throwUnsupportedOutput(); } + @Override + public void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + throwUnsupportedOutput(); + } + + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + throwUnsupportedOutput(); + } + @Override public PipelineOptions getPipelineOptions() { return baseContext.getPipelineOptions(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 537a2d855921..8bcc3a8db2d7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -64,6 +64,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -1375,6 +1376,11 @@ public WindowedValue withValue(NewT value) { return new ValueInEmptyWindows<>(value); } + @Override + public WindowedValue withElementMetadata(ElementMetadata elementMetadata) { + return this; + } + @Override public T getValue() { return value; @@ -1395,6 +1401,21 @@ public PaneInfo getPaneInfo() { return PaneInfo.NO_FIRING; } + @Override + public @Nullable String getCurrentRecordId() { + return null; + } + + @Override + public @Nullable Long getCurrentRecordOffset() { + return null; + } + + @Override + public @Nullable ElementMetadata getElementMetadata() { + return null; + } + @Override public Iterable> explodeWindows() { return Collections.emptyList(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index b156ff45caf6..02d3111ec81b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -23,6 +23,7 @@ import com.google.auto.service.AutoService; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -43,6 +44,7 @@ import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -221,14 +223,25 @@ public long add(WindowedValue data) throws IOException { throw new RuntimeException( "Unexpected record ID via ValueWithRecordIdCoder while offset-based deduplication enabled."); } - byte[] rawId = context.getCurrentRecordId(); + byte[] rawId = null; + + if (data.getCurrentRecordId() != null) { + rawId = data.getCurrentRecordId().getBytes(StandardCharsets.UTF_8); + } else { + rawId = context.getCurrentRecordId(); + } if (rawId.length == 0) { throw new RuntimeException( "Unexpected empty record ID while offset-based deduplication enabled."); } id = ByteString.copyFrom(rawId); - byte[] rawOffset = context.getCurrentRecordOffset(); + byte[] rawOffset = null; + if (data.getCurrentRecordOffset() != null) { + rawOffset = Longs.toByteArray(data.getCurrentRecordOffset()); + } else { + rawOffset = context.getCurrentRecordOffset(); + } if (rawOffset.length == 0) { throw new RuntimeException( "Unexpected empty record offset while offset-based deduplication enabled."); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 42174629b3b8..a7199e834224 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -22,6 +22,7 @@ import java.util.Objects; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -49,6 +50,21 @@ public PaneInfo getPaneInfo() { return PaneInfo.NO_FIRING; } + @Override + public @Nullable String getCurrentRecordId() { + return null; + } + + @Override + public @Nullable Long getCurrentRecordOffset() { + return null; + } + + @Override + public @Nullable ElementMetadata getElementMetadata() { + return null; + } + @Override public Iterable> explodeWindows() { return Collections.emptyList(); @@ -64,6 +80,11 @@ public WindowedValue withValue(NewT newValue) { return new ValueInEmptyWindows<>(newValue); } + @Override + public WindowedValue withElementMetadata(ElementMetadata elementMetadata) { + return this; + } + @Override public Instant getTimestamp() { return BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 0961c8512523..df8429a474f6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -122,6 +123,8 @@ public abstract class FinishBundleContext { */ public abstract void output(OutputT output, Instant timestamp, BoundedWindow window); + public abstract void output( + OutputT output, Instant timestamp, BoundedWindow window, ElementMetadata elementMetadata); /** * Adds the given element to the output {@code PCollection} with the given tag at the given * timestamp in the given window. @@ -133,6 +136,13 @@ public abstract class FinishBundleContext { */ public abstract void output( TupleTag tag, T output, Instant timestamp, BoundedWindow window); + + public abstract void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata); } /** @@ -211,6 +221,13 @@ public abstract void outputWindowedValue( Collection windows, PaneInfo paneInfo); + public abstract void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata); + /** * Adds the given element to the output {@code PCollection} with the given tag. * @@ -283,6 +300,14 @@ public abstract void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo); + + public abstract void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ @@ -323,6 +348,9 @@ public abstract class ProcessContext extends WindowedContext { */ @Pure public abstract PaneInfo pane(); + + @Pure + public abstract ElementMetadata elementMetadata(); } /** Information accessible when running a {@link DoFn.OnTimer} method. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index fb1947ad5ba3..85132afd9e48 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; @@ -213,7 +214,7 @@ public void processWindowedElement(InputT element, Instant timestamp, final Boun try { final DoFn.ProcessContext processContext = createProcessContext( - ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING)); + ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING, null)); fnInvoker.invokeProcessElement( new DoFnInvoker.BaseArgumentProvider() { @@ -478,7 +479,29 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) { @Override public void output(TupleTag tag, T output, Instant timestamp, BoundedWindow window) { getMutableOutput(tag) - .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING)); + .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING, null)); + } + + @Override + public void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + output(mainOutputTag, output, timestamp, window, elementMetadata); + } + + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + output, timestamp, window, PaneInfo.NO_FIRING, elementMetadata)); } }; } @@ -562,6 +585,11 @@ public Instant timestamp() { return element.getTimestamp(); } + @Override + public ElementMetadata elementMetadata() { + return element.getElementMetadata(); + } + @Override public PaneInfo pane() { return element.getPaneInfo(); @@ -591,6 +619,16 @@ public void outputWindowedValue( outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, elementMetadata); + } + @Override public void output(TupleTag tag, T output) { outputWithTimestamp(tag, output, element.getTimestamp()); @@ -601,7 +639,11 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp getMutableOutput(tag) .add( ValueInSingleWindow.of( - output, timestamp, element.getWindow(), element.getPaneInfo())); + output, + timestamp, + element.getWindow(), + element.getPaneInfo(), + element.getElementMetadata())); } @Override @@ -612,7 +654,21 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo) { for (BoundedWindow w : windows) { - getMutableOutput(tag).add(ValueInSingleWindow.of(output, timestamp, w, paneInfo)); + getMutableOutput(tag).add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null)); + } + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + for (BoundedWindow w : windows) { + getMutableOutput(tag) + .add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, elementMetadata)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index edae34fbecf9..c087ed235347 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -397,6 +398,27 @@ public void output( "Output from FinishBundle for SDF is not supported in naive implementation"); } + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + throw new UnsupportedOperationException( + "Output from FinishBundle for SDF is not supported in naive implementation"); + } + + @Override + public void output( + @Nullable OutputT output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + throw new UnsupportedOperationException( + "Output from FinishBundle for SDF is not supported in naive implementation"); + } + @Override public void output( TupleTag tag, T output, Instant timestamp, BoundedWindow window) { @@ -617,6 +639,16 @@ public void outputWindowedValue( outerContext.outputWindowedValue(output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + outerContext.outputWindowedValue(output, timestamp, windows, paneInfo, elementMetadata); + } + @Override public void output(TupleTag tag, T output) { outerContext.output(tag, output); @@ -637,6 +669,17 @@ public void outputWindowedValue( outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo, elementMetadata); + } + @Override public InputT element() { return element; @@ -657,6 +700,11 @@ public PaneInfo pane() { return outerContext.pane(); } + @Override + public ElementMetadata elementMetadata() { + return outerContext.elementMetadata(); + } + @Override public Object watermarkEstimatorState() { throw new UnsupportedOperationException( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ElementMetadata.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ElementMetadata.java new file mode 100644 index 000000000000..0802c7c23135 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ElementMetadata.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import com.google.auto.value.AutoValue; + +@AutoValue +public abstract class ElementMetadata { + public abstract String getCurrentRecordId(); + + public abstract long getCurrentRecordOffset(); + + public static ElementMetadata create(String currentRecordId, long currentRecordOffset) { + return new AutoValue_ElementMetadata(currentRecordId, currentRecordOffset); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 74717fc606b2..ae4622d7007a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -57,12 +57,23 @@ public T getValue() { /** Returns the window of this {@code ValueInSingleWindow}. */ public abstract BoundedWindow getWindow(); + public abstract @Nullable ElementMetadata getElementMetadata(); + /** Returns the pane of this {@code ValueInSingleWindow} in its window. */ public abstract PaneInfo getPaneInfo(); + public static ValueInSingleWindow of( + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + @Nullable ElementMetadata elementMetadata) { + return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, elementMetadata, paneInfo); + } + public static ValueInSingleWindow of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo); + return of(value, timestamp, window, paneInfo, null); } /** A coder for {@link ValueInSingleWindow}. */ @@ -110,7 +121,7 @@ public ValueInSingleWindow decode(InputStream inStream, Context context) thro BoundedWindow window = windowCoder.decode(inStream); PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream); T value = valueCoder.decode(inStream, context); - return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo); + return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, null, paneInfo); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java index d2c4d1f07da7..f9a42e8e6435 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java @@ -20,6 +20,7 @@ import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; import org.joda.time.Instant; @@ -45,6 +46,15 @@ public interface WindowedValue { @Pure PaneInfo getPaneInfo(); + @Nullable + String getCurrentRecordId(); + + @Nullable + Long getCurrentRecordOffset(); + + @Nullable + ElementMetadata getElementMetadata(); + /** * A representation of each of the actual values represented by this compressed {@link * WindowedValue}, one per window. @@ -58,4 +68,7 @@ public interface WindowedValue { */ @Pure WindowedValue withValue(OtherT value); + + @Pure + WindowedValue withElementMetadata(ElementMetadata elementMetadata); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index 9616fd845fa7..ddeb7405eefb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -61,42 +61,67 @@ public class WindowedValues { private WindowedValues() {} // non-instantiable utility class - /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { + return of(value, timestamp, windows, paneInfo, null); + } + + /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ + public static WindowedValue of( + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable ElementMetadata elementMetadata) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo); + return of(value, timestamp, windows.iterator().next(), paneInfo, elementMetadata); } else { - return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, paneInfo); + return new TimestampedValueInMultipleWindows<>( + value, timestamp, windows, paneInfo, elementMetadata); } } /** @deprecated for use only in compatibility with old broken code */ @Deprecated static WindowedValue createWithoutValidation( - T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable ElementMetadata elementMetadata) { if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo); + return of(value, timestamp, windows.iterator().next(), paneInfo, elementMetadata); } else { - return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, paneInfo); + return new TimestampedValueInMultipleWindows<>( + value, timestamp, windows, paneInfo, elementMetadata); } } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { + return of(value, timestamp, window, paneInfo, null); + } + + public static WindowedValue of( + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + @Nullable ElementMetadata elementMetadata) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { return valueInGlobalWindow(value, paneInfo); } else if (isGlobal) { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo); + return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, elementMetadata); } else { - return new TimestampedValueInSingleWindow<>(value, timestamp, window, paneInfo); + return new TimestampedValueInSingleWindow<>( + value, timestamp, window, paneInfo, elementMetadata); } } @@ -124,7 +149,7 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta if (BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { return valueInGlobalWindow(value); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, PaneInfo.NO_FIRING); + return new TimestampedValueInGlobalWindow<>(value, timestamp, PaneInfo.NO_FIRING, null); } } @@ -137,7 +162,7 @@ public static WindowedValue timestampedValueInGlobalWindow( if (paneInfo.equals(PaneInfo.NO_FIRING)) { return timestampedValueInGlobalWindow(value, timestamp); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo); + return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null); } } @@ -151,7 +176,8 @@ public static WindowedValue withValue( newValue, windowedValue.getTimestamp(), windowedValue.getWindows(), - windowedValue.getPaneInfo()); + windowedValue.getPaneInfo(), + windowedValue.getElementMetadata()); } public static boolean equals( @@ -200,10 +226,28 @@ private abstract static class SimpleWindowedValue implements WindowedValue private final T value; private final PaneInfo paneInfo; + private final @Nullable ElementMetadata elementMetadata; + + @Override + public @Nullable String getCurrentRecordId() { + return elementMetadata != null ? elementMetadata.getCurrentRecordId() : null; + } - protected SimpleWindowedValue(T value, PaneInfo paneInfo) { + @Override + public @Nullable Long getCurrentRecordOffset() { + return elementMetadata != null ? elementMetadata.getCurrentRecordOffset() : null; + } + + protected SimpleWindowedValue( + T value, PaneInfo paneInfo, @Nullable ElementMetadata elementMetadata) { this.value = value; this.paneInfo = checkNotNull(paneInfo); + this.elementMetadata = elementMetadata; + } + + @Override + public @Nullable ElementMetadata getElementMetadata() { + return elementMetadata; } @Override @@ -232,8 +276,10 @@ public Iterable> explodeWindows() { /** The abstract superclass of WindowedValue representations where timestamp == MIN. */ private abstract static class MinTimestampWindowedValue extends SimpleWindowedValue { - public MinTimestampWindowedValue(T value, PaneInfo paneInfo) { - super(value, paneInfo); + + public MinTimestampWindowedValue( + T value, PaneInfo pane, @Nullable ElementMetadata elementMetadata) { + super(value, pane, elementMetadata); } @Override @@ -247,7 +293,12 @@ private static class ValueInGlobalWindow extends MinTimestampWindowedValue implements SingleWindowedValue { public ValueInGlobalWindow(T value, PaneInfo paneInfo) { - super(value, paneInfo); + this(value, paneInfo, null); + } + + public ValueInGlobalWindow( + T value, PaneInfo paneInfo, @Nullable ElementMetadata elementMetadata) { + super(value, paneInfo, elementMetadata); } @Override @@ -262,7 +313,12 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { - return new ValueInGlobalWindow<>(newValue, getPaneInfo()); + return new ValueInGlobalWindow<>(newValue, getPaneInfo(), getElementMetadata()); + } + + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new ValueInGlobalWindow<>(getValue(), getPaneInfo(), elementMetadata); } @Override @@ -294,8 +350,9 @@ public String toString() { private abstract static class TimestampedWindowedValue extends SimpleWindowedValue { private final Instant timestamp; - public TimestampedWindowedValue(T value, Instant timestamp, PaneInfo paneInfo) { - super(value, paneInfo); + public TimestampedWindowedValue( + T value, Instant timestamp, PaneInfo paneInfo, @Nullable ElementMetadata elementMetadata) { + super(value, paneInfo, elementMetadata); this.timestamp = checkNotNull(timestamp); } @@ -312,8 +369,9 @@ public Instant getTimestamp() { private static class TimestampedValueInGlobalWindow extends TimestampedWindowedValue implements SingleWindowedValue { - public TimestampedValueInGlobalWindow(T value, Instant timestamp, PaneInfo paneInfo) { - super(value, timestamp, paneInfo); + public TimestampedValueInGlobalWindow( + T value, Instant timestamp, PaneInfo paneInfo, @Nullable ElementMetadata elementMetadata) { + super(value, timestamp, paneInfo, elementMetadata); } @Override @@ -328,7 +386,14 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPaneInfo()); + return new TimestampedValueInGlobalWindow<>( + newValue, getTimestamp(), getPaneInfo(), getElementMetadata()); + } + + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new TimestampedValueInGlobalWindow<>( + getValue(), getTimestamp(), getPaneInfo(), elementMetadata); } @Override @@ -372,14 +437,25 @@ private static class TimestampedValueInSingleWindow extends TimestampedWindow private final BoundedWindow window; public TimestampedValueInSingleWindow( - T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - super(value, timestamp, paneInfo); + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + @Nullable ElementMetadata elementMetadata) { + super(value, timestamp, paneInfo, elementMetadata); this.window = checkNotNull(window); } @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPaneInfo()); + return new TimestampedValueInSingleWindow<>( + newValue, getTimestamp(), window, getPaneInfo(), getElementMetadata()); + } + + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new TimestampedValueInSingleWindow<>( + getValue(), getTimestamp(), getWindow(), getPaneInfo(), elementMetadata); } @Override @@ -433,8 +509,9 @@ public TimestampedValueInMultipleWindows( T value, Instant timestamp, Collection windows, - PaneInfo paneInfo) { - super(value, timestamp, paneInfo); + PaneInfo paneInfo, + @Nullable ElementMetadata elementMetadata) { + super(value, timestamp, paneInfo, elementMetadata); this.windows = checkNotNull(windows); } @@ -446,7 +523,13 @@ public Collection getWindows() { @Override public WindowedValue withValue(NewT newValue) { return new TimestampedValueInMultipleWindows<>( - newValue, getTimestamp(), getWindows(), getPaneInfo()); + newValue, getTimestamp(), getWindows(), getPaneInfo(), getElementMetadata()); + } + + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new TimestampedValueInMultipleWindows<>( + getValue(), getTimestamp(), getWindows(), getPaneInfo(), elementMetadata); } @Override @@ -603,7 +686,7 @@ public WindowedValue decode(InputStream inStream, Context context) // Because there are some remaining (incorrect) uses of WindowedValue with no windows, // we call this deprecated no-validation path when decoding - return WindowedValues.createWithoutValidation(value, timestamp, windows, paneInfo); + return WindowedValues.createWithoutValidation(value, timestamp, windows, paneInfo, null); } @Override diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index cc7c971e10bc..9cc36d09b473 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -102,6 +102,7 @@ import org.apache.beam.sdk.util.construction.ParDoTranslation; import org.apache.beam.sdk.util.construction.RehydratedComponents; import org.apache.beam.sdk.util.construction.Timer; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -1667,6 +1668,34 @@ public void output(TupleTag tag, T output, Instant timestamp, BoundedWind } outputTo(consumer, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING)); } + + @Override + public void output( + OutputT output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + outputTo( + mainOutputConsumer, + WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING, elementMetadata)); + } + + @Override + public void output( + TupleTag tag, + T output, + Instant timestamp, + BoundedWindow window, + ElementMetadata elementMetadata) { + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING, elementMetadata)); + } } private final FinishBundleArgumentProvider.Context context = @@ -1758,6 +1787,20 @@ public void outputWindowedValue( outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all + // runners can provide proper timestamps. + outputTo( + mainOutputConsumer, + WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all @@ -1789,6 +1832,22 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public State state(String stateId, boolean alwaysFetched) { StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId); @@ -1886,6 +1945,19 @@ public void outputWindowedValue( outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + checkTimestamp(timestamp); + outputTo( + mainOutputConsumer, + WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkTimestamp(timestamp); @@ -1915,6 +1987,23 @@ public void outputWindowedValue( } outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + checkTimestamp(timestamp); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } } /** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -2205,6 +2294,11 @@ public Instant timestamp() { return currentElement.getTimestamp(); } + @Override + public ElementMetadata elementMetadata() { + return currentElement.getElementMetadata(); + } + @Override public PaneInfo pane() { return currentElement.getPaneInfo(); @@ -2271,6 +2365,19 @@ public void outputWindowedValue( outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + checkOnWindowExpirationTimestamp(timestamp); + outputTo( + mainOutputConsumer, + WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public void output(TupleTag tag, T output) { FnDataReceiver> consumer = @@ -2313,6 +2420,17 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + // todo + } + @SuppressWarnings( "deprecation") // Allowed Skew is deprecated for users, but must be respected private void checkOnWindowExpirationTimestamp(Instant timestamp) { @@ -2574,6 +2692,19 @@ public void outputWindowedValue( outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) { + checkTimerTimestamp(timestamp); + outputTo( + mainOutputConsumer, + WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + } + @Override public void output(TupleTag tag, T output) { checkTimerTimestamp(currentTimer.getHoldTimestamp()); @@ -2612,6 +2743,15 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo) {} + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ElementMetadata elementMetadata) {} + @Override public TimeDomain timeDomain() { return currentTimeDomain; From c2f820f9167f6893c060b880043616c93dbdcfe3 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 30 Jul 2025 16:35:29 +0200 Subject: [PATCH 2/7] Insert offset metadata. --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 42 ++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 338ee59c3800..c8de19ad632b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -92,10 +92,12 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.construction.PTransformMatchers; import org.apache.beam.sdk.util.construction.ReplacementOutputs; import org.apache.beam.sdk.util.construction.TransformUpgrader; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -109,6 +111,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -1617,10 +1620,14 @@ private void checkRedistributeConfiguration() { isRedistributed(), "withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform."); } - if (getOffsetDeduplication() != null && getOffsetDeduplication()) { + if (getOffsetDeduplication() != null && getOffsetDeduplication() && isRedistributed()) { checkState( - isRedistributed() && !isAllowDuplicates(), - "withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false)."); + !isAllowDuplicates(), + "withOffsetDeduplication and withRedistribute are set but withAllowDuplicates is set to false."); + } + if (getOffsetDeduplication() != null && getOffsetDeduplication() && !isRedistributed()) { + LOG.warn( + "Offsets used for deduplication are available in WindowedValue's metadata. Combining, aggregating, mutating them may risk with data loss."); } } @@ -1765,13 +1772,18 @@ public PCollection> expand(PBegin input) { .withMaxReadTime(kafkaRead.getMaxReadTime()) .withMaxNumRecords(kafkaRead.getMaxNumRecords()); } - + PCollection> output = input.getPipeline().apply(transform); + if (kafkaRead.getOffsetDeduplication() != null && kafkaRead.getOffsetDeduplication()) { + output = + output.apply( + "Insert Offset for offset deduplication", + ParDo.of(new OffsetDeduplicationIdExtractor<>())); + } if (kafkaRead.isRedistributed()) { if (kafkaRead.isCommitOffsetsInFinalizeEnabled() && kafkaRead.isAllowDuplicates()) { LOG.warn( "Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled"); } - PCollection> output = input.getPipeline().apply(transform); if (kafkaRead.getRedistributeNumKeys() == 0) { return output.apply( @@ -1786,7 +1798,7 @@ public PCollection> expand(PBegin input) { .withNumBuckets((int) kafkaRead.getRedistributeNumKeys())); } } - return input.getPipeline().apply(transform); + return output; } } @@ -1895,6 +1907,24 @@ public PCollection> expand(PBegin input) { } } + static class OffsetDeduplicationIdExtractor + extends DoFn, KafkaRecord> { + + @ProcessElement + public void processElement(ProcessContext pc) { + KafkaRecord element = pc.element(); + ElementMetadata em = null; + if (element != null) { + long offset = element.getOffset(); + String uniqueId = + (String.format("%s-%d-%d", element.getTopic(), element.getPartition(), offset)); + em = ElementMetadata.create(uniqueId, offset); + } + pc.outputWindowedValue( + element, pc.timestamp(), Lists.newArrayList(GlobalWindow.INSTANCE), pc.pane(), em); + } + } + /** * A DoFn which generates {@link KafkaSourceDescriptor} based on the configuration of {@link * Read}. From 39c3f5f278bbc98bbc9f1bb9d1a8c4232670fbcc Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 30 Jul 2025 17:00:59 +0200 Subject: [PATCH 3/7] review --- ...tAndTimeBoundedSplittableProcessElementInvoker.java | 6 +----- .../beam/runners/dataflow/worker/WindmillSink.java | 4 ++-- .../main/java/org/apache/beam/sdk/transforms/DoFn.java | 2 +- .../org/apache/beam/fn/harness/FnApiDoFnRunner.java | 10 +++++----- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- 5 files changed, 10 insertions(+), 14 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 0f14aa47681b..3f4630f09994 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -450,11 +450,7 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - noteOutput(); - if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { - ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); - } - outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo)); + outputWindowedValue(tag, value, timestamp, windows, paneInfo, null); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index 02d3111ec81b..31782634ce4f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -230,7 +230,7 @@ public long add(WindowedValue data) throws IOException { } else { rawId = context.getCurrentRecordId(); } - if (rawId.length == 0) { + if (rawId == null || rawId.length == 0) { throw new RuntimeException( "Unexpected empty record ID while offset-based deduplication enabled."); } @@ -242,7 +242,7 @@ public long add(WindowedValue data) throws IOException { } else { rawOffset = context.getCurrentRecordOffset(); } - if (rawOffset.length == 0) { + if (rawOffset == null || rawOffset.length == 0) { throw new RuntimeException( "Unexpected empty record offset while offset-based deduplication enabled."); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index df8429a474f6..642387c8545d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -226,7 +226,7 @@ public abstract void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata); + @Nullable ElementMetadata elementMetadata); /** * Adds the given element to the output {@code PCollection} with the given tag. diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 9cc36d09b473..435be1fc0c2e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -2414,10 +2414,7 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - checkOnWindowExpirationTimestamp(timestamp); - FnDataReceiver> consumer = - (FnDataReceiver) localNameToConsumer.get(tag.getId()); - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); + outputWindowedValue(tag, output, timestamp, windows, paneInfo, null); } @Override @@ -2428,7 +2425,10 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo, ElementMetadata elementMetadata) { - // todo + checkOnWindowExpirationTimestamp(timestamp); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); } @SuppressWarnings( diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index c8de19ad632b..08c3f2182e27 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1623,7 +1623,7 @@ private void checkRedistributeConfiguration() { if (getOffsetDeduplication() != null && getOffsetDeduplication() && isRedistributed()) { checkState( !isAllowDuplicates(), - "withOffsetDeduplication and withRedistribute are set but withAllowDuplicates is set to false."); + "withOffsetDeduplication and withRedistribute can only be used when withAllowDuplicates is set to false."); } if (getOffsetDeduplication() != null && getOffsetDeduplication() && !isRedistributed()) { LOG.warn( From 94ac0010023544a94824d659ff7939f07b0dc31c Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 30 Jul 2025 17:17:59 +0200 Subject: [PATCH 4/7] fixes --- .../beam/runners/spark/util/TimerUtils.java | 21 +++++++++++++++++++ .../SplittableParDoNaiveBounded.java | 3 ++- .../beam/fn/harness/FnApiDoFnRunner.java | 5 +++-- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java index 30a6577d3e97..4fe5441c3455 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; @@ -110,6 +111,21 @@ public PaneInfo getPaneInfo() { return PaneInfo.NO_FIRING; } + @Override + public @Nullable String getCurrentRecordId() { + return null; + } + + @Override + public @Nullable Long getCurrentRecordOffset() { + return null; + } + + @Override + public @Nullable ElementMetadata getElementMetadata() { + return null; + } + @Override public Iterable> explodeWindows() { return Collections.emptyList(); @@ -125,6 +141,11 @@ public WindowedValue withValue(NewT newValue) { return new WindowedValueForTimerMarker<>(newValue); } + @Override + public WindowedValue withElementMetadata(ElementMetadata elementMetadata) { + return this; + } + @Override public Instant getTimestamp() { return BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index c087ed235347..660ff9f09271 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -677,7 +677,8 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo, ElementMetadata elementMetadata) { - outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo, elementMetadata); + outerContext.outputWindowedValue( + tag, output, timestamp, windows, paneInfo, elementMetadata); } @Override diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 435be1fc0c2e..498e05041c6c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -2427,8 +2427,9 @@ public void outputWindowedValue( ElementMetadata elementMetadata) { checkOnWindowExpirationTimestamp(timestamp); FnDataReceiver> consumer = - (FnDataReceiver) localNameToConsumer.get(tag.getId()); - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + outputTo( + consumer, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); } @SuppressWarnings( From d1af070ec57c095e05d6f78d8d9830636f32af42 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 18 Aug 2025 12:06:20 +0200 Subject: [PATCH 5/7] remove ElementMetadata.java --- ...oundedSplittableProcessElementInvoker.java | 30 ++-- .../SplittableParDoViaKeyedWorkItems.java | 7 +- .../runners/dataflow/BatchViewOverrides.java | 11 -- .../worker/util/ValueInEmptyWindows.java | 11 -- .../beam/runners/spark/util/TimerUtils.java | 11 -- .../org/apache/beam/sdk/transforms/DoFn.java | 21 ++- .../beam/sdk/transforms/DoFnTester.java | 63 ++++--- .../SplittableParDoNaiveBounded.java | 27 ++- .../beam/sdk/values/ElementMetadata.java | 31 ---- .../beam/sdk/values/ValueInSingleWindow.java | 16 +- .../apache/beam/sdk/values/WindowedValue.java | 6 - .../beam/sdk/values/WindowedValues.java | 156 ++++++++---------- .../beam/fn/harness/FnApiDoFnRunner.java | 60 ++++--- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 +- 14 files changed, 218 insertions(+), 242 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/values/ElementMetadata.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 3f4630f09994..40ea89cf0d22 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -379,17 +378,22 @@ public Instant timestamp() { return element.getTimestamp(); } - @Override - public ElementMetadata elementMetadata() { - return element.getElementMetadata(); - } - @Override public PaneInfo pane() { return element.getPaneInfo(); } - @Override + @Override + public String currentRecordId() { + return element.getCurrentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return element.getCurrentRecordOffset(); + } + + @Override public PipelineOptions getPipelineOptions() { return pipelineOptions; } @@ -423,13 +427,14 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { noteOutput(); if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } outputReceiver.output( - mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo, elementMetadata)); + mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -450,7 +455,7 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - outputWindowedValue(tag, value, timestamp, windows, paneInfo, null); + outputWindowedValue(tag, value, timestamp, windows, paneInfo, null, null); } @Override @@ -460,13 +465,14 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { noteOutput(); if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } outputReceiver.output( - tag, WindowedValues.of(value, timestamp, windows, paneInfo, elementMetadata)); + tag, WindowedValues.of(value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } private void noteOutput() { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 7fbedb5babc0..dd4f3d54cbaf 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -52,7 +52,6 @@ import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.util.construction.SplittableParDo.ProcessKeyedElements; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -669,7 +668,8 @@ public void output( OutputT output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { throwUnsupportedOutput(); } @@ -679,7 +679,8 @@ public void output( T output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { throwUnsupportedOutput(); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 8bcc3a8db2d7..51c1642f848c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -1376,11 +1375,6 @@ public WindowedValue withValue(NewT value) { return new ValueInEmptyWindows<>(value); } - @Override - public WindowedValue withElementMetadata(ElementMetadata elementMetadata) { - return this; - } - @Override public T getValue() { return value; @@ -1411,11 +1405,6 @@ public PaneInfo getPaneInfo() { return null; } - @Override - public @Nullable ElementMetadata getElementMetadata() { - return null; - } - @Override public Iterable> explodeWindows() { return Collections.emptyList(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index a7199e834224..1119617a068e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -22,7 +22,6 @@ import java.util.Objects; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -60,11 +59,6 @@ public PaneInfo getPaneInfo() { return null; } - @Override - public @Nullable ElementMetadata getElementMetadata() { - return null; - } - @Override public Iterable> explodeWindows() { return Collections.emptyList(); @@ -80,11 +74,6 @@ public WindowedValue withValue(NewT newValue) { return new ValueInEmptyWindows<>(newValue); } - @Override - public WindowedValue withElementMetadata(ElementMetadata elementMetadata) { - return this; - } - @Override public Instant getTimestamp() { return BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java index 4fe5441c3455..d03914a256ca 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; @@ -121,11 +120,6 @@ public PaneInfo getPaneInfo() { return null; } - @Override - public @Nullable ElementMetadata getElementMetadata() { - return null; - } - @Override public Iterable> explodeWindows() { return Collections.emptyList(); @@ -141,11 +135,6 @@ public WindowedValue withValue(NewT newValue) { return new WindowedValueForTimerMarker<>(newValue); } - @Override - public WindowedValue withElementMetadata(ElementMetadata elementMetadata) { - return this; - } - @Override public Instant getTimestamp() { return BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 642387c8545d..10904b2aa393 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -45,7 +45,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -124,7 +123,11 @@ public abstract class FinishBundleContext { public abstract void output(OutputT output, Instant timestamp, BoundedWindow window); public abstract void output( - OutputT output, Instant timestamp, BoundedWindow window, ElementMetadata elementMetadata); + OutputT output, + Instant timestamp, + BoundedWindow window, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset); /** * Adds the given element to the output {@code PCollection} with the given tag at the given * timestamp in the given window. @@ -142,7 +145,8 @@ public abstract void output( T output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset); } /** @@ -226,7 +230,8 @@ public abstract void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - @Nullable ElementMetadata elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset); /** * Adds the given element to the output {@code PCollection} with the given tag. @@ -307,7 +312,8 @@ public abstract void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ @@ -350,7 +356,10 @@ public abstract class ProcessContext extends WindowedContext { public abstract PaneInfo pane(); @Pure - public abstract ElementMetadata elementMetadata(); + public abstract String currentRecordId(); + + @Pure + public abstract Long currentRecordOffset(); } /** Information accessible when running a {@link DoFn.OnTimer} method. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 85132afd9e48..f4670a4d0e94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -49,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; @@ -214,7 +213,7 @@ public void processWindowedElement(InputT element, Instant timestamp, final Boun try { final DoFn.ProcessContext processContext = createProcessContext( - ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING, null)); + ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING, null, null)); fnInvoker.invokeProcessElement( new DoFnInvoker.BaseArgumentProvider() { @@ -479,7 +478,9 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) { @Override public void output(TupleTag tag, T output, Instant timestamp, BoundedWindow window) { getMutableOutput(tag) - .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING, null)); + .add( + ValueInSingleWindow.of( + output, timestamp, window, PaneInfo.NO_FIRING, null, null)); } @Override @@ -487,8 +488,9 @@ public void output( OutputT output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { - output(mainOutputTag, output, timestamp, window, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + output(mainOutputTag, output, timestamp, window, currentRecordId, currentRecordOffset); } @Override @@ -497,11 +499,17 @@ public void output( T output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { getMutableOutput(tag) .add( ValueInSingleWindow.of( - output, timestamp, window, PaneInfo.NO_FIRING, elementMetadata)); + output, + timestamp, + window, + PaneInfo.NO_FIRING, + currentRecordId, + currentRecordOffset)); } }; } @@ -586,13 +594,18 @@ public Instant timestamp() { } @Override - public ElementMetadata elementMetadata() { - return element.getElementMetadata(); + public PaneInfo pane() { + return element.getPaneInfo(); } @Override - public PaneInfo pane() { - return element.getPaneInfo(); + public String currentRecordId() { + return element.getCurrentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return element.getCurrentRecordOffset(); } @Override @@ -625,8 +638,16 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { - outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + mainOutputTag, + output, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset); } @Override @@ -639,11 +660,7 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp getMutableOutput(tag) .add( ValueInSingleWindow.of( - output, - timestamp, - element.getWindow(), - element.getPaneInfo(), - element.getElementMetadata())); + output, timestamp, element.getWindow(), element.getPaneInfo(), null, null)); } @Override @@ -654,7 +671,8 @@ public void outputWindowedValue( Collection windows, PaneInfo paneInfo) { for (BoundedWindow w : windows) { - getMutableOutput(tag).add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null)); + getMutableOutput(tag) + .add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null)); } } @@ -665,10 +683,13 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { for (BoundedWindow w : windows) { getMutableOutput(tag) - .add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, elementMetadata)); + .add( + ValueInSingleWindow.of( + output, timestamp, w, paneInfo, currentRecordId, currentRecordOffset)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 660ff9f09271..d462d422446c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -404,7 +403,8 @@ public void output( T output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { throw new UnsupportedOperationException( "Output from FinishBundle for SDF is not supported in naive implementation"); } @@ -414,7 +414,8 @@ public void output( @Nullable OutputT output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { throw new UnsupportedOperationException( "Output from FinishBundle for SDF is not supported in naive implementation"); } @@ -645,8 +646,10 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { - outerContext.outputWindowedValue(output, timestamp, windows, paneInfo, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outerContext.outputWindowedValue( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset); } @Override @@ -676,9 +679,10 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { outerContext.outputWindowedValue( - tag, output, timestamp, windows, paneInfo, elementMetadata); + tag, output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset); } @Override @@ -702,8 +706,13 @@ public PaneInfo pane() { } @Override - public ElementMetadata elementMetadata() { - return outerContext.elementMetadata(); + public String currentRecordId() { + return outerContext.currentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return outerContext.currentRecordOffset(); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ElementMetadata.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ElementMetadata.java deleted file mode 100644 index 0802c7c23135..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ElementMetadata.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.values; - -import com.google.auto.value.AutoValue; - -@AutoValue -public abstract class ElementMetadata { - public abstract String getCurrentRecordId(); - - public abstract long getCurrentRecordOffset(); - - public static ElementMetadata create(String currentRecordId, long currentRecordOffset) { - return new AutoValue_ElementMetadata(currentRecordId, currentRecordOffset); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index ae4622d7007a..7dc5fef52ecb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -57,23 +57,27 @@ public T getValue() { /** Returns the window of this {@code ValueInSingleWindow}. */ public abstract BoundedWindow getWindow(); - public abstract @Nullable ElementMetadata getElementMetadata(); - /** Returns the pane of this {@code ValueInSingleWindow} in its window. */ public abstract PaneInfo getPaneInfo(); + public abstract @Nullable String getCurrentRecordId(); + + public abstract @Nullable Long getCurrentRecordOffset(); + public static ValueInSingleWindow of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo, - @Nullable ElementMetadata elementMetadata) { - return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, elementMetadata, paneInfo); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + return new AutoValue_ValueInSingleWindow<>( + value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset); } public static ValueInSingleWindow of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - return of(value, timestamp, window, paneInfo, null); + return of(value, timestamp, window, paneInfo, null, null); } /** A coder for {@link ValueInSingleWindow}. */ @@ -121,7 +125,7 @@ public ValueInSingleWindow decode(InputStream inStream, Context context) thro BoundedWindow window = windowCoder.decode(inStream); PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream); T value = valueCoder.decode(inStream, context); - return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, null, paneInfo); + return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo, null, null); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java index f9a42e8e6435..0512be524b91 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java @@ -52,9 +52,6 @@ public interface WindowedValue { @Nullable Long getCurrentRecordOffset(); - @Nullable - ElementMetadata getElementMetadata(); - /** * A representation of each of the actual values represented by this compressed {@link * WindowedValue}, one per window. @@ -68,7 +65,4 @@ public interface WindowedValue { */ @Pure WindowedValue withValue(OtherT value); - - @Pure - WindowedValue withElementMetadata(ElementMetadata elementMetadata); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index ddeb7405eefb..4b4f5fca5658 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -63,7 +63,7 @@ private WindowedValues() {} // non-instantiable utility class public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { - return of(value, timestamp, windows, paneInfo, null); + return of(value, timestamp, windows, paneInfo, null, null); } /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ @@ -72,56 +72,43 @@ public static WindowedValue of( Instant timestamp, Collection windows, PaneInfo paneInfo, - @Nullable ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, elementMetadata); + return of(value, timestamp, windows.iterator().next(), paneInfo); } else { return new TimestampedValueInMultipleWindows<>( - value, timestamp, windows, paneInfo, elementMetadata); + value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset); } } /** @deprecated for use only in compatibility with old broken code */ @Deprecated static WindowedValue createWithoutValidation( - T value, - Instant timestamp, - Collection windows, - PaneInfo paneInfo, - @Nullable ElementMetadata elementMetadata) { + T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, elementMetadata); + return of(value, timestamp, windows.iterator().next(), paneInfo); } else { return new TimestampedValueInMultipleWindows<>( - value, timestamp, windows, paneInfo, elementMetadata); + value, timestamp, windows, paneInfo, null, null); } } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - return of(value, timestamp, window, paneInfo, null); - } - - public static WindowedValue of( - T value, - Instant timestamp, - BoundedWindow window, - PaneInfo paneInfo, - @Nullable ElementMetadata elementMetadata) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { return valueInGlobalWindow(value, paneInfo); } else if (isGlobal) { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, elementMetadata); + return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null, null); } else { - return new TimestampedValueInSingleWindow<>( - value, timestamp, window, paneInfo, elementMetadata); + return new TimestampedValueInSingleWindow<>(value, timestamp, window, paneInfo, null, null); } } @@ -130,7 +117,7 @@ public static WindowedValue of( * default timestamp and pane. */ public static WindowedValue valueInGlobalWindow(T value) { - return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING); + return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null); } /** @@ -138,7 +125,7 @@ public static WindowedValue valueInGlobalWindow(T value) { * default timestamp and the specified pane. */ public static WindowedValue valueInGlobalWindow(T value, PaneInfo paneInfo) { - return new ValueInGlobalWindow<>(value, paneInfo); + return new ValueInGlobalWindow<>(value, paneInfo, null, null); } /** @@ -149,7 +136,7 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta if (BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { return valueInGlobalWindow(value); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, PaneInfo.NO_FIRING, null); + return new TimestampedValueInGlobalWindow<>(value, timestamp, PaneInfo.NO_FIRING, null, null); } } @@ -162,7 +149,7 @@ public static WindowedValue timestampedValueInGlobalWindow( if (paneInfo.equals(PaneInfo.NO_FIRING)) { return timestampedValueInGlobalWindow(value, timestamp); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null); + return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null, null); } } @@ -172,12 +159,7 @@ public static WindowedValue timestampedValueInGlobalWindow( */ public static WindowedValue withValue( WindowedValue windowedValue, NewT newValue) { - return WindowedValues.of( - newValue, - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo(), - windowedValue.getElementMetadata()); + return windowedValue.withValue(newValue); } public static boolean equals( @@ -226,28 +208,28 @@ private abstract static class SimpleWindowedValue implements WindowedValue private final T value; private final PaneInfo paneInfo; - private final @Nullable ElementMetadata elementMetadata; + private final @Nullable String currentRecordId; + private final @Nullable Long currentRecordOffset; @Override public @Nullable String getCurrentRecordId() { - return elementMetadata != null ? elementMetadata.getCurrentRecordId() : null; + return currentRecordId; } @Override public @Nullable Long getCurrentRecordOffset() { - return elementMetadata != null ? elementMetadata.getCurrentRecordOffset() : null; + return currentRecordOffset; } protected SimpleWindowedValue( - T value, PaneInfo paneInfo, @Nullable ElementMetadata elementMetadata) { + T value, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { this.value = value; this.paneInfo = checkNotNull(paneInfo); - this.elementMetadata = elementMetadata; - } - - @Override - public @Nullable ElementMetadata getElementMetadata() { - return elementMetadata; + this.currentRecordId = currentRecordId; + this.currentRecordOffset = currentRecordOffset; } @Override @@ -278,8 +260,11 @@ public Iterable> explodeWindows() { private abstract static class MinTimestampWindowedValue extends SimpleWindowedValue { public MinTimestampWindowedValue( - T value, PaneInfo pane, @Nullable ElementMetadata elementMetadata) { - super(value, pane, elementMetadata); + T value, + PaneInfo pane, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, pane, currentRecordId, currentRecordOffset); } @Override @@ -292,13 +277,12 @@ public Instant getTimestamp() { private static class ValueInGlobalWindow extends MinTimestampWindowedValue implements SingleWindowedValue { - public ValueInGlobalWindow(T value, PaneInfo paneInfo) { - this(value, paneInfo, null); - } - public ValueInGlobalWindow( - T value, PaneInfo paneInfo, @Nullable ElementMetadata elementMetadata) { - super(value, paneInfo, elementMetadata); + T value, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, paneInfo, currentRecordId, currentRecordOffset); } @Override @@ -313,12 +297,8 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { - return new ValueInGlobalWindow<>(newValue, getPaneInfo(), getElementMetadata()); - } - - @Override - public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { - return new ValueInGlobalWindow<>(getValue(), getPaneInfo(), elementMetadata); + return new ValueInGlobalWindow<>( + newValue, getPaneInfo(), getCurrentRecordId(), getCurrentRecordOffset()); } @Override @@ -351,8 +331,12 @@ private abstract static class TimestampedWindowedValue extends SimpleWindowed private final Instant timestamp; public TimestampedWindowedValue( - T value, Instant timestamp, PaneInfo paneInfo, @Nullable ElementMetadata elementMetadata) { - super(value, paneInfo, elementMetadata); + T value, + Instant timestamp, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, paneInfo, currentRecordId, currentRecordOffset); this.timestamp = checkNotNull(timestamp); } @@ -370,8 +354,12 @@ private static class TimestampedValueInGlobalWindow extends TimestampedWindow implements SingleWindowedValue { public TimestampedValueInGlobalWindow( - T value, Instant timestamp, PaneInfo paneInfo, @Nullable ElementMetadata elementMetadata) { - super(value, timestamp, paneInfo, elementMetadata); + T value, + Instant timestamp, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset); } @Override @@ -387,13 +375,7 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { return new TimestampedValueInGlobalWindow<>( - newValue, getTimestamp(), getPaneInfo(), getElementMetadata()); - } - - @Override - public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { - return new TimestampedValueInGlobalWindow<>( - getValue(), getTimestamp(), getPaneInfo(), elementMetadata); + newValue, getTimestamp(), getPaneInfo(), getCurrentRecordId(), getCurrentRecordOffset()); } @Override @@ -441,21 +423,21 @@ public TimestampedValueInSingleWindow( Instant timestamp, BoundedWindow window, PaneInfo paneInfo, - @Nullable ElementMetadata elementMetadata) { - super(value, timestamp, paneInfo, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset); this.window = checkNotNull(window); } @Override public WindowedValue withValue(NewT newValue) { return new TimestampedValueInSingleWindow<>( - newValue, getTimestamp(), window, getPaneInfo(), getElementMetadata()); - } - - @Override - public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { - return new TimestampedValueInSingleWindow<>( - getValue(), getTimestamp(), getWindow(), getPaneInfo(), elementMetadata); + newValue, + getTimestamp(), + window, + getPaneInfo(), + getCurrentRecordId(), + getCurrentRecordOffset()); } @Override @@ -510,8 +492,9 @@ public TimestampedValueInMultipleWindows( Instant timestamp, Collection windows, PaneInfo paneInfo, - @Nullable ElementMetadata elementMetadata) { - super(value, timestamp, paneInfo, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset); this.windows = checkNotNull(windows); } @@ -523,13 +506,12 @@ public Collection getWindows() { @Override public WindowedValue withValue(NewT newValue) { return new TimestampedValueInMultipleWindows<>( - newValue, getTimestamp(), getWindows(), getPaneInfo(), getElementMetadata()); - } - - @Override - public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { - return new TimestampedValueInMultipleWindows<>( - getValue(), getTimestamp(), getWindows(), getPaneInfo(), elementMetadata); + newValue, + getTimestamp(), + getWindows(), + getPaneInfo(), + getCurrentRecordId(), + getCurrentRecordOffset()); } @Override @@ -686,7 +668,7 @@ public WindowedValue decode(InputStream inStream, Context context) // Because there are some remaining (incorrect) uses of WindowedValue with no windows, // we call this deprecated no-validation path when decoding - return WindowedValues.createWithoutValidation(value, timestamp, windows, paneInfo, null); + return WindowedValues.createWithoutValidation(value, timestamp, windows, paneInfo); } @Override diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 498e05041c6c..b47cd5b824be 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -102,7 +102,6 @@ import org.apache.beam.sdk.util.construction.ParDoTranslation; import org.apache.beam.sdk.util.construction.RehydratedComponents; import org.apache.beam.sdk.util.construction.Timer; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -1674,10 +1673,11 @@ public void output( OutputT output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING, elementMetadata)); + WindowedValues.of(output, timestamp, Collections.singletonList(window), PaneInfo.NO_FIRING, currentRecordId, currentRecordOffset)); } @Override @@ -1686,7 +1686,8 @@ public void output( T output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); if (consumer == null) { @@ -1694,7 +1695,7 @@ public void output( } outputTo( consumer, - WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING, elementMetadata)); + WindowedValues.of(output, timestamp, Collections.singletonList(window), PaneInfo.NO_FIRING, currentRecordId, currentRecordOffset)); } } @@ -1793,12 +1794,13 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all // runners can provide proper timestamps. outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -1839,13 +1841,14 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -1951,11 +1954,12 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkTimestamp(timestamp); outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -1995,14 +1999,15 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkTimestamp(timestamp); FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } } @@ -2295,8 +2300,13 @@ public Instant timestamp() { } @Override - public ElementMetadata elementMetadata() { - return currentElement.getElementMetadata(); + public String currentRecordId() { + return currentElement.getCurrentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return currentElement.getCurrentRecordOffset(); } @Override @@ -2371,11 +2381,12 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkOnWindowExpirationTimestamp(timestamp); outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -2414,7 +2425,7 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - outputWindowedValue(tag, output, timestamp, windows, paneInfo, null); + outputWindowedValue(tag, output, timestamp, windows, paneInfo, null, null); } @Override @@ -2424,12 +2435,13 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkOnWindowExpirationTimestamp(timestamp); FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); outputTo( - consumer, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + consumer, WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @SuppressWarnings( @@ -2699,11 +2711,12 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkTimerTimestamp(timestamp); outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -2751,7 +2764,8 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) {} + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) {} @Override public TimeDomain timeDomain() { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 08c3f2182e27..f17e5b4c5227 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1913,15 +1913,15 @@ static class OffsetDeduplicationIdExtractor @ProcessElement public void processElement(ProcessContext pc) { KafkaRecord element = pc.element(); - ElementMetadata em = null; + Long offset = null; + String uniqueId = null; if (element != null) { - long offset = element.getOffset(); - String uniqueId = + offset = element.getOffset(); + uniqueId = (String.format("%s-%d-%d", element.getTopic(), element.getPartition(), offset)); - em = ElementMetadata.create(uniqueId, offset); } pc.outputWindowedValue( - element, pc.timestamp(), Lists.newArrayList(GlobalWindow.INSTANCE), pc.pane(), em); + element, pc.timestamp(), Lists.newArrayList(GlobalWindow.INSTANCE), pc.pane(), uniqueId, offset); } } From a02e0fc00ac827d6cdf4cc724a22feacf0753c6d Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 18 Aug 2025 12:15:18 +0200 Subject: [PATCH 6/7] spotless --- ...oundedSplittableProcessElementInvoker.java | 26 +++--- .../beam/runners/core/SimpleDoFnRunner.java | 84 ++++++++++++++----- .../beam/fn/harness/FnApiDoFnRunner.java | 42 ++++++++-- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 14 ++-- 4 files changed, 122 insertions(+), 44 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 40ea89cf0d22..b16dad86df18 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -383,17 +383,17 @@ public PaneInfo pane() { return element.getPaneInfo(); } - @Override - public String currentRecordId() { - return element.getCurrentRecordId(); - } + @Override + public String currentRecordId() { + return element.getCurrentRecordId(); + } - @Override - public Long currentRecordOffset() { - return element.getCurrentRecordOffset(); - } + @Override + public Long currentRecordOffset() { + return element.getCurrentRecordOffset(); + } - @Override + @Override public PipelineOptions getPipelineOptions() { return pipelineOptions; } @@ -434,7 +434,9 @@ public void outputWindowedValue( ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } outputReceiver.output( - mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + mainOutputTag, + WindowedValues.of( + value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -472,7 +474,9 @@ public void outputWindowedValue( ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } outputReceiver.output( - tag, WindowedValues.of(value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + tag, + WindowedValues.of( + value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } private void noteOutput() { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index a653c32bde26..217c06c56fe5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -54,7 +54,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; @@ -341,8 +340,9 @@ public void output( OutputT output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { - output(mainOutputTag, output, timestamp, window, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + output(mainOutputTag, output, timestamp, window, currentRecordId, currentRecordOffset); } @Override @@ -351,9 +351,17 @@ public void output( T output, Instant timestamp, BoundedWindow window, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { outputWindowedValue( - tag, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING, elementMetadata)); + tag, + WindowedValues.of( + output, + timestamp, + Collections.singletonList(window), + PaneInfo.NO_FIRING, + currentRecordId, + currentRecordOffset)); } } @@ -454,8 +462,16 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { - outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + mainOutputTag, + output, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset); } @Override @@ -489,9 +505,12 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { SimpleDoFnRunner.this.outputWindowedValue( - tag, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + tag, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -500,8 +519,13 @@ public Instant timestamp() { } @Override - public ElementMetadata elementMetadata() { - return elem.getElementMetadata(); + public String currentRecordId() { + return elem.getCurrentRecordId(); + } + + @Override + public Long currentRecordOffset() { + return elem.getCurrentRecordOffset(); } public Collection windows() { @@ -921,8 +945,16 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { - outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + mainOutputTag, + output, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset); } @Override @@ -957,10 +989,13 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkTimestamp(timestamp(), timestamp); SimpleDoFnRunner.this.outputWindowedValue( - tag, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + tag, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -1173,8 +1208,16 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { - outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, elementMetadata); + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { + outputWindowedValue( + mainOutputTag, + output, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset); } @Override @@ -1209,10 +1252,13 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo, - ElementMetadata elementMetadata) { + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset) { checkTimestamp(this.timestamp, timestamp); SimpleDoFnRunner.this.outputWindowedValue( - tag, WindowedValues.of(output, timestamp, windows, paneInfo, elementMetadata)); + tag, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index b47cd5b824be..fbadc984c3d6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1677,7 +1677,13 @@ public void output( @Nullable Long currentRecordOffset) { outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, Collections.singletonList(window), PaneInfo.NO_FIRING, currentRecordId, currentRecordOffset)); + WindowedValues.of( + output, + timestamp, + Collections.singletonList(window), + PaneInfo.NO_FIRING, + currentRecordId, + currentRecordOffset)); } @Override @@ -1695,7 +1701,13 @@ public void output( } outputTo( consumer, - WindowedValues.of(output, timestamp, Collections.singletonList(window), PaneInfo.NO_FIRING, currentRecordId, currentRecordOffset)); + WindowedValues.of( + output, + timestamp, + Collections.singletonList(window), + PaneInfo.NO_FIRING, + currentRecordId, + currentRecordOffset)); } } @@ -1800,7 +1812,8 @@ public void outputWindowedValue( // runners can provide proper timestamps. outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -1848,7 +1861,10 @@ public void outputWindowedValue( if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + outputTo( + consumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -1959,7 +1975,8 @@ public void outputWindowedValue( checkTimestamp(timestamp); outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -2007,7 +2024,10 @@ public void outputWindowedValue( if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + outputTo( + consumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } } @@ -2386,7 +2406,8 @@ public void outputWindowedValue( checkOnWindowExpirationTimestamp(timestamp); outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override @@ -2441,7 +2462,9 @@ public void outputWindowedValue( FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); outputTo( - consumer, WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + consumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @SuppressWarnings( @@ -2716,7 +2739,8 @@ public void outputWindowedValue( checkTimerTimestamp(timestamp); outputTo( mainOutputConsumer, - WindowedValues.of(output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); + WindowedValues.of( + output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset)); } @Override diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index f17e5b4c5227..92860b4ecf9e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -97,7 +97,6 @@ import org.apache.beam.sdk.util.construction.PTransformMatchers; import org.apache.beam.sdk.util.construction.ReplacementOutputs; import org.apache.beam.sdk.util.construction.TransformUpgrader; -import org.apache.beam.sdk.values.ElementMetadata; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -1914,14 +1913,19 @@ static class OffsetDeduplicationIdExtractor public void processElement(ProcessContext pc) { KafkaRecord element = pc.element(); Long offset = null; - String uniqueId = null; + String uniqueId = null; if (element != null) { - offset = element.getOffset(); - uniqueId = + offset = element.getOffset(); + uniqueId = (String.format("%s-%d-%d", element.getTopic(), element.getPartition(), offset)); } pc.outputWindowedValue( - element, pc.timestamp(), Lists.newArrayList(GlobalWindow.INSTANCE), pc.pane(), uniqueId, offset); + element, + pc.timestamp(), + Lists.newArrayList(GlobalWindow.INSTANCE), + pc.pane(), + uniqueId, + offset); } } From 4b04b5d660135025972b54ec5371d4b1cbc11316 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 18 Aug 2025 12:18:42 +0200 Subject: [PATCH 7/7] spotless --- .../java/org/apache/beam/sdk/values/WindowedValues.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index 4b4f5fca5658..4bbab33a8936 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -159,7 +159,13 @@ public static WindowedValue timestampedValueInGlobalWindow( */ public static WindowedValue withValue( WindowedValue windowedValue, NewT newValue) { - return windowedValue.withValue(newValue); + return WindowedValues.of( + newValue, + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPaneInfo(), + windowedValue.getCurrentRecordId(), + windowedValue.getCurrentRecordOffset()); } public static boolean equals(