diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 2acc30455e22..f6e2b9b147c5 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -612,9 +612,6 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) { exclude '**/FhirIOLROIT.class' exclude '**/FhirIOSearchIT.class' exclude '**/FhirIOPatientEverythingIT.class' - // failing due to pane index not incrementing after Reshuffle: - // https://github.com/apache/beam/issues/28219 - exclude '**/FileLoadsStreamingIT.class' maxParallelForks 4 classpath = configurations.googleCloudPlatformIntegrationTest diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 02f6f9acd7a6..17aea34045ff 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1755,7 +1755,7 @@ void maybeRecordPCollectionWithAutoSharding(PCollection pcol) { options.isEnableStreamingEngine(), "Runner determined sharding not available in Dataflow for GroupIntoBatches for" + " non-Streaming-Engine jobs. In order to use runner determined sharding, please use" - + " --streaming --enable_streaming_engine"); + + " --streaming --experiments=enable_streaming_engine"); pCollectionsPreservedKeys.add(pcol); pcollectionsRequiringAutoSharding.add(pcol); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java index 3679c3eb10f5..db4f141ee624 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java @@ -17,11 +17,15 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -34,28 +38,58 @@ */ public class PeriodicImpulse extends PTransform> { - Instant startTimestamp = Instant.now(); - Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE; - Duration fireInterval = Duration.standardMinutes(1); + Instant startTimestamp; + Instant stopTimestamp; + @Nullable Duration stopDuration; + Duration fireInterval; boolean applyWindowing = false; boolean catchUpToNow = true; - private PeriodicImpulse() {} + private PeriodicImpulse() { + this.startTimestamp = Instant.now(); + this.stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE; + this.fireInterval = Duration.standardMinutes(1); + } public static PeriodicImpulse create() { return new PeriodicImpulse(); } + /** + * Assign a timestamp when the pipeliene starts to produce data. + * + *

Cannot be used along with {@link #stopAfter}. + */ public PeriodicImpulse startAt(Instant startTime) { + checkArgument(stopDuration == null, "startAt and stopAfter cannot be set at the same time"); this.startTimestamp = startTime; return this; } + /** + * Assign a timestamp when the pipeliene stops producing data. + * + *

Cannot be used along with {@link #stopAfter}. + */ public PeriodicImpulse stopAt(Instant stopTime) { + checkArgument(stopDuration == null, "stopAt and stopAfter cannot be set at the same time"); this.stopTimestamp = stopTime; return this; } + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Assign a time interval at which the pipeliene produces data. This is different from setting + * {@link #startAt} and {@link #stopAt}, as the first timestamp is determined at run time + * (pipeline starts processing). + */ + @Internal + public PeriodicImpulse stopAfter(Duration duration) { + this.stopDuration = duration; + return this; + } + public PeriodicImpulse withInterval(Duration interval) { this.fireInterval = interval; return this; @@ -67,10 +101,13 @@ public PeriodicImpulse applyWindowing() { } /** - * The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then + * For internal use only; no backwards-compatibility guarantees. + * + *

The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then * starts firing at the specified interval. If this is set to false, the PeriodicImpulse will * perform the interval wait before firing each instant. */ + @Internal public PeriodicImpulse catchUpToNow(boolean catchUpToNow) { this.catchUpToNow = catchUpToNow; return this; @@ -78,20 +115,51 @@ public PeriodicImpulse catchUpToNow(boolean catchUpToNow) { @Override public PCollection expand(PBegin input) { - PCollection result = - input - .apply( - Create.of( - new PeriodicSequence.SequenceDefinition( - startTimestamp, stopTimestamp, fireInterval, catchUpToNow))) - .apply(PeriodicSequence.create()); + PCollection seqDef; + if (stopDuration != null) { + // nonnull guaranteed + Duration d = stopDuration; + seqDef = + input + .apply(Impulse.create()) + .apply(ParDo.of(new RuntimeSequenceFn(d, fireInterval, catchUpToNow))); + } else { + seqDef = + input.apply( + Create.of( + new PeriodicSequence.SequenceDefinition( + startTimestamp, stopTimestamp, fireInterval, catchUpToNow))); + } + PCollection result = seqDef.apply(PeriodicSequence.create()); if (this.applyWindowing) { result = - result.apply( - Window.into(FixedWindows.of(Duration.millis(fireInterval.getMillis())))); + result.apply(Window.into(FixedWindows.of(Duration.millis(fireInterval.getMillis())))); } - return result; } + + /** + * A DoFn generated a SequenceDefinition at run time. This enables set first element timestamp at + * pipeline start processing data. + */ + private static class RuntimeSequenceFn extends DoFn { + Duration stopDuration; + Duration fireInterval; + boolean catchUpToNow; + + RuntimeSequenceFn(Duration stopDuration, Duration fireInterval, boolean catchUpToNow) { + this.stopDuration = stopDuration; + this.fireInterval = fireInterval; + this.catchUpToNow = catchUpToNow; + } + + @ProcessElement + public void process(ProcessContext c) { + Instant now = Instant.now(); + c.output( + new PeriodicSequence.SequenceDefinition( + now, now.plus(stopDuration), fireInterval, catchUpToNow)); + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java index b3cd2afde697..12cbecd04b02 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java @@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.util.Objects; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -67,6 +68,8 @@ public SequenceDefinition(Instant first, Instant last, Duration duration) { this.catchUpToNow = true; } + /** catchUpToNow is experimental; no backwards-compatibility guarantees. */ + @Internal public SequenceDefinition( Instant first, Instant last, Duration duration, boolean catchUpToNow) { this.first = first; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 31036c58f9dc..be732550d875 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; @@ -48,25 +49,31 @@ class WritePartition @AutoValue abstract static class Result { public abstract List getFilenames(); - + // Downstream operations may rely on pane info which will get lost after a ReShuffle abstract Boolean isFirstPane(); + + abstract Long getPaneIndex(); } static class ResultCoder extends AtomicCoder { private static final Coder> FILENAMES_CODER = ListCoder.of(StringUtf8Coder.of()); private static final Coder FIRST_PANE_CODER = BooleanCoder.of(); + private static final Coder PANE_INDEX_CODER = VarLongCoder.of(); static final ResultCoder INSTANCE = new ResultCoder(); @Override public void encode(Result value, OutputStream outStream) throws IOException { FILENAMES_CODER.encode(value.getFilenames(), outStream); FIRST_PANE_CODER.encode(value.isFirstPane(), outStream); + PANE_INDEX_CODER.encode(value.getPaneIndex(), outStream); } @Override public Result decode(InputStream inStream) throws IOException { return new AutoValue_WritePartition_Result( - FILENAMES_CODER.decode(inStream), FIRST_PANE_CODER.decode(inStream)); + FILENAMES_CODER.decode(inStream), + FIRST_PANE_CODER.decode(inStream), + PANE_INDEX_CODER.decode(inStream)); } } @@ -234,7 +241,7 @@ public void processElement(ProcessContext c) throws Exception { KV.of( ShardedKey.of(destination, i + 1), new AutoValue_WritePartition_Result( - partitionData.getFilenames(), c.pane().isFirst()))); + partitionData.getFilenames(), c.pane().isFirst(), c.pane().getIndex()))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 9d798b397070..4add4228bdc5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -208,6 +208,20 @@ private PendingJobData startWriteRename( BigQueryResourceNaming.createJobIdWithDestination( c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex()); + if (isFirstPane) { + LOG.info( + "Setup write disposition {}, create disposition {} for first pane BigQuery job {}", + writeDisposition, + createDisposition, + jobIdPrefix); + } else { + LOG.debug( + "Setup write disposition {}, create disposition {} for BigQuery job {}", + writeDisposition, + createDisposition, + jobIdPrefix); + } + BigQueryHelpers.PendingJob retryJob = startCopy( bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index c6a7d32e2486..a44244d05588 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -36,7 +36,6 @@ import java.util.stream.Collectors; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BooleanCoder; -import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; @@ -75,10 +74,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +98,7 @@ class WriteTables @AutoValue abstract static class Result { abstract String getTableName(); - + // Downstream operations may rely on pane info which will get lost after a ReShuffle abstract Boolean isFirstPane(); } @@ -109,17 +106,13 @@ static class ResultCoder extends AtomicCoder { static final ResultCoder INSTANCE = new ResultCoder(); @Override - public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) - throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull - @Initialized IOException { + public void encode(Result value, OutputStream outStream) throws IOException { StringUtf8Coder.of().encode(value.getTableName(), outStream); BooleanCoder.of().encode(value.isFirstPane(), outStream); } @Override - public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) - throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull - @Initialized IOException { + public Result decode(InputStream inStream) throws IOException { return new AutoValue_WriteTables_Result( StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); } @@ -156,27 +149,36 @@ private class WriteTablesDoFn private class PendingJobData { final BoundedWindow window; final BigQueryHelpers.PendingJob retryJob; - final List partitionFiles; + final WritePartition.Result partitionResult; final TableDestination tableDestination; final TableReference tableReference; final DestinationT destinationT; - final boolean isFirstPane; public PendingJobData( BoundedWindow window, BigQueryHelpers.PendingJob retryJob, - List partitionFiles, + WritePartition.Result partitionResult, TableDestination tableDestination, TableReference tableReference, - DestinationT destinationT, - boolean isFirstPane) { + DestinationT destinationT) { this.window = window; this.retryJob = retryJob; - this.partitionFiles = partitionFiles; + this.partitionResult = partitionResult; this.tableDestination = tableDestination; this.tableReference = tableReference; this.destinationT = destinationT; - this.isFirstPane = isFirstPane; + } + + public List paritionFiles() { + return partitionResult.getFilenames(); + } + + public boolean isFirstPane() { + return partitionResult.isFirstPane(); + } + + public long paneIndex() { + return partitionResult.getPaneIndex(); } } // All pending load jobs. @@ -251,7 +253,10 @@ public void processElement( List partitionFiles = Lists.newArrayList(element.getValue().getFilenames()); String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( - c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()); + c.sideInput(loadJobIdPrefixView), + tableDestination, + partition, + element.getValue().getPaneIndex()); if (tempTable) { if (tempDataset != null) { @@ -291,13 +296,7 @@ public void processElement( pendingJobs.add( new PendingJobData( - window, - retryJob, - partitionFiles, - tableDestination, - tableReference, - destination, - element.getValue().isFirstPane())); + window, retryJob, element.getValue(), tableDestination, tableReference, destination)); } @Teardown @@ -361,13 +360,13 @@ public void finishBundle(FinishBundleContext c) throws Exception { Result result = new AutoValue_WriteTables_Result( BigQueryHelpers.toJsonString(pendingJob.tableReference), - pendingJob.isFirstPane); + pendingJob.isFirstPane()); c.output( mainOutputTag, KV.of(pendingJob.destinationT, result), pendingJob.window.maxTimestamp(), pendingJob.window); - for (String file : pendingJob.partitionFiles) { + for (String file : pendingJob.paritionFiles()) { c.output( temporaryFilesTag, file, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java index 81de67f38502..fc3ce0be4b69 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java @@ -33,9 +33,16 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -81,11 +88,32 @@ public void processElement(ProcessContext c) { } } - private GenerateSequence stream(int rowCount) { - int timestampIntervalInMilliseconds = 10; - return GenerateSequence.from(0) - .to(rowCount) - .withRate(1, Duration.millis(timestampIntervalInMilliseconds)); + static class UnboundedStream extends PTransform> { + + private final int rowCount; + + public UnboundedStream(int rowCount) { + this.rowCount = rowCount; + } + + @Override + public PCollection expand(PBegin input) { + int timestampIntervalInMillis = 10; + PeriodicImpulse impulse = + PeriodicImpulse.create() + .stopAfter(Duration.millis((long) timestampIntervalInMillis * rowCount - 1)) + .withInterval(Duration.millis(timestampIntervalInMillis)); + return input + .apply(impulse) + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public Long apply(Instant input) { + return input.getMillis(); + } + })); + } } private void runBigQueryIOStorageWritePipeline( @@ -102,7 +130,9 @@ private void runBigQueryIOStorageWritePipeline( new TableFieldSchema().setName("str").setType("STRING"))); Pipeline p = Pipeline.create(bqOptions); - p.apply("Input", isStreaming ? stream(rowCount) : GenerateSequence.from(0).to(rowCount)) + p.apply( + "Input", + isStreaming ? new UnboundedStream(rowCount) : GenerateSequence.from(0).to(rowCount)) .apply("GenerateMessage", ParDo.of(new FillRowFn())) .apply( "WriteToBQ", diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 720419f2227a..adc3867ca7e3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2444,6 +2444,7 @@ public TableSchema getSchema(String destination) { @Test public void testWriteTables() throws Exception { + assumeTrue(!useStorageApi); long numTables = 3; long numPartitions = 3; long numFilesPerPartition = 10; @@ -2475,7 +2476,7 @@ public void testWriteTables() throws Exception { partitions.add( KV.of( ShardedKey.of(tableDestination.getTableSpec(), j), - new AutoValue_WritePartition_Result(filesPerPartition, true))); + new AutoValue_WritePartition_Result(filesPerPartition, true, 0L))); String json = String.format( @@ -2554,6 +2555,7 @@ public void processElement( @Test public void testRemoveTemporaryFiles() throws Exception { + assumeTrue(!useStorageApi); int numFiles = 10; List fileNames = Lists.newArrayList(); String tempFilePrefix = options.getTempLocation() + "/"; @@ -2573,6 +2575,7 @@ public void testRemoveTemporaryFiles() throws Exception { @Test public void testWriteRename() throws Exception { + assumeTrue(!useStorageApi); p.enableAbandonedNodeEnforcement(false); final int numFinalTables = 3; @@ -2652,6 +2655,7 @@ public void testWriteRename() throws Exception { @Test public void testRemoveTemporaryTables() throws Exception { + assumeTrue(!useStorageApi); FakeDatasetService datasetService = new FakeDatasetService(); String projectId = "project"; String datasetId = "dataset";