Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
13b8789
refactor: Allow custom api versions to be passed
davidblain-infrabel Jul 30, 2024
e3becfb
refactored: Reformatted TestKiotaRequestAdapterHook
davidblain-infrabel Aug 8, 2024
8f46b85
refactored: Changed type for api_version in get_airflow_connection
davidblain-infrabel Aug 8, 2024
925dff7
refactored: Reformatted TestKiotaRequestAdapterHook
davidblain-infrabel Aug 8, 2024
28847f5
refactored: No need to call value on api_version
davidblain-infrabel Aug 8, 2024
62d0581
refactored: Removed duplicate DefaultResponseHandler
davidblain-infrabel Aug 8, 2024
1c18572
refactored: Changed return type of resolve_api_version_from_value
davidblain-infrabel Aug 8, 2024
fa6c5af
refactored: Reformatted resolve_api_version_from_value method
davidblain-infrabel Aug 8, 2024
d16a100
refactored: Try ignore type error in get_api_version method
davidblain-infrabel Aug 8, 2024
eaeab3a
docs: Added example on how to use the MSGraphAsyncOperator to trigger…
davidblain-infrabel Aug 8, 2024
3353467
refactor: Removed unused imports
davidblain-infrabel Aug 8, 2024
b3f1f36
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 8, 2024
38e841f
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 9, 2024
75863b9
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 9, 2024
4e0962a
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 9, 2024
e6d1828
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 13, 2024
28b6b0c
fix: Fixed serialization of api_version in MSGraphTrigger
davidblain-infrabel Aug 13, 2024
eeb9ce8
refactor: Refactored get_airflow_connection in conftest
davidblain-infrabel Aug 13, 2024
78e7881
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 13, 2024
077c859
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 14, 2024
aca9be3
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 14, 2024
4938592
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 14, 2024
15fa95e
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 14, 2024
5095443
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 14, 2024
45a5b73
refactor: Updated api version types as well in PowerBI
davidblain-infrabel Aug 14, 2024
467f5e8
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 14, 2024
a562908
refactor: Api version property could return None
davidblain-infrabel Aug 14, 2024
41c26d6
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 14, 2024
28dadc2
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 20, 2024
8e5e320
Merge branch 'main' into feature/msgraph-change-api-version
dabla Aug 22, 2024
3e97974
Merge branch 'main' into feature/msgraph-change-api-version
dabla Sep 3, 2024
6577f1a
refactor: Refactored and fixed PowerBI related tests
davidblain-infrabel Sep 4, 2024
2255ad5
refactor: Added support for tenantId beside tenant_id as extra config…
davidblain-infrabel Sep 4, 2024
876da95
refactor: Reformatted test related files to conform static checks
davidblain-infrabel Sep 4, 2024
6f1856d
refactor: Reformatted TestPowerBITrigger
davidblain-infrabel Sep 4, 2024
7f345ae
refactor: Moved import of RequestAdapter into type checking block
davidblain-infrabel Sep 4, 2024
e77ad05
Merge branch 'main' into feature/msgraph-change-api-version
dabla Sep 4, 2024
975723f
Merge branch 'main' into feature/msgraph-change-api-version
dabla Sep 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 22 additions & 25 deletions airflow/providers/microsoft/azure/hooks/msgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class KiotaRequestAdapterHook(BaseHook):
:param timeout: The HTTP timeout being used by the KiotaRequestAdapter (default is None).
When no timeout is specified or set to None then no HTTP timeout is applied on each request.
:param proxies: A Dict defining the HTTP proxies to be used (default is None).
:param host: The host to be used (default is "https://graph.microsoft.com").
:param scopes: The scopes to be used (default is ["https://graph.microsoft.com/.default"]).
:param api_version: The API version of the Microsoft Graph API to be used (default is v1).
You can pass an enum named APIVersion which has 2 possible members v1 and beta,
or you can pass a string as "v1.0" or "beta".
Expand Down Expand Up @@ -123,27 +125,22 @@ def __init__(
self._api_version = self.resolve_api_version_from_value(api_version)

@property
def api_version(self) -> APIVersion:
def api_version(self) -> str | None:
self.get_conn() # Make sure config has been loaded through get_conn to have correct api version!
return self._api_version

@staticmethod
def resolve_api_version_from_value(
api_version: APIVersion | str, default: APIVersion | None = None
) -> APIVersion:
api_version: APIVersion | str, default: str | None = None
) -> str | None:
if isinstance(api_version, APIVersion):
return api_version
return next(
filter(lambda version: version.value == api_version, APIVersion),
default,
)
return api_version.value
return api_version or default

def get_api_version(self, config: dict) -> APIVersion:
if self._api_version is None:
return self.resolve_api_version_from_value(
api_version=config.get("api_version"), default=APIVersion.v1
)
return self._api_version
def get_api_version(self, config: dict) -> str:
return self._api_version or self.resolve_api_version_from_value(
config.get("api_version"), APIVersion.v1.value
) # type: ignore

def get_host(self, connection: Connection) -> str:
if connection.schema and connection.host:
Expand All @@ -169,15 +166,15 @@ def to_httpx_proxies(cls, proxies: dict) -> dict:
return proxies

def to_msal_proxies(self, authority: str | None, proxies: dict):
self.log.info("authority: %s", authority)
self.log.debug("authority: %s", authority)
if authority:
no_proxies = proxies.get("no")
self.log.info("no_proxies: %s", no_proxies)
self.log.debug("no_proxies: %s", no_proxies)
if no_proxies:
for url in no_proxies.split(","):
self.log.info("url: %s", url)
domain_name = urlparse(url).path.replace("*", "")
self.log.info("domain_name: %s", domain_name)
self.log.debug("domain_name: %s", domain_name)
if authority.endswith(domain_name):
return None
return proxies
Expand All @@ -193,10 +190,10 @@ def get_conn(self) -> RequestAdapter:
client_id = connection.login
client_secret = connection.password
config = connection.extra_dejson if connection.extra else {}
tenant_id = config.get("tenant_id")
tenant_id = config.get("tenant_id") or config.get("tenantId")
api_version = self.get_api_version(config)
host = self.get_host(connection)
base_url = config.get("base_url", urljoin(host, api_version.value))
base_url = config.get("base_url", urljoin(host, api_version))
authority = config.get("authority")
proxies = self.proxies or config.get("proxies", {})
msal_proxies = self.to_msal_proxies(authority=authority, proxies=proxies)
Expand All @@ -209,15 +206,15 @@ def get_conn(self) -> RequestAdapter:

self.log.info(
"Creating Microsoft Graph SDK client %s for conn_id: %s",
api_version.value,
api_version,
self.conn_id,
)
self.log.info("Host: %s", host)
self.log.info("Base URL: %s", base_url)
self.log.info("Tenant id: %s", tenant_id)
self.log.info("Client id: %s", client_id)
self.log.info("Client secret: %s", client_secret)
self.log.info("API version: %s", api_version.value)
self.log.info("API version: %s", api_version)
self.log.info("Scope: %s", scopes)
self.log.info("Verify: %s", verify)
self.log.info("Timeout: %s", self.timeout)
Expand All @@ -238,17 +235,17 @@ def get_conn(self) -> RequestAdapter:
connection_verify=verify,
)
http_client = GraphClientFactory.create_with_default_middleware(
api_version=api_version,
api_version=api_version, # type: ignore
client=httpx.AsyncClient(
proxies=httpx_proxies,
timeout=Timeout(timeout=self.timeout),
verify=verify,
trust_env=trust_env,
),
host=host,
host=host, # type: ignore
)
auth_provider = AzureIdentityAuthenticationProvider(
credentials=credentials,
credentials=credentials, # type: ignore
scopes=scopes,
allowed_hosts=allowed_hosts,
)
Expand Down Expand Up @@ -295,7 +292,7 @@ async def run(
error_map=self.error_mapping(),
)

self.log.info("response: %s", response)
self.log.debug("response: %s", response)

return response

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/microsoft/azure/operators/msgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
key: str = XCOM_RETURN_KEY,
timeout: float | None = None,
proxies: dict | None = None,
api_version: APIVersion | None = None,
api_version: APIVersion | str | None = None,
pagination_function: Callable[[MSGraphAsyncOperator, dict], tuple[str, dict]] | None = None,
result_processor: Callable[[Context, Any], Any] = lambda context, result: result,
serializer: type[ResponseSerializer] = ResponseSerializer,
Expand Down
12 changes: 11 additions & 1 deletion airflow/providers/microsoft/azure/operators/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
conn_id: str = PowerBIHook.default_conn_name,
timeout: float = 60 * 60 * 24 * 7,
proxies: dict | None = None,
api_version: APIVersion | None = None,
api_version: APIVersion | str | None = None,
check_interval: int = 60,
**kwargs,
) -> None:
Expand All @@ -89,6 +89,14 @@ def __init__(
self.timeout = timeout
self.check_interval = check_interval

@property
def proxies(self) -> dict | None:
return self.hook.proxies

@property
def api_version(self) -> str | None:
return self.hook.api_version

def execute(self, context: Context):
"""Refresh the Power BI Dataset."""
if self.wait_for_termination:
Expand All @@ -98,6 +106,8 @@ def execute(self, context: Context):
group_id=self.group_id,
dataset_id=self.dataset_id,
timeout=self.timeout,
proxies=self.proxies,
api_version=self.api_version,
check_interval=self.check_interval,
wait_for_termination=self.wait_for_termination,
),
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/microsoft/azure/sensors/msgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(
data: dict[str, Any] | str | BytesIO | None = None,
conn_id: str = KiotaRequestAdapterHook.default_conn_name,
proxies: dict | None = None,
api_version: APIVersion | None = None,
api_version: APIVersion | str | None = None,
event_processor: Callable[[Context, Any], bool] = lambda context, e: e.get("status") == "Succeeded",
result_processor: Callable[[Context, Any], Any] = lambda context, result: result,
serializer: type[ResponseSerializer] = ResponseSerializer,
Expand Down
7 changes: 3 additions & 4 deletions airflow/providers/microsoft/azure/triggers/msgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __init__(
conn_id: str = KiotaRequestAdapterHook.default_conn_name,
timeout: float | None = None,
proxies: dict | None = None,
api_version: APIVersion | None = None,
api_version: APIVersion | str | None = None,
serializer: type[ResponseSerializer] = ResponseSerializer,
):
super().__init__()
Expand Down Expand Up @@ -152,14 +152,13 @@ def resolve_type(cls, value: str | type, default) -> type:

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize the HttpTrigger arguments and classpath."""
api_version = self.api_version.value if self.api_version else None
return (
f"{self.__class__.__module__}.{self.__class__.__name__}",
{
"conn_id": self.conn_id,
"timeout": self.timeout,
"proxies": self.proxies,
"api_version": api_version,
"api_version": self.api_version,
"serializer": f"{self.serializer.__class__.__module__}.{self.serializer.__class__.__name__}",
"url": self.url,
"path_parameters": self.path_parameters,
Expand Down Expand Up @@ -188,7 +187,7 @@ def proxies(self) -> dict | None:
return self.hook.proxies

@property
def api_version(self) -> APIVersion:
def api_version(self) -> APIVersion | str:
return self.hook.api_version

async def run(self) -> AsyncIterator[TriggerEvent]:
Expand Down
7 changes: 3 additions & 4 deletions airflow/providers/microsoft/azure/triggers/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
group_id: str,
timeout: float = 60 * 60 * 24 * 7,
proxies: dict | None = None,
api_version: APIVersion | None = None,
api_version: APIVersion | str | None = None,
check_interval: int = 60,
wait_for_termination: bool = True,
):
Expand All @@ -72,13 +72,12 @@ def __init__(

def serialize(self):
"""Serialize the trigger instance."""
api_version = self.api_version.value if self.api_version else None
return (
"airflow.providers.microsoft.azure.triggers.powerbi.PowerBITrigger",
{
"conn_id": self.conn_id,
"proxies": self.proxies,
"api_version": api_version,
"api_version": self.api_version,
"dataset_id": self.dataset_id,
"group_id": self.group_id,
"timeout": self.timeout,
Expand All @@ -96,7 +95,7 @@ def proxies(self) -> dict | None:
return self.hook.proxies

@property
def api_version(self) -> APIVersion:
def api_version(self) -> APIVersion | str:
return self.hook.api_version

async def run(self) -> AsyncIterator[TriggerEvent]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ Below is an example of using this operator to refresh PowerBI dataset.
:start-after: [START howto_operator_powerbi_refresh_dataset]
:end-before: [END howto_operator_powerbi_refresh_dataset]

Below is an example of using this operator to create an item schedule in Fabric.

.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_msfabric.py
:language: python
:dedent: 0
:start-after: [START howto_operator_ms_fabric_create_item_schedule]
:end-before: [END howto_operator_ms_fabric_create_item_schedule]


Reference
---------
Expand All @@ -80,3 +88,4 @@ For further information, look at:

* `Use the Microsoft Graph API <https://learn.microsoft.com/en-us/graph/use-the-api/>`__
* `Using the Power BI REST APIs <https://learn.microsoft.com/en-us/rest/api/power-bi/>`__
* `Using the Fabric REST APIs <https://learn.microsoft.com/en-us/rest/api/fabric/articles/using-fabric-apis/>`__
Loading