diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index c8e9fa76424fd..0500e869de6df 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -189,12 +189,15 @@ async def get_refresh_details_by_refresh_id( return refresh_details - async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> str: + async def trigger_dataset_refresh( + self, *, dataset_id: str, group_id: str, request_body: dict[str, Any] | None = None + ) -> str: """ Triggers a refresh for the specified dataset from the given group id. :param dataset_id: The dataset id. :param group_id: The workspace id. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body. :return: Request id of the dataset refresh request. """ @@ -207,6 +210,7 @@ async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> st "group_id": group_id, "dataset_id": dataset_id, }, + data=request_body, ) request_id = response.get("requestid") diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 9abab3da977a3..444100dc667f7 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -72,6 +72,7 @@ class PowerBIDatasetRefreshOperator(BaseOperator): :param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits. Used only if ``wait_for_termination`` is True. :param check_interval: Number of seconds to wait before rechecking the refresh status. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body. """ template_fields: Sequence[str] = ( @@ -92,6 +93,7 @@ def __init__( proxies: dict | None = None, api_version: APIVersion | str | None = None, check_interval: int = 60, + request_body: dict[str, Any] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -102,6 +104,7 @@ def __init__( self.conn_id = conn_id self.timeout = timeout self.check_interval = check_interval + self.request_body = request_body @property def proxies(self) -> dict | None: @@ -124,6 +127,7 @@ def execute(self, context: Context): api_version=self.api_version, check_interval=self.check_interval, wait_for_termination=self.wait_for_termination, + request_body=self.request_body, ), method_name=self.get_refresh_status.__name__, ) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py index bcba15a2af1ff..042916f796637 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -20,7 +20,7 @@ import asyncio import time from collections.abc import AsyncIterator -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import tenacity @@ -51,9 +51,9 @@ class PowerBITrigger(BaseTrigger): :param dataset_id: The dataset Id to refresh. :param dataset_refresh_id: The dataset refresh Id to poll for the status, if not provided a new refresh will be triggered. :param group_id: The workspace Id where dataset is located. - :param end_time: Time in seconds when trigger should stop polling. :param check_interval: Time in seconds to wait between each poll. :param wait_for_termination: Wait for the dataset refresh to complete or fail. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body. """ def __init__( @@ -67,6 +67,7 @@ def __init__( api_version: APIVersion | str | None = None, check_interval: int = 60, wait_for_termination: bool = True, + request_body: dict[str, Any] | None = None, ): super().__init__() self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout) @@ -76,6 +77,7 @@ def __init__( self.group_id = group_id self.check_interval = check_interval self.wait_for_termination = wait_for_termination + self.request_body = request_body def serialize(self): """Serialize the trigger instance.""" @@ -91,6 +93,7 @@ def serialize(self): "timeout": self.timeout, "check_interval": self.check_interval, "wait_for_termination": self.wait_for_termination, + "request_body": self.request_body, }, ) @@ -113,6 +116,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: dataset_refresh_id = await self.hook.trigger_dataset_refresh( dataset_id=self.dataset_id, group_id=self.group_id, + request_body=self.request_body, ) if dataset_refresh_id: diff --git a/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py b/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py index 5453caff6463b..b772405da8601 100644 --- a/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py +++ b/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py @@ -66,6 +66,12 @@ def create_connection(conn_id_name: str): group_id=GROUP_ID, check_interval=30, timeout=120, + request_body={ + "type": "full", + "retryCount": 3, + "commitMode": "transactional", + "notifyOption": "MailOnFailure", + }, ) # [END howto_operator_powerbi_refresh_async] diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py index 43794c43880f1..6d1bdddeef90c 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py @@ -39,6 +39,13 @@ TASK_ID = "run_powerbi_operator" GROUP_ID = "group_id" DATASET_ID = "dataset_id" +REQUEST_BODY = { + "type": "full", + "commitMode": "transactional", + "objects": [{"table": "Customer", "partition": "Robert"}], + "applyRefreshPolicy": "false", + "timeout": "05:00:00", +} CONFIG = { "task_id": TASK_ID, "conn_id": DEFAULT_CONNECTION_CLIENT_SECRET, @@ -46,6 +53,7 @@ "dataset_id": DATASET_ID, "check_interval": 1, "timeout": 3, + "request_body": REQUEST_BODY, } NEW_REFRESH_REQUEST_ID = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py b/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py index 950d707498e61..658b6e3046f3b 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py @@ -44,6 +44,13 @@ TIMEOUT = 5 MODULE = "airflow.providers.microsoft.azure" CHECK_INTERVAL = 1 +REQUEST_BODY = { + "type": "full", + "commitMode": "transactional", + "objects": [{"table": "Customer", "partition": "Robert"}], + "applyRefreshPolicy": "false", + "timeout": "05:00:00", +} API_VERSION = "v1.0" @@ -102,6 +109,7 @@ def test_powerbi_trigger_serialization(self, connection): check_interval=CHECK_INTERVAL, wait_for_termination=True, timeout=TIMEOUT, + request_body=REQUEST_BODY, ) classpath, kwargs = powerbi_trigger.serialize() @@ -116,6 +124,7 @@ def test_powerbi_trigger_serialization(self, connection): "api_version": API_VERSION, "check_interval": CHECK_INTERVAL, "wait_for_termination": True, + "request_body": REQUEST_BODY, } @pytest.mark.asyncio