From e2785b78097bf61b45c4fc21439ec53a30272b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=ED=95=98=EC=A4=80?= Date: Sun, 2 Nov 2025 20:42:16 +0900 Subject: [PATCH 1/4] Add flatten_structure parameter to GCSToS3Operator --- .../amazon/aws/transfers/gcs_to_s3.py | 46 ++++++- .../amazon/aws/transfers/test_gcs_to_s3.py | 127 ++++++++++++++++++ 2 files changed, 169 insertions(+), 4 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 92289a89638b4..675bc40cf9276 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -79,6 +79,9 @@ class GCSToS3Operator(BaseOperator): object to be uploaded in S3 :param keep_directory_structure: (Optional) When set to False the path of the file on the bucket is recreated within path passed in dest_s3_key. + :param flatten_structure: (Optional) When set to True, places all files directly + in the dest_s3_key directory without preserving subdirectory structure. + Overrides keep_directory_structure when enabled. :param match_glob: (Optional) filters objects based on the glob pattern given by the string (e.g, ``'**/*/.json'``) :param gcp_user_project: (Optional) The identifier of the Google Cloud project to bill for this request. @@ -108,6 +111,7 @@ def __init__( dest_s3_extra_args: dict | None = None, s3_acl_policy: str | None = None, keep_directory_structure: bool = True, + flatten_structure: bool = False, match_glob: str | None = None, gcp_user_project: str | None = None, **kwargs, @@ -124,6 +128,10 @@ def __init__( self.dest_s3_extra_args = dest_s3_extra_args or {} self.s3_acl_policy = s3_acl_policy self.keep_directory_structure = keep_directory_structure + self.flatten_structure = flatten_structure + + if self.flatten_structure and self.keep_directory_structure: + self.log.warning("flatten_structure=True overrides keep_directory_structure=True") try: from airflow.providers.google import __version__ as _GOOGLE_PROVIDER_VERSION @@ -140,6 +148,17 @@ def __init__( self.match_glob = match_glob self.gcp_user_project = gcp_user_project + def _transform_file_path(self, file_path: str) -> str: + """ + Transform the GCS file path according to the specified options. + + :param file_path: The original GCS file path + :return: The transformed file path for S3 destination + """ + if self.flatten_structure: + return os.path.basename(file_path) + return file_path + def execute(self, context: Context) -> list[str]: # list all files in an Google Cloud Storage bucket gcs_hook = GCSHook( @@ -167,7 +186,7 @@ def execute(self, context: Context) -> list[str]: aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify, extra_args=self.dest_s3_extra_args ) - if not self.keep_directory_structure and self.prefix: + if not self.keep_directory_structure and self.prefix and not self.flatten_structure: self.dest_s3_key = os.path.join(self.dest_s3_key, self.prefix) if not self.replace: @@ -187,15 +206,34 @@ def execute(self, context: Context) -> list[str]: existing_files = existing_files or [] # remove the prefix for the existing files to allow the match existing_files = [file.replace(prefix, "", 1) for file in existing_files] - gcs_files = list(set(gcs_files) - set(existing_files)) + + # Transform GCS files for comparison and filter out existing ones + existing_files_set = set(existing_files) + filtered_files = [] + seen_transformed = set() + + for file in gcs_files: + transformed = self._transform_file_path(file) + if transformed not in existing_files_set and transformed not in seen_transformed: + filtered_files.append(file) + seen_transformed.add(transformed) + elif transformed in seen_transformed: + self.log.warning( + "Skipping duplicate file %s (transforms to %s)", + file, + transformed, + ) + + gcs_files = filtered_files if gcs_files: for file in gcs_files: with gcs_hook.provide_file( object_name=file, bucket_name=str(self.gcs_bucket), user_project=self.gcp_user_project ) as local_tmp_file: - dest_key = os.path.join(self.dest_s3_key, file) - self.log.info("Saving file to %s", dest_key) + transformed_path = self._transform_file_path(file) + dest_key = os.path.join(self.dest_s3_key, transformed_path) + self.log.info("Saving file from %s to %s", file, dest_key) s3_hook.load_file( filename=local_tmp_file.name, key=dest_key, diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py index c64df03bc4fae..f07d16eec646f 100644 --- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py @@ -330,6 +330,133 @@ def test_execute_without_keep_director_structure(self, mock_hook): assert sorted(MOCK_FILES) == sorted(uploaded_files) assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX + "/", delimiter="/") is True + @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") + def test_execute_with_flatten_structure(self, mock_hook): + """Test that flatten_structure parameter flattens directory structure.""" + mock_files_with_paths = ["dir1/subdir1/file1.csv", "dir2/subdir2/file2.csv", "dir3/file3.csv"] + mock_hook.return_value.list.return_value = mock_files_with_paths + + with NamedTemporaryFile() as f: + gcs_provide_file = mock_hook.return_value.provide_file + gcs_provide_file.return_value.__enter__.return_value.name = f.name + + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + prefix=PREFIX, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + flatten_structure=True, + ) + hook, _ = _create_test_bucket() + + uploaded_files = operator.execute(None) + + # Verify all files were uploaded + assert sorted(mock_files_with_paths) == sorted(uploaded_files) + + # Verify files are stored with flattened structure (only filenames) + expected_s3_keys = ["file1.csv", "file2.csv", "file3.csv"] + actual_keys = hook.list_keys("bucket", delimiter="/") + assert sorted(expected_s3_keys) == sorted(actual_keys) + + @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") + def test_execute_with_flatten_structure_duplicate_filenames(self, mock_hook): + """Test that flatten_structure handles duplicate filenames correctly.""" + mock_files_with_duplicates = [ + "dir1/file.csv", + "dir2/file.csv", # Same filename as above + "dir3/other.csv", + ] + mock_hook.return_value.list.return_value = mock_files_with_duplicates + + with NamedTemporaryFile() as f: + gcs_provide_file = mock_hook.return_value.provide_file + gcs_provide_file.return_value.__enter__.return_value.name = f.name + + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + prefix=PREFIX, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + flatten_structure=True, + ) + _, _ = _create_test_bucket() + + # Mock the logging to verify warning is logged + with mock.patch.object(operator, "log") as mock_log: + uploaded_files = operator.execute(None) + + # Only one of the duplicate files should be uploaded + assert len(uploaded_files) == 2 + assert "dir3/other.csv" in uploaded_files + first_or_second = "dir1/file.csv" in uploaded_files or "dir2/file.csv" in uploaded_files + assert first_or_second + + # Verify warning was logged for duplicate + mock_log.warning.assert_called() + + def test_execute_with_flatten_structure_and_keep_directory_structure_warning(self): + """Test warning when both flatten_structure and keep_directory_structure are True.""" + mock_path = "airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log" + with mock.patch(mock_path) as mock_log: + GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + prefix=PREFIX, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + flatten_structure=True, + keep_directory_structure=True, # This should trigger warning + ) + + # Verify warning was logged during initialization + expected_warning = "flatten_structure=True overrides keep_directory_structure=True" + mock_log.warning.assert_called_once_with(expected_warning) + + def test_transform_file_path_with_flatten_structure(self): + """Test _transform_file_path method with flatten_structure=True.""" + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + dest_s3_key=S3_BUCKET, + flatten_structure=True, + ) + + # Test various file paths + result1 = operator._transform_file_path("dir1/subdir1/file.csv") + assert result1 == "file.csv" + + result2 = operator._transform_file_path("path/to/deep/nested/file.txt") + assert result2 == "file.txt" + + result3 = operator._transform_file_path("simple.txt") + assert result3 == "simple.txt" + + result4 = operator._transform_file_path("") + assert result4 == "" + + def test_transform_file_path_without_flatten_structure(self): + """Test _transform_file_path method with flatten_structure=False (default).""" + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + dest_s3_key=S3_BUCKET, + flatten_structure=False, + ) + + # Test that paths are preserved + test_path = "dir1/subdir1/file.csv" + result1 = operator._transform_file_path(test_path) + assert result1 == test_path + + test_path2 = "path/to/deep/nested/file.txt" + result2 = operator._transform_file_path(test_path2) + assert result2 == test_path2 + @pytest.mark.parametrize( ("gcs_prefix", "dest_s3_key", "expected_input", "expected_output"), [ From 08e811cad7c3f47df21fe33931398f0f03556456 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=ED=95=98=EC=A4=80?= Date: Mon, 3 Nov 2025 23:54:47 +0900 Subject: [PATCH 2/4] Enhance GCSToS3Operator docs clarity and refactor tests to use pytest.mark.parametrize for better maintainability --- .../amazon/aws/transfers/gcs_to_s3.py | 5 +- .../amazon/aws/transfers/test_gcs_to_s3.py | 53 +++++++------------ 2 files changed, 24 insertions(+), 34 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 675bc40cf9276..5768513f9d6d2 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -39,6 +39,9 @@ class GCSToS3Operator(BaseOperator): """ Synchronizes a Google Cloud Storage bucket with an S3 bucket. + .. note:: + When flatten_structure=True, it takes precedence over keep_directory_structure. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GCSToS3Operator` @@ -81,7 +84,7 @@ class GCSToS3Operator(BaseOperator): on the bucket is recreated within path passed in dest_s3_key. :param flatten_structure: (Optional) When set to True, places all files directly in the dest_s3_key directory without preserving subdirectory structure. - Overrides keep_directory_structure when enabled. + Takes precedence over keep_directory_structure when enabled. :param match_glob: (Optional) filters objects based on the glob pattern given by the string (e.g, ``'**/*/.json'``) :param gcp_user_project: (Optional) The identifier of the Google Cloud project to bill for this request. diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py index f07d16eec646f..0a94a954f1216 100644 --- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py @@ -417,45 +417,32 @@ def test_execute_with_flatten_structure_and_keep_directory_structure_warning(sel expected_warning = "flatten_structure=True overrides keep_directory_structure=True" mock_log.warning.assert_called_once_with(expected_warning) - def test_transform_file_path_with_flatten_structure(self): - """Test _transform_file_path method with flatten_structure=True.""" - operator = GCSToS3Operator( - task_id=TASK_ID, - gcs_bucket=GCS_BUCKET, - dest_s3_key=S3_BUCKET, - flatten_structure=True, - ) - - # Test various file paths - result1 = operator._transform_file_path("dir1/subdir1/file.csv") - assert result1 == "file.csv" - - result2 = operator._transform_file_path("path/to/deep/nested/file.txt") - assert result2 == "file.txt" - - result3 = operator._transform_file_path("simple.txt") - assert result3 == "simple.txt" - - result4 = operator._transform_file_path("") - assert result4 == "" - - def test_transform_file_path_without_flatten_structure(self): - """Test _transform_file_path method with flatten_structure=False (default).""" + @pytest.mark.parametrize( + ("flatten_structure", "input_path", "expected_output"), + [ + # Tests with flatten_structure=True + (True, "dir1/subdir1/file.csv", "file.csv"), + (True, "path/to/deep/nested/file.txt", "file.txt"), + (True, "simple.txt", "simple.txt"), + (True, "", ""), + # Tests with flatten_structure=False (preserves original paths) + (False, "dir1/subdir1/file.csv", "dir1/subdir1/file.csv"), + (False, "path/to/deep/nested/file.txt", "path/to/deep/nested/file.txt"), + (False, "simple.txt", "simple.txt"), + (False, "", ""), + ], + ) + def test_transform_file_path(self, flatten_structure, input_path, expected_output): + """Test _transform_file_path method with various flatten_structure settings.""" operator = GCSToS3Operator( task_id=TASK_ID, gcs_bucket=GCS_BUCKET, dest_s3_key=S3_BUCKET, - flatten_structure=False, + flatten_structure=flatten_structure, ) - # Test that paths are preserved - test_path = "dir1/subdir1/file.csv" - result1 = operator._transform_file_path(test_path) - assert result1 == test_path - - test_path2 = "path/to/deep/nested/file.txt" - result2 = operator._transform_file_path(test_path2) - assert result2 == test_path2 + result = operator._transform_file_path(input_path) + assert result == expected_output @pytest.mark.parametrize( ("gcs_prefix", "dest_s3_key", "expected_input", "expected_output"), From 701aba695edf11ef6bbf5991d61431c7a559c133 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=ED=95=98=EC=A4=80?= Date: Wed, 5 Nov 2025 02:20:05 +0900 Subject: [PATCH 3/4] Improve GCSToS3Operator documentation consistency and add concrete examples - Use consistent "takes precedence over" terminology in warning messages - Add specific file path transformation example in class docstring --- .../src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py | 4 +++- .../amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 5768513f9d6d2..4a0f078f95206 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -41,6 +41,8 @@ class GCSToS3Operator(BaseOperator): .. note:: When flatten_structure=True, it takes precedence over keep_directory_structure. + For example, with flatten_structure=True, "folder/subfolder/file.txt" becomes "file.txt" + regardless of the keep_directory_structure setting. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -134,7 +136,7 @@ def __init__( self.flatten_structure = flatten_structure if self.flatten_structure and self.keep_directory_structure: - self.log.warning("flatten_structure=True overrides keep_directory_structure=True") + self.log.warning("flatten_structure=True takes precedence over keep_directory_structure=True") try: from airflow.providers.google import __version__ as _GOOGLE_PROVIDER_VERSION diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py index 0a94a954f1216..4a8ab5ca29390 100644 --- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py @@ -414,7 +414,7 @@ def test_execute_with_flatten_structure_and_keep_directory_structure_warning(sel ) # Verify warning was logged during initialization - expected_warning = "flatten_structure=True overrides keep_directory_structure=True" + expected_warning = "flatten_structure=True takes precedence over keep_directory_structure=True" mock_log.warning.assert_called_once_with(expected_warning) @pytest.mark.parametrize( From 44b56fa7ae27aad0c63b5692b108b5e81364b9aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=ED=95=98=EC=A4=80?= Date: Tue, 11 Nov 2025 23:37:14 +0900 Subject: [PATCH 4/4] Fix GCSToS3Operator test log mocking for Airflow SDK compatibility - Replace mock.patch.object() with mock.patch() string path to handle - read-only log property in new Airflow SDK. --- .../amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py index 4a8ab5ca29390..014f26f494348 100644 --- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py @@ -387,7 +387,8 @@ def test_execute_with_flatten_structure_duplicate_filenames(self, mock_hook): _, _ = _create_test_bucket() # Mock the logging to verify warning is logged - with mock.patch.object(operator, "log") as mock_log: + mock_path = "airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log" + with mock.patch(mock_path) as mock_log: uploaded_files = operator.execute(None) # Only one of the duplicate files should be uploaded