diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py b/providers/src/airflow/providers/google/cloud/operators/dataplex.py index 04edc10795f02..f77c648f20e18 100644 --- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py +++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py @@ -686,39 +686,44 @@ def execute(self, context: Context): impersonation_chain=self.impersonation_chain, ) - self.log.info("Creating Dataplex Data Quality scan %s", self.data_scan_id) - try: - operation = hook.create_data_scan( - project_id=self.project_id, - region=self.region, - data_scan_id=self.data_scan_id, - body=self.body, - retry=self.retry, - timeout=self.timeout, - metadata=self.metadata, - ) - hook.wait_for_operation(timeout=self.timeout, operation=operation) - self.log.info("Dataplex Data Quality scan %s created successfully!", self.data_scan_id) - except AlreadyExists: - self.log.info("Dataplex Data Quality scan already exists: %s", {self.data_scan_id}) - - operation = hook.update_data_scan( - project_id=self.project_id, - region=self.region, - data_scan_id=self.data_scan_id, - body=self.body, - update_mask=self.update_mask, - retry=self.retry, - timeout=self.timeout, - metadata=self.metadata, - ) - hook.wait_for_operation(timeout=self.timeout, operation=operation) - self.log.info("Dataplex Data Quality scan %s updated successfully!", self.data_scan_id) - except GoogleAPICallError as e: - raise AirflowException(f"Error creating Data Quality scan {self.data_scan_id}", e) + if self.update_mask is not None: + self._update_data_scan(hook) + else: + self.log.info("Creating Dataplex Data Quality scan %s", self.data_scan_id) + try: + operation = hook.create_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + body=self.body, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Dataplex Data Quality scan %s created successfully!", self.data_scan_id) + except AlreadyExists: + self._update_data_scan(hook) + except GoogleAPICallError as e: + raise AirflowException(f"Error creating Data Quality scan {self.data_scan_id}", e) return self.data_scan_id + def _update_data_scan(self, hook: DataplexHook): + self.log.info("Dataplex Data Quality scan already exists: %s", {self.data_scan_id}) + operation = hook.update_data_scan( + project_id=self.project_id, + region=self.region, + data_scan_id=self.data_scan_id, + body=self.body, + update_mask=self.update_mask, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Dataplex Data Quality scan %s updated successfully!", self.data_scan_id) + class DataplexGetDataQualityScanOperator(GoogleCloudBaseOperator): """ diff --git a/providers/tests/google/cloud/operators/test_dataplex.py b/providers/tests/google/cloud/operators/test_dataplex.py index 67c9b8ca10f9f..1eec9008e2c10 100644 --- a/providers/tests/google/cloud/operators/test_dataplex.py +++ b/providers/tests/google/cloud/operators/test_dataplex.py @@ -672,6 +672,18 @@ def test_execute(self, hook_mock): api_version=API_VERSION, impersonation_chain=IMPERSONATION_CHAIN, ) + update_operator = DataplexCreateOrUpdateDataQualityScanOperator( + task_id=TASK_ID, + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + body={}, + update_mask={}, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + update_operator.execute(context=mock.MagicMock()) hook_mock.return_value.create_data_scan.assert_called_once_with( project_id=PROJECT_ID, region=REGION, @@ -681,6 +693,16 @@ def test_execute(self, hook_mock): timeout=None, metadata=(), ) + hook_mock.return_value.update_data_scan.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + body={}, + update_mask={}, + retry=DEFAULT, + timeout=None, + metadata=(), + ) class TestDataplexCreateDataProfileScanOperator: