From 4a9f97a999fec6d6f5dee4dd9d530e739132acdc Mon Sep 17 00:00:00 2001 From: Amir Mor <49829354+amirmor1@users.noreply.github.com> Date: Thu, 14 Nov 2024 16:18:50 +0200 Subject: [PATCH 1/4] 44012 - Update index.rst --- docs/apache-airflow-providers-docker/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-docker/index.rst b/docs/apache-airflow-providers-docker/index.rst index 4f5f263f263d1..0c7637199df86 100644 --- a/docs/apache-airflow-providers-docker/index.rst +++ b/docs/apache-airflow-providers-docker/index.rst @@ -49,7 +49,7 @@ :maxdepth: 1 :caption: Resources - Example DAGs + Example DAGs PyPI Repository Installing from sources From 46d67e3abaa2e1c085bf53d319738f1863aca9b0 Mon Sep 17 00:00:00 2001 From: Amir Mor Date: Thu, 21 Nov 2024 19:06:16 +0200 Subject: [PATCH 2/4] Fix Dataplex Data Quality Task partial update When we try to update dataplex data quality task using the DataplexCreateOrUpdateDataQualityScanOperator, it will first try to create the task, and only if it fails with AlreadyExists exception, it will try to update the task, but if you want to provide a partial parameters to the update (and not to replace the entire data scan properties), it will fail with AirflowException `Error creating Data Quality scan` because its missing mandatory parameters in the DataScan, and will never update the task. I've added a check to see if update_mask is not None, first try to do this update, and only if not -> try to create the task. Also moved the update section into a private function to reuse it this check, and later if we are trying to do a full update of the task --- .../google/cloud/operators/dataplex.py | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py b/providers/src/airflow/providers/google/cloud/operators/dataplex.py index 04edc10795f02..f6a6b5b5e8e63 100644 --- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py +++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py @@ -686,39 +686,43 @@ def execute(self, context: Context): impersonation_chain=self.impersonation_chain, ) - self.log.info("Creating Dataplex Data Quality scan %s", self.data_scan_id) - try: - operation = hook.create_data_scan( - project_id=self.project_id, - region=self.region, - data_scan_id=self.data_scan_id, - body=self.body, - retry=self.retry, - timeout=self.timeout, - metadata=self.metadata, - ) - hook.wait_for_operation(timeout=self.timeout, operation=operation) - self.log.info("Dataplex Data Quality scan %s created successfully!", self.data_scan_id) - except AlreadyExists: - self.log.info("Dataplex Data Quality scan already exists: %s", {self.data_scan_id}) - - operation = hook.update_data_scan( - project_id=self.project_id, - region=self.region, - data_scan_id=self.data_scan_id, - body=self.body, - update_mask=self.update_mask, - retry=self.retry, - timeout=self.timeout, - metadata=self.metadata, - ) - hook.wait_for_operation(timeout=self.timeout, operation=operation) - self.log.info("Dataplex Data Quality scan %s updated successfully!", self.data_scan_id) - except GoogleAPICallError as e: - raise AirflowException(f"Error creating Data Quality scan {self.data_scan_id}", e) + if self.update_mask is not None: + self._update_data_scan(hook) + else: + self.log.info("Creating Dataplex Data Quality scan %s", self.data_scan_id) + try: + operation = hook.create_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + body=self.body, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Dataplex Data Quality scan %s created successfully!", self.data_scan_id) + except AlreadyExists: + self._update_data_scan(hook) + except GoogleAPICallError as e: + raise AirflowException(f"Error creating Data Quality scan {self.data_scan_id}", e) return self.data_scan_id + def _update_data_scan(self, hook: DataplexHook): + self.log.info("Dataplex Data Quality scan already exists: %s", {self.data_scan_id}) + operation = hook.update_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + body=self.body, + update_mask=self.update_mask, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Dataplex Data Quality scan %s updated successfully!", self.data_scan_id) class DataplexGetDataQualityScanOperator(GoogleCloudBaseOperator): """ From 3dbeb4cbf6bba759a5070a1ee87b6f08dd0311b6 Mon Sep 17 00:00:00 2001 From: Amir Mor Date: Fri, 22 Nov 2024 18:15:42 +0200 Subject: [PATCH 3/4] add empty line for lint --- .../src/airflow/providers/google/cloud/operators/dataplex.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py b/providers/src/airflow/providers/google/cloud/operators/dataplex.py index f6a6b5b5e8e63..f77c648f20e18 100644 --- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py +++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py @@ -724,6 +724,7 @@ def _update_data_scan(self, hook: DataplexHook): hook.wait_for_operation(timeout=self.timeout, operation=operation) self.log.info("Dataplex Data Quality scan %s updated successfully!", self.data_scan_id) + class DataplexGetDataQualityScanOperator(GoogleCloudBaseOperator): """ Gets a DataScan resource. From 8327fb27900e9a61add59a46b54abcd390d8cd4b Mon Sep 17 00:00:00 2001 From: Amir Mor Date: Fri, 22 Nov 2024 18:17:37 +0200 Subject: [PATCH 4/4] add test to verify update when update_mask is not none --- .../google/cloud/operators/test_dataplex.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/providers/tests/google/cloud/operators/test_dataplex.py b/providers/tests/google/cloud/operators/test_dataplex.py index 67c9b8ca10f9f..1eec9008e2c10 100644 --- a/providers/tests/google/cloud/operators/test_dataplex.py +++ b/providers/tests/google/cloud/operators/test_dataplex.py @@ -672,6 +672,18 @@ def test_execute(self, hook_mock): api_version=API_VERSION, impersonation_chain=IMPERSONATION_CHAIN, ) + update_operator = DataplexCreateOrUpdateDataQualityScanOperator( + task_id=TASK_ID, + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + body={}, + update_mask={}, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + update_operator.execute(context=mock.MagicMock()) hook_mock.return_value.create_data_scan.assert_called_once_with( project_id=PROJECT_ID, region=REGION, @@ -681,6 +693,16 @@ def test_execute(self, hook_mock): timeout=None, metadata=(), ) + hook_mock.return_value.update_data_scan.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + body={}, + update_mask={}, + retry=DEFAULT, + timeout=None, + metadata=(), + ) class TestDataplexCreateDataProfileScanOperator: