From 44fb07c8b56513bdeb2217890c2104b929c3271a Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Sun, 2 Jul 2023 01:28:25 +0530 Subject: [PATCH 1/7] moved AzureBlobStorageToGCSOperator to google provider --- .../cloud/transfers/azure_blob_to_gcs.py | 118 ++++++++++++++++++ airflow/providers/google/provider.yaml | 5 +- .../providers/microsoft/azure/provider.yaml | 1 - .../azure/transfers/azure_blob_to_gcs.py | 108 +++------------- .../operators}/transfer/azure_blob_to_gcs.rst | 44 +++---- .../cloud/transfers/test_azure_blob_to_gcs.py | 93 ++++++++++++++ .../cloud}/azure/example_azure_blob_to_gcs.py | 31 ++--- 7 files changed, 257 insertions(+), 143 deletions(-) create mode 100644 airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py rename docs/{apache-airflow-providers-microsoft-azure => apache-airflow-providers-google/operators}/transfer/azure_blob_to_gcs.rst (50%) create mode 100644 tests/providers/google/cloud/transfers/test_azure_blob_to_gcs.py rename tests/system/providers/{microsoft => google/cloud}/azure/example_azure_blob_to_gcs.py (73%) diff --git a/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py b/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py new file mode 100644 index 0000000000000..8ba6f2d6eb079 --- /dev/null +++ b/airflow/providers/google/cloud/transfers/azure_blob_to_gcs.py @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import tempfile +from typing import TYPE_CHECKING, Sequence + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AzureBlobStorageToGCSOperator(BaseOperator): + """ + Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AzureBlobStorageToGCSOperator` + + :param wasb_conn_id: Reference to the wasb connection. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param blob_name: Name of the blob + :param container_name: Name of the container + :param bucket_name: The bucket to upload to + :param object_name: The object name to set when uploading the file + :param filename: The local file path to the file to be uploaded + :param gzip: Option to compress local file or file data for upload + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account. + """ + + def __init__( + self, + *, + wasb_conn_id="wasb_default", + gcp_conn_id: str = "google_cloud_default", + blob_name: str, + container_name: str, + bucket_name: str, + object_name: str, + filename: str, + gzip: bool, + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.wasb_conn_id = wasb_conn_id + self.gcp_conn_id = gcp_conn_id + self.blob_name = blob_name + self.container_name = container_name + self.bucket_name = bucket_name + self.object_name = object_name + self.filename = filename + self.gzip = gzip + self.impersonation_chain = impersonation_chain + + template_fields: Sequence[str] = ( + "blob_name", + "container_name", + "bucket_name", + "object_name", + "filename", + ) + + def execute(self, context: Context) -> str: + azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + gcs_hook = GCSHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + + with tempfile.NamedTemporaryFile() as temp_file: + self.log.info("Downloading data from blob: %s", self.blob_name) + azure_hook.get_file( + file_path=temp_file.name, + container_name=self.container_name, + blob_name=self.blob_name, + ) + self.log.info( + "Uploading data from blob's: %s into GCP bucket: %s", self.object_name, self.bucket_name + ) + gcs_hook.upload( + bucket_name=self.bucket_name, + object_name=self.object_name, + filename=temp_file.name, + gzip=self.gzip, + ) + self.log.info( + "Resources have been uploaded from blob: %s to GCS bucket:%s", + self.blob_name, + self.bucket_name, + ) + return f"gs://{self.bucket_name}/{self.object_name}" diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 02168a12c9535..c240940dc9f9b 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -1007,7 +1007,10 @@ transfers: target-integration-name: Google Cloud Storage (GCS) python-module: airflow.providers.google.cloud.transfers.mssql_to_gcs how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/mssql_to_gcs.rst - + - source-integration-name: Microsoft Azure Blob Storage + target-integration-name: Google Cloud Storage (GCS) + python-module: airflow.providers.google.cloud.transfers.azure_blob_to_gcs + how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst connection-types: - hook-class-name: airflow.providers.google.common.hooks.base_google.GoogleBaseHook diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index a1070177bc11b..196cc84670d3e 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -242,7 +242,6 @@ transfers: python-module: airflow.providers.microsoft.azure.transfers.local_to_wasb - source-integration-name: Microsoft Azure Blob Storage target-integration-name: Google Cloud Storage (GCS) - how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs - source-integration-name: SSH File Transfer Protocol (SFTP) target-integration-name: Microsoft Azure Blob Storage diff --git a/airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py b/airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py index 8ba6f2d6eb079..20edde6e09381 100644 --- a/airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py +++ b/airflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py @@ -17,102 +17,26 @@ # under the License. from __future__ import annotations -import tempfile -from typing import TYPE_CHECKING, Sequence +import warnings -from airflow.models import BaseOperator -from airflow.providers.google.cloud.hooks.gcs import GCSHook -from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import ( + AzureBlobStorageToGCSOperator as AzureBlobStorageToGCSOperatorFromGoogleProvider, +) -if TYPE_CHECKING: - from airflow.utils.context import Context - -class AzureBlobStorageToGCSOperator(BaseOperator): +class AzureBlobStorageToGCSOperator(AzureBlobStorageToGCSOperatorFromGoogleProvider): """ - Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:AzureBlobStorageToGCSOperator` - - :param wasb_conn_id: Reference to the wasb connection. - :param gcp_conn_id: The connection ID to use when fetching connection info. - :param blob_name: Name of the blob - :param container_name: Name of the container - :param bucket_name: The bucket to upload to - :param object_name: The object name to set when uploading the file - :param filename: The local file path to the file to be uploaded - :param gzip: Option to compress local file or file data for upload - :param impersonation_chain: Optional service account to impersonate using short-term - credentials, or chained list of accounts required to get the access_token - of the last account in the list, which will be impersonated in the request. - If set as a string, the account must grant the originating account - the Service Account Token Creator IAM role. - If set as a sequence, the identities from the list must grant - Service Account Token Creator IAM role to the directly preceding identity, with first - account from the list granting this role to the originating account. + This class is deprecated. + Please use `airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`. """ - def __init__( - self, - *, - wasb_conn_id="wasb_default", - gcp_conn_id: str = "google_cloud_default", - blob_name: str, - container_name: str, - bucket_name: str, - object_name: str, - filename: str, - gzip: bool, - impersonation_chain: str | Sequence[str] | None = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - self.wasb_conn_id = wasb_conn_id - self.gcp_conn_id = gcp_conn_id - self.blob_name = blob_name - self.container_name = container_name - self.bucket_name = bucket_name - self.object_name = object_name - self.filename = filename - self.gzip = gzip - self.impersonation_chain = impersonation_chain - - template_fields: Sequence[str] = ( - "blob_name", - "container_name", - "bucket_name", - "object_name", - "filename", - ) - - def execute(self, context: Context) -> str: - azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id) - gcs_hook = GCSHook( - gcp_conn_id=self.gcp_conn_id, - impersonation_chain=self.impersonation_chain, + def __init__(self, *args, **kwargs): + warnings.warn( + """This class is deprecated. + Please use + `airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`.""", + AirflowProviderDeprecationWarning, + stacklevel=2, ) - - with tempfile.NamedTemporaryFile() as temp_file: - self.log.info("Downloading data from blob: %s", self.blob_name) - azure_hook.get_file( - file_path=temp_file.name, - container_name=self.container_name, - blob_name=self.blob_name, - ) - self.log.info( - "Uploading data from blob's: %s into GCP bucket: %s", self.object_name, self.bucket_name - ) - gcs_hook.upload( - bucket_name=self.bucket_name, - object_name=self.object_name, - filename=temp_file.name, - gzip=self.gzip, - ) - self.log.info( - "Resources have been uploaded from blob: %s to GCS bucket:%s", - self.blob_name, - self.bucket_name, - ) - return f"gs://{self.bucket_name}/{self.object_name}" + super().__init__(*args, **kwargs) diff --git a/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst similarity index 50% rename from docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst rename to docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst index b4c4863ac5f85..c1017cfcb2ad3 100644 --- a/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst @@ -18,34 +18,19 @@ Azure Blob Storage to Google Cloud Storage (GCS) Transfer Operator ================================================================== -The Blob service stores text and binary data as objects in the cloud. -The Blob service offers the following three resources: the storage account, containers, and blobs. -Within your storage account, containers provide a way to organize sets of blobs. -For more information about the service visit `Azure Blob Storage API documentation `_. +The `Google Cloud Storage `__ (GCS) is used to store large data from various applications. +This is also the same with `Azure Blob Storage `__. +This page shows how to transfer data from Azure Blob Storage to GCS. -Before you begin -^^^^^^^^^^^^^^^^ -Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password. -Please follow Azure -`instructions `_ -to do it. +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ -TOKEN should be added to the Connection in Airflow in JSON format, Login and Password as plain text. -You can check `how to do such connection `_. +.. include:: ../_partials/prerequisite_tasks.rst -See following example. -Set values for these fields: +Use the :class:`~airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator` +to transfer data from Azure Blob Storage to Google Cloud Storage. -.. code-block:: - - Connection Id: wasb_default - Login: Storage Account Name - Password: KEY1 - Extra: {"sas_token": "TOKEN"} - -.. _howto/operator:AzureBlobStorageToGCSOperator: - -Transfer Data from Blob Storage to Google Cloud Storage +Transfer Data from Azure Blob Storage to Google Cloud Storage ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage @@ -54,7 +39,16 @@ To get information about jobs within a Azure Blob Storage use: Example usage: -.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/example_azure_blob_to_gcs.py :language: python :start-after: [START how_to_azure_blob_to_gcs] :end-before: [END how_to_azure_blob_to_gcs] + +Reference +^^^^^^^^^ + +For further information, look at: + +* `GCS Client Library Documentation `__ +* `GCS Product Documentation `__ +* `Azure Blob Storage Client Library Documentation `__ diff --git a/tests/providers/google/cloud/transfers/test_azure_blob_to_gcs.py b/tests/providers/google/cloud/transfers/test_azure_blob_to_gcs.py new file mode 100644 index 0000000000000..a0b3eae99d683 --- /dev/null +++ b/tests/providers/google/cloud/transfers/test_azure_blob_to_gcs.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator + +WASB_CONN_ID = "wasb_default" +GCP_CONN_ID = "google_cloud_default" +BLOB_NAME = "azure_blob" +CONTAINER_NAME = "azure_container" +BUCKET_NAME = "airflow" +OBJECT_NAME = "file.txt" +FILENAME = "file.txt" +GZIP = False +IMPERSONATION_CHAIN = None +TASK_ID = "transfer_file" + + +class TestAzureBlobStorageToGCSTransferOperator: + def test_init(self): + operator = AzureBlobStorageToGCSOperator( + wasb_conn_id=WASB_CONN_ID, + gcp_conn_id=GCP_CONN_ID, + blob_name=BLOB_NAME, + container_name=CONTAINER_NAME, + bucket_name=BUCKET_NAME, + object_name=OBJECT_NAME, + filename=FILENAME, + gzip=GZIP, + impersonation_chain=IMPERSONATION_CHAIN, + task_id=TASK_ID, + ) + assert operator.wasb_conn_id == WASB_CONN_ID + assert operator.blob_name == BLOB_NAME + assert operator.container_name == CONTAINER_NAME + assert operator.gcp_conn_id == GCP_CONN_ID + assert operator.bucket_name == BUCKET_NAME + assert operator.object_name == OBJECT_NAME + assert operator.filename == FILENAME + assert operator.gzip == GZIP + assert operator.impersonation_chain == IMPERSONATION_CHAIN + assert operator.task_id == TASK_ID + + @mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.WasbHook") + @mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.GCSHook") + @mock.patch("airflow.providers.google.cloud.transfers.azure_blob_to_gcs.tempfile") + def test_execute(self, mock_temp, mock_hook_gcs, mock_hook_wasb): + op = AzureBlobStorageToGCSOperator( + wasb_conn_id=WASB_CONN_ID, + gcp_conn_id=GCP_CONN_ID, + blob_name=BLOB_NAME, + container_name=CONTAINER_NAME, + bucket_name=BUCKET_NAME, + object_name=OBJECT_NAME, + filename=FILENAME, + gzip=GZIP, + impersonation_chain=IMPERSONATION_CHAIN, + task_id=TASK_ID, + ) + + op.execute(context=None) + mock_hook_wasb.assert_called_once_with(wasb_conn_id=WASB_CONN_ID) + + mock_hook_wasb.return_value.get_file.assert_called_once_with( + file_path=mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name, + container_name=CONTAINER_NAME, + blob_name=BLOB_NAME, + ) + mock_hook_gcs.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN + ) + mock_hook_gcs.return_value.upload.assert_called_once_with( + bucket_name=BUCKET_NAME, + object_name=OBJECT_NAME, + gzip=GZIP, + filename=mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name, + ) diff --git a/tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py b/tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py similarity index 73% rename from tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py rename to tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py index 81fb9993b25dd..a2d3513b488cd 100644 --- a/tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py +++ b/tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py @@ -25,7 +25,7 @@ WasbBlobSensor, WasbPrefixSensor, ) -from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator +from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator # Ignore missing args provided by default_args # type: ignore[call-arg] @@ -36,35 +36,21 @@ GCP_BUCKET_FILE_PATH = os.environ.get("GCP_BUCKET_FILE_PATH", "file.txt") GCP_BUCKET_NAME = os.environ.get("GCP_BUCKET_NAME", "INVALID BUCKET NAME") GCP_OBJECT_NAME = os.environ.get("GCP_OBJECT_NAME", "file.txt") -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_azure_blob_to_gcs" -PREFIX_NAME = os.environ.get("AZURE_PREFIX_NAME", "20230421") -# [START how_to_azure_blob_to_gcs] with DAG( DAG_ID, schedule=None, start_date=datetime(2021, 1, 1), # Override to match your needs - default_args={ - # azure args - "container_name": AZURE_CONTAINER_NAME, - "blob_name": BLOB_NAME, - "prefix": PREFIX_NAME, - }, ) as dag: - wait_for_blob = WasbBlobSensor(task_id="wait_for_blob") - - wait_for_blob_async = WasbBlobSensor(task_id="wait_for_blob_async", deferrable=True) - - wait_for_blob_prefix = WasbPrefixSensor(task_id="wait_for_blob_prefix") - - wait_for_blob_prefix_async = WasbPrefixSensor( - task_id="wait_for_blob_prefix_async", - deferrable=True, - ) - + wait_for_blob = WasbBlobSensor(task_id="wait_for_blob", container_name=AZURE_CONTAINER_NAME, blob_name=BLOB_NAME) + + # [START how_to_azure_blob_to_gcs] transfer_files_to_gcs = AzureBlobStorageToGCSOperator( task_id="transfer_files_to_gcs", + # azure args + container_name=AZURE_CONTAINER_NAME, + blob_name=BLOB_NAME, # GCP args bucket_name=GCP_BUCKET_NAME, object_name=GCP_OBJECT_NAME, @@ -76,9 +62,6 @@ ( wait_for_blob - >> wait_for_blob_async - >> wait_for_blob_prefix - >> wait_for_blob_prefix_async >> transfer_files_to_gcs ) From 8481bcb9aae347f1caaaa32149b9571b5dc7cd54 Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Sun, 2 Jul 2023 03:10:24 +0530 Subject: [PATCH 2/7] doc build and static check fixes --- .../operators/transfer/azure_blob_to_gcs.rst | 4 ++-- .../cloud/azure/example_azure_blob_to_gcs.py | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst index c1017cfcb2ad3..54b878d6673f6 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst @@ -31,7 +31,7 @@ Use the :class:`~airflow.providers.google.cloud.transfers.azure_blob_to_gcs.Azur to transfer data from Azure Blob Storage to Google Cloud Storage. Transfer Data from Azure Blob Storage to Google Cloud Storage -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage To get information about jobs within a Azure Blob Storage use: @@ -39,7 +39,7 @@ To get information about jobs within a Azure Blob Storage use: Example usage: -.. exampleinclude:: /../../tests/system/providers/google/cloud/example_azure_blob_to_gcs.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py :language: python :start-after: [START how_to_azure_blob_to_gcs] :end-before: [END how_to_azure_blob_to_gcs] diff --git a/tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py b/tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py index a2d3513b488cd..5d7649479df60 100644 --- a/tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py +++ b/tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py @@ -21,11 +21,10 @@ from datetime import datetime from airflow import DAG +from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator from airflow.providers.microsoft.azure.sensors.wasb import ( WasbBlobSensor, - WasbPrefixSensor, ) -from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator # Ignore missing args provided by default_args # type: ignore[call-arg] @@ -43,8 +42,10 @@ schedule=None, start_date=datetime(2021, 1, 1), # Override to match your needs ) as dag: - wait_for_blob = WasbBlobSensor(task_id="wait_for_blob", container_name=AZURE_CONTAINER_NAME, blob_name=BLOB_NAME) - + wait_for_blob = WasbBlobSensor( + task_id="wait_for_blob", container_name=AZURE_CONTAINER_NAME, blob_name=BLOB_NAME + ) + # [START how_to_azure_blob_to_gcs] transfer_files_to_gcs = AzureBlobStorageToGCSOperator( task_id="transfer_files_to_gcs", @@ -60,10 +61,7 @@ ) # [END how_to_azure_blob_to_gcs] - ( - wait_for_blob - >> transfer_files_to_gcs - ) + (wait_for_blob >> transfer_files_to_gcs) from tests.system.utils.watcher import watcher From aba5df32992b57d8a7ded010cde3acdca9d087e7 Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Sun, 2 Jul 2023 04:27:20 +0530 Subject: [PATCH 3/7] docfix --- .../operators/transfer/azure_blob_to_gcs.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst index 54b878d6673f6..07ca2b0bbe6fe 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst @@ -27,6 +27,8 @@ Prerequisite Tasks .. include:: ../_partials/prerequisite_tasks.rst +.. _howto/operator:AzureBlobStorageToGCSOperator: + Use the :class:`~airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator` to transfer data from Azure Blob Storage to Google Cloud Storage. From dc3e97ab31a81c9104538f083d2cea3156371e06 Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Sun, 2 Jul 2023 15:07:34 +0530 Subject: [PATCH 4/7] doc build issue --- .../operators/transfer/azure_blob_to_gcs.rst | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst index 07ca2b0bbe6fe..1568283ce196a 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst @@ -29,15 +29,12 @@ Prerequisite Tasks .. _howto/operator:AzureBlobStorageToGCSOperator: -Use the :class:`~airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator` -to transfer data from Azure Blob Storage to Google Cloud Storage. - Transfer Data from Azure Blob Storage to Google Cloud Storage ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage -To get information about jobs within a Azure Blob Storage use: -:class:`~airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator` +Use the :class:`~airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator` +to transfer data from Azure Blob Storage to Google Cloud Storage. Example usage: From 22c6643d4474bf7dc32214548371fe67470007ff Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Sun, 2 Jul 2023 15:51:45 +0530 Subject: [PATCH 5/7] code clean up --- .../connections/gcp.rst | 2 +- .../azure/transfers/test_azure_blob_to_gcs.py | 93 ------------------- 2 files changed, 1 insertion(+), 94 deletions(-) delete mode 100644 tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py diff --git a/docs/apache-airflow-providers-google/connections/gcp.rst b/docs/apache-airflow-providers-google/connections/gcp.rst index c8374d455604f..91e54d1b1cd64 100644 --- a/docs/apache-airflow-providers-google/connections/gcp.rst +++ b/docs/apache-airflow-providers-google/connections/gcp.rst @@ -298,4 +298,4 @@ Note that as domain-wide delegation is currently supported by most of the Google * All of Google Cloud operators and hooks. * Firebase hooks. -* All transfer operators that involve Google cloud in different providers, for example: :class:`airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs`. +* All transfer operators that involve Google cloud in different providers, for example: :class:`airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator`. diff --git a/tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py b/tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py deleted file mode 100644 index 4bc51895f3e4c..0000000000000 --- a/tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from unittest import mock - -from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import AzureBlobStorageToGCSOperator - -WASB_CONN_ID = "wasb_default" -GCP_CONN_ID = "google_cloud_default" -BLOB_NAME = "azure_blob" -CONTAINER_NAME = "azure_container" -BUCKET_NAME = "airflow" -OBJECT_NAME = "file.txt" -FILENAME = "file.txt" -GZIP = False -IMPERSONATION_CHAIN = None -TASK_ID = "transfer_file" - - -class TestAzureBlobStorageToGCSTransferOperator: - def test_init(self): - operator = AzureBlobStorageToGCSOperator( - wasb_conn_id=WASB_CONN_ID, - gcp_conn_id=GCP_CONN_ID, - blob_name=BLOB_NAME, - container_name=CONTAINER_NAME, - bucket_name=BUCKET_NAME, - object_name=OBJECT_NAME, - filename=FILENAME, - gzip=GZIP, - impersonation_chain=IMPERSONATION_CHAIN, - task_id=TASK_ID, - ) - assert operator.wasb_conn_id == WASB_CONN_ID - assert operator.blob_name == BLOB_NAME - assert operator.container_name == CONTAINER_NAME - assert operator.gcp_conn_id == GCP_CONN_ID - assert operator.bucket_name == BUCKET_NAME - assert operator.object_name == OBJECT_NAME - assert operator.filename == FILENAME - assert operator.gzip == GZIP - assert operator.impersonation_chain == IMPERSONATION_CHAIN - assert operator.task_id == TASK_ID - - @mock.patch("airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.WasbHook") - @mock.patch("airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.GCSHook") - @mock.patch("airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.tempfile") - def test_execute(self, mock_temp, mock_hook_gcs, mock_hook_wasb): - op = AzureBlobStorageToGCSOperator( - wasb_conn_id=WASB_CONN_ID, - gcp_conn_id=GCP_CONN_ID, - blob_name=BLOB_NAME, - container_name=CONTAINER_NAME, - bucket_name=BUCKET_NAME, - object_name=OBJECT_NAME, - filename=FILENAME, - gzip=GZIP, - impersonation_chain=IMPERSONATION_CHAIN, - task_id=TASK_ID, - ) - - op.execute(context=None) - mock_hook_wasb.assert_called_once_with(wasb_conn_id=WASB_CONN_ID) - - mock_hook_wasb.return_value.get_file.assert_called_once_with( - file_path=mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name, - container_name=CONTAINER_NAME, - blob_name=BLOB_NAME, - ) - mock_hook_gcs.assert_called_once_with( - gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN - ) - mock_hook_gcs.return_value.upload.assert_called_once_with( - bucket_name=BUCKET_NAME, - object_name=OBJECT_NAME, - gzip=GZIP, - filename=mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name, - ) From 5612d3bfcf686f75f22369947723b1b335a80f90 Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Sun, 2 Jul 2023 17:29:34 +0530 Subject: [PATCH 6/7] redirect to Google provider --- .../redirects.txt | 1 + .../transfer/azure_blob_to_gcs.rst | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst diff --git a/docs/apache-airflow-providers-microsoft-azure/redirects.txt b/docs/apache-airflow-providers-microsoft-azure/redirects.txt index 3ce11766b7704..998489b9fbd2e 100644 --- a/docs/apache-airflow-providers-microsoft-azure/redirects.txt +++ b/docs/apache-airflow-providers-microsoft-azure/redirects.txt @@ -1,3 +1,4 @@ connections/index.rst connections/azure.rst secrets-backends/index.rst secrets-backends/azure-key-vault-secrets-backend.rst logging.rst logging/index.rst +transfer/azure_blob_to_gcs.rst ../../apache-airflow-providers-google/latest/operators/transfer/azure_blob_to_gcs.rst diff --git a/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst b/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst new file mode 100644 index 0000000000000..d87e26031f4df --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst @@ -0,0 +1,20 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Upload data from Azure Blob Storage to Google Cloud Storage (Moved to Google Providers) +======================================================================================= \ No newline at end of file From 7a25ecaf71c30df199e9dce14051d877623ba761 Mon Sep 17 00:00:00 2001 From: Akash Sharma <2akash111998@gmail.com> Date: Sun, 2 Jul 2023 18:10:32 +0530 Subject: [PATCH 7/7] static checks --- airflow/providers/microsoft/azure/provider.yaml | 4 ++++ .../transfer/azure_blob_to_gcs.rst | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 196cc84670d3e..441f51a5786b1 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -247,6 +247,10 @@ transfers: target-integration-name: Microsoft Azure Blob Storage how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/sftp_to_wasb.rst python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb + - source-integration-name: Microsoft Azure Blob Storage + target-integration-name: Google Cloud Storage (GCS) + how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst + python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs connection-types: diff --git a/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst b/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst index d87e26031f4df..df2cd0bcceea0 100644 --- a/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst +++ b/docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst @@ -17,4 +17,4 @@ Upload data from Azure Blob Storage to Google Cloud Storage (Moved to Google Providers) -======================================================================================= \ No newline at end of file +=======================================================================================