Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,16 @@ public PaneInfo pane() {
return element.getPaneInfo();
}

@Override
public String currentRecordId() {
return element.getCurrentRecordId();
}

@Override
public Long currentRecordOffset() {
return element.getCurrentRecordOffset();
}

@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
Expand Down Expand Up @@ -411,6 +421,24 @@ public void outputWindowedValue(
outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo));
}

@Override
public void outputWindowedValue(
OutputT value,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
outputReceiver.output(
mainOutputTag,
WindowedValues.of(
value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
}

@Override
public <T> void output(TupleTag<T> tag, T value) {
outputWithTimestamp(tag, value, element.getTimestamp());
Expand All @@ -429,11 +457,26 @@ public <T> void outputWindowedValue(
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outputWindowedValue(tag, value, timestamp, windows, paneInfo, null, null);
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T value,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo));
outputReceiver.output(
tag,
WindowedValues.of(
value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
}

private void noteOutput() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,35 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) {
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
outputWindowedValue(tag, WindowedValues.of(output, timestamp, window, PaneInfo.NO_FIRING));
}

@Override
public void output(
OutputT output,
Instant timestamp,
BoundedWindow window,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
output(mainOutputTag, output, timestamp, window, currentRecordId, currentRecordOffset);
}

@Override
public <T> void output(
TupleTag<T> tag,
T output,
Instant timestamp,
BoundedWindow window,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
outputWindowedValue(
tag,
WindowedValues.of(
output,
timestamp,
Collections.singletonList(window),
PaneInfo.NO_FIRING,
currentRecordId,
currentRecordOffset));
}
}

private final DoFnFinishBundleArgumentProvider.Context context =
Expand Down Expand Up @@ -427,6 +456,24 @@ public void outputWindowedValue(
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
outputWindowedValue(
mainOutputTag,
output,
timestamp,
windows,
paneInfo,
currentRecordId,
currentRecordOffset);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkNotNull(tag, "Tag passed to output cannot be null");
Expand All @@ -451,11 +498,36 @@ public <T> void outputWindowedValue(
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
SimpleDoFnRunner.this.outputWindowedValue(
tag,
WindowedValues.of(
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
}

@Override
public Instant timestamp() {
return elem.getTimestamp();
}

@Override
public String currentRecordId() {
return elem.getCurrentRecordId();
}

@Override
public Long currentRecordOffset() {
return elem.getCurrentRecordOffset();
}

public Collection<? extends BoundedWindow> windows() {
return elem.getWindows();
}
Expand Down Expand Up @@ -867,6 +939,24 @@ public void outputWindowedValue(
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
outputWindowedValue(
mainOutputTag,
output,
timestamp,
windows,
paneInfo,
currentRecordId,
currentRecordOffset);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkTimestamp(timestamp(), timestamp);
Expand All @@ -892,6 +982,22 @@ public <T> void outputWindowedValue(
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
checkTimestamp(timestamp(), timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag,
WindowedValues.of(
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -1096,6 +1202,24 @@ public void outputWindowedValue(
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
outputWindowedValue(
mainOutputTag,
output,
timestamp,
windows,
paneInfo,
currentRecordId,
currentRecordOffset);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkTimestamp(this.timestamp, timestamp);
Expand All @@ -1121,6 +1245,22 @@ public <T> void outputWindowedValue(
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
checkTimestamp(this.timestamp, timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag,
WindowedValues.of(
output, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset));
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,27 @@ public <T> void output(
throwUnsupportedOutput();
}

@Override
public void output(
OutputT output,
Instant timestamp,
BoundedWindow window,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
throwUnsupportedOutput();
}

@Override
public <T> void output(
TupleTag<T> tag,
T output,
Instant timestamp,
BoundedWindow window,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
throwUnsupportedOutput();
}

@Override
public PipelineOptions getPipelineOptions() {
return baseContext.getPipelineOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,16 @@ public PaneInfo getPaneInfo() {
return PaneInfo.NO_FIRING;
}

@Override
public @Nullable String getCurrentRecordId() {
return null;
}

@Override
public @Nullable Long getCurrentRecordOffset() {
return null;
}

@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -43,6 +44,7 @@
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -221,15 +223,26 @@ public long add(WindowedValue<T> data) throws IOException {
throw new RuntimeException(
"Unexpected record ID via ValueWithRecordIdCoder while offset-based deduplication enabled.");
}
byte[] rawId = context.getCurrentRecordId();
if (rawId.length == 0) {
byte[] rawId = null;

if (data.getCurrentRecordId() != null) {
rawId = data.getCurrentRecordId().getBytes(StandardCharsets.UTF_8);
} else {
rawId = context.getCurrentRecordId();
}
if (rawId == null || rawId.length == 0) {
throw new RuntimeException(
"Unexpected empty record ID while offset-based deduplication enabled.");
}
id = ByteString.copyFrom(rawId);

byte[] rawOffset = context.getCurrentRecordOffset();
if (rawOffset.length == 0) {
byte[] rawOffset = null;
if (data.getCurrentRecordOffset() != null) {
rawOffset = Longs.toByteArray(data.getCurrentRecordOffset());
} else {
rawOffset = context.getCurrentRecordOffset();
}
if (rawOffset == null || rawOffset.length == 0) {
throw new RuntimeException(
"Unexpected empty record offset while offset-based deduplication enabled.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public PaneInfo getPaneInfo() {
return PaneInfo.NO_FIRING;
}

@Override
public @Nullable String getCurrentRecordId() {
return null;
}

@Override
public @Nullable Long getCurrentRecordOffset() {
return null;
}

@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ public PaneInfo getPaneInfo() {
return PaneInfo.NO_FIRING;
}

@Override
public @Nullable String getCurrentRecordId() {
return null;
}

@Override
public @Nullable Long getCurrentRecordOffset() {
return null;
}

@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
Expand Down
Loading
Loading