diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index f8041316bc7e..1828192a2b46 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -47,9 +47,12 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; @@ -265,31 +268,44 @@ public void validate(PipelineOptions options) { // Expand the pipeline when the user has requested periodically-triggered file writes. private WriteResult expandTriggered(PCollection> input) { - checkArgument(numFileShards > 0); Pipeline p = input.getPipeline(); final PCollectionView loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD); final PCollectionView copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY); final PCollectionView tempFilePrefixView = createTempFilePrefixView(p, loadJobIdPrefixView); - // The user-supplied triggeringDuration is often chosen to control how many BigQuery load - // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this - // is set to a large value, currently we have to buffer all the data until the trigger fires. - // Instead we ensure that the files are written if a threshold number of records are ready. - // We use only the user-supplied trigger on the actual BigQuery load. This allows us to - // offload the data to the filesystem. - PCollection> inputInGlobalWindow = - input.apply( - "rewindowIntoGlobal", - Window.>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(triggeringFrequency), - AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT)))) - .discardingFiredPanes()); - PCollection> results = - writeShardedFiles(inputInGlobalWindow, tempFilePrefixView); + PCollection> results; + if (numFileShards > 0) { + // The user-supplied triggeringFrequency is often chosen to control how many BigQuery load + // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this + // is set to a large value, currently we have to buffer all the data until the trigger fires. + // Instead we ensure that the files are written if a threshold number of records are ready. + // We use only the user-supplied trigger on the actual BigQuery load. This allows us to + // offload the data to the filesystem. + PCollection> inputInGlobalWindow = + input.apply( + "rewindowIntoGlobal", + Window.>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(triggeringFrequency), + AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT)))) + .discardingFiredPanes()); + results = writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView); + } else { + // In the case of dynamic sharding, however, we use a default triggering and instead apply the + // user supplied triggeringFrequency to the sharding transform. See + // writeDynamicallyShardedFilesTriggered. + PCollection> inputInGlobalWindow = + input.apply( + "rewindowIntoGlobal", + Window.>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + results = writeDynamicallyShardedFilesTriggered(inputInGlobalWindow, tempFilePrefixView); + } + // Apply the user's trigger before we start generating BigQuery load jobs. results = results.apply( @@ -307,7 +323,7 @@ private WriteResult expandTriggered(PCollection> inpu new TupleTag<>("singlePartitionTag"); // If we have non-default triggered output, we can't use the side-input technique used in - // expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for + // expandUntriggered. Instead make the result list a main input. Apply a GroupByKey first for // determinism. PCollectionTuple partitions = results @@ -371,8 +387,8 @@ public WriteResult expandUntriggered(PCollection> inp .discardingFiredPanes()); PCollection> results = (numFileShards == 0) - ? writeDynamicallyShardedFiles(inputInGlobalWindow, tempFilePrefixView) - : writeShardedFiles(inputInGlobalWindow, tempFilePrefixView); + ? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow, tempFilePrefixView) + : writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView); TupleTag, List>> multiPartitionsTag = new TupleTag, List>>("multiPartitionsTag") {}; @@ -470,9 +486,10 @@ public void getTempFilePrefix(ProcessContext c) { .apply("TempFilePrefixView", View.asSingleton()); } - // Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename, - // file byte size, and table destination. - PCollection> writeDynamicallyShardedFiles( + // Writes input data to dynamically-sharded per-bundle files without triggering. Input records are + // spilt to new files if memory is constrained. Returns a PCollection of filename, file byte size, + // and table destination. + PCollection> writeDynamicallyShardedFilesUntriggered( PCollection> input, PCollectionView tempFilePrefix) { TupleTag> writtenFilesTag = new TupleTag>("writtenFiles") {}; @@ -513,9 +530,9 @@ PCollection> writeDynamicallyShardedFil .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); } - // Writes input data to statically-sharded files. Returns a PCollection of filename, - // file byte size, and table destination. - PCollection> writeShardedFiles( + // Writes input data to statically-sharded files. Returns a PCollection of filename, file byte + // size, and table destination. + PCollection> writeStaticallyShardedFiles( PCollection> input, PCollectionView tempFilePrefix) { checkState(numFileShards > 0); PCollection, ElementT>> shardedRecords = @@ -547,11 +564,66 @@ public void processElement( return writeShardedRecords(shardedRecords, tempFilePrefix); } + // Writes input data to dynamically-sharded files with triggering. The input data is sharded by + // table destinations and each destination may be sub-sharded dynamically. Returns a PCollection + // of filename, file byte size, and table destination. + PCollection> writeDynamicallyShardedFilesTriggered( + PCollection> input, PCollectionView tempFilePrefix) { + // In contrast to fixed sharding with triggering, here we use a global window with default + // trigger and apply the user supplied triggeringFrequency in the subsequent GroupIntoBatches + // transform. We also ensure that the files are written if a threshold number of records are + // ready. Dynamic sharding is achieved via the withShardedKey() option provided by + // GroupIntoBatches. + return input + .apply( + GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) + .withMaxBufferingDuration(triggeringFrequency) + .withShardedKey()) + .setCoder( + KvCoder.of( + org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder), + IterableCoder.of(elementCoder))) + .apply( + "StripShardId", + MapElements.via( + new SimpleFunction< + KV, Iterable>, + KV>>() { + @Override + public KV> apply( + KV, Iterable> + input) { + return KV.of(input.getKey().getKey(), input.getValue()); + } + })) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) + .apply( + "WriteGroupedRecords", + ParDo.of( + new WriteGroupedRecordsToFiles( + tempFilePrefix, maxFileSize, rowWriterFactory)) + .withSideInputs(tempFilePrefix)) + .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); + } + private PCollection> writeShardedRecords( PCollection, ElementT>> shardedRecords, PCollectionView tempFilePrefix) { return shardedRecords .apply("GroupByDestination", GroupByKey.create()) + .apply( + "StripShardId", + MapElements.via( + new SimpleFunction< + KV, Iterable>, + KV>>() { + @Override + public KV> apply( + KV, Iterable> input) { + return KV.of(input.getKey().getKey(), input.getValue()); + } + })) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) .apply( "WriteGroupedRecords", ParDo.of( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 1869c91f62af..1e789c1d5f8b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2267,7 +2267,8 @@ public Write withTriggeringFrequency(Duration triggeringFrequency) { /** * Control how many file shards are written when using BigQuery load jobs. Applicable only when - * also setting {@link #withTriggeringFrequency}. + * also setting {@link #withTriggeringFrequency}. To let runner determine the sharding at + * runtime, set {@link #withAutoSharding()} instead. */ public Write withNumFileShards(int numFileShards) { checkArgument(numFileShards > 0, "numFileShards must be > 0, but was: %s", numFileShards); @@ -2350,10 +2351,10 @@ public Write useBeamSchema() { } /** - * If true, enables dynamically determined number of shards to write to BigQuery. Only - * applicable to unbounded data with STREAMING_INSERTS. - * - *

TODO(BEAM-11408): Also integrate this option to FILE_LOADS. + * If true, enables using a dynamically determined number of shards to write to BigQuery. This + * can be used for both {@link Method#FILE_LOADS} and {@link Method#STREAMING_INSERTS}. Only + * applicable to unbounded data. If using {@link Method#FILE_LOADS}, numFileShards set via + * {@link #withNumFileShards} will be ignored. */ @Experimental public Write withAutoSharding() { @@ -2751,7 +2752,11 @@ private WriteResult continueExpandTyped( batchLoads.setMaxRetryJobs(1000); } batchLoads.setTriggeringFrequency(getTriggeringFrequency()); - batchLoads.setNumFileShards(getNumFileShards()); + if (getAutoSharding()) { + batchLoads.setNumFileShards(0); + } else { + batchLoads.setNumFileShards(getNumFileShards()); + } return input.apply(batchLoads); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java index 6db179bfeb8f..8c6366d0183b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -20,17 +20,14 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.ShardedKey; /** - * Receives elements grouped by their (sharded) destination, and writes them out to a file. Since - * all the elements in the {@link Iterable} are destined to the same table, they are all written to - * the same file. Ensures that only one {@link TableRowWriter} is active per bundle. + * Receives elements grouped by their destination, and writes them out to a file. Since all the + * elements in the {@link Iterable} are destined to the same table, they are all written to the same + * file. Ensures that only one {@link TableRowWriter} is active per bundle. */ class WriteGroupedRecordsToFiles - extends DoFn< - KV, Iterable>, - WriteBundlesToFiles.Result> { + extends DoFn>, WriteBundlesToFiles.Result> { private final PCollectionView tempFilePrefix; private final long maxFileSize; @@ -48,24 +45,24 @@ class WriteGroupedRecordsToFiles @ProcessElement public void processElement( ProcessContext c, - @Element KV, Iterable> element, + @Element KV> element, OutputReceiver> o) throws Exception { String tempFilePrefix = c.sideInput(this.tempFilePrefix); BigQueryRowWriter writer = - rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey().getKey()); + rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey()); try { for (ElementT tableRow : element.getValue()) { if (writer.getByteSize() > maxFileSize) { writer.close(); - writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey().getKey()); + writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey()); BigQueryRowWriter.Result result = writer.getResult(); o.output( new WriteBundlesToFiles.Result<>( - result.resourceId.toString(), result.byteSize, c.element().getKey().getKey())); + result.resourceId.toString(), result.byteSize, c.element().getKey())); } writer.write(tableRow); } @@ -76,6 +73,6 @@ public void processElement( BigQueryRowWriter.Result result = writer.getResult(); o.output( new WriteBundlesToFiles.Result<>( - result.resourceId.toString(), result.byteSize, c.element().getKey().getKey())); + result.resourceId.toString(), result.byteSize, c.element().getKey())); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 7534a86ed1ee..bfd6418f2165 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; @@ -62,8 +63,12 @@ public class FakeDatasetService implements DatasetService, Serializable { Map> insertErrors = Maps.newHashMap(); + // The counter for the number of insertions performed. + static AtomicInteger insertCount; + public static void setUp() { tables = HashBasedTable.create(); + insertCount = new AtomicInteger(0); FakeJobService.setUp(); } @@ -217,6 +222,10 @@ public void deleteDataset(String projectId, String datasetId) } } + public int getInsertCount() { + return insertCount.get(); + } + public long insertAll( TableReference ref, List rowList, @Nullable List insertIdList) throws IOException, InterruptedException { @@ -292,6 +301,7 @@ public long insertAll( failedInserts, allErrors.get(allErrors.size() - 1), ref, rowList.get(i)); } } + insertCount.addAndGet(1); return dataSize; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index d327eced3a1b..8d2f07a96680 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -57,6 +57,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -604,6 +605,74 @@ public void testTriggeredFileLoadsWithTempTablesDefaultProject() throws Exceptio testTriggeredFileLoadsWithTempTables("dataset-id.table-id"); } + @Test + public void testTriggeredFileLoadsWithAutoSharding() throws Exception { + List elements = Lists.newArrayList(); + for (int i = 0; i < 30; ++i) { + elements.add(new TableRow().set("number", i)); + } + + Instant startInstant = new Instant(0L); + TestStream testStream = + TestStream.create(TableRowJsonCoder.of()) + // Initialize watermark for timer to be triggered correctly. + .advanceWatermarkTo(startInstant) + .addElements( + elements.get(0), Iterables.toArray(elements.subList(1, 10), TableRow.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .advanceWatermarkTo(startInstant.plus(Duration.standardSeconds(10))) + .addElements( + elements.get(10), Iterables.toArray(elements.subList(11, 20), TableRow.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .advanceWatermarkTo(startInstant.plus(Duration.standardSeconds(30))) + .addElements( + elements.get(20), Iterables.toArray(elements.subList(21, 30), TableRow.class)) + .advanceProcessingTime(Duration.standardMinutes(2)) + .advanceWatermarkToInfinity(); + + int numTables = 3; + p.apply(testStream) + .apply( + BigQueryIO.writeTableRows() + .to( + (ValueInSingleWindow vsw) -> { + String tableSpec = + "project-id:dataset-id.table-" + + ((int) vsw.getValue().get("number") % numTables); + return new TableDestination(tableSpec, null); + }) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + // Set a triggering frequency without needing to also specify numFileShards when + // using autoSharding. + .withTriggeringFrequency(Duration.standardSeconds(100)) + .withAutoSharding() + .withMaxBytesPerPartition(1000) + .withMaxFilesPerPartition(10) + .withMethod(BigQueryIO.Write.Method.FILE_LOADS) + .withoutValidation()); + p.run(); + + Map> elementsByTableIdx = new HashMap<>(); + for (int i = 0; i < elements.size(); i++) { + elementsByTableIdx + .computeIfAbsent(i % numTables, k -> new ArrayList<>()) + .add(elements.get(i)); + } + for (Map.Entry> entry : elementsByTableIdx.entrySet()) { + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-" + entry.getKey()), + containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class))); + } + // For each table destination, it's expected to create two load jobs based on the triggering + // frequency and processing time intervals. + assertEquals(2 * numTables, fakeDatasetService.getInsertCount()); + } + @Test public void testFailuresNoRetryPolicy() throws Exception { TableRow row1 = new TableRow().set("name", "a").set("number", "1");