From 11f92e5168b6d91e7550af72b777a9518da928ca Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Fri, 6 Dec 2024 12:04:08 +0100 Subject: [PATCH 01/12] Add: api_timeout field and retry decorator --- .../microsoft/azure/triggers/powerbi.py | 9 +++- .../microsoft/azure/triggers/test_powerbi.py | 41 ++++++++++++++++++- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py index a2132f6c39384..43dbb4108ed69 100644 --- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -22,7 +22,11 @@ from collections.abc import AsyncIterator from typing import TYPE_CHECKING -from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIDatasetRefreshStatus, PowerBIHook +from airflow.providers.microsoft.azure.hooks.powerbi import ( + PowerBIDatasetRefreshException, + PowerBIDatasetRefreshStatus, + PowerBIHook, +) from airflow.triggers.base import BaseTrigger, TriggerEvent if TYPE_CHECKING: @@ -58,6 +62,7 @@ def __init__( proxies: dict | None = None, api_version: APIVersion | str | None = None, check_interval: int = 60, + api_timeout: float = 30, wait_for_termination: bool = True, ): super().__init__() @@ -66,6 +71,7 @@ def __init__( self.timeout = timeout self.group_id = group_id self.check_interval = check_interval + self.api_timeout = api_timeout self.wait_for_termination = wait_for_termination def serialize(self): @@ -80,6 +86,7 @@ def serialize(self): "group_id": self.group_id, "timeout": self.timeout, "check_interval": self.check_interval, + "api_timeout": self.api_timeout, "wait_for_termination": self.wait_for_termination, }, ) diff --git a/providers/tests/microsoft/azure/triggers/test_powerbi.py b/providers/tests/microsoft/azure/triggers/test_powerbi.py index 303b7d06c8047..50c597715f5bf 100644 --- a/providers/tests/microsoft/azure/triggers/test_powerbi.py +++ b/providers/tests/microsoft/azure/triggers/test_powerbi.py @@ -18,11 +18,15 @@ from __future__ import annotations import asyncio +import logging from unittest import mock import pytest -from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIDatasetRefreshStatus +from airflow.providers.microsoft.azure.hooks.powerbi import ( + PowerBIDatasetRefreshException, + PowerBIDatasetRefreshStatus, +) from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger from airflow.triggers.base import TriggerEvent @@ -35,6 +39,7 @@ TIMEOUT = 5 MODULE = "airflow.providers.microsoft.azure" CHECK_INTERVAL = 1 +API_TIMEOUT = 30 API_VERSION = "v1.0" @@ -48,6 +53,7 @@ def powerbi_trigger(timeout=TIMEOUT, check_interval=CHECK_INTERVAL) -> PowerBITr dataset_id=DATASET_ID, group_id=GROUP_ID, check_interval=check_interval, + api_timeout=API_TIMEOUT, wait_for_termination=True, timeout=timeout, ) @@ -64,6 +70,7 @@ def test_powerbi_trigger_serialization(self, connection): dataset_id=DATASET_ID, group_id=GROUP_ID, check_interval=CHECK_INTERVAL, + api_timeout=API_TIMEOUT, wait_for_termination=True, timeout=TIMEOUT, ) @@ -78,6 +85,7 @@ def test_powerbi_trigger_serialization(self, connection): "proxies": None, "api_version": API_VERSION, "check_interval": CHECK_INTERVAL, + "api_timeout": API_TIMEOUT, "wait_for_termination": True, } @@ -180,6 +188,37 @@ async def test_powerbi_trigger_run_exception_during_refresh_check_loop( assert response in task mock_cancel_dataset_refresh.assert_called_once() + @pytest.mark.asyncio + @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh") + @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id") + @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh") + async def test_powerbi_trigger_run_PowerBIDatasetRefreshExceptionn_during_refresh_check_loop( + self, + mock_trigger_dataset_refresh, + mock_get_refresh_details_by_refresh_id, + mock_cancel_dataset_refresh, + caplog, + powerbi_trigger, + ): + """Assert that run catch PowerBIDatasetRefreshException and triggers retry mechanism""" + mock_get_refresh_details_by_refresh_id.side_effect = PowerBIDatasetRefreshException("Test exception") + mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID + + with caplog.at_level(logging.INFO): + task = [i async for i in powerbi_trigger.run()] + response = TriggerEvent( + { + "status": "error", + "message": "An error occurred: Test exception", + "dataset_refresh_id": DATASET_REFRESH_ID, + } + ) + assert len(task) == 1 + assert response in task + mock_cancel_dataset_refresh.assert_called_once() + + assert "Retrying in 5 seconds" in caplog.text # Test the retry mechanism + @pytest.mark.asyncio @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh") @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id") From cda8a4d2ad4e129ff5764ffc9cb1d4762bdb209a Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Fri, 6 Dec 2024 13:41:14 +0100 Subject: [PATCH 02/12] Add: api_timeout field to powerbi operator --- .../src/airflow/providers/microsoft/azure/operators/powerbi.py | 3 +++ providers/tests/microsoft/azure/operators/test_powerbi.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py index b334a1708a6f7..3fc9632b5c39d 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -80,6 +80,7 @@ def __init__( timeout: float = 60 * 60 * 24 * 7, proxies: dict | None = None, api_version: APIVersion | str | None = None, + api_timeout: float = 30, check_interval: int = 60, **kwargs, ) -> None: @@ -90,6 +91,7 @@ def __init__( self.wait_for_termination = True self.conn_id = conn_id self.timeout = timeout + self.api_timeout = api_timeout self.check_interval = check_interval @property @@ -112,6 +114,7 @@ def execute(self, context: Context): proxies=self.proxies, api_version=self.api_version, check_interval=self.check_interval, + api_timeout=self.api_timeout, wait_for_termination=self.wait_for_termination, ), method_name=self.execute_complete.__name__, diff --git a/providers/tests/microsoft/azure/operators/test_powerbi.py b/providers/tests/microsoft/azure/operators/test_powerbi.py index a115b4c52dc50..06f766fa8bcec 100644 --- a/providers/tests/microsoft/azure/operators/test_powerbi.py +++ b/providers/tests/microsoft/azure/operators/test_powerbi.py @@ -44,6 +44,7 @@ "group_id": GROUP_ID, "dataset_id": DATASET_ID, "check_interval": 1, + "api_timeout": 40, "timeout": 3, } NEW_REFRESH_REQUEST_ID = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" @@ -162,6 +163,7 @@ def test_powerbi_link(self, create_task_instance_of_operator): group_id=GROUP_ID, dataset_id=DATASET_ID, check_interval=1, + api_timeout=40, timeout=3, ) From f82e12f6fe623fdd96e7e527b61baa2e6e75ab91 Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Fri, 6 Dec 2024 13:52:54 +0100 Subject: [PATCH 03/12] doc: add api_timeout to doc --- .../src/airflow/providers/microsoft/azure/operators/powerbi.py | 1 + .../system/microsoft/azure/example_powerbi_dataset_refresh.py | 1 + 2 files changed, 2 insertions(+) diff --git a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py index 3fc9632b5c39d..d3f6b0933d8f2 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -59,6 +59,7 @@ class PowerBIDatasetRefreshOperator(BaseOperator): :param group_id: The workspace id. :param conn_id: Airflow Connection ID that contains the connection information for the Power BI account used for authentication. :param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits. Used only if ``wait_for_termination`` is True. + :param api_timeout: Time in seconds to wait for the API request to get the refresh status. :param check_interval: Number of seconds to wait before rechecking the refresh status. """ diff --git a/providers/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py b/providers/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py index 5453caff6463b..9b942e5ff0a44 100644 --- a/providers/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py +++ b/providers/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py @@ -65,6 +65,7 @@ def create_connection(conn_id_name: str): dataset_id=DATASET_ID, group_id=GROUP_ID, check_interval=30, + api_timeout=30, timeout=120, ) # [END howto_operator_powerbi_refresh_async] From 918d023d2da6cf00d4c7d44b906240c47ec49d3f Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Mon, 9 Dec 2024 16:00:46 +0100 Subject: [PATCH 04/12] Change: Switch retry from timeout duration to number of try --- .../microsoft/azure/operators/powerbi.py | 4 ---- .../providers/microsoft/azure/triggers/powerbi.py | 15 ++++++++++----- .../microsoft/azure/operators/test_powerbi.py | 2 -- .../microsoft/azure/triggers/test_powerbi.py | 15 ++++----------- .../azure/example_powerbi_dataset_refresh.py | 1 - 5 files changed, 14 insertions(+), 23 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py index d3f6b0933d8f2..b334a1708a6f7 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -59,7 +59,6 @@ class PowerBIDatasetRefreshOperator(BaseOperator): :param group_id: The workspace id. :param conn_id: Airflow Connection ID that contains the connection information for the Power BI account used for authentication. :param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits. Used only if ``wait_for_termination`` is True. - :param api_timeout: Time in seconds to wait for the API request to get the refresh status. :param check_interval: Number of seconds to wait before rechecking the refresh status. """ @@ -81,7 +80,6 @@ def __init__( timeout: float = 60 * 60 * 24 * 7, proxies: dict | None = None, api_version: APIVersion | str | None = None, - api_timeout: float = 30, check_interval: int = 60, **kwargs, ) -> None: @@ -92,7 +90,6 @@ def __init__( self.wait_for_termination = True self.conn_id = conn_id self.timeout = timeout - self.api_timeout = api_timeout self.check_interval = check_interval @property @@ -115,7 +112,6 @@ def execute(self, context: Context): proxies=self.proxies, api_version=self.api_version, check_interval=self.check_interval, - api_timeout=self.api_timeout, wait_for_termination=self.wait_for_termination, ), method_name=self.execute_complete.__name__, diff --git a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py index 43dbb4108ed69..8ec8dfbc24e23 100644 --- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -22,6 +22,8 @@ from collections.abc import AsyncIterator from typing import TYPE_CHECKING +import tenacity + from airflow.providers.microsoft.azure.hooks.powerbi import ( PowerBIDatasetRefreshException, PowerBIDatasetRefreshStatus, @@ -62,7 +64,6 @@ def __init__( proxies: dict | None = None, api_version: APIVersion | str | None = None, check_interval: int = 60, - api_timeout: float = 30, wait_for_termination: bool = True, ): super().__init__() @@ -71,7 +72,6 @@ def __init__( self.timeout = timeout self.group_id = group_id self.check_interval = check_interval - self.api_timeout = api_timeout self.wait_for_termination = wait_for_termination def serialize(self): @@ -86,7 +86,6 @@ def serialize(self): "group_id": self.group_id, "timeout": self.timeout, "check_interval": self.check_interval, - "api_timeout": self.api_timeout, "wait_for_termination": self.wait_for_termination, }, ) @@ -110,8 +109,14 @@ async def run(self) -> AsyncIterator[TriggerEvent]: group_id=self.group_id, ) - async def fetch_refresh_status_and_error() -> tuple[str, str]: - """Fetch the current status and error of the dataset refresh.""" + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(multiplier=1, min=3, max=60), + reraise=True, + retry=tenacity.retry_if_exception_type(PowerBIDatasetRefreshException), + ) + async def fetch_refresh_status() -> str: + """Fetch the current status of the dataset refresh.""" refresh_details = await self.hook.get_refresh_details_by_refresh_id( dataset_id=self.dataset_id, group_id=self.group_id, diff --git a/providers/tests/microsoft/azure/operators/test_powerbi.py b/providers/tests/microsoft/azure/operators/test_powerbi.py index 06f766fa8bcec..a115b4c52dc50 100644 --- a/providers/tests/microsoft/azure/operators/test_powerbi.py +++ b/providers/tests/microsoft/azure/operators/test_powerbi.py @@ -44,7 +44,6 @@ "group_id": GROUP_ID, "dataset_id": DATASET_ID, "check_interval": 1, - "api_timeout": 40, "timeout": 3, } NEW_REFRESH_REQUEST_ID = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" @@ -163,7 +162,6 @@ def test_powerbi_link(self, create_task_instance_of_operator): group_id=GROUP_ID, dataset_id=DATASET_ID, check_interval=1, - api_timeout=40, timeout=3, ) diff --git a/providers/tests/microsoft/azure/triggers/test_powerbi.py b/providers/tests/microsoft/azure/triggers/test_powerbi.py index 50c597715f5bf..099aa18da3a62 100644 --- a/providers/tests/microsoft/azure/triggers/test_powerbi.py +++ b/providers/tests/microsoft/azure/triggers/test_powerbi.py @@ -18,7 +18,6 @@ from __future__ import annotations import asyncio -import logging from unittest import mock import pytest @@ -39,7 +38,6 @@ TIMEOUT = 5 MODULE = "airflow.providers.microsoft.azure" CHECK_INTERVAL = 1 -API_TIMEOUT = 30 API_VERSION = "v1.0" @@ -53,7 +51,6 @@ def powerbi_trigger(timeout=TIMEOUT, check_interval=CHECK_INTERVAL) -> PowerBITr dataset_id=DATASET_ID, group_id=GROUP_ID, check_interval=check_interval, - api_timeout=API_TIMEOUT, wait_for_termination=True, timeout=timeout, ) @@ -70,7 +67,6 @@ def test_powerbi_trigger_serialization(self, connection): dataset_id=DATASET_ID, group_id=GROUP_ID, check_interval=CHECK_INTERVAL, - api_timeout=API_TIMEOUT, wait_for_termination=True, timeout=TIMEOUT, ) @@ -85,7 +81,6 @@ def test_powerbi_trigger_serialization(self, connection): "proxies": None, "api_version": API_VERSION, "check_interval": CHECK_INTERVAL, - "api_timeout": API_TIMEOUT, "wait_for_termination": True, } @@ -192,19 +187,18 @@ async def test_powerbi_trigger_run_exception_during_refresh_check_loop( @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh") @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id") @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh") - async def test_powerbi_trigger_run_PowerBIDatasetRefreshExceptionn_during_refresh_check_loop( + async def test_powerbi_trigger_run_PowerBIDatasetRefreshException_during_refresh_check_loop( self, mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id, mock_cancel_dataset_refresh, - caplog, powerbi_trigger, ): """Assert that run catch PowerBIDatasetRefreshException and triggers retry mechanism""" mock_get_refresh_details_by_refresh_id.side_effect = PowerBIDatasetRefreshException("Test exception") mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID - with caplog.at_level(logging.INFO): + with mock.patch("tenacity.wait.wait_exponential.__call__") as mock_retry: task = [i async for i in powerbi_trigger.run()] response = TriggerEvent( { @@ -215,9 +209,8 @@ async def test_powerbi_trigger_run_PowerBIDatasetRefreshExceptionn_during_refres ) assert len(task) == 1 assert response in task - mock_cancel_dataset_refresh.assert_called_once() - - assert "Retrying in 5 seconds" in caplog.text # Test the retry mechanism + assert mock_cancel_dataset_refresh.call_count == 1 + assert mock_retry.call_count == 3 @pytest.mark.asyncio @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh") diff --git a/providers/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py b/providers/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py index 9b942e5ff0a44..5453caff6463b 100644 --- a/providers/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py +++ b/providers/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py @@ -65,7 +65,6 @@ def create_connection(conn_id_name: str): dataset_id=DATASET_ID, group_id=GROUP_ID, check_interval=30, - api_timeout=30, timeout=120, ) # [END howto_operator_powerbi_refresh_async] From 1cc357d70b04e239597f15fbf90ec3b360b9baf6 Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Wed, 8 Jan 2025 15:31:52 +0100 Subject: [PATCH 05/12] fix: fetch_refresh_status func name --- .../airflow/providers/microsoft/azure/triggers/powerbi.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py index 8ec8dfbc24e23..5fa64bc852135 100644 --- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -23,12 +23,12 @@ from typing import TYPE_CHECKING import tenacity - from airflow.providers.microsoft.azure.hooks.powerbi import ( PowerBIDatasetRefreshException, PowerBIDatasetRefreshStatus, PowerBIHook, ) + from airflow.triggers.base import BaseTrigger, TriggerEvent if TYPE_CHECKING: @@ -115,8 +115,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: reraise=True, retry=tenacity.retry_if_exception_type(PowerBIDatasetRefreshException), ) - async def fetch_refresh_status() -> str: - """Fetch the current status of the dataset refresh.""" + async def fetch_refresh_status_and_error() -> tuple[str, str]: + """Fetch the current status and error of the dataset refresh.""" refresh_details = await self.hook.get_refresh_details_by_refresh_id( dataset_id=self.dataset_id, group_id=self.group_id, From 4278288f8f11040c5edfbffbcad6a68ea0279b43 Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Thu, 9 Jan 2025 10:18:51 +0100 Subject: [PATCH 06/12] Change: separate workflows to trigger refresh and get its status --- .../microsoft/azure/operators/powerbi.py | 36 +++++++++++++++- .../microsoft/azure/triggers/powerbi.py | 42 +++++++++++++------ 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py index b334a1708a6f7..3d4114c919d49 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -114,6 +114,37 @@ def execute(self, context: Context): check_interval=self.check_interval, wait_for_termination=self.wait_for_termination, ), + method_name=self.get_refresh_status.__name__, + ) + + def get_refresh_status(self, context: Context, event: dict[str, str] | None = None): + """Push the refresh Id to XCom then runs the Triggers to wait for refresh completion.""" + if event: + if event["status"] == "error": + raise AirflowException(event["message"]) + + self.xcom_push( + context=context, + key=f"{context['ti'].task_id}.powerbi_dataset_refresh_Id", + value=event["dataset_refresh_id"], + ) + + dataset_refresh_id = self.xcom_pull( + context=context, key=f"{context['ti'].task_id}.powerbi_dataset_refresh_Id" + ) + if dataset_refresh_id: + self.defer( + trigger=PowerBITrigger( + conn_id=self.conn_id, + group_id=self.group_id, + dataset_id=self.dataset_id, + dataset_refresh_id=dataset_refresh_id, + timeout=self.timeout, + proxies=self.proxies, + api_version=self.api_version, + check_interval=self.check_interval, + wait_for_termination=self.wait_for_termination, + ), method_name=self.execute_complete.__name__, ) @@ -128,6 +159,7 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> Any: raise AirflowException(event["message"]) self.xcom_push( - context=context, key="powerbi_dataset_refresh_Id", value=event["dataset_refresh_id"] + context=context, + key=f"{context['ti'].task_id}.powerbi_dataset_refresh_status", + value=event["dataset_refresh_status"], ) - self.xcom_push(context=context, key="powerbi_dataset_refresh_status", value=event["status"]) diff --git a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py index 5fa64bc852135..657d83c1e219c 100644 --- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -23,12 +23,12 @@ from typing import TYPE_CHECKING import tenacity + from airflow.providers.microsoft.azure.hooks.powerbi import ( PowerBIDatasetRefreshException, PowerBIDatasetRefreshStatus, PowerBIHook, ) - from airflow.triggers.base import BaseTrigger, TriggerEvent if TYPE_CHECKING: @@ -49,6 +49,7 @@ class PowerBITrigger(BaseTrigger): You can pass an enum named APIVersion which has 2 possible members v1 and beta, or you can pass a string as `v1.0` or `beta`. :param dataset_id: The dataset Id to refresh. + :param dataset_refresh_id: The dataset refresh Id :param group_id: The workspace Id where dataset is located. :param end_time: Time in seconds when trigger should stop polling. :param check_interval: Time in seconds to wait between each poll. @@ -61,6 +62,7 @@ def __init__( dataset_id: str, group_id: str, timeout: float = 60 * 60 * 24 * 7, + dataset_refresh_id: str | None = None, proxies: dict | None = None, api_version: APIVersion | str | None = None, check_interval: int = 60, @@ -69,6 +71,7 @@ def __init__( super().__init__() self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout) self.dataset_id = dataset_id + self.dataset_refresh_id = dataset_refresh_id self.timeout = timeout self.group_id = group_id self.check_interval = check_interval @@ -83,6 +86,7 @@ def serialize(self): "proxies": self.proxies, "api_version": self.api_version, "dataset_id": self.dataset_id, + "dataset_refresh_id": self.dataset_refresh_id, "group_id": self.group_id, "timeout": self.timeout, "check_interval": self.check_interval, @@ -104,25 +108,39 @@ def api_version(self) -> APIVersion | str: async def run(self) -> AsyncIterator[TriggerEvent]: """Make async connection to the PowerBI and polls for the dataset refresh status.""" - self.dataset_refresh_id = await self.hook.trigger_dataset_refresh( - dataset_id=self.dataset_id, - group_id=self.group_id, - ) + if not self.dataset_refresh_id: + dataset_refresh_id = await self.hook.trigger_dataset_refresh( + dataset_id=self.dataset_id, + group_id=self.group_id, + ) + self.log.info("Triggered dataset refresh %s", dataset_refresh_id) + yield TriggerEvent( + { + "status": "success", + "dataset_refresh_status": None, + "message": f"The dataset refresh {self.dataset_refresh_id} has been triggered.", + "dataset_refresh_id": dataset_refresh_id, + } + ) + return @tenacity.retry( stop=tenacity.stop_after_attempt(3), - wait=tenacity.wait_exponential(multiplier=1, min=3, max=60), + wait=tenacity.wait_exponential(min=5, multiplier=2), reraise=True, retry=tenacity.retry_if_exception_type(PowerBIDatasetRefreshException), ) async def fetch_refresh_status_and_error() -> tuple[str, str]: """Fetch the current status and error of the dataset refresh.""" - refresh_details = await self.hook.get_refresh_details_by_refresh_id( - dataset_id=self.dataset_id, - group_id=self.group_id, - refresh_id=self.dataset_refresh_id, - ) - return refresh_details["status"], refresh_details["error"] + if self.dataset_refresh_id: + refresh_details = await self.hook.get_refresh_details_by_refresh_id( + dataset_id=self.dataset_id, + group_id=self.group_id, + refresh_id=self.dataset_refresh_id, + ) + return refresh_details["status"], refresh_details["error"] + else: + raise Exception("Dataset refresh Id is missing.") try: dataset_refresh_status, dataset_refresh_error = await fetch_refresh_status_and_error() From 8812a56528dd7bf269881ac25862c0d70a1fa99e Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Thu, 9 Jan 2025 15:26:44 +0100 Subject: [PATCH 07/12] Change: Update unit tests --- .../microsoft/azure/triggers/powerbi.py | 31 ++++++++++++------ .../microsoft/azure/operators/test_powerbi.py | 32 +++++++++++++++++-- .../microsoft/azure/triggers/test_powerbi.py | 27 ++++++++++++++-- 3 files changed, 76 insertions(+), 14 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py index 657d83c1e219c..c7bb179a1c2b5 100644 --- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -114,15 +114,26 @@ async def run(self) -> AsyncIterator[TriggerEvent]: group_id=self.group_id, ) self.log.info("Triggered dataset refresh %s", dataset_refresh_id) - yield TriggerEvent( - { - "status": "success", - "dataset_refresh_status": None, - "message": f"The dataset refresh {self.dataset_refresh_id} has been triggered.", - "dataset_refresh_id": dataset_refresh_id, - } - ) - return + if dataset_refresh_id: + yield TriggerEvent( + { + "status": "success", + "dataset_refresh_status": None, + "message": f"The dataset refresh {dataset_refresh_id} has been triggered.", + "dataset_refresh_id": dataset_refresh_id, + } + ) + return + else: + yield TriggerEvent( + { + "status": "error", + "dataset_refresh_status": None, + "message": "Failed to trigger the dataset refresh.", + "dataset_refresh_id": None, + } + ) + return @tenacity.retry( stop=tenacity.stop_after_attempt(3), @@ -140,7 +151,7 @@ async def fetch_refresh_status_and_error() -> tuple[str, str]: ) return refresh_details["status"], refresh_details["error"] else: - raise Exception("Dataset refresh Id is missing.") + raise PowerBIDatasetRefreshException("Dataset refresh Id is missing.") try: dataset_refresh_status, dataset_refresh_error = await fetch_refresh_status_and_error() diff --git a/providers/tests/microsoft/azure/operators/test_powerbi.py b/providers/tests/microsoft/azure/operators/test_powerbi.py index a115b4c52dc50..97d15f6d27cbb 100644 --- a/providers/tests/microsoft/azure/operators/test_powerbi.py +++ b/providers/tests/microsoft/azure/operators/test_powerbi.py @@ -49,6 +49,13 @@ NEW_REFRESH_REQUEST_ID = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" SUCCESS_TRIGGER_EVENT = { + "status": "success", + "dataset_refresh_status": None, + "message": "success", + "dataset_refresh_id": NEW_REFRESH_REQUEST_ID, +} + +SUCCESS_REFRESH_EVENT = { "status": "success", "dataset_refresh_status": PowerBIDatasetRefreshStatus.COMPLETED, "message": "success", @@ -89,6 +96,27 @@ def test_execute_wait_for_termination_with_deferrable(self, connection): assert isinstance(exc.value.trigger, PowerBITrigger) + @mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection) + def test_powerbi_operator_async_get_refresh_status_success(self, connection): + """Assert that get_refresh_status log success message""" + operator = PowerBIDatasetRefreshOperator( + **CONFIG, + ) + context = {"ti": MagicMock()} + context["ti"].task_id = TASK_ID + context["ti"].xcom_pull = MagicMock(return_value=NEW_REFRESH_REQUEST_ID) + + with pytest.raises(TaskDeferred) as exc: + operator.get_refresh_status( + context=context, + event=SUCCESS_TRIGGER_EVENT, + ) + + assert isinstance(exc.value.trigger, PowerBITrigger) + + assert context["ti"].xcom_push.call_count == 1 + assert context["ti"].xcom_pull.call_count == 1 + def test_powerbi_operator_async_execute_complete_success(self): """Assert that execute_complete log success message""" operator = PowerBIDatasetRefreshOperator( @@ -97,9 +125,9 @@ def test_powerbi_operator_async_execute_complete_success(self): context = {"ti": MagicMock()} operator.execute_complete( context=context, - event=SUCCESS_TRIGGER_EVENT, + event=SUCCESS_REFRESH_EVENT, ) - assert context["ti"].xcom_push.call_count == 2 + assert context["ti"].xcom_push.call_count == 1 def test_powerbi_operator_async_execute_complete_fail(self): """Assert that execute_complete raise exception on error""" diff --git a/providers/tests/microsoft/azure/triggers/test_powerbi.py b/providers/tests/microsoft/azure/triggers/test_powerbi.py index 099aa18da3a62..0b1fbdcdae743 100644 --- a/providers/tests/microsoft/azure/triggers/test_powerbi.py +++ b/providers/tests/microsoft/azure/triggers/test_powerbi.py @@ -49,6 +49,7 @@ def powerbi_trigger(timeout=TIMEOUT, check_interval=CHECK_INTERVAL) -> PowerBITr proxies=None, api_version=API_VERSION, dataset_id=DATASET_ID, + dataset_refresh_id=DATASET_REFRESH_ID, group_id=GROUP_ID, check_interval=check_interval, wait_for_termination=True, @@ -65,6 +66,7 @@ def test_powerbi_trigger_serialization(self, connection): proxies=None, api_version=API_VERSION, dataset_id=DATASET_ID, + dataset_refresh_id=DATASET_REFRESH_ID, group_id=GROUP_ID, check_interval=CHECK_INTERVAL, wait_for_termination=True, @@ -76,6 +78,7 @@ def test_powerbi_trigger_serialization(self, connection): assert kwargs == { "conn_id": POWERBI_CONN_ID, "dataset_id": DATASET_ID, + "dataset_refresh_id": DATASET_REFRESH_ID, "timeout": TIMEOUT, "group_id": GROUP_ID, "proxies": None, @@ -129,13 +132,32 @@ async def test_powerbi_trigger_run_failed( ) assert expected == actual + @pytest.mark.asyncio + @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh") + async def test_powerbi_trigger_run_trigger_refresh(self, mock_trigger_dataset_refresh, powerbi_trigger): + """Assert event is triggered upon successful new refresh trigger.""" + powerbi_trigger.dataset_refresh_id = None + mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID + + task = [i async for i in powerbi_trigger.run()] + response = TriggerEvent( + { + "status": "success", + "dataset_refresh_status": None, + "message": f"The dataset refresh {DATASET_REFRESH_ID} has been triggered.", + "dataset_refresh_id": DATASET_REFRESH_ID, + } + ) + assert len(task) == 1 + assert response in task + @pytest.mark.asyncio @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id") @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh") async def test_powerbi_trigger_run_completed( self, mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id, powerbi_trigger ): - """Assert event is triggered upon successful dataset refresh.""" + """Assert event is triggered upon successful dataset refresh completion.""" mock_get_refresh_details_by_refresh_id.return_value = { "status": PowerBIDatasetRefreshStatus.COMPLETED, "error": None, @@ -203,6 +225,7 @@ async def test_powerbi_trigger_run_PowerBIDatasetRefreshException_during_refresh response = TriggerEvent( { "status": "error", + "dataset_refresh_status": None, "message": "An error occurred: Test exception", "dataset_refresh_id": DATASET_REFRESH_ID, } @@ -260,7 +283,7 @@ async def test_powerbi_trigger_run_exception_without_refresh_id( { "status": "error", "dataset_refresh_status": None, - "message": "An error occurred: Test exception for no dataset_refresh_id", + "message": "Failed to trigger the dataset refresh.", "dataset_refresh_id": None, } ) From 1d54d54bb579e720d3de26919b05e26355481194 Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Thu, 9 Jan 2025 18:08:16 +0100 Subject: [PATCH 08/12] Fix: code cleanup --- .../providers/microsoft/azure/operators/powerbi.py | 12 +++++------- .../providers/microsoft/azure/triggers/powerbi.py | 7 +++++-- .../tests/microsoft/azure/operators/test_powerbi.py | 5 ++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py index 3d4114c919d49..f50eb1f0bddd7 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -118,21 +118,19 @@ def execute(self, context: Context): ) def get_refresh_status(self, context: Context, event: dict[str, str] | None = None): - """Push the refresh Id to XCom then runs the Triggers to wait for refresh completion.""" + """Push the refresh Id to XCom then runs the Trigger to wait for refresh completion.""" if event: if event["status"] == "error": raise AirflowException(event["message"]) + dataset_refresh_id = event["dataset_refresh_id"] + + if dataset_refresh_id: self.xcom_push( context=context, key=f"{context['ti'].task_id}.powerbi_dataset_refresh_Id", - value=event["dataset_refresh_id"], + value=dataset_refresh_id, ) - - dataset_refresh_id = self.xcom_pull( - context=context, key=f"{context['ti'].task_id}.powerbi_dataset_refresh_Id" - ) - if dataset_refresh_id: self.defer( trigger=PowerBITrigger( conn_id=self.conn_id, diff --git a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py index c7bb179a1c2b5..c1073a67c31b8 100644 --- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -49,7 +49,7 @@ class PowerBITrigger(BaseTrigger): You can pass an enum named APIVersion which has 2 possible members v1 and beta, or you can pass a string as `v1.0` or `beta`. :param dataset_id: The dataset Id to refresh. - :param dataset_refresh_id: The dataset refresh Id + :param dataset_refresh_id: The dataset refresh Id to poll for the status, if not provided a new refresh will be triggered. :param group_id: The workspace Id where dataset is located. :param end_time: Time in seconds when trigger should stop polling. :param check_interval: Time in seconds to wait between each poll. @@ -109,12 +109,14 @@ def api_version(self) -> APIVersion | str: async def run(self) -> AsyncIterator[TriggerEvent]: """Make async connection to the PowerBI and polls for the dataset refresh status.""" if not self.dataset_refresh_id: + # Trigger the dataset refresh dataset_refresh_id = await self.hook.trigger_dataset_refresh( dataset_id=self.dataset_id, group_id=self.group_id, ) - self.log.info("Triggered dataset refresh %s", dataset_refresh_id) + if dataset_refresh_id: + self.log.info("Triggered dataset refresh %s", dataset_refresh_id) yield TriggerEvent( { "status": "success", @@ -135,6 +137,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) return + # The dataset refresh is already triggered. Poll for the dataset refresh status. @tenacity.retry( stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(min=5, multiplier=2), diff --git a/providers/tests/microsoft/azure/operators/test_powerbi.py b/providers/tests/microsoft/azure/operators/test_powerbi.py index 97d15f6d27cbb..9e920d1b01f38 100644 --- a/providers/tests/microsoft/azure/operators/test_powerbi.py +++ b/providers/tests/microsoft/azure/operators/test_powerbi.py @@ -95,6 +95,7 @@ def test_execute_wait_for_termination_with_deferrable(self, connection): operator.execute(context) assert isinstance(exc.value.trigger, PowerBITrigger) + assert exc.value.trigger.dataset_refresh_id is None @mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection) def test_powerbi_operator_async_get_refresh_status_success(self, connection): @@ -104,7 +105,6 @@ def test_powerbi_operator_async_get_refresh_status_success(self, connection): ) context = {"ti": MagicMock()} context["ti"].task_id = TASK_ID - context["ti"].xcom_pull = MagicMock(return_value=NEW_REFRESH_REQUEST_ID) with pytest.raises(TaskDeferred) as exc: operator.get_refresh_status( @@ -113,9 +113,8 @@ def test_powerbi_operator_async_get_refresh_status_success(self, connection): ) assert isinstance(exc.value.trigger, PowerBITrigger) - + assert exc.value.trigger.dataset_refresh_id is NEW_REFRESH_REQUEST_ID assert context["ti"].xcom_push.call_count == 1 - assert context["ti"].xcom_pull.call_count == 1 def test_powerbi_operator_async_execute_complete_success(self): """Assert that execute_complete log success message""" From a69567ab9d62d00aa855ed640ee184d4080dc74e Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Fri, 10 Jan 2025 16:14:54 +0100 Subject: [PATCH 09/12] fix: CI tests --- .../microsoft/azure/triggers/powerbi.py | 4 +-- .../microsoft/azure/triggers/test_powerbi.py | 27 +++++++++---------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py index c1073a67c31b8..dddaefe089d36 100644 --- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -153,8 +153,8 @@ async def fetch_refresh_status_and_error() -> tuple[str, str]: refresh_id=self.dataset_refresh_id, ) return refresh_details["status"], refresh_details["error"] - else: - raise PowerBIDatasetRefreshException("Dataset refresh Id is missing.") + + raise PowerBIDatasetRefreshException("Dataset refresh Id is missing.") try: dataset_refresh_status, dataset_refresh_error = await fetch_refresh_status_and_error() diff --git a/providers/tests/microsoft/azure/triggers/test_powerbi.py b/providers/tests/microsoft/azure/triggers/test_powerbi.py index 0b1fbdcdae743..58bb3489fd56e 100644 --- a/providers/tests/microsoft/azure/triggers/test_powerbi.py +++ b/providers/tests/microsoft/azure/triggers/test_powerbi.py @@ -220,20 +220,19 @@ async def test_powerbi_trigger_run_PowerBIDatasetRefreshException_during_refresh mock_get_refresh_details_by_refresh_id.side_effect = PowerBIDatasetRefreshException("Test exception") mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID - with mock.patch("tenacity.wait.wait_exponential.__call__") as mock_retry: - task = [i async for i in powerbi_trigger.run()] - response = TriggerEvent( - { - "status": "error", - "dataset_refresh_status": None, - "message": "An error occurred: Test exception", - "dataset_refresh_id": DATASET_REFRESH_ID, - } - ) - assert len(task) == 1 - assert response in task - assert mock_cancel_dataset_refresh.call_count == 1 - assert mock_retry.call_count == 3 + task = [i async for i in powerbi_trigger.run()] + response = TriggerEvent( + { + "status": "error", + "dataset_refresh_status": None, + "message": "An error occurred: Test exception", + "dataset_refresh_id": DATASET_REFRESH_ID, + } + ) + assert mock_get_refresh_details_by_refresh_id.call_count == 3 + assert len(task) == 1 + assert response in task + assert mock_cancel_dataset_refresh.call_count == 1 @pytest.mark.asyncio @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh") From 01b3ebef74e2075a83d1705c8aa159b152b67fba Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Tue, 14 Jan 2025 11:13:29 +0100 Subject: [PATCH 10/12] fix: fixes following dabla feedbacks --- .../microsoft/azure/operators/powerbi.py | 2 +- .../microsoft/azure/triggers/powerbi.py | 20 +++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py index f50eb1f0bddd7..ab6e0f98414f1 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -128,7 +128,7 @@ def get_refresh_status(self, context: Context, event: dict[str, str] | None = No if dataset_refresh_id: self.xcom_push( context=context, - key=f"{context['ti'].task_id}.powerbi_dataset_refresh_Id", + key=f"{self.task_id}.powerbi_dataset_refresh_Id", value=dataset_refresh_id, ) self.defer( diff --git a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py index dddaefe089d36..8f749311806ec 100644 --- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -126,16 +126,16 @@ async def run(self) -> AsyncIterator[TriggerEvent]: } ) return - else: - yield TriggerEvent( - { - "status": "error", - "dataset_refresh_status": None, - "message": "Failed to trigger the dataset refresh.", - "dataset_refresh_id": None, - } - ) - return + + yield TriggerEvent( + { + "status": "error", + "dataset_refresh_status": None, + "message": "Failed to trigger the dataset refresh.", + "dataset_refresh_id": None, + } + ) + return # The dataset refresh is already triggered. Poll for the dataset refresh status. @tenacity.retry( From 18648a1762af449feaa3b5550d2c9a52cc0e1842 Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Tue, 14 Jan 2025 11:16:25 +0100 Subject: [PATCH 11/12] Fix: forgotten self.task_id --- .../src/airflow/providers/microsoft/azure/operators/powerbi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py index ab6e0f98414f1..35d5027978bcd 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -158,6 +158,6 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> Any: self.xcom_push( context=context, - key=f"{context['ti'].task_id}.powerbi_dataset_refresh_status", + key=f"{self.task_id}.powerbi_dataset_refresh_status", value=event["dataset_refresh_status"], ) From bfe7fa28e9f2fc07835363f72d63c4606c2e9a23 Mon Sep 17 00:00:00 2001 From: "emma.galliere" Date: Tue, 14 Jan 2025 11:40:04 +0100 Subject: [PATCH 12/12] change: push powerbi_dataset_refresh_status before raising exception --- .../airflow/providers/microsoft/azure/operators/powerbi.py | 5 ++--- providers/tests/microsoft/azure/operators/test_powerbi.py | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py index 35d5027978bcd..1c6878c27af68 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -153,11 +153,10 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> Any: Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if event: - if event["status"] == "error": - raise AirflowException(event["message"]) - self.xcom_push( context=context, key=f"{self.task_id}.powerbi_dataset_refresh_status", value=event["dataset_refresh_status"], ) + if event["status"] == "error": + raise AirflowException(event["message"]) diff --git a/providers/tests/microsoft/azure/operators/test_powerbi.py b/providers/tests/microsoft/azure/operators/test_powerbi.py index 9e920d1b01f38..7975411d50cc8 100644 --- a/providers/tests/microsoft/azure/operators/test_powerbi.py +++ b/providers/tests/microsoft/azure/operators/test_powerbi.py @@ -144,7 +144,7 @@ def test_powerbi_operator_async_execute_complete_fail(self): "dataset_refresh_id": "1234", }, ) - assert context["ti"].xcom_push.call_count == 0 + assert context["ti"].xcom_push.call_count == 1 assert str(exc.value) == "error" def test_powerbi_operator_refresh_fail(self): @@ -163,7 +163,7 @@ def test_powerbi_operator_refresh_fail(self): "dataset_refresh_id": "1234", }, ) - assert context["ti"].xcom_push.call_count == 0 + assert context["ti"].xcom_push.call_count == 1 assert str(exc.value) == "error message" def test_execute_complete_no_event(self):