diff --git a/CHANGES.md b/CHANGES.md index 93353245042a..8e94058dd7b4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,6 +77,15 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* [Python] Reshuffle now preserves PaneInfo, where previously PaneInfo was lost +after reshuffle. To opt out of this change, set the +update_compatibility_version to a previous Beam version e.g. "2.64.0". +([#34348](https://github.com/apache/beam/pull/34348)) +* [Python] BigQueryFileLoads now adds a Reshuffle before triggering load jobs. +This fixes a bug where there can be data loss in a streaming pipeline if there +is a pending load job during autoscaling. To opt out of this change, set the +update_compatibility_version to a previous Beam version e.g. "2.64.0". +([#34657](https://github.com/apache/beam/pull/34657)) ## Deprecations diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 3145fb511068..2f0375859235 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -44,8 +44,8 @@ from apache_beam.options import value_provider as vp from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.transforms import trigger +from apache_beam.transforms import util from apache_beam.transforms.display import DisplayDataItem -from apache_beam.transforms.util import GroupIntoBatches from apache_beam.transforms.window import GlobalWindows # Protect against environments where bigquery library is not available. @@ -1062,7 +1062,7 @@ def _write_files_with_auto_sharding( destination_data_kv_pc | 'ToHashableTableRef' >> beam.Map(bigquery_tools.to_hashable_table_ref) - | 'WithAutoSharding' >> GroupIntoBatches.WithShardedKey( + | 'WithAutoSharding' >> util.GroupIntoBatches.WithShardedKey( batch_size=_FILE_TRIGGERING_RECORD_COUNT, max_buffering_duration_secs=_FILE_TRIGGERING_BATCHING_DURATION_SECS, clock=clock) @@ -1101,6 +1101,18 @@ def _load_data( of the load jobs would fail but not other. If any of them fails, then copy jobs are not triggered. """ + self.reshuffle_before_load = not util.is_compat_version_prior_to( + p.options, "2.65.0") + if self.reshuffle_before_load: + # Ensure that TriggerLoadJob retry inputs are deterministic by breaking + # fusion for inputs. + partitions_using_temp_tables = ( + partitions_using_temp_tables + | "ReshuffleBeforeLoadWithTempTables" >> beam.Reshuffle()) + partitions_direct_to_destination = ( + partitions_direct_to_destination + | "ReshuffleBeforeLoadWithoutTempTables" >> beam.Reshuffle()) + # Load data using temp tables trigger_loads_outputs = ( partitions_using_temp_tables diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 10453d9c8baf..6908a0fb0392 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -478,6 +478,44 @@ def test_records_traverse_transform_with_mocks(self): assert_that(jobs, equal_to([job_reference]), label='CheckJobs') + @parameterized.expand([ + param(compat_version=None), + param(compat_version="2.64.0"), + ]) + def test_reshuffle_before_load(self, compat_version): + destination = 'project1:dataset1.table1' + + job_reference = bigquery_api.JobReference() + job_reference.projectId = 'project1' + job_reference.jobId = 'job_name1' + result_job = bigquery_api.Job() + result_job.jobReference = job_reference + + mock_job = mock.Mock() + mock_job.status.state = 'DONE' + mock_job.status.errorResult = None + mock_job.jobReference = job_reference + + bq_client = mock.Mock() + bq_client.jobs.Get.return_value = mock_job + + bq_client.jobs.Insert.return_value = result_job + + transform = bqfl.BigQueryBatchFileLoads( + destination, + custom_gcs_temp_location=self._new_tempdir(), + test_client=bq_client, + validate=False, + temp_file_format=bigquery_tools.FileFormat.JSON) + + options = PipelineOptions(update_compatibility_version=compat_version) + # Need to test this with the DirectRunner to avoid serializing mocks + with TestPipeline('DirectRunner', options=options) as p: + _ = p | beam.Create(_ELEMENTS) | transform + + reshuffle_before_load = compat_version is None + assert transform.reshuffle_before_load == reshuffle_before_load + def test_load_job_id_used(self): job_reference = bigquery_api.JobReference() job_reference.projectId = 'loadJobProject' @@ -774,11 +812,16 @@ def test_multiple_partition_files_write_dispositions( self.assertEqual(mock_call_process.call_count, 1) @parameterized.expand([ - param(is_streaming=False, with_auto_sharding=False), - param(is_streaming=True, with_auto_sharding=False), - param(is_streaming=True, with_auto_sharding=True), + param(is_streaming=False, with_auto_sharding=False, compat_version=None), + param(is_streaming=True, with_auto_sharding=False, compat_version=None), + param(is_streaming=True, with_auto_sharding=True, compat_version=None), + param( + is_streaming=True, with_auto_sharding=False, compat_version="2.64.0"), + param( + is_streaming=True, with_auto_sharding=True, compat_version="2.64.0"), ]) - def test_triggering_frequency(self, is_streaming, with_auto_sharding): + def test_triggering_frequency( + self, is_streaming, with_auto_sharding, compat_version): destination = 'project1:dataset1.table1' job_reference = bigquery_api.JobReference() @@ -820,7 +863,9 @@ def __call__(self): with_auto_sharding=with_auto_sharding) # Need to test this with the DirectRunner to avoid serializing mocks - test_options = PipelineOptions(flags=['--allow_unsafe_triggers']) + test_options = PipelineOptions( + flags=['--allow_unsafe_triggers'], + update_compatibility_version=compat_version) test_options.view_as(StandardOptions).streaming = is_streaming with TestPipeline(runner='BundleBasedDirectRunner', options=test_options) as p: