Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
bbf5b75
Add Power BI operator that refreshes the powerbi dataset
ambika-garg Jun 18, 2024
b0251ca
Extend Power BI Operator to support async mode
ambika-garg Jul 8, 2024
32b2bde
refactor: Refactored PowerBIHook based on the KiotaRequestAdapterHook…
davidblain-infrabel Jul 11, 2024
7f3d3f0
Refactor: To support operator's async behavior
ambika-garg Jul 16, 2024
eea4470
Add unit tests for the power bi trigger and refactor the code
ambika-garg Jul 16, 2024
483bbf6
unit tests for powerbi operator
ambika-garg Jul 16, 2024
2b94371
refactor: Did some small changes to PowerBIOperator, removed unnecess…
davidblain-infrabel Jul 18, 2024
bf2a9da
Merge branch 'main' into powerbi_operator
dabla Jul 18, 2024
71abb54
Merge branch 'main' into powerbi_operator
dabla Jul 18, 2024
6378622
Merge branch 'main' into powerbi_operator
dabla Jul 19, 2024
77f6142
Merge branch 'main' into powerbi_operator
dabla Jul 19, 2024
389712d
Merge branch 'main' into powerbi_operator
ambika-garg Jul 26, 2024
2a3db17
Fixed the unit test
ambika-garg Jul 29, 2024
efbb02a
Added more tests for full code coverage
ambika-garg Jul 29, 2024
2f8f46f
Added system test for operator
ambika-garg Jul 29, 2024
c71b2f0
Merge branch 'main' into powerbi_operator
dabla Jul 30, 2024
16b9a2d
Merge branch 'main' into powerbi_operator
ambika-garg Jul 31, 2024
d1996d5
Fix system test
ambika-garg Jul 31, 2024
0e7092c
Merge branch 'main' into powerbi_operator
ambika-garg Aug 7, 2024
6f6e94e
Merge branch 'main' into powerbi_operator
dabla Aug 7, 2024
78c78c4
Merge branch 'main' into powerbi_operator
dabla Aug 7, 2024
931073f
Merge branch 'main' into powerbi_operator
dabla Aug 7, 2024
f7939d3
Refactor: To use more of defferable mechanism, shifted all the async …
ambika-garg Aug 7, 2024
977b102
Fix unit tests and remove unnecessary parameters
ambika-garg Aug 8, 2024
1966a7d
refactor: Initialize hosts within constructor to make sure it's initi…
dabla Aug 8, 2024
7621a09
Merge branch 'main' into powerbi_operator
dabla Aug 8, 2024
2997741
Merge branch 'main' into powerbi_operator
dabla Aug 8, 2024
576b4fc
Merge branch 'main' into powerbi_operator
dabla Aug 8, 2024
8b29b99
Merge branch 'main' into powerbi_operator
ambika-garg Aug 8, 2024
dee4080
Merge branch 'main' into powerbi_operator
dabla Aug 8, 2024
3f1a22a
Merge branch 'main' into powerbi_operator
ambika-garg Aug 9, 2024
fb2d832
fix: Changed the 'powerbi_conn_id' parameter to 'conn_id' for the dat…
davidblain-infrabel Aug 9, 2024
aede60b
Merge branch 'main' into powerbi_operator
ambika-garg Aug 9, 2024
5a30fe3
Merge branch 'main' into powerbi_operator
ambika-garg Aug 9, 2024
07e03c7
Merge branch 'main' into powerbi_operator
ambika-garg Aug 9, 2024
4032275
Remove redundant system test for powerbi dataset refresh operator and…
ambika-garg Aug 9, 2024
69e447d
Merge branch 'main' into powerbi_operator
ambika-garg Aug 12, 2024
09268ea
remove extra comments
ambika-garg Aug 12, 2024
0652b11
Merge branch 'main' into powerbi_operator
ambika-garg Aug 12, 2024
226b5b8
Fix msgraph hook tests
ambika-garg Aug 12, 2024
0ea609a
Fix powerbi trigger tests
ambika-garg Aug 12, 2024
e623f18
Merge branch 'main' into powerbi_operator
ambika-garg Aug 12, 2024
3e688ff
Refactor to pass the provider[microsoft.azure] tests
ambika-garg Aug 12, 2024
4a16dae
Merge branch 'main' into powerbi_operator
ambika-garg Aug 12, 2024
b241d32
Merge branch 'main' into powerbi_operator
ambika-garg Aug 13, 2024
b8d1b80
Merge branch 'main' into powerbi_operator
ambika-garg Aug 13, 2024
d1931d6
Merge branch 'main' into powerbi_operator
dabla Aug 13, 2024
a018bde
refactor: Removed commented out (dead) code
dabla Aug 13, 2024
2b5f2c6
Merge branch 'main' into powerbi_operator
ambika-garg Aug 14, 2024
45e4ae7
Refactor: Remove unused parameters and dead code
ambika-garg Aug 14, 2024
8559667
Merge branch 'main' into powerbi_operator
dabla Aug 14, 2024
d75cabb
Merge branch 'main' into powerbi_operator
dabla Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions airflow/providers/microsoft/azure/hooks/msgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,16 @@ def __init__(
conn_id: str = default_conn_name,
timeout: float | None = None,
proxies: dict | None = None,
host: str = NationalClouds.Global.value,
scopes: list[str] | None = None,
api_version: APIVersion | str | None = None,
):
super().__init__()
self.conn_id = conn_id
self.timeout = timeout
self.proxies = proxies
self.host = host
self.scopes = scopes or ["https://graph.microsoft.com/.default"]
self._api_version = self.resolve_api_version_from_value(api_version)

@property
Expand All @@ -141,11 +145,10 @@ def get_api_version(self, config: dict) -> APIVersion:
)
return self._api_version

@staticmethod
def get_host(connection: Connection) -> str:
def get_host(self, connection: Connection) -> str:
if connection.schema and connection.host:
return f"{connection.schema}://{connection.host}"
return NationalClouds.Global.value
return self.host

@staticmethod
def format_no_proxy_url(url: str) -> str:
Expand Down Expand Up @@ -198,7 +201,7 @@ def get_conn(self) -> RequestAdapter:
proxies = self.proxies or config.get("proxies", {})
msal_proxies = self.to_msal_proxies(authority=authority, proxies=proxies)
httpx_proxies = self.to_httpx_proxies(proxies=proxies)
scopes = config.get("scopes", ["https://graph.microsoft.com/.default"])
scopes = config.get("scopes", self.scopes)
verify = config.get("verify", True)
trust_env = config.get("trust_env", False)
disable_instance_discovery = config.get("disable_instance_discovery", False)
Expand Down
218 changes: 218 additions & 0 deletions airflow/providers/microsoft/azure/hooks/powerbi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING, Any

from airflow.exceptions import AirflowException
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook

if TYPE_CHECKING:
from msgraph_core import APIVersion


class PowerBIDatasetRefreshFields(Enum):
"""Power BI refresh dataset details."""

REQUEST_ID = "request_id"
STATUS = "status"
ERROR = "error"


class PowerBIDatasetRefreshStatus:
"""Power BI refresh dataset statuses."""

IN_PROGRESS = "In Progress"
FAILED = "Failed"
COMPLETED = "Completed"
DISABLED = "Disabled"

TERMINAL_STATUSES = {FAILED, COMPLETED}


class PowerBIDatasetRefreshException(AirflowException):
"""An exception that indicates a dataset refresh failed to complete."""


class PowerBIHook(KiotaRequestAdapterHook):
"""
A async hook to interact with Power BI.

:param conn_id: The Power BI connection id.
"""

conn_type: str = "powerbi"
conn_name_attr: str = "conn_id"
default_conn_name: str = "powerbi_default"
hook_name: str = "Power BI"

def __init__(
self,
conn_id: str = default_conn_name,
proxies: dict | None = None,
timeout: float = 60 * 60 * 24 * 7,
api_version: APIVersion | str | None = None,
):
super().__init__(
conn_id=conn_id,
proxies=proxies,
timeout=timeout,
host="https://api.powerbi.com",
scopes=["https://analysis.windows.net/powerbi/api/.default"],
api_version=api_version,
)

@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Return connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField

return {
"tenant_id": StringField(lazy_gettext("Tenant ID"), widget=BS3TextFieldWidget()),
}

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return {
"hidden_fields": ["schema", "port", "host", "extra"],
"relabeling": {
"login": "Client ID",
"password": "Client Secret",
},
}

async def get_refresh_history(
self,
dataset_id: str,
group_id: str,
) -> list[dict[str, str]]:
"""
Retrieve the refresh history of the specified dataset from the given group ID.

:param dataset_id: The dataset ID.
:param group_id: The workspace ID.

:return: Dictionary containing all the refresh histories of the dataset.
"""
try:
response = await self.run(
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
path_parameters={
"group_id": group_id,
"dataset_id": dataset_id,
},
)

refresh_histories = response.get("value")
return [self.raw_to_refresh_details(refresh_history) for refresh_history in refresh_histories]

except AirflowException:
raise PowerBIDatasetRefreshException("Failed to retrieve refresh history")

@classmethod
def raw_to_refresh_details(cls, refresh_details: dict) -> dict[str, str]:
"""
Convert raw refresh details into a dictionary containing required fields.

:param refresh_details: Raw object of refresh details.
"""
return {
PowerBIDatasetRefreshFields.REQUEST_ID.value: str(refresh_details.get("requestId")),
PowerBIDatasetRefreshFields.STATUS.value: (
"In Progress"
if str(refresh_details.get("status")) == "Unknown"
else str(refresh_details.get("status"))
),
PowerBIDatasetRefreshFields.ERROR.value: str(refresh_details.get("serviceExceptionJson")),
}

async def get_refresh_details_by_refresh_id(
self, dataset_id: str, group_id: str, refresh_id: str
) -> dict[str, str]:
"""
Get the refresh details of the given request Id.

:param refresh_id: Request Id of the Dataset refresh.
"""
refresh_histories = await self.get_refresh_history(dataset_id=dataset_id, group_id=group_id)

if len(refresh_histories) == 0:
raise PowerBIDatasetRefreshException(
f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}"
)

refresh_ids = [
refresh_history.get(PowerBIDatasetRefreshFields.REQUEST_ID.value)
for refresh_history in refresh_histories
]

if refresh_id not in refresh_ids:
raise PowerBIDatasetRefreshException(
f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}"
)

refresh_details = refresh_histories[refresh_ids.index(refresh_id)]

return refresh_details

async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> str:
"""
Triggers a refresh for the specified dataset from the given group id.

:param dataset_id: The dataset id.
:param group_id: The workspace id.

:return: Request id of the dataset refresh request.
"""
try:
response = await self.run(
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
method="POST",
path_parameters={
"group_id": group_id,
"dataset_id": dataset_id,
},
)

request_id = response.get("requestid")
return request_id
except AirflowException:
raise PowerBIDatasetRefreshException("Failed to trigger dataset refresh.")

async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> None:
"""
Cancel the dataset refresh.

:param dataset_id: The dataset Id.
:param group_id: The workspace Id.
:param dataset_refresh_id: The dataset refresh Id.
"""
await self.run(
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}",
response_type=None,
path_parameters={
"group_id": group_id,
"dataset_id": dataset_id,
"dataset_refresh_id": dataset_refresh_id,
},
method="DELETE",
)
120 changes: 120 additions & 0 deletions airflow/providers/microsoft/azure/operators/powerbi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Sequence

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.providers.microsoft.azure.hooks.powerbi import (
PowerBIHook,
)
from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger

if TYPE_CHECKING:
from msgraph_core import APIVersion

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context


class PowerBILink(BaseOperatorLink):
"""Construct a link to monitor a dataset in Power BI."""

name = "Monitor PowerBI Dataset"

def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
url = (
"https://app.powerbi.com" # type: ignore[attr-defined]
f"/groups/{operator.group_id}/datasets/{operator.dataset_id}" # type: ignore[attr-defined]
"/details?experience=power-bi"
)

return url


class PowerBIDatasetRefreshOperator(BaseOperator):
"""
Refreshes a Power BI dataset.

:param dataset_id: The dataset id.
:param group_id: The workspace id.
:param conn_id: Airflow Connection ID that contains the connection information for the Power BI account used for authentication.
:param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits. Used only if ``wait_for_termination`` is True.
:param check_interval: Number of seconds to wait before rechecking the
refresh status.
"""

template_fields: Sequence[str] = (
"dataset_id",
"group_id",
)
template_fields_renderers = {"parameters": "json"}

operator_extra_links = (PowerBILink(),)

def __init__(
self,
*,
dataset_id: str,
group_id: str,
conn_id: str = PowerBIHook.default_conn_name,
timeout: float = 60 * 60 * 24 * 7,
proxies: dict | None = None,
api_version: APIVersion | None = None,
check_interval: int = 60,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout)
self.dataset_id = dataset_id
self.group_id = group_id
self.wait_for_termination = True
self.conn_id = conn_id
self.timeout = timeout
self.check_interval = check_interval

def execute(self, context: Context):
"""Refresh the Power BI Dataset."""
if self.wait_for_termination:
self.defer(
trigger=PowerBITrigger(
conn_id=self.conn_id,
group_id=self.group_id,
dataset_id=self.dataset_id,
timeout=self.timeout,
check_interval=self.check_interval,
wait_for_termination=self.wait_for_termination,
),
method_name=self.execute_complete.__name__,
)

def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
"""
Return immediately - callback for when the trigger fires.

Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
if event["status"] == "error":
raise AirflowException(event["message"])

self.xcom_push(
context=context, key="powerbi_dataset_refresh_Id", value=event["dataset_refresh_id"]
)
self.xcom_push(context=context, key="powerbi_dataset_refresh_status", value=event["status"])
Loading