From 76a7c3655016d15f89185006de50468bb5906342 Mon Sep 17 00:00:00 2001 From: Ambika Garg Date: Wed, 24 Apr 2024 15:15:25 -0400 Subject: [PATCH] Add Power BI Provider * PowerBIDatasetRefreshOperator: Refreshes the Dataset * PowerBI Hook: A class to interact with Power BI * Unit tests --- INSTALL | 10 +- .../providers/microsoft/powerbi/CHANGELOG.rst | 16 + .../providers/microsoft/powerbi/__init__.py | 16 + .../microsoft/powerbi/hooks/__init__.py | 17 + .../microsoft/powerbi/hooks/powerbi.py | 322 ++++++++++++++++++ .../microsoft/powerbi/operators/__init__.py | 17 + .../microsoft/powerbi/operators/powerbi.py | 194 +++++++++++ .../providers/microsoft/powerbi/provider.yaml | 52 +++ .../12_airflow_dependencies_and_extras.rst | 10 +- dev/breeze/doc/images/output_build-docs.svg | 8 +- dev/breeze/doc/images/output_build-docs.txt | 2 +- ...release-management_add-back-references.svg | 8 +- ...release-management_add-back-references.txt | 2 +- ...output_release-management_publish-docs.svg | 8 +- ...output_release-management_publish-docs.txt | 2 +- ...t_sbom_generate-providers-requirements.svg | 60 ++-- ...t_sbom_generate-providers-requirements.txt | 2 +- .../changelog.rst | 18 + .../commits.rst | 18 + .../index.rst | 63 ++++ .../installing-providers-from-sources.rst | 16 + .../security.rst | 18 + generated/provider_dependencies.json | 10 + pyproject.toml | 10 +- tests/providers/microsoft/powerbi/__init__.py | 16 + .../microsoft/powerbi/hooks/__init__.py | 16 + .../microsoft/powerbi/hooks/test_powerbi.py | 277 +++++++++++++++ .../microsoft/powerbi/operators/__init__.py | 16 + .../powerbi/operators/test_powerbi.py | 261 ++++++++++++++ .../providers/microsoft/powerbi/__init__.py | 16 + .../powerbi/example_dataset_refresh.py | 87 +++++ 31 files changed, 1529 insertions(+), 59 deletions(-) create mode 100644 airflow/providers/microsoft/powerbi/CHANGELOG.rst create mode 100644 airflow/providers/microsoft/powerbi/__init__.py create mode 100644 airflow/providers/microsoft/powerbi/hooks/__init__.py create mode 100644 airflow/providers/microsoft/powerbi/hooks/powerbi.py create mode 100644 airflow/providers/microsoft/powerbi/operators/__init__.py create mode 100644 airflow/providers/microsoft/powerbi/operators/powerbi.py create mode 100644 airflow/providers/microsoft/powerbi/provider.yaml create mode 100644 docs/apache-airflow-providers-microsoft-powerbi/changelog.rst create mode 100644 docs/apache-airflow-providers-microsoft-powerbi/commits.rst create mode 100644 docs/apache-airflow-providers-microsoft-powerbi/index.rst create mode 100644 docs/apache-airflow-providers-microsoft-powerbi/installing-providers-from-sources.rst create mode 100644 docs/apache-airflow-providers-microsoft-powerbi/security.rst create mode 100644 tests/providers/microsoft/powerbi/__init__.py create mode 100644 tests/providers/microsoft/powerbi/hooks/__init__.py create mode 100644 tests/providers/microsoft/powerbi/hooks/test_powerbi.py create mode 100644 tests/providers/microsoft/powerbi/operators/__init__.py create mode 100644 tests/providers/microsoft/powerbi/operators/test_powerbi.py create mode 100644 tests/system/providers/microsoft/powerbi/__init__.py create mode 100644 tests/system/providers/microsoft/powerbi/example_dataset_refresh.py diff --git a/INSTALL b/INSTALL index 38434d9192d40..cf102321962cd 100644 --- a/INSTALL +++ b/INSTALL @@ -281,11 +281,11 @@ apache.hdfs, apache.hive, apache.impala, apache.kafka, apache.kylin, apache.livy apache.pinot, apache.spark, apprise, arangodb, asana, atlassian.jira, celery, cloudant, cncf.kubernetes, cohere, common.io, common.sql, databricks, datadog, dbt.cloud, dingding, discord, docker, elasticsearch, exasol, fab, facebook, ftp, github, google, grpc, hashicorp, http, imap, -influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, -mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, pagerduty, -papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, samba, segment, -sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, telegram, -teradata, trino, vertica, weaviate, yandex, zendesk +influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.powerbi, microsoft.psrp, +microsoft.winrm, mongo, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, +oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, +samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, +telegram, teradata, trino, vertica, weaviate, yandex, zendesk # END PROVIDER EXTRAS HERE diff --git a/airflow/providers/microsoft/powerbi/CHANGELOG.rst b/airflow/providers/microsoft/powerbi/CHANGELOG.rst new file mode 100644 index 0000000000000..106592bd11775 --- /dev/null +++ b/airflow/providers/microsoft/powerbi/CHANGELOG.rst @@ -0,0 +1,16 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. diff --git a/airflow/providers/microsoft/powerbi/__init__.py b/airflow/providers/microsoft/powerbi/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/microsoft/powerbi/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/microsoft/powerbi/hooks/__init__.py b/airflow/providers/microsoft/powerbi/hooks/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow/providers/microsoft/powerbi/hooks/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/microsoft/powerbi/hooks/powerbi.py b/airflow/providers/microsoft/powerbi/hooks/powerbi.py new file mode 100644 index 0000000000000..f2b12845f4810 --- /dev/null +++ b/airflow/providers/microsoft/powerbi/hooks/powerbi.py @@ -0,0 +1,322 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import time +from enum import Enum +from typing import Any, Callable + +import requests +from azure.identity import ClientSecretCredential + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook + + +class PowerBIDatasetRefreshFields(Enum): + """Power BI refresh dataset details.""" + + REQUEST_ID = "request_id" + STATUS = "status" + END_TIME = "end_time" + ERROR = "error" + + +class PowerBIDatasetRefreshStatus: + """Power BI refresh dataset statuses.""" + + # If the completion state is unknown or a refresh is in progress. + IN_PROGRESS = "In Progress" + FAILED = "Failed" + COMPLETED = "Completed" + DISABLED = "Disabled" + + TERMINAL_STATUSES = {FAILED, COMPLETED} + + +class PowerBIDatasetRefreshException(AirflowException): + """An exception that indicates a dataset refresh failed to complete.""" + + +class PowerBIHook(BaseHook): + """ + A hook to interact with Power BI. + + :param powerbi_conn_id: Airflow Connection ID that contains the connection + information for the Power BI account used for authentication. + """ + + conn_type: str = "powerbi" + conn_name_attr: str = "powerbi_conn_id" + default_conn_name: str = "powerbi_default" + hook_name: str = "Power BI" + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Return connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "tenantId": StringField(lazy_gettext("Tenant ID"), widget=BS3TextFieldWidget()), + } + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom field behaviour.""" + return { + "hidden_fields": ["schema", "port", "host", "extra"], + "relabeling": { + "login": "Client ID", + "password": "Secret", + }, + } + + def __init__( + self, + *, + powerbi_conn_id: str = default_conn_name, + ): + self.conn_id = powerbi_conn_id + self._api_version = "v1.0" + self._base_url = "https://api.powerbi.com" + super().__init__() + + def refresh_dataset(self, dataset_id: str, group_id: str) -> str: + """ + Triggers a refresh for the specified dataset from the given group id. + + :param dataset_id: The dataset id. + :param group_id: The workspace id. + + :return: Request id of the dataset refresh request. + """ + url = f"{self._base_url}/{self._api_version}/myorg" + + # add the group id if it is specified + url += f"/groups/{group_id}" + + # add the dataset key + url += f"/datasets/{dataset_id}/refreshes" + + response = self._send_request("POST", url=url) + + if response.ok: + request_id = response.headers["RequestId"] + return request_id + + raise PowerBIDatasetRefreshException( + "Failed to trigger dataset refresh. Status code: %s", str(response.status_code) + ) + + def _get_token(self) -> str: + """Retrieve the access token used to authenticate against the API.""" + conn = self.get_connection(self.conn_id) + extras = conn.extra_dejson + tenant = extras.get("tenantId", None) + + if not conn.login or not conn.password: + raise ValueError("A Client ID and Secret is required to authenticate with Power BI.") + + if not tenant: + raise ValueError("A Tenant ID is required when authenticating with Client ID and Secret.") + + credential = ClientSecretCredential( + client_id=conn.login, client_secret=conn.password, tenant_id=tenant + ) + + resource = "https://analysis.windows.net/powerbi/api" + + access_token = credential.get_token(f"{resource}/.default") + + return access_token.token + + def get_refresh_history( + self, + dataset_id: str, + group_id: str, + ) -> list[dict[str, str]]: + """ + Retrieve the refresh history of the specified dataset from the given group ID. + + :param dataset_id: The dataset ID. + :param group_id: The workspace ID. + + :return: Dictionary containing all the refresh histories of the dataset. + """ + url = f"{self._base_url}/{self._api_version}/myorg" + + # add the group id + url += f"/groups/{group_id}" + + # add the dataset id + url += f"/datasets/{dataset_id}/refreshes" + + raw_response = self._send_request("GET", url=url) + + if raw_response.ok: + response = raw_response.json() + refresh_histories = response.get("value") + return [self.raw_to_refresh_details(refresh_history) for refresh_history in refresh_histories] + + raise PowerBIDatasetRefreshException( + "Failed to retrieve refresh history. Status code: %s", str(response.status_code) + ) + + def raw_to_refresh_details(self, refresh_details: dict) -> dict[str, str]: + """ + Convert raw refresh details into a dictionary containing required fields. + + :param refresh_details: Raw object of refresh details. + """ + return { + PowerBIDatasetRefreshFields.REQUEST_ID.value: str(refresh_details.get("requestId")), + PowerBIDatasetRefreshFields.STATUS.value: ( + "In Progress" + if str(refresh_details.get("status")) == "Unknown" + else str(refresh_details.get("status")) + ), + PowerBIDatasetRefreshFields.END_TIME.value: str(refresh_details.get("endTime")), + PowerBIDatasetRefreshFields.ERROR.value: str(refresh_details.get("serviceExceptionJson")), + } + + def get_latest_refresh_details(self, dataset_id: str, group_id: str) -> dict[str, str] | None: + """ + Get the refresh details of the most recent dataset refresh in the refresh history of the data source. + + :return: Dictionary containing refresh status and end time if refresh history exists, otherwise None. + """ + history = self.get_refresh_history(dataset_id=dataset_id, group_id=group_id) + + if len(history) == 0: + return None + + refresh_details = history[0] + return refresh_details + + def get_refresh_details_by_request_id(self, dataset_id: str, group_id: str, request_id) -> dict[str, str]: + """ + Get the refresh details of the given request Id. + + :param request_id: Request Id of the Dataset refresh. + """ + refresh_histories = self.get_refresh_history(dataset_id=dataset_id, group_id=group_id) + + if len(refresh_histories) == 0: + raise PowerBIDatasetRefreshException( + f"Unable to fetch the details of dataset refresh with Request Id: {request_id}" + ) + + request_ids = [ + refresh_history.get(PowerBIDatasetRefreshFields.REQUEST_ID.value) + for refresh_history in refresh_histories + ] + + if request_id not in request_ids: + raise PowerBIDatasetRefreshException( + f"Unable to fetch the details of dataset refresh with Request Id: {request_id}" + ) + + request_id_index = request_ids.index(request_id) + refresh_details = refresh_histories[request_id_index] + + return refresh_details + + def wait_for_dataset_refresh_status( + self, + *, + expected_status: str, + request_id: str, + dataset_id: str, + group_id: str, + check_interval: int = 60, + timeout: int = 60 * 60 * 24 * 7, + ) -> bool: + """ + Wait until the dataset refresh of given request ID has reached the expected status. + + :param expected_status: The desired status to check against a dataset refresh's current status. + :param request_id: Request id for the dataset refresh request. + :param check_interval: Time in seconds to check on a dataset refresh's status. + :param timeout: Time in seconds to wait for a dataset to reach a terminal status or the expected status. + :return: Boolean indicating if the dataset refresh has reached the ``expected_status`` before the timeout. + """ + dataset_refresh_details = self.get_refresh_details_by_request_id( + dataset_id=dataset_id, group_id=group_id, request_id=request_id + ) + dataset_refresh_status = dataset_refresh_details.get(PowerBIDatasetRefreshFields.STATUS.value) + + start_time = time.monotonic() + + while ( + dataset_refresh_status not in PowerBIDatasetRefreshStatus.TERMINAL_STATUSES + and dataset_refresh_status != expected_status + ): + # Check if the dataset-refresh duration has exceeded the ``timeout`` configured. + if start_time + timeout < time.monotonic(): + raise PowerBIDatasetRefreshException( + f"Dataset refresh has not reached a terminal status after {timeout} seconds" + ) + + time.sleep(check_interval) + + dataset_refresh_details = self.get_refresh_details_by_request_id( + dataset_id=dataset_id, group_id=group_id, request_id=request_id + ) + dataset_refresh_status = dataset_refresh_details.get(PowerBIDatasetRefreshFields.STATUS.value) + + return dataset_refresh_status == expected_status + + def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> str: + """ + Triggers the Power BI dataset refresh. + + :param dataset_id: The dataset ID. + :param group_id: The workspace ID. + + :return: Request ID of the dataset refresh request. + """ + # Start dataset refresh + self.log.info("Starting dataset refresh.") + request_id = self.refresh_dataset(dataset_id=dataset_id, group_id=group_id) + + return request_id + + def _send_request(self, request_type: str, url: str, **kwargs) -> requests.Response: + """ + Send a request to the Power BI REST API. + + :param request_type: The type of the request (GET, POST, PUT, etc.). + :param url: The URL against which the request needs to be made. + :param kwargs: Additional keyword arguments to be passed to the request function. + :return: The response object returned by the request. + :raises requests.HTTPError: If the request fails (e.g., non-2xx status code). + """ + self.header: dict[str, str] = {} + + request_funcs: dict[str, Callable[..., requests.Response]] = { + "GET": requests.get, + "POST": requests.post, + } + + func: Callable[..., requests.Response] = request_funcs[request_type.upper()] + + response = func(url=url, headers={"Authorization": f"Bearer {self._get_token()}"}, **kwargs) + + return response diff --git a/airflow/providers/microsoft/powerbi/operators/__init__.py b/airflow/providers/microsoft/powerbi/operators/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow/providers/microsoft/powerbi/operators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/microsoft/powerbi/operators/powerbi.py b/airflow/providers/microsoft/powerbi/operators/powerbi.py new file mode 100644 index 0000000000000..18574c9185724 --- /dev/null +++ b/airflow/providers/microsoft/powerbi/operators/powerbi.py @@ -0,0 +1,194 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Sequence + +from airflow.models import BaseOperator, BaseOperatorLink +from airflow.providers.microsoft.powerbi.hooks.powerbi import ( + PowerBIDatasetRefreshException, + PowerBIDatasetRefreshFields, + PowerBIDatasetRefreshStatus, + PowerBIHook, +) + +if TYPE_CHECKING: + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.utils.context import Context + + +class PowerBILink(BaseOperatorLink): + """Construct a link to monitor a dataset in Power BI.""" + + name = "Monitor PowerBI Dataset" + + def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey): + url = ( + f"https://app.powerbi.com" # type: ignore[attr-defined] + f"/groups/{operator.group_id}/datasets/{operator.dataset_id}" # type: ignore[attr-defined] + f"/details?experience=power-bi" + ) + + return url + + +class PowerBIDatasetRefreshOperator(BaseOperator): + """ + Refreshes a Power BI dataset. + + By default the operator will wait until the refresh has completed before + exiting. The refresh status is checked every 60 seconds as a default. This + can be changed by specifying a new value for `check_interval`. + + :param dataset_id: The dataset id. + :param group_id: The workspace id. + :param wait_for_termination: Wait until the pre-existing or current triggered refresh completes before exiting. + :param force_refresh: Force refresh if pre-existing refresh found. + :param powerbi_conn_id: Airflow Connection ID that contains the connection + information for the Power BI account used for authentication. + :param timeout: Time in seconds to wait for a dataset to reach a terminal status for non-asynchronous waits. Used only if ``wait_for_termination`` is True. + :param check_interval: Number of seconds to wait before rechecking the + refresh status. + """ + + template_fields: Sequence[str] = ( + "dataset_id", + "group_id", + ) + template_fields_renderers = {"parameters": "json"} + + operator_extra_links = (PowerBILink(),) + + def __init__( + self, + *, # Indicates all the following parameters must be specified using keyword arguments. + dataset_id: str, + group_id: str, + wait_for_termination: bool = True, + force_refresh: bool = False, + powerbi_conn_id: str = PowerBIHook.default_conn_name, + timeout: int = 60 * 60 * 24 * 7, + check_interval: int = 60, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.dataset_id = dataset_id + self.group_id = group_id + self.wait_for_termination = wait_for_termination + self.force_refresh = force_refresh + self.powerbi_conn_id = powerbi_conn_id + self.timeout = timeout + self.check_interval = check_interval + + @cached_property + def hook(self) -> PowerBIHook: + """Create and return an PowerBIHook (cached).""" + return PowerBIHook(powerbi_conn_id=self.powerbi_conn_id) + + def execute(self, context: Context): + """Refresh the Power BI Dataset.""" + self.log.info("Check if a refresh is already in progress.") + refresh_details = self.hook.get_latest_refresh_details( + dataset_id=self.dataset_id, group_id=self.group_id + ) + + if ( + refresh_details is None + or refresh_details.get(PowerBIDatasetRefreshFields.STATUS.value) + in PowerBIDatasetRefreshStatus.TERMINAL_STATUSES + ): + self.log.info("No pre-existing refresh found.") + request_id = self.hook.trigger_dataset_refresh( + dataset_id=self.dataset_id, + group_id=self.group_id, + ) + + if self.wait_for_termination: + self.log.info("Waiting for dataset refresh to terminate.") + if self.hook.wait_for_dataset_refresh_status( + request_id=request_id, + dataset_id=self.dataset_id, + group_id=self.group_id, + expected_status=PowerBIDatasetRefreshStatus.COMPLETED, + ): + self.log.info("Dataset refresh %s has completed successfully.", request_id) + else: + raise PowerBIDatasetRefreshException( + f"Dataset refresh {request_id} has failed or has been cancelled." + ) + else: + if ( + refresh_details.get(PowerBIDatasetRefreshFields.STATUS.value) + == PowerBIDatasetRefreshStatus.IN_PROGRESS + ): + request_id = str(refresh_details.get(PowerBIDatasetRefreshFields.REQUEST_ID.value)) + self.log.info("Found pre-existing dataset refresh request: %s.", request_id) + + if self.force_refresh or self.wait_for_termination: + self.log.info("Waiting for dataset refresh %s to terminate.", request_id) + if self.hook.wait_for_dataset_refresh_status( + request_id=request_id, + dataset_id=self.dataset_id, + group_id=self.group_id, + expected_status=PowerBIDatasetRefreshStatus.COMPLETED, + ): + self.log.info( + "Pre-existing dataset refresh %s has completed successfully.", request_id + ) + else: + raise PowerBIDatasetRefreshException( + f"Pre-exisintg dataset refresh {request_id} has failed or has been cancelled." + ) + + if self.force_refresh: + self.log.info("Starting forced refresh.") + request_id = self.hook.trigger_dataset_refresh( + dataset_id=self.dataset_id, + group_id=self.group_id, + ) + + if self.wait_for_termination: + self.log.info("Waiting for dataset refresh to terminate.") + if self.hook.wait_for_dataset_refresh_status( + request_id=request_id, + dataset_id=self.dataset_id, + group_id=self.group_id, + expected_status=PowerBIDatasetRefreshStatus.COMPLETED, + ): + self.log.info("Dataset refresh %s has completed successfully.", request_id) + else: + raise PowerBIDatasetRefreshException( + f"Dataset refresh {request_id} has failed or has been cancelled." + ) + + # Retrieve refresh details after triggering refresh + refresh_details = self.hook.get_refresh_details_by_request_id( + dataset_id=self.dataset_id, group_id=self.group_id, request_id=request_id + ) + + request_id = str(refresh_details.get(PowerBIDatasetRefreshFields.REQUEST_ID.value)) + status = str(refresh_details.get(PowerBIDatasetRefreshFields.STATUS.value)) + end_time = str(refresh_details.get(PowerBIDatasetRefreshFields.END_TIME.value)) + error = str(refresh_details.get(PowerBIDatasetRefreshFields.ERROR.value)) + + # Xcom Integration + context["ti"].xcom_push(key="powerbi_dataset_refresh_id", value=request_id) + context["ti"].xcom_push(key="powerbi_dataset_refresh_status", value=status) + context["ti"].xcom_push(key="powerbi_dataset_refresh_end_time", value=end_time) + context["ti"].xcom_push(key="powerbi_dataset_refresh_error", value=error) diff --git a/airflow/providers/microsoft/powerbi/provider.yaml b/airflow/providers/microsoft/powerbi/provider.yaml new file mode 100644 index 0000000000000..3b2dd2563cd56 --- /dev/null +++ b/airflow/providers/microsoft/powerbi/provider.yaml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-microsoft-powerbi +name: Microsoft Power BI +description: | + `Microsoft Power BI `__ + +state: not-ready +source-date-epoch: 1707636422 +# note that those versions are maintained by release manager - do not update them manually +versions: + - 1.0.0 + +dependencies: + - apache-airflow>=2.7.0 + - azure-identity>=1.3.1 + +# integrations: +# - integration-name: Microsoft Power BI + +operators: + - integration-name: Microsoft Power BI + python-modules: + - airflow.providers.microsoft.powerbi.operators.powerbi + +hooks: + - integration-name: Microsoft Power BI + python-modules: + - airflow.providers.microsoft.powerbi.hooks.powerbi + +connection-types: + - hook-class-name: airflow.providers.microsoft.powerbi.hooks.powerbi.PowerBIHook + connection-type: powerbi + +extra-links: + - airflow.providers.microsoft.powerbi.operators.powerbi.PowerBILink diff --git a/contributing-docs/12_airflow_dependencies_and_extras.rst b/contributing-docs/12_airflow_dependencies_and_extras.rst index 91328a24abb39..8e8a014b35b4b 100644 --- a/contributing-docs/12_airflow_dependencies_and_extras.rst +++ b/contributing-docs/12_airflow_dependencies_and_extras.rst @@ -183,11 +183,11 @@ apache.hdfs, apache.hive, apache.impala, apache.kafka, apache.kylin, apache.livy apache.pinot, apache.spark, apprise, arangodb, asana, atlassian.jira, celery, cloudant, cncf.kubernetes, cohere, common.io, common.sql, databricks, datadog, dbt.cloud, dingding, discord, docker, elasticsearch, exasol, fab, facebook, ftp, github, google, grpc, hashicorp, http, imap, -influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, -mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, pagerduty, -papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, samba, segment, -sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, telegram, -teradata, trino, vertica, weaviate, yandex, zendesk +influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.powerbi, microsoft.psrp, +microsoft.winrm, mongo, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, +oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, +samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, +telegram, teradata, trino, vertica, weaviate, yandex, zendesk .. END PROVIDER EXTRAS HERE diff --git a/dev/breeze/doc/images/output_build-docs.svg b/dev/breeze/doc/images/output_build-docs.svg index 1769101730720..7efcc7a646dee 100644 --- a/dev/breeze/doc/images/output_build-docs.svg +++ b/dev/breeze/doc/images/output_build-docs.svg @@ -195,10 +195,10 @@ atlassian.jira | celery | cloudant | cncf.kubernetes | cohere | common.io | common.sql | databricks | datadog |        dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | -microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    -samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        -telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                                +microsoft.powerbi | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage +opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis |  +salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |     +tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                      Build documents. diff --git a/dev/breeze/doc/images/output_build-docs.txt b/dev/breeze/doc/images/output_build-docs.txt index a9ecb4c194bf5..7f7b3633e8876 100644 --- a/dev/breeze/doc/images/output_build-docs.txt +++ b/dev/breeze/doc/images/output_build-docs.txt @@ -1 +1 @@ -7391d7b5a523f63bb02bea9ca23216dd +86440122387e7067d8f581221704bb88 diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.svg b/dev/breeze/doc/images/output_release-management_add-back-references.svg index 48a3be2224e81..e1b632369bdf6 100644 --- a/dev/breeze/doc/images/output_release-management_add-back-references.svg +++ b/dev/breeze/doc/images/output_release-management_add-back-references.svg @@ -143,10 +143,10 @@ atlassian.jira | celery | cloudant | cncf.kubernetes | cohere | common.io | common.sql | databricks | datadog |        dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | -microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    -samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        -telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                                +microsoft.powerbi | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage +opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis |  +salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |     +tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                      Command to add back references for documentation to make it backward compatible. diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.txt b/dev/breeze/doc/images/output_release-management_add-back-references.txt index ae51a4106f10a..241527f1b9951 100644 --- a/dev/breeze/doc/images/output_release-management_add-back-references.txt +++ b/dev/breeze/doc/images/output_release-management_add-back-references.txt @@ -1 +1 @@ -6cccd29cb919026e925f9c54882c4900 +53a7e2f9f3f1607e94bda5524c8a4db4 diff --git a/dev/breeze/doc/images/output_release-management_publish-docs.svg b/dev/breeze/doc/images/output_release-management_publish-docs.svg index 55cca08f674cf..3b6844305f348 100644 --- a/dev/breeze/doc/images/output_release-management_publish-docs.svg +++ b/dev/breeze/doc/images/output_release-management_publish-docs.svg @@ -200,10 +200,10 @@ atlassian.jira | celery | cloudant | cncf.kubernetes | cohere | common.io | common.sql | databricks | datadog |        dbt.cloud | dingding | discord | docker | docker-stack | elasticsearch | exasol | fab | facebook | ftp | github |      google | grpc | hashicorp | helm-chart | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | -microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch |       -opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce |    -samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular |        -telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                                +microsoft.powerbi | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage +opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis |  +salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |     +tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                      Command to publish generated documentation to airflow-site diff --git a/dev/breeze/doc/images/output_release-management_publish-docs.txt b/dev/breeze/doc/images/output_release-management_publish-docs.txt index 9529819686ee9..a10735b8e08f8 100644 --- a/dev/breeze/doc/images/output_release-management_publish-docs.txt +++ b/dev/breeze/doc/images/output_release-management_publish-docs.txt @@ -1 +1 @@ -a6be6aad28ce6b74e0b3d075b03aabc4 +94ef151eeca2c452dea89eebf311b3b1 diff --git a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg index edf41b8a49b14..ce61c9a747fe1 100644 --- a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg +++ b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg @@ -1,4 +1,4 @@ - + atlassian.jira | celery | cloudant | cncf.kubernetes | cohere | common.io | common.sql |       databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | fab  | facebook | ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc |        -jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql -| neo4j | odbc | openai | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty  -| papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce | samba |  -segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |  -tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk)                 ---provider-versionProvider version to generate the requirements for i.e `2.1.0`. `latest` is also a supported    -value to account for the most recent version of the provider                                   -(TEXT)                                                                                         ---forceForce update providers requirements even if they already exist. -╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ -╭─ Parallel running ───────────────────────────────────────────────────────────────────────────────────────────────────╮ ---run-in-parallelRun the operation in parallel on all or selected subset of parameters. ---parallelismMaximum number of processes to use while running the operation in parallel. -(INTEGER RANGE)                                                             -[default: 4; 1<=x<=8]                                                       ---skip-cleanupSkip cleanup of temporary files created during parallel run. ---debug-resourcesWhether to show resource information while running in parallel. ---include-success-outputsWhether to include outputs of successful parallel runs (skipped by default). -╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ -╭─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮ ---verbose-vPrint verbose information about performed steps. ---dry-run-DIf dry-run is set, commands are only printed, not executed. ---answer-aForce answer to questions.(y | n | q | yes | no | quit) ---help-hShow this message and exit. -╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ +jenkins | microsoft.azure | microsoft.mssql | microsoft.powerbi | microsoft.psrp |             +microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage | opensearch  +| opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant +| redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp |        +snowflake | sqlite | ssh | tableau | tabular | telegram | teradata | trino | vertica |         +weaviate | yandex | zendesk)                                                                   +--provider-versionProvider version to generate the requirements for i.e `2.1.0`. `latest` is also a supported    +value to account for the most recent version of the provider                                   +(TEXT)                                                                                         +--forceForce update providers requirements even if they already exist. +╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ +╭─ Parallel running ───────────────────────────────────────────────────────────────────────────────────────────────────╮ +--run-in-parallelRun the operation in parallel on all or selected subset of parameters. +--parallelismMaximum number of processes to use while running the operation in parallel. +(INTEGER RANGE)                                                             +[default: 4; 1<=x<=8]                                                       +--skip-cleanupSkip cleanup of temporary files created during parallel run. +--debug-resourcesWhether to show resource information while running in parallel. +--include-success-outputsWhether to include outputs of successful parallel runs (skipped by default). +╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ +╭─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮ +--verbose-vPrint verbose information about performed steps. +--dry-run-DIf dry-run is set, commands are only printed, not executed. +--answer-aForce answer to questions.(y | n | q | yes | no | quit) +--help-hShow this message and exit. +╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt index c536c3906cb6a..583169ef12bc1 100644 --- a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt +++ b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt @@ -1 +1 @@ -2d4b3d380270c5e6db73a2b51fd7d6b8 +a0d924e8c1ca5446dbd3454f2ac3dd32 diff --git a/docs/apache-airflow-providers-microsoft-powerbi/changelog.rst b/docs/apache-airflow-providers-microsoft-powerbi/changelog.rst new file mode 100644 index 0000000000000..f221c5178a2f5 --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-powerbi/changelog.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../../airflow/providers/microsoft/powerbi/CHANGELOG.rst diff --git a/docs/apache-airflow-providers-microsoft-powerbi/commits.rst b/docs/apache-airflow-providers-microsoft-powerbi/commits.rst new file mode 100644 index 0000000000000..1e9802470054a --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-powerbi/commits.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + .. THIS FILE IS UPDATED AUTOMATICALLY_AT_RELEASE_TIME diff --git a/docs/apache-airflow-providers-microsoft-powerbi/index.rst b/docs/apache-airflow-providers-microsoft-powerbi/index.rst new file mode 100644 index 0000000000000..108c9b2126c1f --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-powerbi/index.rst @@ -0,0 +1,63 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-microsoft-powerbi`` +============================================ + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Basics + + Home + Changelog + Security + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Connection types + Operators + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: System tests + + System Tests <_api/tests/system/providers/microsoft/powerbi/index> + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Resources + + Example DAGs + PyPI Repository + Installing from sources + +.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! diff --git a/docs/apache-airflow-providers-microsoft-powerbi/installing-providers-from-sources.rst b/docs/apache-airflow-providers-microsoft-powerbi/installing-providers-from-sources.rst new file mode 100644 index 0000000000000..106592bd11775 --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-powerbi/installing-providers-from-sources.rst @@ -0,0 +1,16 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. diff --git a/docs/apache-airflow-providers-microsoft-powerbi/security.rst b/docs/apache-airflow-providers-microsoft-powerbi/security.rst new file mode 100644 index 0000000000000..afa13dac6fc9b --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-powerbi/security.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/security.rst diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 841f3764674e3..8d4e247b65065 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -726,6 +726,16 @@ "excluded-python-versions": [], "state": "ready" }, + "microsoft.powerbi": { + "deps": [ + "apache-airflow>=2.7.0", + "azure-identity>=1.3.1" + ], + "devel-deps": [], + "cross-providers-deps": [], + "excluded-python-versions": [], + "state": "not-ready" + }, "microsoft.psrp": { "deps": [ "apache-airflow>=2.6.0", diff --git a/pyproject.toml b/pyproject.toml index 9feea850a7560..7c24715138ff0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -136,11 +136,11 @@ dynamic = ["version", "optional-dependencies", "dependencies"] # apache.pinot, apache.spark, apprise, arangodb, asana, atlassian.jira, celery, cloudant, # cncf.kubernetes, cohere, common.io, common.sql, databricks, datadog, dbt.cloud, dingding, discord, # docker, elasticsearch, exasol, fab, facebook, ftp, github, google, grpc, hashicorp, http, imap, -# influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, -# mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, pagerduty, -# papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, samba, segment, -# sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, telegram, -# teradata, trino, vertica, weaviate, yandex, zendesk +# influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.powerbi, microsoft.psrp, +# microsoft.winrm, mongo, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, +# oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, +# samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, +# telegram, teradata, trino, vertica, weaviate, yandex, zendesk # # END PROVIDER EXTRAS HERE diff --git a/tests/providers/microsoft/powerbi/__init__.py b/tests/providers/microsoft/powerbi/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/microsoft/powerbi/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/microsoft/powerbi/hooks/__init__.py b/tests/providers/microsoft/powerbi/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/microsoft/powerbi/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/microsoft/powerbi/hooks/test_powerbi.py b/tests/providers/microsoft/powerbi/hooks/test_powerbi.py new file mode 100644 index 0000000000000..4a9b65a53031f --- /dev/null +++ b/tests/providers/microsoft/powerbi/hooks/test_powerbi.py @@ -0,0 +1,277 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock +from unittest.mock import MagicMock + +import pytest + +from airflow.models.connection import Connection +from airflow.providers.microsoft.powerbi.hooks.powerbi import ( + PowerBIDatasetRefreshException, + PowerBIDatasetRefreshFields, + PowerBIDatasetRefreshStatus, + PowerBIHook, +) + +DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id" +MODULE = "airflow.providers.microsoft.powerbi.hooks.powerbi" +CLIENT_ID = "client_id" +CLIENT_SECRET = "client_secret" +TENANT_ID = "tenant_id" +BASE_URL = "https://api.powerbi.com" +API_VERSION = "v1.0" +GROUP_ID = "group_id" +DATASET_ID = "dataset_id" + +API_RAW_RESPONSE = { + "value": [ + # Completed refresh + { + "requestId": "5e2d9921-e91b-491f-b7e1-e7d8db49194c", + "status": "Completed", + "endTime": "2024-04-15T20:14:08.1458221Z", + # serviceExceptionJson is not present when status is not "Failed" + }, + # In-progress refresh + { + "requestId": "6b6536c1-cfcb-4148-9c21-402c3f5241e4", + "status": "Unknown", # endtime is not available. + }, + # Failed refresh + { + "requestId": "11bf290a-346b-48b7-8973-c5df149337ff", + "status": "Failed", + "endTime": "2024-04-15T20:14:08.1458221Z", + "serviceExceptionJson": '{"errorCode":"ModelRefreshFailed_CredentialsNotSpecified"}', + }, + ] +} + +FORMATTED_RESPONSE = [ + # Completed refresh + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "5e2d9921-e91b-491f-b7e1-e7d8db49194c", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.COMPLETED, + PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z", + PowerBIDatasetRefreshFields.ERROR.value: "None", + }, + # In-progress refresh + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "6b6536c1-cfcb-4148-9c21-402c3f5241e4", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.IN_PROGRESS, + PowerBIDatasetRefreshFields.END_TIME.value: "None", + PowerBIDatasetRefreshFields.ERROR.value: "None", + }, + # Failed refresh + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "11bf290a-346b-48b7-8973-c5df149337ff", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.FAILED, + PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z", + PowerBIDatasetRefreshFields.ERROR.value: '{"errorCode":"ModelRefreshFailed_CredentialsNotSpecified"}', + }, +] + + +@pytest.fixture +def powerbi_hook(): + client = PowerBIHook(powerbi_conn_id=DEFAULT_CONNECTION_CLIENT_SECRET) + return client + + +@pytest.fixture +def get_token(powerbi_hook): + powerbi_hook._get_token = MagicMock(return_value="access_token") + return powerbi_hook._get_token() + + +def test_get_token_with_missing_credentials(powerbi_hook): + # Mock the get_connection method to return a connection with missing credentials + powerbi_hook.get_connection = MagicMock( + return_value=Connection( + conn_id=DEFAULT_CONNECTION_CLIENT_SECRET, + conn_type="powerbi", + login=None, + password=None, + extra={ + "tenantId": TENANT_ID, + }, + ) + ) + + with pytest.raises(ValueError): + powerbi_hook._get_token() + + +def test_get_token_with_missing_tenant_id(powerbi_hook): + # Mock the get_connection method to return a connection with missing tenant ID + powerbi_hook.get_connection = MagicMock( + return_value=Connection( + conn_id=DEFAULT_CONNECTION_CLIENT_SECRET, + conn_type="powerbi", + login=CLIENT_ID, + password=CLIENT_SECRET, + extra={}, + ) + ) + + with pytest.raises(ValueError): + powerbi_hook._get_token() + + +@mock.patch(f"{MODULE}.ClientSecretCredential") +def test_get_token_with_valid_credentials(mock_credential, powerbi_hook): + # Mock the get_connection method to return a connection with valid credentials + powerbi_hook.get_connection = MagicMock( + return_value=Connection( + conn_id=DEFAULT_CONNECTION_CLIENT_SECRET, + conn_type="powerbi", + login=CLIENT_ID, + password=CLIENT_SECRET, + extra={ + "tenantId": TENANT_ID, + }, + ) + ) + + token = powerbi_hook._get_token() + mock_credential.assert_called() + + assert token is not None + + +def test_refresh_dataset(powerbi_hook, requests_mock, get_token): + request_id = "request_id" + + # Mock the request in _send_request method to return a successful response + requests_mock.post( + f"{BASE_URL}/{API_VERSION}/myorg/groups/{GROUP_ID}/datasets/{DATASET_ID}/refreshes", + status_code=202, + headers={"Authorization": f"Bearer {get_token}", "RequestId": request_id}, + ) + + result = powerbi_hook.refresh_dataset(dataset_id=DATASET_ID, group_id=GROUP_ID) + + assert result == request_id + + +def test_get_refresh_history_success(powerbi_hook, requests_mock, get_token): + url = f"{BASE_URL}/{API_VERSION}/myorg/groups/{GROUP_ID}/datasets/{DATASET_ID}/refreshes" + + requests_mock.get( + url, json=API_RAW_RESPONSE, headers={"Authorization": f"Bearer {get_token}"}, status_code=200 + ) + + result = powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID) + + assert len(result) == 3 + assert result == FORMATTED_RESPONSE + + +def test_get_latest_refresh_details_with_no_history(powerbi_hook): + # Mock the get_refresh_history method to return an empty list + powerbi_hook.get_refresh_history = MagicMock(return_value=[]) + + result = powerbi_hook.get_latest_refresh_details(dataset_id=DATASET_ID, group_id=GROUP_ID) + + assert result is None + + +def test_get_latest_refresh_details_with_history(powerbi_hook): + # Mock the get_refresh_history method to return a list with refresh details + refresh_history = FORMATTED_RESPONSE + powerbi_hook.get_refresh_history = MagicMock(return_value=refresh_history) + + result = powerbi_hook.get_latest_refresh_details(dataset_id=DATASET_ID, group_id=GROUP_ID) + + assert result == FORMATTED_RESPONSE[0] + + +def test_get_refresh_details_by_request_id(powerbi_hook): + # Mock the get_refresh_history method to return a list of refresh histories + refresh_histories = FORMATTED_RESPONSE + powerbi_hook.get_refresh_history = MagicMock(return_value=refresh_histories) + + # Call the function with a valid request ID + request_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" + result = powerbi_hook.get_refresh_details_by_request_id( + dataset_id=DATASET_ID, group_id=GROUP_ID, request_id=request_id + ) + + # Assert that the correct refresh details are returned + assert result == { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "5e2d9921-e91b-491f-b7e1-e7d8db49194c", + PowerBIDatasetRefreshFields.STATUS.value: "Completed", + PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z", + PowerBIDatasetRefreshFields.ERROR.value: "None", + } + + # Call the function with an invalid request ID + invalid_request_id = "invalid_request_id" + with pytest.raises(PowerBIDatasetRefreshException): + powerbi_hook.get_refresh_details_by_request_id( + dataset_id=DATASET_ID, group_id=GROUP_ID, request_id=invalid_request_id + ) + + +_wait_for_dataset_refresh_status_test_args = [ + (PowerBIDatasetRefreshStatus.COMPLETED, PowerBIDatasetRefreshStatus.COMPLETED, True), + (PowerBIDatasetRefreshStatus.FAILED, PowerBIDatasetRefreshStatus.COMPLETED, False), + (PowerBIDatasetRefreshStatus.IN_PROGRESS, PowerBIDatasetRefreshStatus.COMPLETED, "timeout"), +] + + +@pytest.mark.parametrize( + argnames=("dataset_refresh_status", "expected_status", "expected_result"), + argvalues=_wait_for_dataset_refresh_status_test_args, + ids=[ + f"refresh_status_{argval[0]}_expected_{argval[1]}" + for argval in _wait_for_dataset_refresh_status_test_args + ], +) +def test_wait_for_dataset_refresh_status( + powerbi_hook, dataset_refresh_status, expected_status, expected_result +): + config = { + "dataset_id": DATASET_ID, + "group_id": GROUP_ID, + "request_id": "5e2d9921-e91b-491f-b7e1-e7d8db49194c", + "timeout": 3, + "check_interval": 1, + "expected_status": expected_status, + } + + # Mock the get_refresh_details_by_request_id method to return a dataset refresh details + dataset_refresh_details = {PowerBIDatasetRefreshFields.STATUS.value: dataset_refresh_status} + powerbi_hook.get_refresh_details_by_request_id = MagicMock(return_value=dataset_refresh_details) + + if expected_result != "timeout": + assert powerbi_hook.wait_for_dataset_refresh_status(**config) == expected_result + else: + with pytest.raises(PowerBIDatasetRefreshException): + powerbi_hook.wait_for_dataset_refresh_status(**config) + + +def test_trigger_dataset_refresh(powerbi_hook): + # Mock the refresh_dataset method to return a request ID + powerbi_hook.refresh_dataset = MagicMock(return_value="request_id") + + # Assert trigger_dataset_refresh raises an exception. + response = powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID) + + assert response == "request_id" diff --git a/tests/providers/microsoft/powerbi/operators/__init__.py b/tests/providers/microsoft/powerbi/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/microsoft/powerbi/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/microsoft/powerbi/operators/test_powerbi.py b/tests/providers/microsoft/powerbi/operators/test_powerbi.py new file mode 100644 index 0000000000000..83870a29d69fb --- /dev/null +++ b/tests/providers/microsoft/powerbi/operators/test_powerbi.py @@ -0,0 +1,261 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest.mock import MagicMock, call + +import pytest + +from airflow.providers.microsoft.powerbi.hooks.powerbi import ( + PowerBIDatasetRefreshException, + PowerBIDatasetRefreshFields, + PowerBIDatasetRefreshStatus, + PowerBIHook, +) +from airflow.providers.microsoft.powerbi.operators.powerbi import PowerBIDatasetRefreshOperator + +DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id" +TASK_ID = "run_powerbi_operator" +GROUP_ID = "group_id" +DATASET_ID = "dataset_id" +CONFIG = { + "task_id": TASK_ID, + "powerbi_conn_id": DEFAULT_CONNECTION_CLIENT_SECRET, + "group_id": GROUP_ID, + "dataset_id": DATASET_ID, + "check_interval": 1, + "timeout": 3, +} + +# Sample responses from PowerBI API +COMPLETED_REFRESH_DETAILS = { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "5e2d9921-e91b-491f-b7e1-e7d8db49194c", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.COMPLETED, + PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z", + # serviceExceptionJson is not present when status is not "Failed" +} + +FAILED_REFRESH_DETAILS = { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "11bf290a-346b-48b7-8973-c5df149337ff", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.FAILED, + PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z", + PowerBIDatasetRefreshFields.ERROR.value: '{"errorCode":"ModelRefreshFailed_CredentialsNotSpecified"}', +} + +IN_PROGRESS_REFRESH_DETAILS = { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "6b6536c1-cfcb-4148-9c21-402c3f5241e4", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.IN_PROGRESS, # endtime is not available. +} + + +@pytest.fixture +def mock_powerbi_hook(): + hook = PowerBIHook() + return hook + + +# Test cases: refresh_details returns None, Terminal Status, In-progress Status +_get_latest_refresh_details_args = [ + (None), + COMPLETED_REFRESH_DETAILS, + FAILED_REFRESH_DETAILS, + IN_PROGRESS_REFRESH_DETAILS, +] + + +@pytest.mark.parametrize( + argnames=("latest_refresh_details"), + argvalues=_get_latest_refresh_details_args, + ids=[ + ( + f"latest_refresh_status_{argval[PowerBIDatasetRefreshFields.STATUS.value]}_no_wait_for_termination" + if argval is not None + else "latest_refresh_status_None_no_wait_for_termination" + ) + for argval in _get_latest_refresh_details_args + ], +) +def test_execute_no_wait_for_termination(mock_powerbi_hook, latest_refresh_details): + operator = PowerBIDatasetRefreshOperator( + wait_for_termination=False, + force_refresh=False, + **CONFIG, + ) + operator.hook = mock_powerbi_hook + context = {"ti": MagicMock()} + new_refresh_request_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" + mock_powerbi_hook.get_latest_refresh_details = MagicMock(return_value=latest_refresh_details) + mock_powerbi_hook.trigger_dataset_refresh = MagicMock(return_value=new_refresh_request_id) + mock_powerbi_hook.get_refresh_details_by_request_id = MagicMock( + return_value={ + PowerBIDatasetRefreshFields.REQUEST_ID.value: new_refresh_request_id, + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.COMPLETED, + PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z", + # serviceExceptionJson is not present when status is not "Failed" + } + ) + mock_powerbi_hook.wait_for_dataset_refresh_status = MagicMock(return_value=True) + operator.execute(context) + + if ( + latest_refresh_details is None + or latest_refresh_details[PowerBIDatasetRefreshFields.STATUS.value] + in PowerBIDatasetRefreshStatus.TERMINAL_STATUSES + ): + assert mock_powerbi_hook.get_latest_refresh_details.called + assert mock_powerbi_hook.trigger_dataset_refresh.called + else: + assert not mock_powerbi_hook.trigger_dataset_refresh.called + + assert not mock_powerbi_hook.wait_for_dataset_refresh_status.called + assert mock_powerbi_hook.get_refresh_details_by_request_id.called + assert context["ti"].xcom_push.call_count == 4 + assert context["ti"].xcom_push.call_args_list == [ + call(key="powerbi_dataset_refresh_id", value=new_refresh_request_id), + call(key="powerbi_dataset_refresh_status", value=PowerBIDatasetRefreshStatus.COMPLETED), + call(key="powerbi_dataset_refresh_end_time", value="2024-04-15T20:14:08.1458221Z"), + call(key="powerbi_dataset_refresh_error", value="None"), + ] + + +_get_wait_for_status_args = [(True), (False)] + + +@pytest.mark.parametrize( + argnames=("wait_for_status_return_value"), + argvalues=_get_wait_for_status_args, + ids=[f"wait_for_status_return_value_{argval}" for argval in _get_wait_for_status_args], +) +def test_execute_wait_for_termination_preexisting_refresh_going_on( + mock_powerbi_hook, wait_for_status_return_value +): + operator = PowerBIDatasetRefreshOperator( + wait_for_termination=True, + force_refresh=True, + **CONFIG, + ) + preexisting_refresh_request_id = "6b6536c1-cfcb-4148-9c21-402c3f5241e4" + new_refresh_request_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" + operator.hook = mock_powerbi_hook + context = {"ti": MagicMock()} + mock_powerbi_hook.get_latest_refresh_details = MagicMock( + return_value={ + PowerBIDatasetRefreshFields.REQUEST_ID.value: preexisting_refresh_request_id, + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.IN_PROGRESS, # endtime is not available. + } + ) + mock_powerbi_hook.trigger_dataset_refresh = MagicMock(return_value=new_refresh_request_id) + mock_powerbi_hook.get_refresh_details_by_request_id = MagicMock( + return_value={ + PowerBIDatasetRefreshFields.REQUEST_ID.value: new_refresh_request_id, + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.COMPLETED, + PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z", + # serviceExceptionJson is not present when status is not "Failed" + } + ) + mock_powerbi_hook.wait_for_dataset_refresh_status = MagicMock(return_value=wait_for_status_return_value) + + if wait_for_status_return_value is False: + with pytest.raises(PowerBIDatasetRefreshException): + operator.execute(context) + assert not mock_powerbi_hook.trigger_dataset_refresh.called + else: + operator.execute(context) + assert mock_powerbi_hook.trigger_dataset_refresh.called + assert mock_powerbi_hook.get_refresh_details_by_request_id.called + assert mock_powerbi_hook.wait_for_dataset_refresh_status.call_count == 2 + assert context["ti"].xcom_push.call_count == 4 + assert context["ti"].xcom_push.call_args_list == [ + call(key="powerbi_dataset_refresh_id", value=new_refresh_request_id), + call(key="powerbi_dataset_refresh_status", value=PowerBIDatasetRefreshStatus.COMPLETED), + call(key="powerbi_dataset_refresh_end_time", value="2024-04-15T20:14:08.1458221Z"), + call(key="powerbi_dataset_refresh_error", value="None"), + ] + + +_get_wait_for_status_and_latest_refresh_details_args = [ + (True, None), + (False, None), + (True, COMPLETED_REFRESH_DETAILS), + (False, COMPLETED_REFRESH_DETAILS), + (True, FAILED_REFRESH_DETAILS), + (False, FAILED_REFRESH_DETAILS), +] + + +@pytest.mark.parametrize( + argnames=("wait_for_status_return_value", "latest_refresh_details"), + argvalues=_get_wait_for_status_and_latest_refresh_details_args, + ids=[ + ( + f"wait_for_status_detail_{argval[1][PowerBIDatasetRefreshFields.STATUS.value]}_return_value_{argval[0]}" + if argval[1] is not None + else f"wait_for_status_detail_None_return_value_{argval[0]}" + ) + for argval in _get_wait_for_status_and_latest_refresh_details_args + ], +) +def test_execute_wait_for_termination_no_preexisting_refresh( + mock_powerbi_hook, wait_for_status_return_value, latest_refresh_details +): + operator = PowerBIDatasetRefreshOperator( + wait_for_termination=True, + force_refresh=True, + **CONFIG, + ) + operator.hook = mock_powerbi_hook + context = {"ti": MagicMock()} + new_refresh_request_id = "11bf290a-346b-48b7-8973-c5df149337ff" + + # Magic mock the hook methods + mock_powerbi_hook.get_latest_refresh_details = MagicMock(return_value=latest_refresh_details) + mock_powerbi_hook.trigger_dataset_refresh = MagicMock(return_value=new_refresh_request_id) + mock_powerbi_hook.get_refresh_details_by_request_id = MagicMock( + return_value={ + PowerBIDatasetRefreshFields.REQUEST_ID.value: new_refresh_request_id, + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.COMPLETED, + PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z", + # serviceExceptionJson is not present when status is not "Failed" + } + ) + mock_powerbi_hook.wait_for_dataset_refresh_status = MagicMock(return_value=wait_for_status_return_value) + + # Act and assert + if wait_for_status_return_value is False: + with pytest.raises(PowerBIDatasetRefreshException): + operator.execute(context) + else: + operator.execute(context) + assert mock_powerbi_hook.trigger_dataset_refresh.called + assert mock_powerbi_hook.get_refresh_details_by_request_id.called + mock_powerbi_hook.wait_for_dataset_refresh_status.assert_called_once_with( + request_id=new_refresh_request_id, + dataset_id=DATASET_ID, + group_id=GROUP_ID, + expected_status=PowerBIDatasetRefreshStatus.COMPLETED, + ) + assert context["ti"].xcom_push.call_count == 4 + assert context["ti"].xcom_push.call_args_list == [ + call( + key="powerbi_dataset_refresh_id", + value=new_refresh_request_id, + ), + call(key="powerbi_dataset_refresh_status", value=PowerBIDatasetRefreshStatus.COMPLETED), + call(key="powerbi_dataset_refresh_end_time", value="2024-04-15T20:14:08.1458221Z"), + call(key="powerbi_dataset_refresh_error", value="None"), + ] diff --git a/tests/system/providers/microsoft/powerbi/__init__.py b/tests/system/providers/microsoft/powerbi/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/microsoft/powerbi/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/microsoft/powerbi/example_dataset_refresh.py b/tests/system/providers/microsoft/powerbi/example_dataset_refresh.py new file mode 100644 index 0000000000000..d610f67ef11cf --- /dev/null +++ b/tests/system/providers/microsoft/powerbi/example_dataset_refresh.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime, timedelta + +from airflow.models import DAG + +# Ignore missing args provided by default_args +# mypy: disable-error-code="call-arg" +from airflow.operators.empty import EmptyOperator +from airflow.providers.microsoft.powerbi.operators.powerbi import PowerBIDatasetRefreshOperator +from airflow.utils.edgemodifier import Label + +DAG_ID = "example_powerbi_dataset_refresh" + +with DAG( + dag_id=DAG_ID, + start_date=datetime(2021, 8, 13), + schedule="@daily", + catchup=False, + default_args={ + "retries": 1, + "retry_delay": timedelta(minutes=3), + }, + default_view="graph", +) as dag: + begin = EmptyOperator(task_id="begin") + end = EmptyOperator(task_id="end") + + # [START howto_operator_powerbi_refresh_dataset] + dataset_refresh = PowerBIDatasetRefreshOperator( + powerbi_conn_id="powerbi_default", + task_id="dataset_refresh", + dataset_id="dataset-id", + group_id="group-id", + ) + # [END howto_operator_powerbi_refresh_dataset] + + # [START howto_operator_powerbi_refresh_dataset_async] + dataset_refresh2 = PowerBIDatasetRefreshOperator( + powerbi_conn_id="powerbi_default", + task_id="dataset_refresh_async", + dataset_id="dataset-id", + group_id="group-id", + wait_for_termination=False, + ) + # [END howto_operator_powerbi_refresh_dataset_async] + + # [START howto_operator_powerbi_refresh_dataset_force_refresh] + dataset_refresh3 = PowerBIDatasetRefreshOperator( + powerbi_conn_id="powerbi_default", + task_id="dataset_refresh_force_refresh", + dataset_id="dataset-id", + group_id="group-id", + force_refresh=True, + ) + # [END howto_operator_powerbi_refresh_dataset_force_refresh] + + begin >> Label("No async wait") >> dataset_refresh + begin >> Label("Do async wait with force refresh") >> dataset_refresh2 + begin >> Label("Do async wait") >> dataset_refresh3 >> end + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)