From abf8a88b5cabc58f9e59fb0850ff61530010860d Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 9 Feb 2025 20:55:29 +0100 Subject: [PATCH 01/18] Implement S3 dag bundle --- .../providers/amazon/aws/bundles/__init__.py | 16 ++ .../providers/amazon/aws/bundles/s3.py | 203 +++++++++++++++ .../tests/unit/amazon/aws/bundles/__init__.py | 16 ++ .../tests/unit/amazon/aws/bundles/test_s3.py | 246 ++++++++++++++++++ 4 files changed, 481 insertions(+) create mode 100644 providers/amazon/src/airflow/providers/amazon/aws/bundles/__init__.py create mode 100644 providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py create mode 100644 providers/amazon/tests/unit/amazon/aws/bundles/__init__.py create mode 100644 providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/__init__.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py new file mode 100644 index 0000000000000..ed073fe28e0fe --- /dev/null +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -0,0 +1,203 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from pathlib import Path +from typing import TYPE_CHECKING + +from airflow.dag_processing.bundles.base import BaseDagBundle +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.log.logging_mixin import LoggingMixin + +if TYPE_CHECKING: + from airflow.utils.types import ArgNotSet + + +class S3DagBundle(BaseDagBundle, LoggingMixin): + """ + S3 DAG bundle - exposes a directory in S3 as a DAG bundle. + + This allows Airflow to load DAGs directly from an S3 bucket. + + :param aws_conn_id: Airflow connection ID for AWS. Defaults to AwsBaseHook.default_conn_name. + :param bucket_name: The name of the S3 bucket containing the DAG files. + :param prefix: Optional subdirectory within the S3 bucket where the DAGs are stored. + If None, DAGs are assumed to be at the root of the bucket (Optional). + """ + + supports_versioning = False + + def __init__( + self, + *, + aws_conn_id: str | None | ArgNotSet = AwsBaseHook.default_conn_name, + bucket_name: str, + prefix: str | None = "", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.aws_conn_id = aws_conn_id + self.bucket_name = bucket_name + self.prefix = prefix + # Local path where S3 DAGs are downloaded. + self.s3_dags_root_dir: Path = self.base_dir.joinpath("s3") + # Local path where S3 DAGs are downloaded for current config. + self.s3_dags_dir: Path = self.s3_dags_root_dir.joinpath(self.name) + + try: + self.s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id, extra_args={}) # Initialize S3 hook. + except AirflowException as e: + self.log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) + + def _initialize(self): + with self.lock(): + if not self.s3_dags_dir.exists(): + self.log.info("Creating local DAGs directory: %s", self.s3_dags_dir) + os.makedirs(self.s3_dags_dir, exist_ok=True) + + if not self.s3_dags_dir.is_dir(): + raise AirflowException(f"Local DAGs path: {self.s3_dags_dir} is not a directory.") + + if not self.s3_hook.check_for_bucket(bucket_name=self.bucket_name): + raise AirflowException(f"S3 bucket '{self.bucket_name}' does not exist.") + + if not self.s3_hook.check_for_prefix( + bucket_name=self.bucket_name, prefix=self.prefix, delimiter="/" + ): + raise AirflowException(f"S3 prefix 's3://{self.bucket_name}/{self.prefix}' does not exist.") + + self._download_s3_dags() + self.refresh() + + def initialize(self) -> None: + self._initialize() + super().initialize() + + def _delete_stale_local_files(self, current_s3_objects: list[Path]): + current_s3_keys = {key for key in current_s3_objects} + + for item in self.s3_dags_dir.iterdir(): + item: Path # type: ignore[no-redef] + absolute_item_path = item.resolve() + + if absolute_item_path not in current_s3_keys: + try: + if item.is_file(): + item.unlink(missing_ok=True) + self.log.debug("Deleted stale local file: %s", item) + elif item.is_dir(): + # delete only when the folder is empty + if not os.listdir(item): + item.rmdir() + self.log.debug("Deleted stale empty directory: %s", item) + else: + self.log.debug("Skipping stale item of unknown type: %s", item) + except OSError as e: + self.log.error("Error deleting stale item %s: %s", item, e) + raise e + + def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path: Path): + should_download = False + download_msg = "" + if not local_target_path.exists(): + should_download = True + download_msg = f"Local file {local_target_path} does not exist." + else: + local_stats = local_target_path.stat() + + if s3_object.size != local_stats.st_size: + should_download = True + download_msg = ( + f"S3 object size ({s3_object.size}) and local file size ({local_stats.st_size}) differ." + ) + + s3_last_modified = s3_object.last_modified + if local_stats.st_mtime < s3_last_modified.microsecond: + should_download = True + download_msg = f"S3 object last modified ({s3_last_modified.microsecond}) and local file last modified ({local_stats.st_mtime}) differ." + + if should_download: + s3_bucket.download_file(s3_object.key, local_target_path) + self.log.debug( + "%s Downloaded %s to %s", download_msg, s3_object.key, local_target_path.as_posix() + ) + else: + self.log.debug( + "Local file %s is up-to-date with S3 object %s. Skipping download.", + local_target_path.as_posix(), + s3_object.key, + ) + + def _download_s3_dags(self): + """Download DAG files from the S3 bucket to the local directory.""" + self.log.debug( + "Downloading DAGs from s3://%s/%s to %s", self.bucket_name, self.prefix, self.s3_dags_dir + ) + local_s3_objects = [] + s3_bucket = self.s3_hook.get_bucket(self.bucket_name) + for obj in s3_bucket.objects.filter(Prefix=self.prefix): + obj_path = Path(obj.key) + local_target_path = self.s3_dags_dir.joinpath(obj_path.relative_to(self.prefix)) + if not local_target_path.parent.exists(): + local_target_path.parent.mkdir(parents=True, exist_ok=True) + self.log.debug("Created local directory: %s", local_target_path.parent) + self._download_s3_object_if_changed( + s3_bucket=s3_bucket, s3_object=obj, local_target_path=local_target_path + ) + local_s3_objects.append(local_target_path) + + self._delete_stale_local_files(current_s3_objects=local_s3_objects) + + def __repr__(self): + return ( + f"" + ) + + def get_current_version(self) -> str | None: + """Return the current version of the DAG bundle. Currently not supported.""" + return None + + @property + def path(self) -> Path: + """Return the local path to the DAG files.""" + return self.s3_dags_dir # Path where DAGs are downloaded. + + def refresh(self) -> None: + """Refresh the DAG bundle by re-downloading the DAGs from S3.""" + if self.version: + raise AirflowException("Refreshing a specific version is not supported") + + with self.lock(): + self._download_s3_dags() + + def view_url(self, version: str | None = None) -> str | None: + """Return a URL for viewing the DAGs in S3. Currently, versioning is not supported.""" + if self.version: + raise AirflowException("S3 url with version is not supported") + + presigned_url = self.s3_hook.generate_presigned_url( + client_method="get_object", params={"Bucket": self.bucket_name, "Key": self.prefix} + ) + return presigned_url diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/__init__.py b/providers/amazon/tests/unit/amazon/aws/bundles/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/amazon/tests/unit/amazon/aws/bundles/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py new file mode 100644 index 0000000000000..07395c37c1307 --- /dev/null +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -0,0 +1,246 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import os +import re + +import boto3 +import pytest +from moto import mock_aws + +import airflow.version +from airflow.exceptions import AirflowException +from airflow.models import Connection +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils import db + +from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.db import clear_db_connections + +AWS_CONN_ID_WITH_REGION = "s3_dags_connection" +AWS_CONN_ID_REGION = "eu-central-1" +AWS_CONN_ID_DEFAULT = "aws_default" +S3_BUCKET_NAME = "my-airflow-dags-bucket" +S3_BUCKET_PREFIX = "project1/dags" + +if airflow.version.version.strip().startswith("3"): + from airflow.providers.amazon.aws.bundles.s3 import S3DagBundle + + +@pytest.fixture +def mocked_s3_resource(): + with mock_aws(): + yield boto3.resource("s3") + + +@pytest.fixture +def s3_client(): + with mock_aws(): + yield boto3.client("s3") + + +@pytest.fixture +def s3_bucket(mocked_s3_resource, s3_client): + bucket = mocked_s3_resource.create_bucket(Bucket=S3_BUCKET_NAME) + + s3_client.put_object(Bucket=bucket.name, Key=S3_BUCKET_PREFIX + "/dag_01.py", Body=b"test data") + s3_client.put_object(Bucket=bucket.name, Key=S3_BUCKET_PREFIX + "/dag_02.py", Body=b"test data") + s3_client.put_object( + Bucket=bucket.name, Key=S3_BUCKET_PREFIX + "/subproject1/dag_a.py", Body=b"test data" + ) + s3_client.put_object( + Bucket=bucket.name, Key=S3_BUCKET_PREFIX + "/subproject1/dag_b.py", Body=b"test data" + ) + + return bucket + + +@pytest.fixture(autouse=True) +def bundle_temp_dir(tmp_path): + with conf_vars({("dag_processor", "dag_bundle_storage_path"): str(tmp_path)}): + yield tmp_path + + +@pytest.mark.skipif(not airflow.version.version.strip().startswith("3"), reason="Airflow >=3.0.0 test") +class TestS3DagBundle: + @classmethod + def teardown_class(cls) -> None: + clear_db_connections() + + @classmethod + def setup_class(cls) -> None: + db.merge_conn( + Connection( + conn_id=AWS_CONN_ID_DEFAULT, + conn_type="aws", + extra={ + "config_kwargs": {"s3": {"bucket_name": S3_BUCKET_NAME}}, + }, + ) + ) + db.merge_conn( + conn=Connection( + conn_id=AWS_CONN_ID_WITH_REGION, + conn_type="aws", + extra={ + "config_kwargs": {"s3": {"bucket_name": S3_BUCKET_NAME}}, + "region_name": AWS_CONN_ID_REGION, + }, + ) + ) + + @pytest.mark.db_test + def test_view_url_generates_presigned_url(self): + bundle = S3DagBundle( + name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1/dags", bucket_name=S3_BUCKET_NAME + ) + url: str = bundle.view_url("test_version") + assert url.startswith("https://my-airflow-dags-bucket.s3.amazonaws.com/project1/dags") + assert "AWSAccessKeyId=" in url + assert "Signature=" in url + assert "Expires=" in url + + @pytest.mark.db_test + def test_supports_versioning(self): + bundle = S3DagBundle( + name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1/dags", bucket_name=S3_BUCKET_NAME + ) + assert S3DagBundle.supports_versioning is False + + # set version, it's not supported + bundle.version = "test_version" + + with pytest.raises(AirflowException, match="Refreshing a specific version is not supported"): + bundle.refresh() + with pytest.raises(AirflowException, match="S3 url with version is not supported"): + bundle.view_url("test_version") + + @pytest.mark.db_test + def test_correct_bundle_path_used(self): + bundle = S3DagBundle( + name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1_dags", bucket_name="aiflow_dags" + ) + assert str(bundle.path) == str(bundle.base_dir) + "/s3/test" + + @pytest.mark.db_test + def test_s3_bucket_and_prefix_validated(self, s3_bucket): + hook = S3Hook(aws_conn_id=AWS_CONN_ID_DEFAULT) + assert hook.check_for_bucket(s3_bucket.name) is True + + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix="project1_dags", + bucket_name="non-existing-bucket", + ) + with pytest.raises(AirflowException, match="S3 bucket.*non-existing-bucket.*does not exist.*"): + bundle.initialize() + + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix="non-existing-prefix", + bucket_name=S3_BUCKET_NAME, + ) + with pytest.raises(AirflowException, match="S3 prefix.*non-existing-prefix.*does not exist.*"): + bundle.initialize() + + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix="project1_dags", + bucket_name=S3_BUCKET_NAME, + ) + assert bundle.s3_hook.region_name == AWS_CONN_ID_REGION + + def _upload_fixtures(self, bucket: str, fixtures_dir: str) -> None: + client = boto3.client("s3") + fixtures_paths = [ + os.path.join(path, filename) for path, _, files in os.walk(fixtures_dir) for filename in files + ] + for path in fixtures_paths: + key = os.path.relpath(path, fixtures_dir) + client.upload_file(Filename=path, Bucket=bucket, Key=key) + + @pytest.mark.db_test + def test_refresh(self, s3_bucket, s3_client, caplog): + caplog.set_level(logging.ERROR) + caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix=S3_BUCKET_PREFIX, + bucket_name=S3_BUCKET_NAME, + ) + + bundle.initialize() + # dags are downloaded once by initialize and once with refresh called post initialize + assert caplog.text.count("Downloading DAGs from s3") == 2 + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*{bundle.s3_dags_dir.as_posix()}.*subproject1/dag_a.py.*", + ) + + s3_client.put_object(Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data") + bundle.refresh() + assert caplog.text.count("Downloading DAGs from s3") == 3 + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=rf"Local file.*/s3/{bundle.name}/subproject1/dag_a.py.*is up-to-date with S3 object.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*", + ) + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*dag_03.py.*/s3/{bundle.name}/dag_03.py", + ) + assert bundle.s3_dags_dir.joinpath("dag_03.py").read_text() == "test data" + bundle.s3_dags_dir.joinpath("dag_should_be_deleted.py").write_text("test dag") + bundle.s3_dags_dir.joinpath("dag_should_be_deleted_folder").mkdir(exist_ok=True) + s3_client.put_object( + Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data-changed" + ) + bundle.refresh() + assert caplog.text.count("Downloading DAGs from s3") == 4 + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=r"S3 object size.*and local file size.*differ.*Downloaded.*dag_03.py.*", + ) + assert bundle.s3_dags_dir.joinpath("dag_03.py").read_text() == "test data-changed" + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=r"Deleted stale empty directory.*dag_should_be_deleted_folder.*", + ) + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=r"Deleted stale local file.*dag_should_be_deleted.py.*", + ) + + def assert_log_matches_regex(self, caplog, level, regex): + """Helper function to assert if a log message matches a regex.""" + matched = False + for record in caplog.records: + if record.levelname == level and re.search(regex, record.message): + matched = True + break # Stop searching once a match is found + assert matched, f"No log message at level {level} matching regex '{regex}' found." From faed04e89dade1be0e1db6ade509ee5fa5334171 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Fri, 16 May 2025 13:51:31 +0200 Subject: [PATCH 02/18] Update providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py Co-authored-by: Ephraim Anierobi --- providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index ed073fe28e0fe..63df29289dda2 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -70,7 +70,7 @@ def _initialize(self): with self.lock(): if not self.s3_dags_dir.exists(): self.log.info("Creating local DAGs directory: %s", self.s3_dags_dir) - os.makedirs(self.s3_dags_dir, exist_ok=True) + os.makedirs(self.s3_dags_dir) if not self.s3_dags_dir.is_dir(): raise AirflowException(f"Local DAGs path: {self.s3_dags_dir} is not a directory.") From 7004538dd3467994bb6f1bbe7ed2846734be365c Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Fri, 16 May 2025 13:52:55 +0200 Subject: [PATCH 03/18] remove empty arg --- providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index 63df29289dda2..91c33e0308ff1 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -62,7 +62,7 @@ def __init__( self.s3_dags_dir: Path = self.s3_dags_root_dir.joinpath(self.name) try: - self.s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id, extra_args={}) # Initialize S3 hook. + self.s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id) # Initialize S3 hook. except AirflowException as e: self.log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) From 3a5892fb32ab6f1bd29192157d7b7a695f3f0412 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Fri, 16 May 2025 14:14:00 +0200 Subject: [PATCH 04/18] test empty prefix --- .../providers/amazon/aws/bundles/s3.py | 12 +++-- .../tests/unit/amazon/aws/bundles/test_s3.py | 49 ++++++++++++++++++- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index 91c33e0308ff1..eeb9f37c76a7e 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -78,10 +78,14 @@ def _initialize(self): if not self.s3_hook.check_for_bucket(bucket_name=self.bucket_name): raise AirflowException(f"S3 bucket '{self.bucket_name}' does not exist.") - if not self.s3_hook.check_for_prefix( - bucket_name=self.bucket_name, prefix=self.prefix, delimiter="/" - ): - raise AirflowException(f"S3 prefix 's3://{self.bucket_name}/{self.prefix}' does not exist.") + if self.prefix: + # don't check when prefix is "" or None + if not self.s3_hook.check_for_prefix( + bucket_name=self.bucket_name, prefix=self.prefix, delimiter="/" + ): + raise AirflowException( + f"S3 prefix 's3://{self.bucket_name}/{self.prefix}' does not exist." + ) self._download_s3_dags() self.refresh() diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py index 07395c37c1307..3e69c08396dc5 100644 --- a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -164,9 +164,21 @@ def test_s3_bucket_and_prefix_validated(self, s3_bucket): bundle = S3DagBundle( name="test", aws_conn_id=AWS_CONN_ID_WITH_REGION, - prefix="project1_dags", + prefix=S3_BUCKET_PREFIX, + bucket_name=S3_BUCKET_NAME, + ) + # initialize succeeds, with correct prefix and bucket + bundle.initialize() + assert bundle.s3_hook.region_name == AWS_CONN_ID_REGION + + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix="", bucket_name=S3_BUCKET_NAME, ) + # initialize succeeds, with empty prefix + bundle.initialize() assert bundle.s3_hook.region_name == AWS_CONN_ID_REGION def _upload_fixtures(self, bucket: str, fixtures_dir: str) -> None: @@ -236,6 +248,41 @@ def test_refresh(self, s3_bucket, s3_client, caplog): regex=r"Deleted stale local file.*dag_should_be_deleted.py.*", ) + @pytest.mark.db_test + def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog): + caplog.set_level(logging.ERROR) + caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + bucket_name=S3_BUCKET_NAME, + ) + assert bundle.prefix == "" + bundle.initialize() + + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=rf"Downloaded.*subproject1/dag_a.py.*{bundle.s3_dags_dir.as_posix()}.*subproject1/dag_a.py.*", + ) + s3_client.put_object(Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data") + bundle.refresh() + assert caplog.text.count("Downloading DAGs from s3") == 3 + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=rf"Local file.*/s3/{bundle.name}.*/dag_a.py.*is up-to-date with S3 object.*dag_a.py.*", + ) + self.assert_log_matches_regex( + caplog=caplog, + level="DEBUG", + regex=rf"Downloaded.*dag_03.py.*/s3/{bundle.name}.*/dag_03.py", + ) + # we are using s3 bucket rood but the dag file is in sub folder, project1/dags/dag_03.py + assert bundle.s3_dags_dir.joinpath("project1/dags/dag_03.py").read_text() == "test data" + bundle.s3_dags_dir.joinpath("dag_should_be_deleted.py").write_text("test dag") + bundle.s3_dags_dir.joinpath("dag_should_be_deleted_folder").mkdir(exist_ok=True) + def assert_log_matches_regex(self, caplog, level, regex): """Helper function to assert if a log message matches a regex.""" matched = False From 2bb8633fc9844e565c9da92f84b0c583b5a5370f Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 20 May 2025 10:45:10 +0200 Subject: [PATCH 05/18] change logging --- .../providers/amazon/aws/bundles/s3.py | 45 +++++++++++-------- .../tests/unit/amazon/aws/bundles/test_s3.py | 34 +++++++------- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index eeb9f37c76a7e..188caedadb9a8 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -18,19 +18,18 @@ import os from pathlib import Path -from typing import TYPE_CHECKING + +import structlog + +log = structlog.get_logger(__name__) from airflow.dag_processing.bundles.base import BaseDagBundle from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.utils.log.logging_mixin import LoggingMixin - -if TYPE_CHECKING: - from airflow.utils.types import ArgNotSet -class S3DagBundle(BaseDagBundle, LoggingMixin): +class S3DagBundle(BaseDagBundle): """ S3 DAG bundle - exposes a directory in S3 as a DAG bundle. @@ -47,9 +46,9 @@ class S3DagBundle(BaseDagBundle, LoggingMixin): def __init__( self, *, - aws_conn_id: str | None | ArgNotSet = AwsBaseHook.default_conn_name, + aws_conn_id: str = AwsBaseHook.default_conn_name, bucket_name: str, - prefix: str | None = "", + prefix: str = "", **kwargs, ) -> None: super().__init__(**kwargs) @@ -61,15 +60,23 @@ def __init__( # Local path where S3 DAGs are downloaded for current config. self.s3_dags_dir: Path = self.s3_dags_root_dir.joinpath(self.name) + self._log = log.bind( + bundle_name=self.name, + version=self.version, + bucket_name=self.bucket_name, + prefix=self.prefix, + aws_conn_id=self.aws_conn_id, + ) + try: self.s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id) # Initialize S3 hook. except AirflowException as e: - self.log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) + self._log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) def _initialize(self): with self.lock(): if not self.s3_dags_dir.exists(): - self.log.info("Creating local DAGs directory: %s", self.s3_dags_dir) + self._log.info("Creating local DAGs directory: %s", self.s3_dags_dir) os.makedirs(self.s3_dags_dir) if not self.s3_dags_dir.is_dir(): @@ -79,7 +86,7 @@ def _initialize(self): raise AirflowException(f"S3 bucket '{self.bucket_name}' does not exist.") if self.prefix: - # don't check when prefix is "" or None + # don't check when prefix is "" if not self.s3_hook.check_for_prefix( bucket_name=self.bucket_name, prefix=self.prefix, delimiter="/" ): @@ -105,16 +112,16 @@ def _delete_stale_local_files(self, current_s3_objects: list[Path]): try: if item.is_file(): item.unlink(missing_ok=True) - self.log.debug("Deleted stale local file: %s", item) + self._log.debug("Deleted stale local file: %s", item) elif item.is_dir(): # delete only when the folder is empty if not os.listdir(item): item.rmdir() - self.log.debug("Deleted stale empty directory: %s", item) + self._log.debug("Deleted stale empty directory: %s", item) else: - self.log.debug("Skipping stale item of unknown type: %s", item) + self._log.debug("Skipping stale item of unknown type: %s", item) except OSError as e: - self.log.error("Error deleting stale item %s: %s", item, e) + self._log.error("Error deleting stale item %s: %s", item, e) raise e def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path: Path): @@ -139,11 +146,11 @@ def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path if should_download: s3_bucket.download_file(s3_object.key, local_target_path) - self.log.debug( + self._log.debug( "%s Downloaded %s to %s", download_msg, s3_object.key, local_target_path.as_posix() ) else: - self.log.debug( + self._log.debug( "Local file %s is up-to-date with S3 object %s. Skipping download.", local_target_path.as_posix(), s3_object.key, @@ -151,7 +158,7 @@ def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path def _download_s3_dags(self): """Download DAG files from the S3 bucket to the local directory.""" - self.log.debug( + self._log.debug( "Downloading DAGs from s3://%s/%s to %s", self.bucket_name, self.prefix, self.s3_dags_dir ) local_s3_objects = [] @@ -161,7 +168,7 @@ def _download_s3_dags(self): local_target_path = self.s3_dags_dir.joinpath(obj_path.relative_to(self.prefix)) if not local_target_path.parent.exists(): local_target_path.parent.mkdir(parents=True, exist_ok=True) - self.log.debug("Created local directory: %s", local_target_path.parent) + self._log.debug("Created local directory: %s", local_target_path.parent) self._download_s3_object_if_changed( s3_bucket=s3_bucket, s3_object=obj, local_target_path=local_target_path ) diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py index 3e69c08396dc5..7a6960f88f6c6 100644 --- a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -191,7 +191,7 @@ def _upload_fixtures(self, bucket: str, fixtures_dir: str) -> None: client.upload_file(Filename=path, Bucket=bucket, Key=key) @pytest.mark.db_test - def test_refresh(self, s3_bucket, s3_client, caplog): + def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): caplog.set_level(logging.ERROR) caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") bundle = S3DagBundle( @@ -203,23 +203,23 @@ def test_refresh(self, s3_bucket, s3_client, caplog): bundle.initialize() # dags are downloaded once by initialize and once with refresh called post initialize - assert caplog.text.count("Downloading DAGs from s3") == 2 + assert cap_structlog.text.count("Downloading DAGs from s3") == 2 self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*{bundle.s3_dags_dir.as_posix()}.*subproject1/dag_a.py.*", ) s3_client.put_object(Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data") bundle.refresh() - assert caplog.text.count("Downloading DAGs from s3") == 3 + assert cap_structlog.text.count("Downloading DAGs from s3") == 3 self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=rf"Local file.*/s3/{bundle.name}/subproject1/dag_a.py.*is up-to-date with S3 object.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*", ) self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*dag_03.py.*/s3/{bundle.name}/dag_03.py", ) @@ -230,26 +230,26 @@ def test_refresh(self, s3_bucket, s3_client, caplog): Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data-changed" ) bundle.refresh() - assert caplog.text.count("Downloading DAGs from s3") == 4 + assert cap_structlog.text.count("Downloading DAGs from s3") == 4 self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=r"S3 object size.*and local file size.*differ.*Downloaded.*dag_03.py.*", ) assert bundle.s3_dags_dir.joinpath("dag_03.py").read_text() == "test data-changed" self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=r"Deleted stale empty directory.*dag_should_be_deleted_folder.*", ) self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=r"Deleted stale local file.*dag_should_be_deleted.py.*", ) @pytest.mark.db_test - def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog): + def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlog): caplog.set_level(logging.ERROR) caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") bundle = S3DagBundle( @@ -261,20 +261,20 @@ def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog): bundle.initialize() self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=rf"Downloaded.*subproject1/dag_a.py.*{bundle.s3_dags_dir.as_posix()}.*subproject1/dag_a.py.*", ) s3_client.put_object(Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data") bundle.refresh() - assert caplog.text.count("Downloading DAGs from s3") == 3 + assert cap_structlog.text.count("Downloading DAGs from s3") == 3 self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=rf"Local file.*/s3/{bundle.name}.*/dag_a.py.*is up-to-date with S3 object.*dag_a.py.*", ) self.assert_log_matches_regex( - caplog=caplog, + caplog=cap_structlog, level="DEBUG", regex=rf"Downloaded.*dag_03.py.*/s3/{bundle.name}.*/dag_03.py", ) @@ -286,8 +286,8 @@ def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog): def assert_log_matches_regex(self, caplog, level, regex): """Helper function to assert if a log message matches a regex.""" matched = False - for record in caplog.records: - if record.levelname == level and re.search(regex, record.message): + for record in caplog.entries: + if record.get("log_level", None) == level.lower() and re.search(regex, record.get("event", "")): matched = True break # Stop searching once a match is found assert matched, f"No log message at level {level} matching regex '{regex}' found." From 87cc5225c94a0b5f61d2e2a51846da05a274c52e Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 22 May 2025 11:30:09 +0200 Subject: [PATCH 06/18] Update providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index 188caedadb9a8..c793a3147202e 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -95,7 +95,6 @@ def _initialize(self): ) self._download_s3_dags() - self.refresh() def initialize(self) -> None: self._initialize() From 8450d3f0a30c4117d45b8028cb729c0c3fd21ef6 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 22 May 2025 11:30:41 +0200 Subject: [PATCH 07/18] Update providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- .../amazon/src/airflow/providers/amazon/aws/bundles/s3.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index c793a3147202e..6873205ec1bfc 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -55,10 +55,8 @@ def __init__( self.aws_conn_id = aws_conn_id self.bucket_name = bucket_name self.prefix = prefix - # Local path where S3 DAGs are downloaded. - self.s3_dags_root_dir: Path = self.base_dir.joinpath("s3") - # Local path where S3 DAGs are downloaded for current config. - self.s3_dags_dir: Path = self.s3_dags_root_dir.joinpath(self.name) + # Local path where S3 DAGs are downloaded + self.s3_dags_dir: Path = self.base_dir self._log = log.bind( bundle_name=self.name, From 3f222b1c6ff06269fb77ed503c3abcbe640c96f0 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 22 May 2025 20:47:43 +0200 Subject: [PATCH 08/18] Add review changes --- .../providers/amazon/aws/bundles/s3.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index 6873205ec1bfc..7316fcf18752b 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -65,11 +65,7 @@ def __init__( prefix=self.prefix, aws_conn_id=self.aws_conn_id, ) - - try: - self.s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id) # Initialize S3 hook. - except AirflowException as e: - self._log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) + self.s3_hook: S3Hook = None def _initialize(self): with self.lock(): @@ -95,6 +91,12 @@ def _initialize(self): self._download_s3_dags() def initialize(self) -> None: + if self.s3_hook is None: + try: + self.s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id) # Initialize S3 hook. + except AirflowException as e: + self._log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) + self._initialize() super().initialize() @@ -205,7 +207,7 @@ def view_url(self, version: str | None = None) -> str | None: if self.version: raise AirflowException("S3 url with version is not supported") - presigned_url = self.s3_hook.generate_presigned_url( - client_method="get_object", params={"Bucket": self.bucket_name, "Key": self.prefix} - ) - return presigned_url + if self.prefix: + return f"https://s3.{self.s3_hook.region_name}.amazonaws.com/{self.bucket_name}/{self.prefix}/" + else: + return f"https://s3.{self.s3_hook.region_name}.amazonaws.com/{self.bucket_name}/" From c0d018ebca4f42942f51adb17a8c26806ca9bb6d Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 22 May 2025 21:02:41 +0200 Subject: [PATCH 09/18] Add review changes --- .../providers/amazon/aws/bundles/s3.py | 25 +++++++++++++------ .../tests/unit/amazon/aws/bundles/test_s3.py | 21 +++++++--------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index 7316fcf18752b..3d1dac6f03dcf 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -65,7 +65,7 @@ def __init__( prefix=self.prefix, aws_conn_id=self.aws_conn_id, ) - self.s3_hook: S3Hook = None + self._s3_hook: S3Hook = None def _initialize(self): with self.lock(): @@ -91,14 +91,18 @@ def _initialize(self): self._download_s3_dags() def initialize(self) -> None: - if self.s3_hook is None: + self._initialize() + super().initialize() + + @property + def s3_hook(self): + if self._s3_hook is None: try: - self.s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id) # Initialize S3 hook. + self._s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id) # Initialize S3 hook. except AirflowException as e: self._log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) + return self._s3_hook - self._initialize() - super().initialize() def _delete_stale_local_files(self, current_s3_objects: list[Path]): current_s3_keys = {key for key in current_s3_objects} @@ -207,7 +211,12 @@ def view_url(self, version: str | None = None) -> str | None: if self.version: raise AirflowException("S3 url with version is not supported") + # https://.s3..amazonaws.com/ + url = f"https://{self.bucket_name}.s3" + if self.s3_hook.region_name: + url += f".{self.s3_hook.region_name}" + url += f".amazonaws.com" if self.prefix: - return f"https://s3.{self.s3_hook.region_name}.amazonaws.com/{self.bucket_name}/{self.prefix}/" - else: - return f"https://s3.{self.s3_hook.region_name}.amazonaws.com/{self.bucket_name}/" + url += f"/{self.prefix}" + + return url diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py index 7a6960f88f6c6..c4535b251ba96 100644 --- a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -112,9 +112,6 @@ def test_view_url_generates_presigned_url(self): ) url: str = bundle.view_url("test_version") assert url.startswith("https://my-airflow-dags-bucket.s3.amazonaws.com/project1/dags") - assert "AWSAccessKeyId=" in url - assert "Signature=" in url - assert "Expires=" in url @pytest.mark.db_test def test_supports_versioning(self): @@ -136,7 +133,7 @@ def test_correct_bundle_path_used(self): bundle = S3DagBundle( name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1_dags", bucket_name="aiflow_dags" ) - assert str(bundle.path) == str(bundle.base_dir) + "/s3/test" + assert str(bundle.base_dir) == str(bundle.s3_dags_dir) @pytest.mark.db_test def test_s3_bucket_and_prefix_validated(self, s3_bucket): @@ -203,7 +200,7 @@ def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): bundle.initialize() # dags are downloaded once by initialize and once with refresh called post initialize - assert cap_structlog.text.count("Downloading DAGs from s3") == 2 + assert cap_structlog.text.count("Downloading DAGs from s3") == 1 self.assert_log_matches_regex( caplog=cap_structlog, level="DEBUG", @@ -212,16 +209,16 @@ def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): s3_client.put_object(Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data") bundle.refresh() - assert cap_structlog.text.count("Downloading DAGs from s3") == 3 + assert cap_structlog.text.count("Downloading DAGs from s3") == 2 self.assert_log_matches_regex( caplog=cap_structlog, level="DEBUG", - regex=rf"Local file.*/s3/{bundle.name}/subproject1/dag_a.py.*is up-to-date with S3 object.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*", + regex=rf"Local file.*/{bundle.name}/subproject1/dag_a.py.*is up-to-date with S3 object.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*", ) self.assert_log_matches_regex( caplog=cap_structlog, level="DEBUG", - regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*dag_03.py.*/s3/{bundle.name}/dag_03.py", + regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*dag_03.py.*/{bundle.name}/dag_03.py", ) assert bundle.s3_dags_dir.joinpath("dag_03.py").read_text() == "test data" bundle.s3_dags_dir.joinpath("dag_should_be_deleted.py").write_text("test dag") @@ -230,7 +227,7 @@ def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data-changed" ) bundle.refresh() - assert cap_structlog.text.count("Downloading DAGs from s3") == 4 + assert cap_structlog.text.count("Downloading DAGs from s3") == 3 self.assert_log_matches_regex( caplog=cap_structlog, level="DEBUG", @@ -267,16 +264,16 @@ def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlo ) s3_client.put_object(Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data") bundle.refresh() - assert cap_structlog.text.count("Downloading DAGs from s3") == 3 + assert cap_structlog.text.count("Downloading DAGs from s3") == 2 self.assert_log_matches_regex( caplog=cap_structlog, level="DEBUG", - regex=rf"Local file.*/s3/{bundle.name}.*/dag_a.py.*is up-to-date with S3 object.*dag_a.py.*", + regex=rf"Local file.*/{bundle.name}.*/dag_a.py.*is up-to-date with S3 object.*dag_a.py.*", ) self.assert_log_matches_regex( caplog=cap_structlog, level="DEBUG", - regex=rf"Downloaded.*dag_03.py.*/s3/{bundle.name}.*/dag_03.py", + regex=rf"Downloaded.*dag_03.py.*/{bundle.name}.*/dag_03.py", ) # we are using s3 bucket rood but the dag file is in sub folder, project1/dags/dag_03.py assert bundle.s3_dags_dir.joinpath("project1/dags/dag_03.py").read_text() == "test data" From 5eeeee15c78cabef807bd759b7904ce6560e83dd Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 22 May 2025 21:06:52 +0200 Subject: [PATCH 10/18] Add review changes --- .../providers/amazon/aws/bundles/s3.py | 91 +++---------------- .../airflow/providers/amazon/aws/hooks/s3.py | 77 ++++++++++++++++ .../tests/unit/amazon/aws/bundles/test_s3.py | 41 ++++++--- 3 files changed, 116 insertions(+), 93 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index 3d1dac6f03dcf..bdb2ebf437cf5 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -87,8 +87,7 @@ def _initialize(self): raise AirflowException( f"S3 prefix 's3://{self.bucket_name}/{self.prefix}' does not exist." ) - - self._download_s3_dags() + self.refresh() def initialize(self) -> None: self._initialize() @@ -103,82 +102,6 @@ def s3_hook(self): self._log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) return self._s3_hook - - def _delete_stale_local_files(self, current_s3_objects: list[Path]): - current_s3_keys = {key for key in current_s3_objects} - - for item in self.s3_dags_dir.iterdir(): - item: Path # type: ignore[no-redef] - absolute_item_path = item.resolve() - - if absolute_item_path not in current_s3_keys: - try: - if item.is_file(): - item.unlink(missing_ok=True) - self._log.debug("Deleted stale local file: %s", item) - elif item.is_dir(): - # delete only when the folder is empty - if not os.listdir(item): - item.rmdir() - self._log.debug("Deleted stale empty directory: %s", item) - else: - self._log.debug("Skipping stale item of unknown type: %s", item) - except OSError as e: - self._log.error("Error deleting stale item %s: %s", item, e) - raise e - - def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path: Path): - should_download = False - download_msg = "" - if not local_target_path.exists(): - should_download = True - download_msg = f"Local file {local_target_path} does not exist." - else: - local_stats = local_target_path.stat() - - if s3_object.size != local_stats.st_size: - should_download = True - download_msg = ( - f"S3 object size ({s3_object.size}) and local file size ({local_stats.st_size}) differ." - ) - - s3_last_modified = s3_object.last_modified - if local_stats.st_mtime < s3_last_modified.microsecond: - should_download = True - download_msg = f"S3 object last modified ({s3_last_modified.microsecond}) and local file last modified ({local_stats.st_mtime}) differ." - - if should_download: - s3_bucket.download_file(s3_object.key, local_target_path) - self._log.debug( - "%s Downloaded %s to %s", download_msg, s3_object.key, local_target_path.as_posix() - ) - else: - self._log.debug( - "Local file %s is up-to-date with S3 object %s. Skipping download.", - local_target_path.as_posix(), - s3_object.key, - ) - - def _download_s3_dags(self): - """Download DAG files from the S3 bucket to the local directory.""" - self._log.debug( - "Downloading DAGs from s3://%s/%s to %s", self.bucket_name, self.prefix, self.s3_dags_dir - ) - local_s3_objects = [] - s3_bucket = self.s3_hook.get_bucket(self.bucket_name) - for obj in s3_bucket.objects.filter(Prefix=self.prefix): - obj_path = Path(obj.key) - local_target_path = self.s3_dags_dir.joinpath(obj_path.relative_to(self.prefix)) - if not local_target_path.parent.exists(): - local_target_path.parent.mkdir(parents=True, exist_ok=True) - self._log.debug("Created local directory: %s", local_target_path.parent) - self._download_s3_object_if_changed( - s3_bucket=s3_bucket, s3_object=obj, local_target_path=local_target_path - ) - local_s3_objects.append(local_target_path) - - self._delete_stale_local_files(current_s3_objects=local_s3_objects) - def __repr__(self): return ( f" None: raise AirflowException("Refreshing a specific version is not supported") with self.lock(): - self._download_s3_dags() + self._log.debug( + "Downloading DAGs from s3://%s/%s to %s", self.bucket_name, self.prefix, self.s3_dags_dir + ) + self.s3_hook.download_s3( + bucket_name=self.bucket_name, + s3_prefix=self.prefix, + local_dir=self.s3_dags_dir, + delete_stale=True, + ) def view_url(self, version: str | None = None) -> str | None: """Return a URL for viewing the DAGs in S3. Currently, versioning is not supported.""" @@ -215,7 +146,7 @@ def view_url(self, version: str | None = None) -> str | None: url = f"https://{self.bucket_name}.s3" if self.s3_hook.region_name: url += f".{self.s3_hook.region_name}" - url += f".amazonaws.com" + url += ".amazonaws.com" if self.prefix: url += f"/{self.prefix}" diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py index 578ba2a2c4646..933a0b185054d 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py @@ -1683,3 +1683,80 @@ def delete_bucket_tagging(self, bucket_name: str | None = None) -> None: """ s3_client = self.get_conn() s3_client.delete_bucket_tagging(Bucket=bucket_name) + + def _download_s3_delete_stale_local_files(self, current_s3_objects: list[Path], local_dir: Path): + current_s3_keys = {key for key in current_s3_objects} + + for item in local_dir.iterdir(): + item: Path # type: ignore[no-redef] + absolute_item_path = item.resolve() + + if absolute_item_path not in current_s3_keys: + try: + if item.is_file(): + item.unlink(missing_ok=True) + self.log.debug("Deleted stale local file: %s", item) + elif item.is_dir(): + # delete only when the folder is empty + if not os.listdir(item): + item.rmdir() + self.log.debug("Deleted stale empty directory: %s", item) + else: + self.log.debug("Skipping stale item of unknown type: %s", item) + except OSError as e: + self.log.error("Error deleting stale item %s: %s", item, e) + raise e + + def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path: Path): + should_download = False + download_msg = "" + if not local_target_path.exists(): + should_download = True + download_msg = f"Local file {local_target_path} does not exist." + else: + local_stats = local_target_path.stat() + + if s3_object.size != local_stats.st_size: + should_download = True + download_msg = ( + f"S3 object size ({s3_object.size}) and local file size ({local_stats.st_size}) differ." + ) + + s3_last_modified = s3_object.last_modified + if local_stats.st_mtime < s3_last_modified.microsecond: + should_download = True + download_msg = f"S3 object last modified ({s3_last_modified.microsecond}) and local file last modified ({local_stats.st_mtime}) differ." + + if should_download: + s3_bucket.download_file(s3_object.key, local_target_path) + self.log.debug( + "%s Downloaded %s to %s", download_msg, s3_object.key, local_target_path.as_posix() + ) + else: + self.log.debug( + "Local file %s is up-to-date with S3 object %s. Skipping download.", + local_target_path.as_posix(), + s3_object.key, + ) + + def download_s3(self, bucket_name: str, local_dir: Path, s3_prefix="", delete_stale: bool = True): + """Download S3 files from the S3 bucket to the local directory.""" + self.log.debug("Downloading data from s3://%s/%s to %s", bucket_name, s3_prefix, local_dir) + + local_s3_objects = [] + s3_bucket = self.get_bucket(bucket_name) + for obj in s3_bucket.objects.filter(Prefix=s3_prefix): + obj_path = Path(obj.key) + local_target_path = local_dir.joinpath(obj_path.relative_to(s3_prefix)) + if not local_target_path.parent.exists(): + local_target_path.parent.mkdir(parents=True, exist_ok=True) + self.log.debug("Created local directory: %s", local_target_path.parent) + self._download_s3_object_if_changed( + s3_bucket=s3_bucket, s3_object=obj, local_target_path=local_target_path + ) + local_s3_objects.append(local_target_path) + + if delete_stale: + self._download_s3_delete_stale_local_files( + current_s3_objects=local_s3_objects, local_dir=local_dir + ) diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py index c4535b251ba96..5245afaeca1e3 100644 --- a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -191,6 +191,9 @@ def _upload_fixtures(self, bucket: str, fixtures_dir: str) -> None: def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): caplog.set_level(logging.ERROR) caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") + caplog.set_level( + logging.DEBUG, logger="airflow.task.hooks.airflow.providers.amazon.aws.hooks.s3.S3Hook" + ) bundle = S3DagBundle( name="test", aws_conn_id=AWS_CONN_ID_WITH_REGION, @@ -202,7 +205,7 @@ def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): # dags are downloaded once by initialize and once with refresh called post initialize assert cap_structlog.text.count("Downloading DAGs from s3") == 1 self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*{bundle.s3_dags_dir.as_posix()}.*subproject1/dag_a.py.*", ) @@ -211,12 +214,12 @@ def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): bundle.refresh() assert cap_structlog.text.count("Downloading DAGs from s3") == 2 self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=rf"Local file.*/{bundle.name}/subproject1/dag_a.py.*is up-to-date with S3 object.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*", ) self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*dag_03.py.*/{bundle.name}/dag_03.py", ) @@ -229,18 +232,18 @@ def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): bundle.refresh() assert cap_structlog.text.count("Downloading DAGs from s3") == 3 self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=r"S3 object size.*and local file size.*differ.*Downloaded.*dag_03.py.*", ) assert bundle.s3_dags_dir.joinpath("dag_03.py").read_text() == "test data-changed" self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=r"Deleted stale empty directory.*dag_should_be_deleted_folder.*", ) self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=r"Deleted stale local file.*dag_should_be_deleted.py.*", ) @@ -249,6 +252,9 @@ def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlog): caplog.set_level(logging.ERROR) caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") + caplog.set_level( + logging.DEBUG, logger="airflow.task.hooks.airflow.providers.amazon.aws.hooks.s3.S3Hook" + ) bundle = S3DagBundle( name="test", aws_conn_id=AWS_CONN_ID_WITH_REGION, @@ -258,7 +264,7 @@ def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlo bundle.initialize() self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=rf"Downloaded.*subproject1/dag_a.py.*{bundle.s3_dags_dir.as_posix()}.*subproject1/dag_a.py.*", ) @@ -266,12 +272,12 @@ def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlo bundle.refresh() assert cap_structlog.text.count("Downloading DAGs from s3") == 2 self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=rf"Local file.*/{bundle.name}.*/dag_a.py.*is up-to-date with S3 object.*dag_a.py.*", ) self.assert_log_matches_regex( - caplog=cap_structlog, + caplog=caplog, level="DEBUG", regex=rf"Downloaded.*dag_03.py.*/{bundle.name}.*/dag_03.py", ) @@ -283,8 +289,17 @@ def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlo def assert_log_matches_regex(self, caplog, level, regex): """Helper function to assert if a log message matches a regex.""" matched = False - for record in caplog.entries: - if record.get("log_level", None) == level.lower() and re.search(regex, record.get("event", "")): - matched = True - break # Stop searching once a match is found + if hasattr(caplog, "entries"): + for record in caplog.entries: + if record.get("log_level", None) == level.lower() and re.search( + regex, record.get("event", "") + ): + matched = True + break # Stop searching once a match is found + else: + for record in caplog.records: + if record.levelname == level and re.search(regex, record.message): + matched = True + break # Stop searching once a match is found + assert matched, f"No log message at level {level} matching regex '{regex}' found." From d0c1a1774c100f329c245997369021d24bde1e3e Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 27 May 2025 13:22:04 +0200 Subject: [PATCH 11/18] fix linting --- .../amazon/src/airflow/providers/amazon/aws/bundles/s3.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index bdb2ebf437cf5..3871dbc69d59b 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -21,8 +21,6 @@ import structlog -log = structlog.get_logger(__name__) - from airflow.dag_processing.bundles.base import BaseDagBundle from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook @@ -58,6 +56,7 @@ def __init__( # Local path where S3 DAGs are downloaded self.s3_dags_dir: Path = self.base_dir + log = structlog.get_logger(__name__) self._log = log.bind( bundle_name=self.name, version=self.version, From f134a6b9891f8d6646da972aabfcf4a8e7594c06 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 14 Jun 2025 23:17:30 +0200 Subject: [PATCH 12/18] review change, improve naming --- .../amazon/src/airflow/providers/amazon/aws/bundles/s3.py | 2 +- .../amazon/src/airflow/providers/amazon/aws/hooks/s3.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index 3871dbc69d59b..b7947527d8af0 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -129,7 +129,7 @@ def refresh(self) -> None: self._log.debug( "Downloading DAGs from s3://%s/%s to %s", self.bucket_name, self.prefix, self.s3_dags_dir ) - self.s3_hook.download_s3( + self.s3_hook.sync_to_local_dir( bucket_name=self.bucket_name, s3_prefix=self.prefix, local_dir=self.s3_dags_dir, diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py index 933a0b185054d..f95998963d211 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py @@ -1684,7 +1684,7 @@ def delete_bucket_tagging(self, bucket_name: str | None = None) -> None: s3_client = self.get_conn() s3_client.delete_bucket_tagging(Bucket=bucket_name) - def _download_s3_delete_stale_local_files(self, current_s3_objects: list[Path], local_dir: Path): + def _sync_to_local_dir_delete_stale_local_files(self, current_s3_objects: list[Path], local_dir: Path): current_s3_keys = {key for key in current_s3_objects} for item in local_dir.iterdir(): @@ -1707,7 +1707,7 @@ def _download_s3_delete_stale_local_files(self, current_s3_objects: list[Path], self.log.error("Error deleting stale item %s: %s", item, e) raise e - def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path: Path): + def _sync_to_local_dir_if_changed(self, s3_bucket, s3_object, local_target_path: Path): should_download = False download_msg = "" if not local_target_path.exists(): @@ -1739,7 +1739,7 @@ def _download_s3_object_if_changed(self, s3_bucket, s3_object, local_target_path s3_object.key, ) - def download_s3(self, bucket_name: str, local_dir: Path, s3_prefix="", delete_stale: bool = True): + def sync_to_local_dir(self, bucket_name: str, local_dir: Path, s3_prefix="", delete_stale: bool = True): """Download S3 files from the S3 bucket to the local directory.""" self.log.debug("Downloading data from s3://%s/%s to %s", bucket_name, s3_prefix, local_dir) From 340d251acd3fc66c8630af01b767b466f3b34442 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 15 Jun 2025 00:08:12 +0200 Subject: [PATCH 13/18] review change, add test to hook --- .../airflow/providers/amazon/aws/hooks/s3.py | 4 +- .../tests/unit/amazon/aws/bundles/test_s3.py | 2 - .../tests/unit/amazon/aws/hooks/test_s3.py | 62 +++++++++++++++++++ 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py index f95998963d211..dd28e410b1730 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py @@ -1751,12 +1751,12 @@ def sync_to_local_dir(self, bucket_name: str, local_dir: Path, s3_prefix="", del if not local_target_path.parent.exists(): local_target_path.parent.mkdir(parents=True, exist_ok=True) self.log.debug("Created local directory: %s", local_target_path.parent) - self._download_s3_object_if_changed( + self._sync_to_local_dir_if_changed( s3_bucket=s3_bucket, s3_object=obj, local_target_path=local_target_path ) local_s3_objects.append(local_target_path) if delete_stale: - self._download_s3_delete_stale_local_files( + self._sync_to_local_dir_delete_stale_local_files( current_s3_objects=local_s3_objects, local_dir=local_dir ) diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py index 5245afaeca1e3..f89aef8bd6d68 100644 --- a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -283,8 +283,6 @@ def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlo ) # we are using s3 bucket rood but the dag file is in sub folder, project1/dags/dag_03.py assert bundle.s3_dags_dir.joinpath("project1/dags/dag_03.py").read_text() == "test data" - bundle.s3_dags_dir.joinpath("dag_should_be_deleted.py").write_text("test dag") - bundle.s3_dags_dir.joinpath("dag_should_be_deleted_folder").mkdir(exist_ok=True) def assert_log_matches_regex(self, caplog, level, regex): """Helper function to assert if a log message matches a regex.""" diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py index 31b3a625a3971..0e5e47c5149f5 100644 --- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py @@ -19,9 +19,11 @@ import gzip as gz import inspect +import logging import os import re from datetime import datetime as std_datetime, timezone +from pathlib import Path from unittest import mock, mock as async_mock from unittest.mock import AsyncMock, MagicMock, Mock, patch from urllib.parse import parse_qs @@ -50,6 +52,12 @@ def mocked_s3_res(): yield boto3.resource("s3") +@pytest.fixture +def s3_client(): + with mock_aws(): + yield boto3.client("s3") + + @pytest.fixture def s3_bucket(mocked_s3_res): bucket = "airflow-test-s3-bucket" @@ -1726,6 +1734,60 @@ def test_delete_bucket_tagging_with_no_tags(self): with pytest.raises(ClientError, match=r".*NoSuchTagSet.*"): hook.get_bucket_tagging(bucket_name="new_bucket") + def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, caplog, cap_structlog, tmp_path): + caplog.set_level(logging.ERROR) + caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") + caplog.set_level( + logging.DEBUG, logger="airflow.task.hooks.airflow.providers.amazon.aws.hooks.s3.S3Hook" + ) + + s3_client.put_object(Bucket=s3_bucket, Key="dag_01.py", Body=b"test data") + s3_client.put_object(Bucket=s3_bucket, Key="dag_02.py", Body=b"test data") + s3_client.put_object(Bucket=s3_bucket, Key="subproject1/dag_a.py", Body=b"test data") + s3_client.put_object(Bucket=s3_bucket, Key="subproject1/dag_b.py", Body=b"test data") + + sync_local_dir = tmp_path / "s3_sync_dir" + hook = S3Hook() + hook.sync_to_local_dir( + bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True + ) + assert caplog.text.count(f"Downloading data from s3://{s3_bucket}") == 1 + # + assert caplog.text.count(f"does not exist. Downloaded dag_01.py to {sync_local_dir}/dag_01.py") == 1 + assert caplog.text.count("does not exist. Downloaded dag_01.py to") == 1 + assert caplog.text.count(f"does not exist. Downloaded subproject1/dag_a.py to {sync_local_dir}") == 1 + # add new file to bucket and sync + s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test data") + hook.sync_to_local_dir( + bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True + ) + print(caplog) + assert ( + caplog.text.count( + "subproject1/dag_b.py is up-to-date with S3 object subproject1/dag_b.py. Skipping download" + ) + == 1 + ) + assert caplog.text.count(f"does not exist. Downloaded dag_03.py to {sync_local_dir}/dag_03.py") == 1 + # read that file is donloaded and has same content + assert Path(sync_local_dir).joinpath("dag_03.py").read_text() == "test data" + + local_file_that_should_be_deleted = Path(sync_local_dir).joinpath("file_that_should_be_deleted.py") + local_file_that_should_be_deleted.write_text("test dag") + hook.sync_to_local_dir( + bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True + ) + assert ( + caplog.text.count(f"Deleted stale local file: {local_file_that_should_be_deleted.as_posix()}") + == 1 + ) + s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test data-changed") + hook.sync_to_local_dir( + bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True + ) + assert caplog.text.count("S3 object size") == 1 + assert caplog.text.count("differ. Downloaded dag_03.py to") == 1 + @pytest.mark.parametrize( "key_kind, has_conn, has_bucket, precedence, expected", From 0ab61decc0c0681f5218e22d0da3edb2c13cd5f0 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 15 Jun 2025 00:33:38 +0200 Subject: [PATCH 14/18] review change, add test to hook --- .../tests/unit/amazon/aws/bundles/test_s3.py | 62 ------------------- .../tests/unit/amazon/aws/hooks/test_s3.py | 4 ++ 2 files changed, 4 insertions(+), 62 deletions(-) diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py index f89aef8bd6d68..e3b73c2b828b1 100644 --- a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -191,62 +191,19 @@ def _upload_fixtures(self, bucket: str, fixtures_dir: str) -> None: def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): caplog.set_level(logging.ERROR) caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") - caplog.set_level( - logging.DEBUG, logger="airflow.task.hooks.airflow.providers.amazon.aws.hooks.s3.S3Hook" - ) bundle = S3DagBundle( name="test", aws_conn_id=AWS_CONN_ID_WITH_REGION, prefix=S3_BUCKET_PREFIX, bucket_name=S3_BUCKET_NAME, ) - bundle.initialize() # dags are downloaded once by initialize and once with refresh called post initialize assert cap_structlog.text.count("Downloading DAGs from s3") == 1 - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*{bundle.s3_dags_dir.as_posix()}.*subproject1/dag_a.py.*", - ) - - s3_client.put_object(Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data") bundle.refresh() assert cap_structlog.text.count("Downloading DAGs from s3") == 2 - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=rf"Local file.*/{bundle.name}/subproject1/dag_a.py.*is up-to-date with S3 object.*{S3_BUCKET_PREFIX}.*subproject1/dag_a.py.*", - ) - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=rf"Downloaded.*{S3_BUCKET_PREFIX}.*dag_03.py.*/{bundle.name}/dag_03.py", - ) - assert bundle.s3_dags_dir.joinpath("dag_03.py").read_text() == "test data" - bundle.s3_dags_dir.joinpath("dag_should_be_deleted.py").write_text("test dag") - bundle.s3_dags_dir.joinpath("dag_should_be_deleted_folder").mkdir(exist_ok=True) - s3_client.put_object( - Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data-changed" - ) bundle.refresh() assert cap_structlog.text.count("Downloading DAGs from s3") == 3 - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=r"S3 object size.*and local file size.*differ.*Downloaded.*dag_03.py.*", - ) - assert bundle.s3_dags_dir.joinpath("dag_03.py").read_text() == "test data-changed" - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=r"Deleted stale empty directory.*dag_should_be_deleted_folder.*", - ) - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=r"Deleted stale local file.*dag_should_be_deleted.py.*", - ) @pytest.mark.db_test def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlog): @@ -262,27 +219,8 @@ def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlo ) assert bundle.prefix == "" bundle.initialize() - - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=rf"Downloaded.*subproject1/dag_a.py.*{bundle.s3_dags_dir.as_posix()}.*subproject1/dag_a.py.*", - ) - s3_client.put_object(Bucket=s3_bucket.name, Key=S3_BUCKET_PREFIX + "/dag_03.py", Body=b"test data") bundle.refresh() assert cap_structlog.text.count("Downloading DAGs from s3") == 2 - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=rf"Local file.*/{bundle.name}.*/dag_a.py.*is up-to-date with S3 object.*dag_a.py.*", - ) - self.assert_log_matches_regex( - caplog=caplog, - level="DEBUG", - regex=rf"Downloaded.*dag_03.py.*/{bundle.name}.*/dag_03.py", - ) - # we are using s3 bucket rood but the dag file is in sub folder, project1/dags/dag_03.py - assert bundle.s3_dags_dir.joinpath("project1/dags/dag_03.py").read_text() == "test data" def assert_log_matches_regex(self, caplog, level, regex): """Helper function to assert if a log message matches a regex.""" diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py index 0e5e47c5149f5..5a97f5ed13d6b 100644 --- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py @@ -1774,6 +1774,8 @@ def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, caplog, cap_str local_file_that_should_be_deleted = Path(sync_local_dir).joinpath("file_that_should_be_deleted.py") local_file_that_should_be_deleted.write_text("test dag") + local_folder_should_be_deleted = Path(sync_local_dir).joinpath("local_folder_should_be_deleted") + local_folder_should_be_deleted.mkdir(exist_ok=True) hook.sync_to_local_dir( bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True ) @@ -1781,6 +1783,8 @@ def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, caplog, cap_str caplog.text.count(f"Deleted stale local file: {local_file_that_should_be_deleted.as_posix()}") == 1 ) + assert caplog.text.count(f"Deleted stale empty directory: {local_folder_should_be_deleted.as_posix()}") == 1 + s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test data-changed") hook.sync_to_local_dir( bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True From 23532bf561842868bc3e67ed9a49ee281bb00aa6 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 15 Jun 2025 00:38:52 +0200 Subject: [PATCH 15/18] review change, add test to hook --- providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py index 5a97f5ed13d6b..c948ef84a6d24 100644 --- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py @@ -1783,7 +1783,10 @@ def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, caplog, cap_str caplog.text.count(f"Deleted stale local file: {local_file_that_should_be_deleted.as_posix()}") == 1 ) - assert caplog.text.count(f"Deleted stale empty directory: {local_folder_should_be_deleted.as_posix()}") == 1 + assert ( + caplog.text.count(f"Deleted stale empty directory: {local_folder_should_be_deleted.as_posix()}") + == 1 + ) s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test data-changed") hook.sync_to_local_dir( From 02505bba30dfa77dde97f2035aa920460e651958 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Thu, 3 Jul 2025 17:22:11 -0700 Subject: [PATCH 16/18] Fixes/Updates to get PR passing --- providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index b7947527d8af0..3752705c7aac2 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -64,7 +64,7 @@ def __init__( prefix=self.prefix, aws_conn_id=self.aws_conn_id, ) - self._s3_hook: S3Hook = None + self._s3_hook: S3Hook | None = None def _initialize(self): with self.lock(): From 9fd52989ca91590e5d658a9f81acc36fc48552bb Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Thu, 3 Jul 2025 17:23:30 -0700 Subject: [PATCH 17/18] Fixes/Updates to get PR passing --- .../tests/unit/amazon/aws/bundles/test_s3.py | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py index e3b73c2b828b1..e801a6a9259b1 100644 --- a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -16,9 +16,8 @@ # under the License. from __future__ import annotations -import logging import os -import re +from unittest.mock import MagicMock, call import boto3 import pytest @@ -188,54 +187,41 @@ def _upload_fixtures(self, bucket: str, fixtures_dir: str) -> None: client.upload_file(Filename=path, Bucket=bucket, Key=key) @pytest.mark.db_test - def test_refresh(self, s3_bucket, s3_client, caplog, cap_structlog): - caplog.set_level(logging.ERROR) - caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") + def test_refresh(self, s3_bucket, s3_client): bundle = S3DagBundle( name="test", aws_conn_id=AWS_CONN_ID_WITH_REGION, prefix=S3_BUCKET_PREFIX, bucket_name=S3_BUCKET_NAME, ) + bundle._log.debug = MagicMock() + # Create a pytest Call object to compare against the call_args_list of the _log.debug mock + download_log_call = call( + "Downloading DAGs from s3://%s/%s to %s", S3_BUCKET_NAME, S3_BUCKET_PREFIX, bundle.s3_dags_dir + ) bundle.initialize() - # dags are downloaded once by initialize and once with refresh called post initialize - assert cap_structlog.text.count("Downloading DAGs from s3") == 1 + assert bundle._log.debug.call_count == 1 + assert bundle._log.debug.call_args_list == [download_log_call] bundle.refresh() - assert cap_structlog.text.count("Downloading DAGs from s3") == 2 + assert bundle._log.debug.call_count == 2 + assert bundle._log.debug.call_args_list == [download_log_call, download_log_call] bundle.refresh() - assert cap_structlog.text.count("Downloading DAGs from s3") == 3 + assert bundle._log.debug.call_count == 3 + assert bundle._log.debug.call_args_list == [download_log_call, download_log_call, download_log_call] @pytest.mark.db_test - def test_refresh_without_prefix(self, s3_bucket, s3_client, caplog, cap_structlog): - caplog.set_level(logging.ERROR) - caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") - caplog.set_level( - logging.DEBUG, logger="airflow.task.hooks.airflow.providers.amazon.aws.hooks.s3.S3Hook" - ) + def test_refresh_without_prefix(self, s3_bucket, s3_client): bundle = S3DagBundle( name="test", aws_conn_id=AWS_CONN_ID_WITH_REGION, bucket_name=S3_BUCKET_NAME, ) + bundle._log.debug = MagicMock() + download_log_call = call( + "Downloading DAGs from s3://%s/%s to %s", S3_BUCKET_NAME, "", bundle.s3_dags_dir + ) assert bundle.prefix == "" bundle.initialize() bundle.refresh() - assert cap_structlog.text.count("Downloading DAGs from s3") == 2 - - def assert_log_matches_regex(self, caplog, level, regex): - """Helper function to assert if a log message matches a regex.""" - matched = False - if hasattr(caplog, "entries"): - for record in caplog.entries: - if record.get("log_level", None) == level.lower() and re.search( - regex, record.get("event", "") - ): - matched = True - break # Stop searching once a match is found - else: - for record in caplog.records: - if record.levelname == level and re.search(regex, record.message): - matched = True - break # Stop searching once a match is found - - assert matched, f"No log message at level {level} matching regex '{regex}' found." + assert bundle._log.debug.call_count == 2 + assert bundle._log.debug.call_args_list == [download_log_call, download_log_call] From 17eb628591b76e4b5884dee98c6e64fb3673d49b Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Thu, 3 Jul 2025 17:24:46 -0700 Subject: [PATCH 18/18] Fixes/Updates to get PR passing --- .../tests/unit/amazon/aws/hooks/test_s3.py | 66 ++++++++++--------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py index c948ef84a6d24..2d7552df75f49 100644 --- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py @@ -19,7 +19,6 @@ import gzip as gz import inspect -import logging import os import re from datetime import datetime as std_datetime, timezone @@ -45,6 +44,16 @@ ) from airflow.utils.timezone import datetime +try: + import importlib.util + + if not importlib.util.find_spec("airflow.sdk.bases.hook"): + raise ImportError + + BASEHOOK_PATCH_PATH = "airflow.sdk.bases.hook.BaseHook" +except ImportError: + BASEHOOK_PATCH_PATH = "airflow.hooks.base.BaseHook" + @pytest.fixture def mocked_s3_res(): @@ -1734,12 +1743,9 @@ def test_delete_bucket_tagging_with_no_tags(self): with pytest.raises(ClientError, match=r".*NoSuchTagSet.*"): hook.get_bucket_tagging(bucket_name="new_bucket") - def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, caplog, cap_structlog, tmp_path): - caplog.set_level(logging.ERROR) - caplog.set_level(logging.DEBUG, logger="airflow.providers.amazon.aws.bundles.s3.S3DagBundle") - caplog.set_level( - logging.DEBUG, logger="airflow.task.hooks.airflow.providers.amazon.aws.hooks.s3.S3Hook" - ) + def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, tmp_path): + def get_logs_string(call_args_list): + return "".join([args[0][0] % args[0][1:] for args in call_args_list]) s3_client.put_object(Bucket=s3_bucket, Key="dag_01.py", Body=b"test data") s3_client.put_object(Bucket=s3_bucket, Key="dag_02.py", Body=b"test data") @@ -1748,27 +1754,28 @@ def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, caplog, cap_str sync_local_dir = tmp_path / "s3_sync_dir" hook = S3Hook() + hook.log.debug = MagicMock() hook.sync_to_local_dir( bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True ) - assert caplog.text.count(f"Downloading data from s3://{s3_bucket}") == 1 - # - assert caplog.text.count(f"does not exist. Downloaded dag_01.py to {sync_local_dir}/dag_01.py") == 1 - assert caplog.text.count("does not exist. Downloaded dag_01.py to") == 1 - assert caplog.text.count(f"does not exist. Downloaded subproject1/dag_a.py to {sync_local_dir}") == 1 + logs_string = get_logs_string(hook.log.debug.call_args_list) + assert f"Downloading data from s3://{s3_bucket}" in logs_string + assert f"does not exist. Downloaded dag_01.py to {sync_local_dir}/dag_01.py" in logs_string + assert "does not exist. Downloaded dag_01.py to" in logs_string + assert f"does not exist. Downloaded subproject1/dag_a.py to {sync_local_dir}" in logs_string + # add new file to bucket and sync + hook.log.debug = MagicMock() s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test data") hook.sync_to_local_dir( bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True ) - print(caplog) + logs_string = get_logs_string(hook.log.debug.call_args_list) assert ( - caplog.text.count( - "subproject1/dag_b.py is up-to-date with S3 object subproject1/dag_b.py. Skipping download" - ) - == 1 + "subproject1/dag_b.py is up-to-date with S3 object subproject1/dag_b.py. Skipping download" + in logs_string ) - assert caplog.text.count(f"does not exist. Downloaded dag_03.py to {sync_local_dir}/dag_03.py") == 1 + assert f"does not exist. Downloaded dag_03.py to {sync_local_dir}/dag_03.py" in logs_string # read that file is donloaded and has same content assert Path(sync_local_dir).joinpath("dag_03.py").read_text() == "test data" @@ -1776,24 +1783,23 @@ def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, caplog, cap_str local_file_that_should_be_deleted.write_text("test dag") local_folder_should_be_deleted = Path(sync_local_dir).joinpath("local_folder_should_be_deleted") local_folder_should_be_deleted.mkdir(exist_ok=True) + hook.log.debug = MagicMock() hook.sync_to_local_dir( bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True ) - assert ( - caplog.text.count(f"Deleted stale local file: {local_file_that_should_be_deleted.as_posix()}") - == 1 - ) - assert ( - caplog.text.count(f"Deleted stale empty directory: {local_folder_should_be_deleted.as_posix()}") - == 1 - ) + logs_string = get_logs_string(hook.log.debug.call_args_list) + assert f"Deleted stale local file: {local_file_that_should_be_deleted.as_posix()}" in logs_string + + assert f"Deleted stale empty directory: {local_folder_should_be_deleted.as_posix()}" in logs_string s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test data-changed") + hook.log.debug = MagicMock() hook.sync_to_local_dir( bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True ) - assert caplog.text.count("S3 object size") == 1 - assert caplog.text.count("differ. Downloaded dag_03.py to") == 1 + logs_string = get_logs_string(hook.log.debug.call_args_list) + assert "S3 object size" in logs_string + assert "differ. Downloaded dag_03.py to" in logs_string @pytest.mark.parametrize( @@ -1817,7 +1823,7 @@ def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, caplog, cap_str ("rel_key", "with_conn", "with_bucket", "provide", ["kwargs_bucket", "key.txt"]), ], ) -@patch("airflow.hooks.base.BaseHook.get_connection") +@patch(f"{BASEHOOK_PATCH_PATH}.get_connection") def test_unify_and_provide_bucket_name_combination( mock_base, key_kind, has_conn, has_bucket, precedence, expected, caplog ): @@ -1880,7 +1886,7 @@ def do_something(self, bucket_name=None, key=None): ("rel_key", "with_conn", "with_bucket", ["kwargs_bucket", "key.txt"]), ], ) -@patch("airflow.hooks.base.BaseHook.get_connection") +@patch(f"{BASEHOOK_PATCH_PATH}.get_connection") def test_s3_head_object_decorated_behavior(mock_conn, has_conn, has_bucket, key_kind, expected): if has_conn == "with_conn": c = Connection(extra={"service_config": {"s3": {"bucket_name": "conn_bucket"}}})