Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ void maybeRecordPCollectionWithAutoSharding(PCollection<?> pcol) {
options.isEnableStreamingEngine(),
"Runner determined sharding not available in Dataflow for GroupIntoBatches for"
+ " non-Streaming-Engine jobs. In order to use runner determined sharding, please use"
+ " --streaming --enable_streaming_engine");
+ " --streaming --experiments=enable_streaming_engine");
pCollectionsPreservedKeys.add(pcol);
pcollectionsRequiringAutoSharding.add(pcol);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
*/
package org.apache.beam.sdk.transforms;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand All @@ -34,28 +38,58 @@
*/
public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {

Instant startTimestamp = Instant.now();
Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
Duration fireInterval = Duration.standardMinutes(1);
Instant startTimestamp;
Instant stopTimestamp;
@Nullable Duration stopDuration;
Duration fireInterval;
boolean applyWindowing = false;
boolean catchUpToNow = true;

private PeriodicImpulse() {}
private PeriodicImpulse() {
this.startTimestamp = Instant.now();
this.stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
this.fireInterval = Duration.standardMinutes(1);
}

public static PeriodicImpulse create() {
return new PeriodicImpulse();
}

/**
* Assign a timestamp when the pipeliene starts to produce data.
*
* <p>Cannot be used along with {@link #stopAfter}.
*/
public PeriodicImpulse startAt(Instant startTime) {
checkArgument(stopDuration == null, "startAt and stopAfter cannot be set at the same time");
this.startTimestamp = startTime;
return this;
}

/**
* Assign a timestamp when the pipeliene stops producing data.
*
* <p>Cannot be used along with {@link #stopAfter}.
*/
public PeriodicImpulse stopAt(Instant stopTime) {
checkArgument(stopDuration == null, "stopAt and stopAfter cannot be set at the same time");
this.stopTimestamp = stopTime;
return this;
}

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Assign a time interval at which the pipeliene produces data. This is different from setting
* {@link #startAt} and {@link #stopAt}, as the first timestamp is determined at run time
* (pipeline starts processing).
*/
@Internal
public PeriodicImpulse stopAfter(Duration duration) {
this.stopDuration = duration;
return this;
}

public PeriodicImpulse withInterval(Duration interval) {
this.fireInterval = interval;
return this;
Expand All @@ -67,31 +101,65 @@ public PeriodicImpulse applyWindowing() {
}

/**
* The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then
* starts firing at the specified interval. If this is set to false, the PeriodicImpulse will
* perform the interval wait before firing each instant.
*/
@Internal
public PeriodicImpulse catchUpToNow(boolean catchUpToNow) {
this.catchUpToNow = catchUpToNow;
return this;
}

@Override
public PCollection<Instant> expand(PBegin input) {
PCollection<Instant> result =
input
.apply(
Create.<PeriodicSequence.SequenceDefinition>of(
new PeriodicSequence.SequenceDefinition(
startTimestamp, stopTimestamp, fireInterval, catchUpToNow)))
.apply(PeriodicSequence.create());
PCollection<PeriodicSequence.SequenceDefinition> seqDef;
if (stopDuration != null) {
// nonnull guaranteed
Duration d = stopDuration;
seqDef =
input
.apply(Impulse.create())
.apply(ParDo.of(new RuntimeSequenceFn(d, fireInterval, catchUpToNow)));
} else {
seqDef =
input.apply(
Create.of(
new PeriodicSequence.SequenceDefinition(
startTimestamp, stopTimestamp, fireInterval, catchUpToNow)));
}
PCollection<Instant> result = seqDef.apply(PeriodicSequence.create());

if (this.applyWindowing) {
result =
result.apply(
Window.<Instant>into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
result.apply(Window.into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
}

return result;
}

/**
* A DoFn generated a SequenceDefinition at run time. This enables set first element timestamp at
* pipeline start processing data.
*/
private static class RuntimeSequenceFn extends DoFn<byte[], PeriodicSequence.SequenceDefinition> {
Duration stopDuration;
Duration fireInterval;
boolean catchUpToNow;

RuntimeSequenceFn(Duration stopDuration, Duration fireInterval, boolean catchUpToNow) {
this.stopDuration = stopDuration;
this.fireInterval = fireInterval;
this.catchUpToNow = catchUpToNow;
}

@ProcessElement
public void process(ProcessContext c) {
Instant now = Instant.now();
c.output(
new PeriodicSequence.SequenceDefinition(
now, now.plus(stopDuration), fireInterval, catchUpToNow));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.util.Objects;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
Expand Down Expand Up @@ -67,6 +68,8 @@ public SequenceDefinition(Instant first, Instant last, Duration duration) {
this.catchUpToNow = true;
}

/** <b><i>catchUpToNow is experimental; no backwards-compatibility guarantees.</i></b> */
@Internal
public SequenceDefinition(
Instant first, Instant last, Duration duration, boolean catchUpToNow) {
this.first = first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -81,11 +88,32 @@ public void processElement(ProcessContext c) {
}
}

private GenerateSequence stream(int rowCount) {
int timestampIntervalInMilliseconds = 10;
return GenerateSequence.from(0)
.to(rowCount)
.withRate(1, Duration.millis(timestampIntervalInMilliseconds));
static class UnboundedStream extends PTransform<PBegin, PCollection<Long>> {

private final int rowCount;

public UnboundedStream(int rowCount) {
this.rowCount = rowCount;
}

@Override
public PCollection<Long> expand(PBegin input) {
int timestampIntervalInMillis = 10;
PeriodicImpulse impulse =
PeriodicImpulse.create()
.stopAfter(Duration.millis((long) timestampIntervalInMillis * rowCount - 1))
.withInterval(Duration.millis(timestampIntervalInMillis));
return input
.apply(impulse)
.apply(
MapElements.via(
new SimpleFunction<Instant, Long>() {
@Override
public Long apply(Instant input) {
return input.getMillis();
}
}));
}
}

private void runBigQueryIOStorageWritePipeline(
Expand All @@ -102,7 +130,9 @@ private void runBigQueryIOStorageWritePipeline(
new TableFieldSchema().setName("str").setType("STRING")));

Pipeline p = Pipeline.create(bqOptions);
p.apply("Input", isStreaming ? stream(rowCount) : GenerateSequence.from(0).to(rowCount))
p.apply(
"Input",
isStreaming ? new UnboundedStream(rowCount) : GenerateSequence.from(0).to(rowCount))
.apply("GenerateMessage", ParDo.of(new FillRowFn()))
.apply(
"WriteToBQ",
Expand Down