From 4c6e7038c8907a4fb87e0d50ab9238e5ccae78ba Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 14 Jan 2025 11:51:26 -0500 Subject: [PATCH 1/4] Change Reify.asIterable to GBK in BigQueryIO File loads --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 +- .../io/gcp/bigquery/CombineAsIterable.java | 44 +++++++++++++++++++ .../io/gcp/bigquery/BigQueryIOWriteTest.java | 2 +- 3 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CombineAsIterable.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 56bd14318be4..41fbf8e9dfee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -485,7 +485,7 @@ public WriteResult expandUntriggered(PCollection> inp // loading. PCollectionTuple partitions = results - .apply("ReifyResults", new ReifyAsIterable<>()) + .apply("ReifyResults", new CombineAsIterable<>()) .setCoder(IterableCoder.of(WriteBundlesToFiles.ResultCoder.of(destinationCoder))) .apply( "WritePartitionUntriggered", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CombineAsIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CombineAsIterable.java new file mode 100644 index 000000000000..ffa474b1fda8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CombineAsIterable.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +public class CombineAsIterable extends PTransform, PCollection>> { + @Override + public PCollection> expand(PCollection input) { + return input + .apply( + "assign single key", + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(@Element T element, OutputReceiver> o) { + o.output(KV.of(null, element)); + } + })) + .apply(GroupByKey.create()) + .apply(Values.create()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 57c71c023fcb..bca4f8ee1201 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -332,7 +332,7 @@ public void testWriteEmptyPCollection() throws Exception { .withoutValidation()); p.run(); - checkNotNull( + assertNull( fakeDatasetService.getTable( BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"))); } From bfea4d41dd296e5e4200f36aaa8ddc20ce758699 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 14 Jan 2025 12:46:53 -0500 Subject: [PATCH 2/4] trigger postcommit --- .github/trigger_files/beam_PostCommit_Java_DataflowV1.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index a03c067d2c4e..1efc8e9e4405 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -1,3 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 } From 6b7e9a6ba5f780c6af645fe4e976af2492890e55 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 22 Jan 2025 17:55:46 -0500 Subject: [PATCH 3/4] the change is only effective when explicitly enabled --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 11 +++++++++- .../sdk/io/gcp/bigquery/BigQueryOptions.java | 16 ++++++++++++++ .../io/gcp/bigquery/BigQueryIOWriteTest.java | 21 ++++++++++++++++++- 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 41fbf8e9dfee..8f97ef1f8c50 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -480,12 +480,21 @@ public WriteResult expandUntriggered(PCollection> inp TupleTag, WritePartition.Result>> singlePartitionTag = new TupleTag, WritePartition.Result>>("singlePartitionTag") {}; + PTransform< + PCollection>, + PCollection>>> + reifyTransform; + if (p.getOptions().as(BigQueryOptions.class).getGroupFilesFileLoad()) { + reifyTransform = new CombineAsIterable<>(); + } else { + reifyTransform = new ReifyAsIterable<>(); + } // This transform will look at the set of files written for each table, and if any table has // too many files or bytes, will partition that table's files into multiple partitions for // loading. PCollectionTuple partitions = results - .apply("ReifyResults", new CombineAsIterable<>()) + .apply("ReifyResults", reifyTransform) .setCoder(IterableCoder.of(WriteBundlesToFiles.ResultCoder.of(destinationCoder))) .apply( "WritePartitionUntriggered", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 5ddede38aa78..e02a150a4de6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -229,4 +229,20 @@ public interface BigQueryOptions String getBigQueryEndpoint(); void setBigQueryEndpoint(String value); + + /** + * Choose to use a GBK when gathering a list of files in batch FILE_LOAD. + * + *

The purpose of this option is to accommodate the runner compatibility, for example, some + * runners having known issues on large side input, turning on this option avoids trigger side + * input related issues. + * + *

This is an experimental pipeline option, no backward compatibility guaranteed. + */ + @Hidden + @Description("Whether involves side input when gathering a list of files in batch FILE_LOAD.") + @Default.Boolean(false) + Boolean getGroupFilesFileLoad(); + + void setGroupFilesFileLoad(Boolean value); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index bca4f8ee1201..dab65a945aac 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -316,11 +316,17 @@ abstract static class StringLongDestinations extends DynamicDestinations Date: Tue, 28 Jan 2025 16:38:53 -0500 Subject: [PATCH 4/4] update CHANGES.md --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 2c00cd02e7d1..03ddb805dfc9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,8 @@ * Support gcs-connector 3.x+ in GcsUtil ([#33368](https://github.com/apache/beam/pull/33368)) * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Introduced `--groupFilesFileLoad` pipeline option to mitigate side-input related issues in BigQueryIO + batch FILE_LOAD on certain runners (including Dataflow Runner V2) (Java) ([#33587](https://github.com/apache/beam/pull/33587)). ## New Features / Improvements