From c89fbfbe8f012176623fd375c3d99dedc105d63b Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 30 Jan 2024 19:57:31 +0100 Subject: [PATCH 01/25] feat: Pass proxies config when using ClientSecretCredential in AzureDataLakeStorageV2Hook and added DataToADLSOperator which allows uploading data (e.g. from an XCOM) to a remote file without the need to have a local file created first --- .../microsoft/azure/hooks/data_lake.py | 31 ++++++++-- .../azure/transfers/local_to_adls.py | 56 ++++++++++++++++++- .../transfer/local_to_adls.rst | 17 ++++++ .../microsoft/azure/hooks/test_data_lake.py | 34 +++++++++++ .../azure/transfers/test_local_to_adls.py | 21 ++++++- .../microsoft/azure/example_local_to_adls.py | 15 ++++- 6 files changed, 163 insertions(+), 11 deletions(-) diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py b/airflow/providers/microsoft/azure/hooks/data_lake.py index 3777e0e5823a4..ea1a634709755 100644 --- a/airflow/providers/microsoft/azure/hooks/data_lake.py +++ b/airflow/providers/microsoft/azure/hooks/data_lake.py @@ -332,7 +332,7 @@ def service_client(self) -> DataLakeServiceClient: """Return the DataLakeServiceClient object (cached).""" return self.get_conn() - def get_conn(self) -> DataLakeServiceClient: # type: ignore[override] + def get_conn(self) -> Union[DataLakeFileClient, DataLakeServiceClient]: # type: ignore[override] """Return the DataLakeServiceClient object.""" conn = self.get_connection(self.conn_id) extra = conn.extra_dejson or {} @@ -340,7 +340,9 @@ def get_conn(self) -> DataLakeServiceClient: # type: ignore[override] connection_string = self._get_field(extra, "connection_string") if connection_string: # connection_string auth takes priority - return DataLakeServiceClient.from_connection_string(connection_string, **extra) + return DataLakeServiceClient.from_connection_string( + connection_string, **extra + ) credential: Credentials tenant = self._get_field(extra, "tenant_id") @@ -348,19 +350,36 @@ def get_conn(self) -> DataLakeServiceClient: # type: ignore[override] # use Active Directory auth app_id = conn.login app_secret = conn.password - credential = ClientSecretCredential(tenant, app_id, app_secret) + proxies = extra.get("proxies", {}) + + credential = ClientSecretCredential( + tenant_id=tenant, + client_id=app_id, + client_secret=app_secret, + proxies=proxies + ) elif conn.password: credential = conn.password else: - managed_identity_client_id = self._get_field(extra, "managed_identity_client_id") - workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id") + managed_identity_client_id = self._get_field( + extra, "managed_identity_client_id" + ) + workload_identity_tenant_id = self._get_field( + extra, "workload_identity_tenant_id" + ) credential = AzureIdentityCredentialAdapter( managed_identity_client_id=managed_identity_client_id, workload_identity_tenant_id=workload_identity_tenant_id, ) + account_url = extra.pop( + "account_url", f"https://{conn.host}.dfs.core.windows.net" + ) + + self.log.info("account_url: %s", account_url) + return DataLakeServiceClient( - account_url=f"https://{conn.host}.dfs.core.windows.net", + account_url=account_url, credential=credential, # type: ignore[arg-type] **extra, ) diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 032742fc6497d..4835533d01497 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -16,11 +16,11 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any, Sequence +from typing import TYPE_CHECKING, Any, Sequence, Union, Iterable, AnyStr, IO, Optional from airflow.exceptions import AirflowException from airflow.models import BaseOperator -from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook +from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook, AzureDataLakeStorageV2Hook if TYPE_CHECKING: from airflow.utils.context import Context @@ -96,3 +96,55 @@ def execute(self, context: Context) -> None: blocksize=self.blocksize, **self.extra_upload_options, ) + + +class DataToADLSOperator(BaseOperator): + """ + Upload data to Azure Data Lake. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataToADLSOperator` + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param file_name: Name of the file which needs to be created in the file system. + :param data: Number of threads to use. If None, uses the number of cores. + :param length: Number of threads to use. If None, uses the number of cores. + :param overwrite: Whether to forcibly overwrite existing files/directories. + If False and remote path is a directory, will quit regardless if any files + would be overwritten or not. If True, only matching filenames are actually + overwritten + :param azure_data_lake_conn_id: Reference to the Azure Data Lake connection + """ + + template_fields: Sequence[str] = ("file_system_name", "file_name", "data") + ui_color = "#e4f0e8" + + def __init__( + self, + *, + file_system_name: str, + file_name: str, + data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]], + length: Optional[int] = None, + overwrite: bool = False, + azure_data_lake_conn_id: str = "azure_data_lake_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.file_system_name = file_system_name + self.file_name = file_name + self.overwrite = overwrite + self.data = data + self.length = length + self.azure_data_lake_conn_id = azure_data_lake_conn_id + + def execute(self, context: Context) -> None: + self.log.info("Uploading %s to %s", self.data, self.file_name) + hook = AzureDataLakeStorageV2Hook( + adls_conn_id=self.azure_data_lake_conn_id + ) + return hook.create_file( + file_system_name=self.file_system_name, file_name=self.file_name + ).upload_data(data=self.data, length=self.length, overwrite=self.overwrite) diff --git a/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst b/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst index 76a06133d21db..892d5aaf8654c 100644 --- a/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst @@ -44,6 +44,23 @@ Below is an example of using this operator to upload a file to ADL. :start-after: [START howto_operator_local_to_adls] :end-before: [END howto_operator_local_to_adls] +.. _howto/operator:LocalFilesystemToADLSOperator: + +DataToADLSOperator +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:class:`~airflow.providers.microsoft.azure.transfers.local_to_adls.DataToADLSOperator` allows you to +upload data as a file to ADL. + + +Below is an example of using this operator to upload data as a file to ADL. + +.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_local_to_adls.py + :language: python + :dedent: 0 + :start-after: [START howto_operator_data_to_adls] + :end-before: [END howto_operator_data_to_adls] + Reference --------- diff --git a/tests/providers/microsoft/azure/hooks/test_data_lake.py b/tests/providers/microsoft/azure/hooks/test_data_lake.py index 84a0a9420b0d9..e8cb977fc8c54 100644 --- a/tests/providers/microsoft/azure/hooks/test_data_lake.py +++ b/tests/providers/microsoft/azure/hooks/test_data_lake.py @@ -21,6 +21,8 @@ from unittest.mock import PropertyMock import pytest +from azure.core.pipeline.policies._universal import ProxyPolicy +from azure.storage.filedatalake import DataLakeServiceClient from azure.storage.filedatalake._models import FileSystemProperties from airflow.models import Connection @@ -297,3 +299,35 @@ def test_connection_failure(self, mock_conn): assert status is False assert msg == "Authentication failed." + + @mock.patch(f"{MODULE}.AzureDataLakeStorageV2Hook.get_connection") + def test_proxies_passed_to_credentials(self, mock_conn): + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + mock_conn.return_value = Connection( + conn_id=self.conn_id, + login="client_id", + password="secret", + extra={ + "tenant_id": "tenant-id", + "proxies": {"https": "https://proxy:80"}, + "account_url": "https://onelake.dfs.fabric.microsoft.com" + }, + ) + conn: DataLakeServiceClient = hook.get_conn() + + assert conn is not None + assert conn.primary_endpoint == "https://onelake.dfs.fabric.microsoft.com/" + assert conn.primary_hostname == "onelake.dfs.fabric.microsoft.com" + assert conn.scheme == "https" + assert conn.url == "https://onelake.dfs.fabric.microsoft.com/" + assert conn.credential._client_id == "client_id" + assert conn.credential._client_credential == "secret" + assert self.find_policy(conn, ProxyPolicy) is not None + assert self.find_policy(conn, ProxyPolicy).proxies["https"] == "https://proxy:80" + + def find_policy(self, conn, policy_type): + policies = conn.credential._client._pipeline._impl_policies + return next( + map(lambda policy: policy._policy, + filter(lambda policy: isinstance(policy._policy, policy_type), policies)) + ) diff --git a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py index 1ef388eef7597..fdde338fdf74a 100644 --- a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py @@ -22,12 +22,15 @@ import pytest from airflow.exceptions import AirflowException -from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator +from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator, \ + DataToADLSOperator TASK_ID = "test-adls-upload-operator" +FILE_SYSTEM_NAME = "Fabric" LOCAL_PATH = "test/*" BAD_LOCAL_PATH = "test/**" REMOTE_PATH = "TEST-DIR" +DATA = {"name": "David", "surname": "Blain", "gender": "M"} class TestADLSUploadOperator: @@ -73,3 +76,19 @@ def test_extra_options_is_passed(self, mock_hook): blocksize=4194304, run=False, # extra upload options ) + + @mock.patch("airflow.providers.microsoft.azure.transfers.local_to_adls.AzureDataLakeStorageV2Hook") + def test_execute_success_when_local_data(self, mock_hook): + operator = DataToADLSOperator( + task_id=TASK_ID, + file_system_name=FILE_SYSTEM_NAME, + file_name=REMOTE_PATH, + data=DATA, + overwrite=True, + ) + operator.execute(None) + data_lake_file_client_mock = mock_hook.return_value.create_file + data_lake_file_client_mock.assert_called_once_with( + file_system_name=FILE_SYSTEM_NAME, file_name=REMOTE_PATH + ) + data_lake_file_client_mock.upload_data(data=DATA, length=None, overwrite=True) diff --git a/tests/system/providers/microsoft/azure/example_local_to_adls.py b/tests/system/providers/microsoft/azure/example_local_to_adls.py index f5a75e7ce414b..18460bf3958a4 100644 --- a/tests/system/providers/microsoft/azure/example_local_to_adls.py +++ b/tests/system/providers/microsoft/azure/example_local_to_adls.py @@ -21,7 +21,8 @@ from airflow import models from airflow.providers.microsoft.azure.operators.adls import ADLSDeleteOperator -from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator +from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator, \ + DataToADLSOperator LOCAL_FILE_PATH = os.environ.get("LOCAL_FILE_PATH", "localfile.txt") REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", "remote.txt") @@ -43,9 +44,19 @@ ) # [END howto_operator_local_to_adls] + # [START howto_operator_data_to_adls] + upload_data = DataToADLSOperator( + task_id="upload_data", + file_system_name="Fabric", + file_name=REMOTE_FILE_PATH, + data=upload_file.output, + overwrite=True, + ) + # [END howto_operator_data_to_adls] + delete_file = ADLSDeleteOperator(task_id="remove_task", path=REMOTE_FILE_PATH, recursive=True) - upload_file >> delete_file + upload_file >> upload_data >> delete_file from tests.system.utils.watcher import watcher From 754a5f1ad208cbd12205e586ee6ecf95436526c2 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 08:11:54 +0100 Subject: [PATCH 02/25] refactor: Try fixing typing issues --- airflow/providers/microsoft/azure/hooks/data_lake.py | 2 +- airflow/providers/microsoft/azure/transfers/local_to_adls.py | 2 +- tests/providers/microsoft/azure/hooks/test_data_lake.py | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py b/airflow/providers/microsoft/azure/hooks/data_lake.py index ea1a634709755..7aa3c04686196 100644 --- a/airflow/providers/microsoft/azure/hooks/data_lake.py +++ b/airflow/providers/microsoft/azure/hooks/data_lake.py @@ -332,7 +332,7 @@ def service_client(self) -> DataLakeServiceClient: """Return the DataLakeServiceClient object (cached).""" return self.get_conn() - def get_conn(self) -> Union[DataLakeFileClient, DataLakeServiceClient]: # type: ignore[override] + def get_conn(self) -> DataLakeServiceClient: # type: ignore[override] """Return the DataLakeServiceClient object.""" conn = self.get_connection(self.conn_id) extra = conn.extra_dejson or {} diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 4835533d01497..9797338615de3 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -125,7 +125,7 @@ def __init__( *, file_system_name: str, file_name: str, - data: Union[bytes, str, Iterable[AnyStr], IO[AnyStr]], + data: bytes | str | Iterable[AnyStr] | IO[AnyStr], length: Optional[int] = None, overwrite: bool = False, azure_data_lake_conn_id: str = "azure_data_lake_default", diff --git a/tests/providers/microsoft/azure/hooks/test_data_lake.py b/tests/providers/microsoft/azure/hooks/test_data_lake.py index e8cb977fc8c54..f9beac25e00a3 100644 --- a/tests/providers/microsoft/azure/hooks/test_data_lake.py +++ b/tests/providers/microsoft/azure/hooks/test_data_lake.py @@ -17,17 +17,20 @@ # under the License. from __future__ import annotations +from typing import TYPE_CHECKING from unittest import mock from unittest.mock import PropertyMock import pytest from azure.core.pipeline.policies._universal import ProxyPolicy -from azure.storage.filedatalake import DataLakeServiceClient from azure.storage.filedatalake._models import FileSystemProperties from airflow.models import Connection from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook +if TYPE_CHECKING: + from azure.storage.filedatalake import DataLakeServiceClient + MODULE = "airflow.providers.microsoft.azure.hooks.data_lake" From 0469dd9f7912ed1faf268b73a28e78c1cc2ebc52 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 08:20:23 +0100 Subject: [PATCH 03/25] docs: Updated provider docs --- .../transfer/local_to_adls.rst | 4 +- .../microsoft/azure/example_data_to_adls.py | 59 +++++++++++++++++++ .../microsoft/azure/example_local_to_adls.py | 15 +---- 3 files changed, 63 insertions(+), 15 deletions(-) create mode 100644 tests/system/providers/microsoft/azure/example_data_to_adls.py diff --git a/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst b/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst index 892d5aaf8654c..62b1e35ca39f7 100644 --- a/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst @@ -44,7 +44,7 @@ Below is an example of using this operator to upload a file to ADL. :start-after: [START howto_operator_local_to_adls] :end-before: [END howto_operator_local_to_adls] -.. _howto/operator:LocalFilesystemToADLSOperator: +.. _howto/operator:DataToADLSOperator: DataToADLSOperator ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -55,7 +55,7 @@ upload data as a file to ADL. Below is an example of using this operator to upload data as a file to ADL. -.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_local_to_adls.py +.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_data_to_adls.py :language: python :dedent: 0 :start-after: [START howto_operator_data_to_adls] diff --git a/tests/system/providers/microsoft/azure/example_data_to_adls.py b/tests/system/providers/microsoft/azure/example_data_to_adls.py new file mode 100644 index 0000000000000..e497434a880b3 --- /dev/null +++ b/tests/system/providers/microsoft/azure/example_data_to_adls.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from datetime import datetime + +from airflow import models +from airflow.providers.microsoft.azure.operators.adls import ADLSDeleteOperator +from airflow.providers.microsoft.azure.transfers.local_to_adls import DataToADLSOperator + +REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", "remote.txt") +DAG_ID = "example_local_to_adls" + +with models.DAG( + DAG_ID, + start_date=datetime(2021, 1, 1), + catchup=False, + schedule=None, + tags=["example"], +) as dag: + # [START howto_operator_data_to_adls] + upload_data = DataToADLSOperator( + task_id="upload_data", + file_system_name="Fabric", + file_name=REMOTE_FILE_PATH, + data="Hello world", + overwrite=True, + ) + # [END howto_operator_data_to_adls] + + delete_file = ADLSDeleteOperator(task_id="remove_task", path=REMOTE_FILE_PATH, recursive=True) + + upload_data >> delete_file + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/microsoft/azure/example_local_to_adls.py b/tests/system/providers/microsoft/azure/example_local_to_adls.py index 18460bf3958a4..f5a75e7ce414b 100644 --- a/tests/system/providers/microsoft/azure/example_local_to_adls.py +++ b/tests/system/providers/microsoft/azure/example_local_to_adls.py @@ -21,8 +21,7 @@ from airflow import models from airflow.providers.microsoft.azure.operators.adls import ADLSDeleteOperator -from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator, \ - DataToADLSOperator +from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator LOCAL_FILE_PATH = os.environ.get("LOCAL_FILE_PATH", "localfile.txt") REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", "remote.txt") @@ -44,19 +43,9 @@ ) # [END howto_operator_local_to_adls] - # [START howto_operator_data_to_adls] - upload_data = DataToADLSOperator( - task_id="upload_data", - file_system_name="Fabric", - file_name=REMOTE_FILE_PATH, - data=upload_file.output, - overwrite=True, - ) - # [END howto_operator_data_to_adls] - delete_file = ADLSDeleteOperator(task_id="remove_task", path=REMOTE_FILE_PATH, recursive=True) - upload_file >> upload_data >> delete_file + upload_file >> delete_file from tests.system.utils.watcher import watcher From 78e7fa28f91eb43816cac16b414506c4973d6dba Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 09:00:11 +0100 Subject: [PATCH 04/25] refactor: Reformatted files --- .../microsoft/azure/hooks/data_lake.py | 21 +++++-------------- .../azure/transfers/local_to_adls.py | 6 ++---- .../microsoft/azure/hooks/test_data_lake.py | 8 ++++--- .../azure/transfers/test_local_to_adls.py | 6 ++++-- 4 files changed, 16 insertions(+), 25 deletions(-) diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py b/airflow/providers/microsoft/azure/hooks/data_lake.py index 7aa3c04686196..054eda087e434 100644 --- a/airflow/providers/microsoft/azure/hooks/data_lake.py +++ b/airflow/providers/microsoft/azure/hooks/data_lake.py @@ -340,9 +340,7 @@ def get_conn(self) -> DataLakeServiceClient: # type: ignore[override] connection_string = self._get_field(extra, "connection_string") if connection_string: # connection_string auth takes priority - return DataLakeServiceClient.from_connection_string( - connection_string, **extra - ) + return DataLakeServiceClient.from_connection_string(connection_string, **extra) credential: Credentials tenant = self._get_field(extra, "tenant_id") @@ -353,28 +351,19 @@ def get_conn(self) -> DataLakeServiceClient: # type: ignore[override] proxies = extra.get("proxies", {}) credential = ClientSecretCredential( - tenant_id=tenant, - client_id=app_id, - client_secret=app_secret, - proxies=proxies + tenant_id=tenant, client_id=app_id, client_secret=app_secret, proxies=proxies ) elif conn.password: credential = conn.password else: - managed_identity_client_id = self._get_field( - extra, "managed_identity_client_id" - ) - workload_identity_tenant_id = self._get_field( - extra, "workload_identity_tenant_id" - ) + managed_identity_client_id = self._get_field(extra, "managed_identity_client_id") + workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id") credential = AzureIdentityCredentialAdapter( managed_identity_client_id=managed_identity_client_id, workload_identity_tenant_id=workload_identity_tenant_id, ) - account_url = extra.pop( - "account_url", f"https://{conn.host}.dfs.core.windows.net" - ) + account_url = extra.pop("account_url", f"https://{conn.host}.dfs.core.windows.net") self.log.info("account_url: %s", account_url) diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 9797338615de3..88e92723cfe14 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any, Sequence, Union, Iterable, AnyStr, IO, Optional +from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Optional, Sequence from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -142,9 +142,7 @@ def __init__( def execute(self, context: Context) -> None: self.log.info("Uploading %s to %s", self.data, self.file_name) - hook = AzureDataLakeStorageV2Hook( - adls_conn_id=self.azure_data_lake_conn_id - ) + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id) return hook.create_file( file_system_name=self.file_system_name, file_name=self.file_name ).upload_data(data=self.data, length=self.length, overwrite=self.overwrite) diff --git a/tests/providers/microsoft/azure/hooks/test_data_lake.py b/tests/providers/microsoft/azure/hooks/test_data_lake.py index f9beac25e00a3..ba62a8972e6c6 100644 --- a/tests/providers/microsoft/azure/hooks/test_data_lake.py +++ b/tests/providers/microsoft/azure/hooks/test_data_lake.py @@ -313,7 +313,7 @@ def test_proxies_passed_to_credentials(self, mock_conn): extra={ "tenant_id": "tenant-id", "proxies": {"https": "https://proxy:80"}, - "account_url": "https://onelake.dfs.fabric.microsoft.com" + "account_url": "https://onelake.dfs.fabric.microsoft.com", }, ) conn: DataLakeServiceClient = hook.get_conn() @@ -331,6 +331,8 @@ def test_proxies_passed_to_credentials(self, mock_conn): def find_policy(self, conn, policy_type): policies = conn.credential._client._pipeline._impl_policies return next( - map(lambda policy: policy._policy, - filter(lambda policy: isinstance(policy._policy, policy_type), policies)) + map( + lambda policy: policy._policy, + filter(lambda policy: isinstance(policy._policy, policy_type), policies) + ) ) diff --git a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py index fdde338fdf74a..31ec1ff61da8b 100644 --- a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py @@ -22,8 +22,10 @@ import pytest from airflow.exceptions import AirflowException -from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator, \ - DataToADLSOperator +from airflow.providers.microsoft.azure.transfers.local_to_adls import ( + LocalFilesystemToADLSOperator, + DataToADLSOperator, +) TASK_ID = "test-adls-upload-operator" FILE_SYSTEM_NAME = "Fabric" From e6f142cd5767bbe5f8cafe15344846fb68aa234c Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 09:04:36 +0100 Subject: [PATCH 05/25] refactor: Make sure upload_data method is verified and asserted --- .../microsoft/azure/transfers/test_local_to_adls.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py index 31ec1ff61da8b..7f443b6f6446c 100644 --- a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import json from unittest import mock import pytest @@ -32,7 +33,7 @@ LOCAL_PATH = "test/*" BAD_LOCAL_PATH = "test/**" REMOTE_PATH = "TEST-DIR" -DATA = {"name": "David", "surname": "Blain", "gender": "M"} +DATA = json.dumps({"name": "David", "surname": "Blain", "gender": "M"}).encode("utf-8") class TestADLSUploadOperator: @@ -93,4 +94,7 @@ def test_execute_success_when_local_data(self, mock_hook): data_lake_file_client_mock.assert_called_once_with( file_system_name=FILE_SYSTEM_NAME, file_name=REMOTE_PATH ) - data_lake_file_client_mock.upload_data(data=DATA, length=None, overwrite=True) + upload_data_mock = data_lake_file_client_mock.return_value.upload_data + upload_data_mock.assert_called_once_with( + data=DATA, length=None, overwrite=True + ) From f50dfd13c8df028e5edb075078af39a5f687c631 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 09:37:27 +0100 Subject: [PATCH 06/25] refactor: Reformatted some files --- .../providers/microsoft/azure/transfers/local_to_adls.py | 6 +++--- tests/providers/microsoft/azure/hooks/test_data_lake.py | 2 +- .../microsoft/azure/transfers/test_local_to_adls.py | 6 ++---- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 88e92723cfe14..bc28e101edf46 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -143,6 +143,6 @@ def __init__( def execute(self, context: Context) -> None: self.log.info("Uploading %s to %s", self.data, self.file_name) hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id) - return hook.create_file( - file_system_name=self.file_system_name, file_name=self.file_name - ).upload_data(data=self.data, length=self.length, overwrite=self.overwrite) + return hook.create_file(file_system_name=self.file_system_name, file_name=self.file_name).upload_data( + data=self.data, length=self.length, overwrite=self.overwrite + ) diff --git a/tests/providers/microsoft/azure/hooks/test_data_lake.py b/tests/providers/microsoft/azure/hooks/test_data_lake.py index ba62a8972e6c6..f1e10c510076c 100644 --- a/tests/providers/microsoft/azure/hooks/test_data_lake.py +++ b/tests/providers/microsoft/azure/hooks/test_data_lake.py @@ -333,6 +333,6 @@ def find_policy(self, conn, policy_type): return next( map( lambda policy: policy._policy, - filter(lambda policy: isinstance(policy._policy, policy_type), policies) + filter(lambda policy: isinstance(policy._policy, policy_type), policies), ) ) diff --git a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py index 7f443b6f6446c..950db94da01b7 100644 --- a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py @@ -24,8 +24,8 @@ from airflow.exceptions import AirflowException from airflow.providers.microsoft.azure.transfers.local_to_adls import ( - LocalFilesystemToADLSOperator, DataToADLSOperator, + LocalFilesystemToADLSOperator, ) TASK_ID = "test-adls-upload-operator" @@ -95,6 +95,4 @@ def test_execute_success_when_local_data(self, mock_hook): file_system_name=FILE_SYSTEM_NAME, file_name=REMOTE_PATH ) upload_data_mock = data_lake_file_client_mock.return_value.upload_data - upload_data_mock.assert_called_once_with( - data=DATA, length=None, overwrite=True - ) + upload_data_mock.assert_called_once_with(data=DATA, length=None, overwrite=True) From 62753a49292451c5027223d10eb298b19aeac3a0 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 09:44:30 +0100 Subject: [PATCH 07/25] refactor: Try fixing mypy issues --- .../providers/microsoft/azure/transfers/local_to_adls.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index bc28e101edf46..b3555ab0f4778 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Optional, Sequence +from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Optional, Sequence, Dict from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -136,11 +136,11 @@ def __init__( self.file_system_name = file_system_name self.file_name = file_name self.overwrite = overwrite - self.data = data + self.data = data # type: ignore[arg-type] self.length = length self.azure_data_lake_conn_id = azure_data_lake_conn_id - def execute(self, context: Context) -> None: + def execute(self, context: Context) -> Dict[str, Any]: self.log.info("Uploading %s to %s", self.data, self.file_name) hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id) return hook.create_file(file_system_name=self.file_system_name, file_name=self.file_name).upload_data( From 55f3ba312160e21f9238c33b7d0ecee53321d27d Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 10:25:11 +0100 Subject: [PATCH 08/25] refactor: Fixed additional static checks --- .../providers/microsoft/azure/transfers/local_to_adls.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index b3555ab0f4778..6070872aab007 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Optional, Sequence, Dict +from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Optional, Sequence from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -136,11 +136,11 @@ def __init__( self.file_system_name = file_system_name self.file_name = file_name self.overwrite = overwrite - self.data = data # type: ignore[arg-type] + self.data = data # type: ignore[var-annotated] self.length = length self.azure_data_lake_conn_id = azure_data_lake_conn_id - def execute(self, context: Context) -> Dict[str, Any]: + def execute(self, context: Context) -> dict[str, Any]: self.log.info("Uploading %s to %s", self.data, self.file_name) hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id) return hook.create_file(file_system_name=self.file_system_name, file_name=self.file_name).upload_data( From 91276319cbd209331b6c9e79232838dfd808f144 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 11:03:14 +0100 Subject: [PATCH 09/25] refactor: Fixed length parameter type definition in operator --- airflow/providers/microsoft/azure/transfers/local_to_adls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 6070872aab007..09854fc10c954 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -126,7 +126,7 @@ def __init__( file_system_name: str, file_name: str, data: bytes | str | Iterable[AnyStr] | IO[AnyStr], - length: Optional[int] = None, + length: int | None = None, overwrite: bool = False, azure_data_lake_conn_id: str = "azure_data_lake_default", **kwargs, From 7374c473105a4378cca833883dfacb95d386ac2c Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 31 Jan 2024 13:07:57 +0100 Subject: [PATCH 10/25] refactor: Removed unused import --- airflow/providers/microsoft/azure/transfers/local_to_adls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 09854fc10c954..41b8d253f9723 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Optional, Sequence +from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Sequence from airflow.exceptions import AirflowException from airflow.models import BaseOperator From 51b319dfeb3f0c4e056b9be9c82f0c316ed296d2 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 8 Feb 2024 09:26:47 +0100 Subject: [PATCH 11/25] docs: Updated docstring for DataToADLSOperator --- airflow/providers/microsoft/azure/transfers/local_to_adls.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 41b8d253f9723..1c8ba8d30a7ff 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -108,8 +108,8 @@ class DataToADLSOperator(BaseOperator): :param file_system_name: Name of the file system or instance of FileSystemProperties. :param file_name: Name of the file which needs to be created in the file system. - :param data: Number of threads to use. If None, uses the number of cores. - :param length: Number of threads to use. If None, uses the number of cores. + :param data: The data that will be uploaded + :param length: Size of the data in bytes (optional). :param overwrite: Whether to forcibly overwrite existing files/directories. If False and remote path is a directory, will quit regardless if any files would be overwritten or not. If True, only matching filenames are actually From 7517de97a6ef989a2aa33fd498cf13420fab150b Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 4 Mar 2024 10:19:33 +0100 Subject: [PATCH 12/25] refactor: Renamed DataToADLSOperator to ADLSCreateObjectOperator and moved it under adls operator module --- airflow/cli/commands/connection_command.py | 2 +- .../microsoft/azure/operators/adls.py | 60 +++++++++++++++++-- .../azure/transfers/local_to_adls.py | 54 +---------------- .../operators/adls.rst | 17 ++++++ .../transfer/local_to_adls.rst | 17 ------ .../azure/operators/test_adls_create.py | 47 +++++++++++++++ .../azure/transfers/test_local_to_adls.py | 40 ------------- ...data_to_adls.py => example_adls_create.py} | 13 ++-- 8 files changed, 129 insertions(+), 121 deletions(-) create mode 100644 tests/providers/microsoft/azure/operators/test_adls_create.py rename tests/system/providers/microsoft/azure/{example_data_to_adls.py => example_adls_create.py} (86%) diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py index e279faf48cef5..a165439375529 100644 --- a/airflow/cli/commands/connection_command.py +++ b/airflow/cli/commands/connection_command.py @@ -314,7 +314,7 @@ def connections_delete(args): def connections_import(args): """Import connections from a file.""" if os.path.exists(args.file): - _import_helper(args.file, args.overwrite) + _import_helper(args.file, args.replace) else: raise SystemExit("Missing connections file.") diff --git a/airflow/providers/microsoft/azure/operators/adls.py b/airflow/providers/microsoft/azure/operators/adls.py index 345336b2c4cb2..26e147d786ec4 100644 --- a/airflow/providers/microsoft/azure/operators/adls.py +++ b/airflow/providers/microsoft/azure/operators/adls.py @@ -16,14 +16,66 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any, Sequence +from typing import TYPE_CHECKING, Any, Sequence, Iterable, AnyStr, IO from airflow.models import BaseOperator -from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook +from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook, AzureDataLakeStorageV2Hook if TYPE_CHECKING: from airflow.utils.context import Context +DEFAULT_AZURE_DATA_LAKE_CONN_ID = "azure_data_lake_default" + + +class ADLSCreateObjectOperator(BaseOperator): + """ + Creates a new object from`data`to Azure Data Lake on specified file. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ADLSCreateObjectOperator` + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param file_name: Name of the file which needs to be created in the file system. + :param data: The data that will be uploaded. + :param length: Size of the data in bytes (optional). + :param replace: Whether to forcibly overwrite existing files/directories. + If False and remote path is a directory, will quit regardless if any files + would be overwritten or not. If True, only matching filenames are actually + overwritten. + :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection . + """ + + template_fields: Sequence[str] = ("file_system_name", "file_name", "data") + ui_color = "#e4f0e8" + + def __init__( + self, + *, + file_system_name: str, + file_name: str, + data: bytes | str | Iterable[AnyStr] | IO[AnyStr], + length: int | None = None, + replace: bool = False, + azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.file_system_name = file_system_name + self.file_name = file_name + self.replace = replace + self.data = data # type: ignore[var-annotated] + self.length = length + self.azure_data_lake_conn_id = azure_data_lake_conn_id + + def execute(self, context: Context) -> dict[str, Any]: + self.log.debug("Uploading %s to %s", self.data, self.file_name) + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id) + return hook.create_file(file_system_name=self.file_system_name, file_name=self.file_name).upload_data( + data=self.data, length=self.length, overwrite=self.replace + ) + class ADLSDeleteOperator(BaseOperator): """ @@ -48,7 +100,7 @@ def __init__( path: str, recursive: bool = False, ignore_not_found: bool = True, - azure_data_lake_conn_id: str = "azure_data_lake_default", + azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID, **kwargs, ) -> None: super().__init__(**kwargs) @@ -88,7 +140,7 @@ class ADLSListOperator(BaseOperator): ui_color = "#901dd2" def __init__( - self, *, path: str, azure_data_lake_conn_id: str = "azure_data_lake_default", **kwargs + self, *, path: str, azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID, **kwargs ) -> None: super().__init__(**kwargs) self.path = path diff --git a/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 1c8ba8d30a7ff..032742fc6497d 100644 --- a/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -16,11 +16,11 @@ # under the License. from __future__ import annotations -from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Sequence +from typing import TYPE_CHECKING, Any, Sequence from airflow.exceptions import AirflowException from airflow.models import BaseOperator -from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook, AzureDataLakeStorageV2Hook +from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook if TYPE_CHECKING: from airflow.utils.context import Context @@ -96,53 +96,3 @@ def execute(self, context: Context) -> None: blocksize=self.blocksize, **self.extra_upload_options, ) - - -class DataToADLSOperator(BaseOperator): - """ - Upload data to Azure Data Lake. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:DataToADLSOperator` - - :param file_system_name: Name of the file system or instance of FileSystemProperties. - :param file_name: Name of the file which needs to be created in the file system. - :param data: The data that will be uploaded - :param length: Size of the data in bytes (optional). - :param overwrite: Whether to forcibly overwrite existing files/directories. - If False and remote path is a directory, will quit regardless if any files - would be overwritten or not. If True, only matching filenames are actually - overwritten - :param azure_data_lake_conn_id: Reference to the Azure Data Lake connection - """ - - template_fields: Sequence[str] = ("file_system_name", "file_name", "data") - ui_color = "#e4f0e8" - - def __init__( - self, - *, - file_system_name: str, - file_name: str, - data: bytes | str | Iterable[AnyStr] | IO[AnyStr], - length: int | None = None, - overwrite: bool = False, - azure_data_lake_conn_id: str = "azure_data_lake_default", - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.file_system_name = file_system_name - self.file_name = file_name - self.overwrite = overwrite - self.data = data # type: ignore[var-annotated] - self.length = length - self.azure_data_lake_conn_id = azure_data_lake_conn_id - - def execute(self, context: Context) -> dict[str, Any]: - self.log.info("Uploading %s to %s", self.data, self.file_name) - hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id) - return hook.create_file(file_system_name=self.file_system_name, file_name=self.file_name).upload_data( - data=self.data, length=self.length, overwrite=self.overwrite - ) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst index 54578991458d0..d91e80d72b084 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst @@ -24,6 +24,23 @@ Prerequisite Tasks .. include:: /operators/_partials/prerequisite_tasks.rst +.. _howto/operator:ADLSCreateObjectOperator: + +ADLSCreateObjectOperator +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:class:`~airflow.providers.microsoft.azure.operators.adls.ADLSCreateObjectOperator` allows you to +upload data to ADL. + + +Below is an example of using this operator to upload data to ADL. + +.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_adls_create.py + :language: python + :dedent: 0 + :start-after: [START howto_operator_adls_create] + :end-before: [END howto_operator_adls_create] + .. _howto/operator:ADLSDeleteOperator: ADLSDeleteOperator diff --git a/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst b/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst index 62b1e35ca39f7..76a06133d21db 100644 --- a/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/transfer/local_to_adls.rst @@ -44,23 +44,6 @@ Below is an example of using this operator to upload a file to ADL. :start-after: [START howto_operator_local_to_adls] :end-before: [END howto_operator_local_to_adls] -.. _howto/operator:DataToADLSOperator: - -DataToADLSOperator -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -:class:`~airflow.providers.microsoft.azure.transfers.local_to_adls.DataToADLSOperator` allows you to -upload data as a file to ADL. - - -Below is an example of using this operator to upload data as a file to ADL. - -.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_data_to_adls.py - :language: python - :dedent: 0 - :start-after: [START howto_operator_data_to_adls] - :end-before: [END howto_operator_data_to_adls] - Reference --------- diff --git a/tests/providers/microsoft/azure/operators/test_adls_create.py b/tests/providers/microsoft/azure/operators/test_adls_create.py new file mode 100644 index 0000000000000..e286ddf6b702f --- /dev/null +++ b/tests/providers/microsoft/azure/operators/test_adls_create.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +from unittest import mock + +from airflow.providers.microsoft.azure.operators.adls import ADLSCreateObjectOperator + +TASK_ID = "test-adls-upload-operator" +FILE_SYSTEM_NAME = "Fabric" +REMOTE_PATH = "TEST-DIR" +DATA = json.dumps({"name": "David", "surname": "Blain", "gender": "M"}).encode("utf-8") + + +class TestADLSUploadOperator: + @mock.patch("airflow.providers.microsoft.azure.transfers.local_to_adls.AzureDataLakeStorageV2Hook") + def test_execute_success_when_local_data(self, mock_hook): + operator = ADLSCreateObjectOperator( + task_id=TASK_ID, + file_system_name=FILE_SYSTEM_NAME, + file_name=REMOTE_PATH, + data=DATA, + replace=True, + ) + operator.execute(None) + data_lake_file_client_mock = mock_hook.return_value.create_file + data_lake_file_client_mock.assert_called_once_with( + file_system_name=FILE_SYSTEM_NAME, file_name=REMOTE_PATH + ) + upload_data_mock = data_lake_file_client_mock.return_value.upload_data + upload_data_mock.assert_called_once_with(data=DATA, length=None, overwrite=True) diff --git a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py index 950db94da01b7..2ada81acd026e 100644 --- a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py @@ -17,23 +17,19 @@ # under the License. from __future__ import annotations -import json from unittest import mock import pytest from airflow.exceptions import AirflowException from airflow.providers.microsoft.azure.transfers.local_to_adls import ( - DataToADLSOperator, LocalFilesystemToADLSOperator, ) TASK_ID = "test-adls-upload-operator" -FILE_SYSTEM_NAME = "Fabric" LOCAL_PATH = "test/*" BAD_LOCAL_PATH = "test/**" REMOTE_PATH = "TEST-DIR" -DATA = json.dumps({"name": "David", "surname": "Blain", "gender": "M"}).encode("utf-8") class TestADLSUploadOperator: @@ -60,39 +56,3 @@ def test_execute_raises_for_bad_glob_val(self, mock_hook): with pytest.raises(AirflowException) as ctx: operator.execute(None) assert str(ctx.value) == "Recursive glob patterns using `**` are not supported" - - @mock.patch("airflow.providers.microsoft.azure.transfers.local_to_adls.AzureDataLakeHook") - def test_extra_options_is_passed(self, mock_hook): - operator = LocalFilesystemToADLSOperator( - task_id=TASK_ID, - local_path=LOCAL_PATH, - remote_path=REMOTE_PATH, - extra_upload_options={"run": False}, - ) - operator.execute(None) - mock_hook.return_value.upload_file.assert_called_once_with( - local_path=LOCAL_PATH, - remote_path=REMOTE_PATH, - nthreads=64, - overwrite=True, - buffersize=4194304, - blocksize=4194304, - run=False, # extra upload options - ) - - @mock.patch("airflow.providers.microsoft.azure.transfers.local_to_adls.AzureDataLakeStorageV2Hook") - def test_execute_success_when_local_data(self, mock_hook): - operator = DataToADLSOperator( - task_id=TASK_ID, - file_system_name=FILE_SYSTEM_NAME, - file_name=REMOTE_PATH, - data=DATA, - overwrite=True, - ) - operator.execute(None) - data_lake_file_client_mock = mock_hook.return_value.create_file - data_lake_file_client_mock.assert_called_once_with( - file_system_name=FILE_SYSTEM_NAME, file_name=REMOTE_PATH - ) - upload_data_mock = data_lake_file_client_mock.return_value.upload_data - upload_data_mock.assert_called_once_with(data=DATA, length=None, overwrite=True) diff --git a/tests/system/providers/microsoft/azure/example_data_to_adls.py b/tests/system/providers/microsoft/azure/example_adls_create.py similarity index 86% rename from tests/system/providers/microsoft/azure/example_data_to_adls.py rename to tests/system/providers/microsoft/azure/example_adls_create.py index e497434a880b3..726e9eba76ae8 100644 --- a/tests/system/providers/microsoft/azure/example_data_to_adls.py +++ b/tests/system/providers/microsoft/azure/example_adls_create.py @@ -20,11 +20,10 @@ from datetime import datetime from airflow import models -from airflow.providers.microsoft.azure.operators.adls import ADLSDeleteOperator -from airflow.providers.microsoft.azure.transfers.local_to_adls import DataToADLSOperator +from airflow.providers.microsoft.azure.operators.adls import ADLSCreateObjectOperator, ADLSDeleteOperator REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", "remote.txt") -DAG_ID = "example_local_to_adls" +DAG_ID = "example_adls_create" with models.DAG( DAG_ID, @@ -33,15 +32,15 @@ schedule=None, tags=["example"], ) as dag: - # [START howto_operator_data_to_adls] - upload_data = DataToADLSOperator( + # [START howto_operator_adls_create] + upload_data = ADLSCreateObjectOperator( task_id="upload_data", file_system_name="Fabric", file_name=REMOTE_FILE_PATH, data="Hello world", - overwrite=True, + replace=True, ) - # [END howto_operator_data_to_adls] + # [END howto_operator_adls_create] delete_file = ADLSDeleteOperator(task_id="remove_task", path=REMOTE_FILE_PATH, recursive=True) From 90d5dd75af1b90256c853c5f24f5a7d5f18c4047 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 4 Mar 2024 13:51:03 +0100 Subject: [PATCH 13/25] fix: Re-added wrongly removed unit test --- .../azure/transfers/test_local_to_adls.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py index 2ada81acd026e..6ce82b3f20940 100644 --- a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py @@ -56,3 +56,22 @@ def test_execute_raises_for_bad_glob_val(self, mock_hook): with pytest.raises(AirflowException) as ctx: operator.execute(None) assert str(ctx.value) == "Recursive glob patterns using `**` are not supported" + + @mock.patch("airflow.providers.microsoft.azure.transfers.local_to_adls.AzureDataLakeHook") + def test_extra_options_is_passed(self, mock_hook): + operator = LocalFilesystemToADLSOperator( + task_id=TASK_ID, + local_path=LOCAL_PATH, + remote_path=REMOTE_PATH, + extra_upload_options={"run": False}, + ) + operator.execute(None) + mock_hook.return_value.upload_file.assert_called_once_with( + local_path=LOCAL_PATH, + remote_path=REMOTE_PATH, + nthreads=64, + overwrite=True, + buffersize=4194304, + blocksize=4194304, + run=False, # extra upload options + ) From 490ef76909ea1f744e315a9a1d7fedcf967bfa0d Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 4 Mar 2024 13:51:24 +0100 Subject: [PATCH 14/25] refactor: Re-ordered typing imports in adls module --- airflow/providers/microsoft/azure/operators/adls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/operators/adls.py b/airflow/providers/microsoft/azure/operators/adls.py index 26e147d786ec4..47c8adf05ef51 100644 --- a/airflow/providers/microsoft/azure/operators/adls.py +++ b/airflow/providers/microsoft/azure/operators/adls.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any, Sequence, Iterable, AnyStr, IO +from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable, Sequence from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook, AzureDataLakeStorageV2Hook From 6dca24891d21e699f421bc662cd99e1fa281ad36 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 4 Mar 2024 14:35:54 +0100 Subject: [PATCH 15/25] docs: Replaced tiles with dashes --- .../apache-airflow-providers-microsoft-azure/operators/adls.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst index d91e80d72b084..ea75a3f8e5265 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst @@ -27,7 +27,7 @@ Prerequisite Tasks .. _howto/operator:ADLSCreateObjectOperator: ADLSCreateObjectOperator -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +---------------------------------- :class:`~airflow.providers.microsoft.azure.operators.adls.ADLSCreateObjectOperator` allows you to upload data to ADL. From c983f8a5aa36bea6bdc4d4ea9ebe44c74cdd2211 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 4 Mar 2024 14:37:54 +0100 Subject: [PATCH 16/25] docs: Fixed class reference to ADLSDeleteOperator --- .../apache-airflow-providers-microsoft-azure/operators/adls.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst index ea75a3f8e5265..cbd12dd631acb 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst @@ -46,7 +46,7 @@ Below is an example of using this operator to upload data to ADL. ADLSDeleteOperator ---------------------------------- Use the -:class:`~airflow.providers.microsoft.azure.operators.adls_delete.ADLSDeleteOperator` to remove +:class:`~airflow.providers.microsoft.azure.operators.adls.ADLSDeleteOperator` to remove file(s) from Azure DataLake Storage From 7a0003ec29b13a63b737c4203661b8fcc6f04b65 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 4 Mar 2024 18:21:08 +0100 Subject: [PATCH 17/25] docs: Updated comment near class reference for ADLSCreateObjectOperator --- .../apache-airflow-providers-microsoft-azure/operators/adls.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst index cbd12dd631acb..29c435c02f357 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst @@ -30,7 +30,7 @@ ADLSCreateObjectOperator ---------------------------------- :class:`~airflow.providers.microsoft.azure.operators.adls.ADLSCreateObjectOperator` allows you to -upload data to ADL. +upload data to Azure DataLake Storage Below is an example of using this operator to upload data to ADL. From 3eaeb6e27c8674be9014ccdec1afbc38983cdd8f Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 4 Mar 2024 18:48:29 +0100 Subject: [PATCH 18/25] docs: Added example of ADLSListOperator --- .../microsoft/azure/operators/adls.py | 25 ++++----- .../operators/adls.rst | 17 ++++++ .../microsoft/azure/example_adls_list.py | 54 +++++++++++++++++++ 3 files changed, 80 insertions(+), 16 deletions(-) create mode 100644 tests/system/providers/microsoft/azure/example_adls_list.py diff --git a/airflow/providers/microsoft/azure/operators/adls.py b/airflow/providers/microsoft/azure/operators/adls.py index 47c8adf05ef51..7f363c300ea86 100644 --- a/airflow/providers/microsoft/azure/operators/adls.py +++ b/airflow/providers/microsoft/azure/operators/adls.py @@ -43,7 +43,7 @@ class ADLSCreateObjectOperator(BaseOperator): If False and remote path is a directory, will quit regardless if any files would be overwritten or not. If True, only matching filenames are actually overwritten. - :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection . + :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection . """ template_fields: Sequence[str] = ("file_system_name", "file_name", "data") @@ -81,9 +81,9 @@ class ADLSDeleteOperator(BaseOperator): """ Delete files in the specified path. - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:ADLSDeleteOperator` + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ADLSDeleteOperator` :param path: A directory or file to remove :param recursive: Whether to loop into directories in the location and remove the files @@ -121,19 +121,12 @@ class ADLSListOperator(BaseOperator): This operator returns a python list with the names of files which can be used by `xcom` in the downstream tasks. - :param path: The Azure Data Lake path to find the objects. Supports glob - strings (templated) - :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection`. - - **Example**: - The following Operator would list all the Parquet files from ``folder/output/`` - folder in the specified ADLS account :: + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ADLSListOperator` - adls_files = ADLSListOperator( - task_id="adls_files", - path="folder/output/*.parquet", - azure_data_lake_conn_id="azure_data_lake_default", - ) + :param path: The Azure Data Lake path to find the objects. Supports glob strings (templated) + :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection`. """ template_fields: Sequence[str] = ("path",) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst index 29c435c02f357..24b6a3ac5603e 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst @@ -58,6 +58,23 @@ Below is an example of using this operator to delete a file from ADL. :start-after: [START howto_operator_adls_delete] :end-before: [END howto_operator_adls_delete] +.. _howto/operator:ADLSListOperator: + +ADLSListOperator +---------------------------------- +Use the +:class:`~airflow.providers.microsoft.azure.operators.adls.ADLSListOperator` to list all +file(s) from Azure DataLake Storage + + +Below is an example of using this operator to list files from ADL. + +.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_adls_list.py + :language: python + :dedent: 0 + :start-after: [START howto_operator_adls_list] + :end-before: [END howto_operator_adls_list] + Reference --------- diff --git a/tests/system/providers/microsoft/azure/example_adls_list.py b/tests/system/providers/microsoft/azure/example_adls_list.py new file mode 100644 index 0000000000000..86eb4eb77e371 --- /dev/null +++ b/tests/system/providers/microsoft/azure/example_adls_list.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from datetime import datetime + +from airflow import models +from airflow.providers.microsoft.azure.operators.adls import ADLSListOperator + +LOCAL_FILE_PATH = os.environ.get("LOCAL_FILE_PATH", "localfile.txt") +REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", "remote.txt") + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_adls_delete" + +with models.DAG( + DAG_ID, + start_date=datetime(2021, 1, 1), + schedule=None, + tags=["example"], +) as dag: + # [START howto_operator_adls_list] + adls_files = ADLSListOperator( + task_id="adls_files", + path="folder/output/*.parquet", + azure_data_lake_conn_id="azure_data_lake_default", + ) + # [END howto_operator_adls_list] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From 65b2f741ae85835c3d79fad9d482faac9ff37af5 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 5 Mar 2024 14:43:05 +0100 Subject: [PATCH 19/25] fix: Reverted wrong rename of replace to overwrite due to refactoring with PyCharm --- airflow/cli/commands/connection_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py index a165439375529..e279faf48cef5 100644 --- a/airflow/cli/commands/connection_command.py +++ b/airflow/cli/commands/connection_command.py @@ -314,7 +314,7 @@ def connections_delete(args): def connections_import(args): """Import connections from a file.""" if os.path.exists(args.file): - _import_helper(args.file, args.replace) + _import_helper(args.file, args.overwrite) else: raise SystemExit("Missing connections file.") From 31c494b80e13d15744aaf6f5aaf63b588b6c1e91 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 6 Mar 2024 13:58:55 +0100 Subject: [PATCH 20/25] refactor: Reformatted import of LocalFilesystemToADLSOperator as in main branch --- .../providers/microsoft/azure/transfers/test_local_to_adls.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py index 6ce82b3f20940..1ef388eef7597 100644 --- a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py @@ -22,9 +22,7 @@ import pytest from airflow.exceptions import AirflowException -from airflow.providers.microsoft.azure.transfers.local_to_adls import ( - LocalFilesystemToADLSOperator, -) +from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator TASK_ID = "test-adls-upload-operator" LOCAL_PATH = "test/*" From de9d65474482e78f13a59102d501c2b823412255 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 6 Mar 2024 21:26:24 +0100 Subject: [PATCH 21/25] fix: Fixed patch of AzureDataLakeStorageV2Hook in TestADLSUploadOperator --- tests/providers/microsoft/azure/operators/test_adls_create.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/microsoft/azure/operators/test_adls_create.py b/tests/providers/microsoft/azure/operators/test_adls_create.py index e286ddf6b702f..73d2f98de759f 100644 --- a/tests/providers/microsoft/azure/operators/test_adls_create.py +++ b/tests/providers/microsoft/azure/operators/test_adls_create.py @@ -29,7 +29,7 @@ class TestADLSUploadOperator: - @mock.patch("airflow.providers.microsoft.azure.transfers.local_to_adls.AzureDataLakeStorageV2Hook") + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook") def test_execute_success_when_local_data(self, mock_hook): operator = ADLSCreateObjectOperator( task_id=TASK_ID, From 1c45e9c6d03f0de492a9da8af2edb16601dfba79 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 7 Mar 2024 12:04:20 +0100 Subject: [PATCH 22/25] Update tests/system/providers/microsoft/azure/example_adls_list.py Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com> --- tests/system/providers/microsoft/azure/example_adls_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/providers/microsoft/azure/example_adls_list.py b/tests/system/providers/microsoft/azure/example_adls_list.py index 86eb4eb77e371..a6bd2d7bd6652 100644 --- a/tests/system/providers/microsoft/azure/example_adls_list.py +++ b/tests/system/providers/microsoft/azure/example_adls_list.py @@ -26,7 +26,7 @@ REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", "remote.txt") ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") -DAG_ID = "example_adls_delete" +DAG_ID = "example_adls_list" with models.DAG( DAG_ID, From a3753e96184f39e8716b9f3ae96a4feb5841c281 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 11 Mar 2024 11:01:10 +0100 Subject: [PATCH 23/25] docs: Temporary removed docs for ADLSListOperator --- .../providers/microsoft/azure/operators/adls.py | 17 ++++++++++++----- .../operators/adls.rst | 17 ----------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/airflow/providers/microsoft/azure/operators/adls.py b/airflow/providers/microsoft/azure/operators/adls.py index 7f363c300ea86..73d53bc17f6f5 100644 --- a/airflow/providers/microsoft/azure/operators/adls.py +++ b/airflow/providers/microsoft/azure/operators/adls.py @@ -121,12 +121,19 @@ class ADLSListOperator(BaseOperator): This operator returns a python list with the names of files which can be used by `xcom` in the downstream tasks. - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:ADLSListOperator` - - :param path: The Azure Data Lake path to find the objects. Supports glob strings (templated) + :param path: The Azure Data Lake path to find the objects. Supports glob + strings (templated) :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection`. + + **Example**: + The following Operator would list all the Parquet files from ``folder/output/`` + folder in the specified ADLS account :: + + adls_files = ADLSListOperator( + task_id="adls_files", + path="folder/output/*.parquet", + azure_data_lake_conn_id="azure_data_lake_default", + ) """ template_fields: Sequence[str] = ("path",) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst index 24b6a3ac5603e..29c435c02f357 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst @@ -58,23 +58,6 @@ Below is an example of using this operator to delete a file from ADL. :start-after: [START howto_operator_adls_delete] :end-before: [END howto_operator_adls_delete] -.. _howto/operator:ADLSListOperator: - -ADLSListOperator ----------------------------------- -Use the -:class:`~airflow.providers.microsoft.azure.operators.adls.ADLSListOperator` to list all -file(s) from Azure DataLake Storage - - -Below is an example of using this operator to list files from ADL. - -.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_adls_list.py - :language: python - :dedent: 0 - :start-after: [START howto_operator_adls_list] - :end-before: [END howto_operator_adls_list] - Reference --------- From a3de8fb3352ac9c449b6a210e66c5579fc1d041a Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 11 Mar 2024 19:24:57 +0100 Subject: [PATCH 24/25] Revert "docs: Temporary removed docs for ADLSListOperator" This reverts commit a3753e96184f39e8716b9f3ae96a4feb5841c281. --- .../microsoft/azure/operators/adls.py | 21 +++++++------------ .../operators/adls.rst | 17 +++++++++++++++ 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/airflow/providers/microsoft/azure/operators/adls.py b/airflow/providers/microsoft/azure/operators/adls.py index 73d53bc17f6f5..6afb495077963 100644 --- a/airflow/providers/microsoft/azure/operators/adls.py +++ b/airflow/providers/microsoft/azure/operators/adls.py @@ -29,7 +29,7 @@ class ADLSCreateObjectOperator(BaseOperator): """ - Creates a new object from`data`to Azure Data Lake on specified file. + Creates a new object from passed data to Azure Data Lake on specified file. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -43,7 +43,7 @@ class ADLSCreateObjectOperator(BaseOperator): If False and remote path is a directory, will quit regardless if any files would be overwritten or not. If True, only matching filenames are actually overwritten. - :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection . + :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection`. """ template_fields: Sequence[str] = ("file_system_name", "file_name", "data") @@ -121,19 +121,12 @@ class ADLSListOperator(BaseOperator): This operator returns a python list with the names of files which can be used by `xcom` in the downstream tasks. - :param path: The Azure Data Lake path to find the objects. Supports glob - strings (templated) - :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection`. - - **Example**: - The following Operator would list all the Parquet files from ``folder/output/`` - folder in the specified ADLS account :: + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ADLSListOperator` - adls_files = ADLSListOperator( - task_id="adls_files", - path="folder/output/*.parquet", - azure_data_lake_conn_id="azure_data_lake_default", - ) + :param path: The Azure Data Lake path to find the objects. Supports glob strings (templated) + :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection`. """ template_fields: Sequence[str] = ("path",) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst index 29c435c02f357..24b6a3ac5603e 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst @@ -58,6 +58,23 @@ Below is an example of using this operator to delete a file from ADL. :start-after: [START howto_operator_adls_delete] :end-before: [END howto_operator_adls_delete] +.. _howto/operator:ADLSListOperator: + +ADLSListOperator +---------------------------------- +Use the +:class:`~airflow.providers.microsoft.azure.operators.adls.ADLSListOperator` to list all +file(s) from Azure DataLake Storage + + +Below is an example of using this operator to list files from ADL. + +.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_adls_list.py + :language: python + :dedent: 0 + :start-after: [START howto_operator_adls_list] + :end-before: [END howto_operator_adls_list] + Reference --------- From 77f6caefdb09b7bd8e5f58eac1f34710fbdced74 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 12 Mar 2024 17:04:07 +0100 Subject: [PATCH 25/25] fix: Fixed unit tests for TestADLSUploadOperator --- tests/providers/microsoft/azure/operators/test_adls_create.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/microsoft/azure/operators/test_adls_create.py b/tests/providers/microsoft/azure/operators/test_adls_create.py index 73d2f98de759f..90d14b229fc51 100644 --- a/tests/providers/microsoft/azure/operators/test_adls_create.py +++ b/tests/providers/microsoft/azure/operators/test_adls_create.py @@ -29,7 +29,7 @@ class TestADLSUploadOperator: - @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook") + @mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook") def test_execute_success_when_local_data(self, mock_hook): operator = ADLSCreateObjectOperator( task_id=TASK_ID,