From 3422fbae1a172030e04e8b78019df535907ce495 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 17 Apr 2025 10:23:53 -0400 Subject: [PATCH 1/4] Add reshuffle before triggering load jobs. --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 3145fb511068..f9c99ae64d37 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -37,6 +37,7 @@ import apache_beam as beam from apache_beam import pvalue +from apache_beam.transforms import util from apache_beam.io import filesystems as fs from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata @@ -1101,6 +1102,16 @@ def _load_data( of the load jobs would fail but not other. If any of them fails, then copy jobs are not triggered. """ + # Ensure that TriggerLoadJob retry inputs are deterministic by breaking + # fusion for inputs. + if not util.is_compat_version_prior_to(p.options, "2.65.0"): + partitions_using_temp_tables = ( + partitions_using_temp_tables + | "ReshuffleBeforeLoadWithTempTables" >> beam.Reshuffle()) + partitions_direct_to_destination = ( + partitions_direct_to_destination + | "ReshuffleBeforeLoadWithoutTempTables" >> beam.Reshuffle()) + # Load data using temp tables trigger_loads_outputs = ( partitions_using_temp_tables From 56b286cefabcaefe551785a048ff4413e79722a8 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 17 Apr 2025 11:24:52 -0400 Subject: [PATCH 2/4] Add tests. --- .../apache_beam/io/gcp/bigquery_file_loads.py | 13 ++++--- .../io/gcp/bigquery_file_loads_test.py | 38 +++++++++++++++++++ 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index f9c99ae64d37..2f0375859235 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -37,7 +37,6 @@ import apache_beam as beam from apache_beam import pvalue -from apache_beam.transforms import util from apache_beam.io import filesystems as fs from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata @@ -45,8 +44,8 @@ from apache_beam.options import value_provider as vp from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.transforms import trigger +from apache_beam.transforms import util from apache_beam.transforms.display import DisplayDataItem -from apache_beam.transforms.util import GroupIntoBatches from apache_beam.transforms.window import GlobalWindows # Protect against environments where bigquery library is not available. @@ -1063,7 +1062,7 @@ def _write_files_with_auto_sharding( destination_data_kv_pc | 'ToHashableTableRef' >> beam.Map(bigquery_tools.to_hashable_table_ref) - | 'WithAutoSharding' >> GroupIntoBatches.WithShardedKey( + | 'WithAutoSharding' >> util.GroupIntoBatches.WithShardedKey( batch_size=_FILE_TRIGGERING_RECORD_COUNT, max_buffering_duration_secs=_FILE_TRIGGERING_BATCHING_DURATION_SECS, clock=clock) @@ -1102,9 +1101,11 @@ def _load_data( of the load jobs would fail but not other. If any of them fails, then copy jobs are not triggered. """ - # Ensure that TriggerLoadJob retry inputs are deterministic by breaking - # fusion for inputs. - if not util.is_compat_version_prior_to(p.options, "2.65.0"): + self.reshuffle_before_load = not util.is_compat_version_prior_to( + p.options, "2.65.0") + if self.reshuffle_before_load: + # Ensure that TriggerLoadJob retry inputs are deterministic by breaking + # fusion for inputs. partitions_using_temp_tables = ( partitions_using_temp_tables | "ReshuffleBeforeLoadWithTempTables" >> beam.Reshuffle()) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 10453d9c8baf..843fa3c25ba1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -478,6 +478,44 @@ def test_records_traverse_transform_with_mocks(self): assert_that(jobs, equal_to([job_reference]), label='CheckJobs') + @parameterized.expand([ + param(compat_version=None), + param(compat_version="2.64.0"), + ]) + def test_reshuffle_before_load(self, compat_version): + destination = 'project1:dataset1.table1' + + job_reference = bigquery_api.JobReference() + job_reference.projectId = 'project1' + job_reference.jobId = 'job_name1' + result_job = bigquery_api.Job() + result_job.jobReference = job_reference + + mock_job = mock.Mock() + mock_job.status.state = 'DONE' + mock_job.status.errorResult = None + mock_job.jobReference = job_reference + + bq_client = mock.Mock() + bq_client.jobs.Get.return_value = mock_job + + bq_client.jobs.Insert.return_value = result_job + + transform = bqfl.BigQueryBatchFileLoads( + destination, + custom_gcs_temp_location=self._new_tempdir(), + test_client=bq_client, + validate=False, + temp_file_format=bigquery_tools.FileFormat.JSON) + + options = PipelineOptions(update_compatibility_version=compat_version) + # Need to test this with the DirectRunner to avoid serializing mocks + with TestPipeline('DirectRunner', options=options) as p: + _ = p | beam.Create(_ELEMENTS) | transform + + reshuffle_before_load = compat_version is None + assert transform.reshuffle_before_load == reshuffle_before_load + def test_load_job_id_used(self): job_reference = bigquery_api.JobReference() job_reference.projectId = 'loadJobProject' From 0d72cac6de385f635ec258e8a89ec786acfaf203 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 28 Apr 2025 17:42:53 -0400 Subject: [PATCH 3/4] Testing. --- .../io/gcp/bigquery_file_loads_test.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 843fa3c25ba1..6908a0fb0392 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -812,11 +812,16 @@ def test_multiple_partition_files_write_dispositions( self.assertEqual(mock_call_process.call_count, 1) @parameterized.expand([ - param(is_streaming=False, with_auto_sharding=False), - param(is_streaming=True, with_auto_sharding=False), - param(is_streaming=True, with_auto_sharding=True), + param(is_streaming=False, with_auto_sharding=False, compat_version=None), + param(is_streaming=True, with_auto_sharding=False, compat_version=None), + param(is_streaming=True, with_auto_sharding=True, compat_version=None), + param( + is_streaming=True, with_auto_sharding=False, compat_version="2.64.0"), + param( + is_streaming=True, with_auto_sharding=True, compat_version="2.64.0"), ]) - def test_triggering_frequency(self, is_streaming, with_auto_sharding): + def test_triggering_frequency( + self, is_streaming, with_auto_sharding, compat_version): destination = 'project1:dataset1.table1' job_reference = bigquery_api.JobReference() @@ -858,7 +863,9 @@ def __call__(self): with_auto_sharding=with_auto_sharding) # Need to test this with the DirectRunner to avoid serializing mocks - test_options = PipelineOptions(flags=['--allow_unsafe_triggers']) + test_options = PipelineOptions( + flags=['--allow_unsafe_triggers'], + update_compatibility_version=compat_version) test_options.view_as(StandardOptions).streaming = is_streaming with TestPipeline(runner='BundleBasedDirectRunner', options=test_options) as p: From bef4184aba5ed33e22e3c2fe7c633f8eb7a1ea82 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 28 Apr 2025 19:09:50 -0400 Subject: [PATCH 4/4] Add to changes.md. --- CHANGES.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 93353245042a..8e94058dd7b4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,6 +77,15 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* [Python] Reshuffle now preserves PaneInfo, where previously PaneInfo was lost +after reshuffle. To opt out of this change, set the +update_compatibility_version to a previous Beam version e.g. "2.64.0". +([#34348](https://github.com/apache/beam/pull/34348)) +* [Python] BigQueryFileLoads now adds a Reshuffle before triggering load jobs. +This fixes a bug where there can be data loss in a streaming pipeline if there +is a pending load job during autoscaling. To opt out of this change, set the +update_compatibility_version to a previous Beam version e.g. "2.64.0". +([#34657](https://github.com/apache/beam/pull/34657)) ## Deprecations