From 2308ef2836a6f06f31201dab311e779d293fc5b7 Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Tue, 19 Dec 2023 14:45:00 +0000 Subject: [PATCH] Implement Google Analytics Admin (GA4) operators --- .github/workflows/ci.yml | 2 +- .../marketing_platform/hooks/analytics.py | 9 + .../hooks/analytics_admin.py | 234 +++++++ .../marketing_platform/links/__init__.py | 16 + .../links/analytics_admin.py | 65 ++ .../marketing_platform/operators/analytics.py | 53 ++ .../operators/analytics_admin.py | 579 ++++++++++++++++++ airflow/providers/google/provider.yaml | 14 + .../marketing_platform/analytics.rst | 5 + .../marketing_platform/analytics_admin.rst | 151 +++++ generated/provider_dependencies.json | 1 + .../hooks/test_analytics_admin.py | 192 ++++++ .../marketing_platform/links/__init__.py | 17 + .../links/test_analytics_admin.py | 71 +++ .../operators/test_analytics_admin.py | 310 ++++++++++ .../example_analytics_admin.py | 203 ++++++ 16 files changed, 1921 insertions(+), 1 deletion(-) create mode 100644 airflow/providers/google/marketing_platform/hooks/analytics_admin.py create mode 100644 airflow/providers/google/marketing_platform/links/__init__.py create mode 100644 airflow/providers/google/marketing_platform/links/analytics_admin.py create mode 100644 airflow/providers/google/marketing_platform/operators/analytics_admin.py create mode 100644 docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst create mode 100644 tests/providers/google/marketing_platform/hooks/test_analytics_admin.py create mode 100644 tests/providers/google/marketing_platform/links/__init__.py create mode 100644 tests/providers/google/marketing_platform/links/test_analytics_admin.py create mode 100644 tests/providers/google/marketing_platform/operators/test_analytics_admin.py create mode 100644 tests/system/providers/google/marketing_platform/example_analytics_admin.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7e52be67b21a7..a622e7411152b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -668,7 +668,7 @@ jobs: run: aws s3 sync --delete ./files/documentation s3://apache-airflow-docs spellcheck-docs: - timeout-minutes: 60 + timeout-minutes: 120 name: "Spellcheck docs" runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}} needs: [build-info, wait-for-ci-images] diff --git a/airflow/providers/google/marketing_platform/hooks/analytics.py b/airflow/providers/google/marketing_platform/hooks/analytics.py index 05df51c2c1f57..ec98ec2829397 100644 --- a/airflow/providers/google/marketing_platform/hooks/analytics.py +++ b/airflow/providers/google/marketing_platform/hooks/analytics.py @@ -17,11 +17,13 @@ # under the License. from __future__ import annotations +import warnings from typing import Any from googleapiclient.discovery import Resource, build from googleapiclient.http import MediaFileUpload +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.google.common.hooks.base_google import GoogleBaseHook @@ -30,6 +32,13 @@ class GoogleAnalyticsHook(GoogleBaseHook): def __init__(self, api_version: str = "v3", *args, **kwargs): super().__init__(*args, **kwargs) + warnings.warn( + f"The `{type(self).__name__}` class is deprecated, please use " + f"`GoogleAnalyticsAdminHook` instead.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) + self.api_version = api_version self._conn = None diff --git a/airflow/providers/google/marketing_platform/hooks/analytics_admin.py b/airflow/providers/google/marketing_platform/hooks/analytics_admin.py new file mode 100644 index 0000000000000..cff9e0e409681 --- /dev/null +++ b/airflow/providers/google/marketing_platform/hooks/analytics_admin.py @@ -0,0 +1,234 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Hooks for Google Analytics (GA4) Admin service. + +.. spelling:word-list:: + + DataStream + ListAccountsPager + ListGoogleAdsLinksPager +""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Sequence + +from google.analytics.admin_v1beta import ( + AnalyticsAdminServiceClient, + DataStream, + Property, +) +from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault + +from airflow.providers.google.common.consts import CLIENT_INFO +from airflow.providers.google.common.hooks.base_google import GoogleBaseHook + +if TYPE_CHECKING: + from google.analytics.admin_v1beta.services.analytics_admin_service.pagers import ( + ListAccountsPager, + ListGoogleAdsLinksPager, + ) + from google.api_core.retry import Retry + + +class GoogleAnalyticsAdminHook(GoogleBaseHook): + """Hook for Google Analytics 4 (GA4) Admin API.""" + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._conn: AnalyticsAdminServiceClient | None = None + + def get_conn(self) -> AnalyticsAdminServiceClient: + if not self._conn: + self._conn = AnalyticsAdminServiceClient( + credentials=self.get_credentials(), client_info=CLIENT_INFO + ) + return self._conn + + def list_accounts( + self, + page_size: int | None = None, + page_token: str | None = None, + show_deleted: bool | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> ListAccountsPager: + """Get list of accounts in Google Analytics. + + .. seealso:: + For more details please check the client library documentation: + https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/accounts/list + + :param page_size: Optional, number of results to return in the list. + :param page_token: Optional. The next_page_token value returned from a previous List request, if any. + :param show_deleted: Optional. Whether to include soft-deleted (ie: "trashed") Accounts in the results. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + + :returns: List of Google Analytics accounts. + """ + request = {"page_size": page_size, "page_token": page_token, "show_deleted": show_deleted} + client = self.get_conn() + return client.list_accounts(request=request, retry=retry, timeout=timeout, metadata=metadata) + + def create_property( + self, + analytics_property: Property | dict, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Property: + """Create Google Analytics property. + + .. seealso:: + For more details please check the client library documentation: + https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties/create + + :param analytics_property: The property to create. Note: the supplied property must specify its + parent. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + + :returns: Created Google Analytics property. + """ + client = self.get_conn() + return client.create_property( + request={"property": analytics_property}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + def delete_property( + self, + property_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Property: + """Soft delete Google Analytics property. + + .. seealso:: + For more details please check the client library documentation: + https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties/delete + + :param property_id: ID of the Property to soft-delete. Format: properties/{property_id}. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + + :returns: Resource message representing Google Analytics property. + """ + client = self.get_conn() + request = {"name": f"properties/{property_id}"} + return client.delete_property(request=request, retry=retry, timeout=timeout, metadata=metadata) + + def create_data_stream( + self, + property_id: str, + data_stream: DataStream | dict, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> DataStream: + """Create Google Analytics data stream. + + .. seealso:: + For more details please check the client library documentation: + https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams/create + + :param property_id: ID of the parent property for the data stream. + :param data_stream: The data stream to create. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + + :returns: Created Google Analytics data stream. + """ + client = self.get_conn() + return client.create_data_stream( + request={"parent": f"properties/{property_id}", "data_stream": data_stream}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + def delete_data_stream( + self, + property_id: str, + data_stream_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> None: + """Delete Google Analytics data stream. + + .. seealso:: + For more details please check the client library documentation: + https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams/delete + + :param property_id: ID of the parent property for the data stream. + :param data_stream_id: The data stream id to delete. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_conn() + return client.delete_data_stream( + request={"name": f"properties/{property_id}/dataStreams/{data_stream_id}"}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + def list_google_ads_links( + self, + property_id: str, + page_size: int | None = None, + page_token: str | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> ListGoogleAdsLinksPager: + """Get list of Google Ads links. + + .. seealso:: + For more details please check the client library documentation: + https://googleapis.dev/python/analyticsadmin/latest/admin_v1beta/analytics_admin_service.html#google.analytics.admin_v1beta.services.analytics_admin_service.AnalyticsAdminServiceAsyncClient.list_google_ads_links + + :param property_id: ID of the parent property. + :param page_size: Optional, number of results to return in the list. + :param page_token: Optional. The next_page_token value returned from a previous List request, if any. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + + :returns: List of Google Analytics accounts. + """ + client = self.get_conn() + request = {"parent": f"properties/{property_id}", "page_size": page_size, "page_token": page_token} + return client.list_google_ads_links(request=request, retry=retry, timeout=timeout, metadata=metadata) diff --git a/airflow/providers/google/marketing_platform/links/__init__.py b/airflow/providers/google/marketing_platform/links/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/google/marketing_platform/links/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/google/marketing_platform/links/analytics_admin.py b/airflow/providers/google/marketing_platform/links/analytics_admin.py new file mode 100644 index 0000000000000..3ab79c8804715 --- /dev/null +++ b/airflow/providers/google/marketing_platform/links/analytics_admin.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, ClassVar + +from airflow.models import BaseOperator, BaseOperatorLink, XCom + +if TYPE_CHECKING: + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.utils.context import Context + + +BASE_LINK = "https://analytics.google.com/analytics/web/" + + +class GoogleAnalyticsBaseLink(BaseOperatorLink): + """Base class for Google Analytics links. + + :meta private: + """ + + name: ClassVar[str] + key: ClassVar[str] + format_str: ClassVar[str] + + def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str: + if conf := XCom.get_value(key=self.key, ti_key=ti_key): + res = BASE_LINK + "#/" + self.format_str.format(**conf) + return res + return "" + + +class GoogleAnalyticsPropertyLink(GoogleAnalyticsBaseLink): + """Helper class for constructing Google Analytics Property Link.""" + + name = "Data Analytics Property" + key = "data_analytics_property" + format_str = "p{property_id}/" + + @staticmethod + def persist( + context: Context, + task_instance: BaseOperator, + property_id: str, + ): + task_instance.xcom_push( + context, + key=GoogleAnalyticsPropertyLink.key, + value={"property_id": property_id}, + ) diff --git a/airflow/providers/google/marketing_platform/operators/analytics.py b/airflow/providers/google/marketing_platform/operators/analytics.py index 0098980e9bee2..d987b62591ed1 100644 --- a/airflow/providers/google/marketing_platform/operators/analytics.py +++ b/airflow/providers/google/marketing_platform/operators/analytics.py @@ -19,9 +19,11 @@ from __future__ import annotations import csv +import warnings from tempfile import NamedTemporaryFile from typing import TYPE_CHECKING, Any, Sequence +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.marketing_platform.hooks.analytics import GoogleAnalyticsHook @@ -34,6 +36,10 @@ class GoogleAnalyticsListAccountsOperator(BaseOperator): """ Lists all accounts to which the user has access. + .. seealso:: + This operator is deprecated, please use + :class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator`: + .. seealso:: Check official API docs: https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/accounts/list @@ -70,6 +76,13 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: + warnings.warn( + f"The `{type(self).__name__}` operator is deprecated, please use " + f"`GoogleAnalyticsAdminListAccountsOperator` instead.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) + super().__init__(**kwargs) self.api_version = api_version @@ -90,6 +103,10 @@ class GoogleAnalyticsGetAdsLinkOperator(BaseOperator): """ Returns a web property-Google Ads link to which the user has access. + .. seealso:: + This operator is deprecated, please use + :class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator`: + .. seealso:: Check official API docs: https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/webPropertyAdWordsLinks/get @@ -132,6 +149,12 @@ def __init__( **kwargs, ): super().__init__(**kwargs) + warnings.warn( + f"The `{type(self).__name__}` operator is deprecated, please use " + f"`GoogleAnalyticsAdminGetGoogleAdsLinkOperator` instead.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) self.account_id = account_id self.web_property_ad_words_link_id = web_property_ad_words_link_id @@ -158,6 +181,10 @@ class GoogleAnalyticsRetrieveAdsLinksListOperator(BaseOperator): """ Lists webProperty-Google Ads links for a given web property. + .. seealso:: + This operator is deprecated, please use + :class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator`: + .. seealso:: Check official API docs: https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/webPropertyAdWordsLinks/list#http-request @@ -197,6 +224,12 @@ def __init__( **kwargs, ) -> None: super().__init__(**kwargs) + warnings.warn( + f"The `{type(self).__name__}` operator is deprecated, please use " + f"`GoogleAnalyticsAdminListGoogleAdsLinksOperator` instead.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) self.account_id = account_id self.web_property_id = web_property_id @@ -221,6 +254,10 @@ class GoogleAnalyticsDataImportUploadOperator(BaseOperator): """ Take a file from Cloud Storage and uploads it to GA via data import API. + .. seealso:: + This operator is deprecated, please use + :class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator`: + :param storage_bucket: The Google cloud storage bucket where the file is stored. :param storage_name_object: The name of the object in the desired Google cloud storage bucket. (templated) If the destination points to an existing @@ -266,6 +303,12 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: + warnings.warn( + f"The `{type(self).__name__}` operator is deprecated, please use " + f"`GoogleAnalyticsAdminCreateDataStreamOperator` instead.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) super().__init__(**kwargs) self.storage_bucket = storage_bucket self.storage_name_object = storage_name_object @@ -317,6 +360,10 @@ class GoogleAnalyticsDeletePreviousDataUploadsOperator(BaseOperator): """ Deletes previous GA uploads to leave the latest file to control the size of the Data Set Quota. + .. seealso:: + This operator is deprecated, please use + :class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator`: + :param account_id: The GA account Id (long) to which the data upload belongs. :param web_property_id: The web property UA-string associated with the upload. :param custom_data_source_id: The id to which the data import belongs. @@ -348,6 +395,12 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: + warnings.warn( + f"The `{type(self).__name__}` operator is deprecated, please use " + f"`GoogleAnalyticsAdminDeleteDataStreamOperator` instead.", + AirflowProviderDeprecationWarning, + stacklevel=1, + ) super().__init__(**kwargs) self.account_id = account_id diff --git a/airflow/providers/google/marketing_platform/operators/analytics_admin.py b/airflow/providers/google/marketing_platform/operators/analytics_admin.py new file mode 100644 index 0000000000000..d961630f866c2 --- /dev/null +++ b/airflow/providers/google/marketing_platform/operators/analytics_admin.py @@ -0,0 +1,579 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains Google Analytics 4 (GA4) operators.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Sequence + +from google.analytics.admin_v1beta import ( + Account, + DataStream, + GoogleAdsLink, + Property, +) +from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault + +from airflow.exceptions import AirflowNotFoundException +from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.marketing_platform.hooks.analytics_admin import GoogleAnalyticsAdminHook +from airflow.providers.google.marketing_platform.links.analytics_admin import GoogleAnalyticsPropertyLink + +if TYPE_CHECKING: + from google.api_core.retry import Retry + from google.protobuf.message import Message + + from airflow.utils.context import Context + + +class GoogleAnalyticsAdminListAccountsOperator(GoogleCloudBaseOperator): + """ + Lists all accounts to which the user has access. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleAnalyticsAdminListAccountsOperator` + + :param page_size: Optional, number of results to return in the list. + :param page_token: Optional. The next_page_token value returned from a previous List request, if any. + :param show_deleted: Optional. Whether to include soft-deleted (ie: "trashed") Accounts in the results. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "gcp_conn_id", + "impersonation_chain", + "page_size", + "page_token", + ) + + def __init__( + self, + *, + page_size: int | None = None, + page_token: str | None = None, + show_deleted: bool | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.page_size = page_size + self.page_token = page_token + self.show_deleted = show_deleted + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute( + self, + context: Context, + ) -> Sequence[Message]: + hook = GoogleAnalyticsAdminHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info( + "Requesting list of Google Analytics accounts. " + f"Page size: {self.page_size}, page token: {self.page_token}" + ) + accounts = hook.list_accounts( + page_size=self.page_size, + page_token=self.page_token, + show_deleted=self.show_deleted, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + accounts_list: Sequence[Message] = [Account.to_dict(item) for item in accounts] + n = len(accounts_list) + self.log.info("Successful request. Retrieved %s item%s.", n, "s" if n > 1 else "") + return accounts_list + + +class GoogleAnalyticsAdminCreatePropertyOperator(GoogleCloudBaseOperator): + """ + Creates property. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleAnalyticsAdminCreatePropertyOperator` + + :param analytics_property: The property to create. Note: the supplied property must specify its parent. + For more details see: https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties#Property + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "gcp_conn_id", + "impersonation_chain", + "analytics_property", + ) + operator_extra_links = (GoogleAnalyticsPropertyLink(),) + + def __init__( + self, + *, + analytics_property: Property | dict[str, Any], + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.analytics_property = analytics_property + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute( + self, + context: Context, + ) -> Message: + hook = GoogleAnalyticsAdminHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Creating a Google Analytics property.") + prop = hook.create_property( + analytics_property=self.analytics_property, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + self.log.info("The Google Analytics property %s was created successfully.", prop.name) + GoogleAnalyticsPropertyLink.persist( + context=context, + task_instance=self, + property_id=prop.name.lstrip("properties/"), + ) + + return Property.to_dict(prop) + + +class GoogleAnalyticsAdminDeletePropertyOperator(GoogleCloudBaseOperator): + """ + Soft-delete property. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleAnalyticsAdminDeletePropertyOperator` + + :param property_id: The id of the Property to soft-delete. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "gcp_conn_id", + "impersonation_chain", + "property_id", + ) + + def __init__( + self, + *, + property_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.property_id = property_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute( + self, + context: Context, + ) -> Message: + hook = GoogleAnalyticsAdminHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Deleting a Google Analytics property.") + prop = hook.delete_property( + property_id=self.property_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + self.log.info("The Google Analytics property %s was soft-deleted successfully.", prop.name) + return Property.to_dict(prop) + + +class GoogleAnalyticsAdminCreateDataStreamOperator(GoogleCloudBaseOperator): + """ + Creates Data stream. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleAnalyticsAdminCreateDataStreamOperator` + + :param property_id: ID of the parent property for the data stream. + :param data_stream: The data stream to create. + For more details see: https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams#DataStream + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "gcp_conn_id", + "impersonation_chain", + "property_id", + "data_stream", + ) + + def __init__( + self, + *, + property_id: str, + data_stream: DataStream | dict, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.property_id = property_id + self.data_stream = data_stream + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute( + self, + context: Context, + ) -> Message: + hook = GoogleAnalyticsAdminHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Creating a Google Analytics data stream.") + data_stream = hook.create_data_stream( + property_id=self.property_id, + data_stream=self.data_stream, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + self.log.info("The Google Analytics data stream %s was created successfully.", data_stream.name) + return DataStream.to_dict(data_stream) + + +class GoogleAnalyticsAdminDeleteDataStreamOperator(GoogleCloudBaseOperator): + """ + Deletes Data stream. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleAnalyticsAdminDeleteDataStreamOperator` + + :param property_id: ID of the property which is parent for the data stream. + :param data_stream_id: ID of the data stream to delete. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "gcp_conn_id", + "impersonation_chain", + "property_id", + "data_stream_id", + ) + + def __init__( + self, + *, + property_id: str, + data_stream_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.property_id = property_id + self.data_stream_id = data_stream_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute( + self, + context: Context, + ) -> None: + hook = GoogleAnalyticsAdminHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Deleting a Google Analytics data stream (id %s).", self.data_stream_id) + hook.delete_data_stream( + property_id=self.property_id, + data_stream_id=self.data_stream_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + self.log.info("The Google Analytics data stream was deleted successfully.") + return None + + +class GoogleAnalyticsAdminListGoogleAdsLinksOperator(GoogleCloudBaseOperator): + """ + Lists all Google Ads links associated with a given property. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleAnalyticsAdminListGoogleAdsLinksOperator` + + :param property_id: ID of the parent property. + :param page_size: Optional, number of results to return in the list. + :param page_token: Optional. The next_page_token value returned from a previous List request, if any. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "gcp_conn_id", + "impersonation_chain", + "property_id", + "page_size", + "page_token", + ) + + def __init__( + self, + *, + property_id: str, + page_size: int | None = None, + page_token: str | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.property_id = property_id + self.page_size = page_size + self.page_token = page_token + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute( + self, + context: Context, + ) -> Sequence[Message]: + hook = GoogleAnalyticsAdminHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info( + "Requesting list of Google Ads links accounts for the property_id %s, " + "page size %s, page token %s", + self.property_id, + self.page_size, + self.page_token, + ) + google_ads_links = hook.list_google_ads_links( + property_id=self.property_id, + page_size=self.page_size, + page_token=self.page_token, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + ads_links_list: Sequence[Message] = [GoogleAdsLink.to_dict(item) for item in google_ads_links] + n = len(ads_links_list) + self.log.info("Successful request. Retrieved %s item%s.", n, "s" if n > 1 else "") + return ads_links_list + + +class GoogleAnalyticsAdminGetGoogleAdsLinkOperator(GoogleCloudBaseOperator): + """ + Gets a Google Ads link associated with a given property. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleAnalyticsAdminGetGoogleAdsLinkOperator` + + :param property_id: Parent property id. + :param google_ads_link_id: Google Ads link id. + :param retry: Optional, a retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "gcp_conn_id", + "impersonation_chain", + "google_ads_link_id", + ) + + def __init__( + self, + *, + property_id: str, + google_ads_link_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.property_id = property_id + self.google_ads_link_id = google_ads_link_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute( + self, + context: Context, + ) -> Message: + hook = GoogleAnalyticsAdminHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info( + "Requesting the Google Ads link with id %s for the property_id %s", + self.google_ads_link_id, + self.property_id, + ) + ads_links = hook.list_google_ads_links( + property_id=self.property_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + find_link = (item for item in ads_links if item.name.split("/")[-1] == self.google_ads_link_id) + if ads_link := next(find_link, None): + self.log.info("Successful request.") + return GoogleAdsLink.to_dict(ads_link) + raise AirflowNotFoundException( + f"Google Ads Link with id {self.google_ads_link_id} and property id {self.property_id} not found" + ) diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index d7aca84d8178a..ed9c35637f406 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -91,6 +91,7 @@ dependencies: - gcloud-aio-storage>=9.0.0 - gcsfs>=2023.10.0 - google-ads>=22.1.0 + - google-analytics-admin - google-api-core>=2.11.0 - google-api-python-client>=1.6.0 - google-auth>=1.0.0 @@ -145,6 +146,12 @@ dependencies: - sqlalchemy-spanner>=1.6.2 integrations: + - integration-name: Google Analytics (GA4) + external-doc-url: https://analytics.google.com/ + logo: /integration-logos/gcp/Google-Analytics.png + how-to-guide: + - /docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst + tags: [gmp] - integration-name: Google Analytics360 external-doc-url: https://analytics.google.com/ logo: /integration-logos/gcp/Google-Analytics.png @@ -605,6 +612,9 @@ operators: - integration-name: Google Cloud Firestore python-modules: - airflow.providers.google.firebase.operators.firestore + - integration-name: Google Analytics (GA4) + python-modules: + - airflow.providers.google.marketing_platform.operators.analytics_admin - integration-name: Google Analytics360 python-modules: - airflow.providers.google.marketing_platform.operators.analytics @@ -849,6 +859,9 @@ hooks: - integration-name: Google Cloud Firestore python-modules: - airflow.providers.google.firebase.hooks.firestore + - integration-name: Google Analytics (GA4) + python-modules: + - airflow.providers.google.marketing_platform.hooks.analytics_admin - integration-name: Google Analytics360 python-modules: - airflow.providers.google.marketing_platform.hooks.analytics @@ -1203,6 +1216,7 @@ extra-links: - airflow.providers.google.cloud.links.mlengine.MLEngineModelVersionDetailsLink - airflow.providers.google.common.links.storage.StorageLink - airflow.providers.google.common.links.storage.FileDetailsLink + - airflow.providers.google.marketing_platform.links.analytics_admin.GoogleAnalyticsPropertyLink additional-extras: - name: apache.beam diff --git a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst index 9b99c182956b0..765b6a6063763 100644 --- a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst +++ b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst @@ -22,6 +22,11 @@ Google Analytics 360 operators allow you to lists all accounts to which the user For more information about the Google Analytics 360 API check `official documentation `__. +Please note that the Google Analytics 360 API is replaced by +`Google Analytics 4 `__ and +`will be turned down on July 1, 2024 `__. +Thus consider using new :doc:`Google Analytics (GA4) Admin Operators `. + Prerequisite Tasks ^^^^^^^^^^^^^^^^^^ diff --git a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst new file mode 100644 index 0000000000000..b4aab19897659 --- /dev/null +++ b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst @@ -0,0 +1,151 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Google Analytics (GA4) Admin Operators +====================================== + +Google Analytics (GA4) Admin operators allow you to lists all accounts to which the user has access. +For more information about the Google Analytics 360 API check +`official documentation `__. + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include:: /operators/_partials/prerequisite_tasks.rst + +.. _howto/operator:GoogleAnalyticsAdminListAccountsOperator: + +List the Accounts +^^^^^^^^^^^^^^^^^ + +To list accounts from Analytics you can use the +:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator`. + +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics_admin.py + :language: python + :dedent: 4 + :start-after: [START howto_marketing_platform_list_accounts_operator] + :end-before: [END howto_marketing_platform_list_accounts_operator] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator` + +.. _howto/operator:GoogleAnalyticsAdminCreatePropertyOperator: + +Create Property +^^^^^^^^^^^^^^^ + +Creates a property. +To create a property you can use the +:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreatePropertyOperator`. + +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics_admin.py + :language: python + :dedent: 4 + :start-after: [START howto_marketing_platform_create_property_operator] + :end-before: [END howto_marketing_platform_create_property_operator] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreatePropertyOperator` + +.. _howto/operator:GoogleAnalyticsAdminDeletePropertyOperator: + +Delete Property +^^^^^^^^^^^^^^^ + +Deletes a property. +To delete a property you can use the +:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeletePropertyOperator`. + +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics_admin.py + :language: python + :dedent: 4 + :start-after: [START howto_marketing_platform_delete_property_operator] + :end-before: [END howto_marketing_platform_delete_property_operator] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeletePropertyOperator` + +.. _howto/operator:GoogleAnalyticsAdminCreateDataStreamOperator: + +Create Data stream +^^^^^^^^^^^^^^^^^^ + +Creates a data stream. +To create a data stream you can use the +:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator`. + +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics_admin.py + :language: python + :dedent: 4 + :start-after: [START howto_marketing_platform_create_data_stream_operator] + :end-before: [END howto_marketing_platform_create_data_stream_operator] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator` + +.. _howto/operator:GoogleAnalyticsAdminDeleteDataStreamOperator: + +Delete Data stream +^^^^^^^^^^^^^^^^^^ + +Deletes a data stream. +To delete a data stream you can use the +:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator`. + +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics_admin.py + :language: python + :dedent: 4 + :start-after: [START howto_marketing_platform_delete_data_stream_operator] + :end-before: [END howto_marketing_platform_delete_data_stream_operator] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator` + +.. _howto/operator:GoogleAnalyticsAdminListGoogleAdsLinksOperator: + +List Google Ads Links +^^^^^^^^^^^^^^^^^^^^^ + +To list Google Ads links you can use the +:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator`. + +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics_admin.py + :language: python + :dedent: 4 + :start-after: [START howto_marketing_platform_list_google_ads_links] + :end-before: [END howto_marketing_platform_list_google_ads_links] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator` + +.. _howto/operator:GoogleAnalyticsAdminGetGoogleAdsLinkOperator: + +Get the Google Ads link +^^^^^^^^^^^^^^^^^^^^^^^ + +To list Google Ads links you can use the +:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator`. + +.. exampleinclude:: /../../tests/system/providers/google/marketing_platform/example_analytics_admin.py + :language: python + :dedent: 4 + :start-after: [START howto_marketing_platform_get_google_ad_link] + :end-before: [END howto_marketing_platform_get_google_ad_link] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator` diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index c0131f8ca8d7c..fe07b77f44c9f 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -460,6 +460,7 @@ "gcloud-aio-storage>=9.0.0", "gcsfs>=2023.10.0", "google-ads>=22.1.0", + "google-analytics-admin", "google-api-core>=2.11.0", "google-api-python-client>=1.6.0", "google-auth-httplib2>=0.0.1", diff --git a/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py b/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py new file mode 100644 index 0000000000000..81a5210d499ab --- /dev/null +++ b/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py @@ -0,0 +1,192 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +from airflow.providers.google.marketing_platform.hooks.analytics_admin import GoogleAnalyticsAdminHook +from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id + +GCP_CONN_ID = "test_gcp_conn_id" +IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] +TEST_PROPERTY_ID = "123456789" +TEST_PROPERTY_NAME = f"properties/{TEST_PROPERTY_ID}" +TEST_DATASTREAM_ID = "987654321" +TEST_DATASTREAM_NAME = f"properties/{TEST_PROPERTY_ID}/dataStreams/{TEST_DATASTREAM_ID}" +ANALYTICS_HOOK_PATH = "airflow.providers.google.marketing_platform.hooks.analytics_admin" + + +class TestGoogleAnalyticsAdminHook: + def setup_method(self): + with mock.patch( + "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__", + new=mock_base_gcp_hook_default_project_id, + ): + self.hook = GoogleAnalyticsAdminHook(GCP_CONN_ID) + + @mock.patch("airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__") + def test_init(self, mock_base_init): + GoogleAnalyticsAdminHook( + GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + mock_base_init.assert_called_once_with( + GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + @mock.patch(f"{ANALYTICS_HOOK_PATH}.CLIENT_INFO") + @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_credentials") + @mock.patch(f"{ANALYTICS_HOOK_PATH}.AnalyticsAdminServiceClient") + def test_get_conn(self, mock_client, get_credentials, mock_client_info): + mock_credentials = mock.MagicMock() + get_credentials.return_value = mock_credentials + + result = self.hook.get_conn() + + mock_client.assert_called_once_with(credentials=mock_credentials, client_info=mock_client_info) + assert self.hook._conn == result + + @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn") + def test_list_accounts(self, mock_get_conn): + list_accounts_expected = mock.MagicMock() + mock_list_accounts = mock_get_conn.return_value.list_accounts + mock_list_accounts.return_value = list_accounts_expected + mock_page_size, mock_page_token, mock_show_deleted, mock_retry, mock_timeout, mock_metadata = ( + mock.MagicMock() for _ in range(6) + ) + + request = { + "page_size": mock_page_size, + "page_token": mock_page_token, + "show_deleted": mock_show_deleted, + } + + list_accounts_received = self.hook.list_accounts( + page_size=mock_page_size, + page_token=mock_page_token, + show_deleted=mock_show_deleted, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + mock_list_accounts.assert_called_once_with( + request=request, retry=mock_retry, timeout=mock_timeout, metadata=mock_metadata + ) + assert list_accounts_received == list_accounts_expected + + @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn") + def test_create_property(self, mock_get_conn): + property_expected = mock.MagicMock() + + mock_create_property = mock_get_conn.return_value.create_property + mock_create_property.return_value = property_expected + mock_property, mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(4)) + + property_created = self.hook.create_property( + analytics_property=mock_property, retry=mock_retry, timeout=mock_timeout, metadata=mock_metadata + ) + + request = {"property": mock_property} + mock_create_property.assert_called_once_with( + request=request, retry=mock_retry, timeout=mock_timeout, metadata=mock_metadata + ) + assert property_created == property_expected + + @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn") + def test_delete_property(self, mock_get_conn): + property_expected = mock.MagicMock() + mock_delete_property = mock_get_conn.return_value.delete_property + mock_delete_property.return_value = property_expected + mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(3)) + + property_deleted = self.hook.delete_property( + property_id=TEST_PROPERTY_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + request = {"name": TEST_PROPERTY_NAME} + mock_delete_property.assert_called_once_with( + request=request, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + assert property_deleted == property_expected + + @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn") + def test_create_data_stream(self, mock_get_conn): + data_stream_expected = mock.MagicMock() + mock_create_data_stream = mock_get_conn.return_value.create_data_stream + mock_create_data_stream.return_value = data_stream_expected + mock_data_stream, mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(4)) + + data_stream_created = self.hook.create_data_stream( + property_id=TEST_PROPERTY_ID, + data_stream=mock_data_stream, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + + request = {"parent": TEST_PROPERTY_NAME, "data_stream": mock_data_stream} + mock_create_data_stream.assert_called_once_with( + request=request, retry=mock_retry, timeout=mock_timeout, metadata=mock_metadata + ) + assert data_stream_created == data_stream_expected + + @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn") + def test_delete_data_stream(self, mock_get_conn): + mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(3)) + + self.hook.delete_data_stream( + property_id=TEST_PROPERTY_ID, + data_stream_id=TEST_DATASTREAM_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + + request = {"name": TEST_DATASTREAM_NAME} + mock_get_conn.return_value.delete_data_stream.assert_called_once_with( + request=request, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + + @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn") + def test_list_ads_links(self, mock_get_conn): + mock_page_size, mock_page_token, mock_retry, mock_timeout, mock_metadata = ( + mock.MagicMock() for _ in range(5) + ) + + self.hook.list_google_ads_links( + property_id=TEST_PROPERTY_ID, + page_size=mock_page_size, + page_token=mock_page_token, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + + request = {"parent": TEST_PROPERTY_NAME, "page_size": mock_page_size, "page_token": mock_page_token} + mock_get_conn.return_value.list_google_ads_links.assert_called_once_with( + request=request, retry=mock_retry, timeout=mock_timeout, metadata=mock_metadata + ) diff --git a/tests/providers/google/marketing_platform/links/__init__.py b/tests/providers/google/marketing_platform/links/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/tests/providers/google/marketing_platform/links/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/google/marketing_platform/links/test_analytics_admin.py b/tests/providers/google/marketing_platform/links/test_analytics_admin.py new file mode 100644 index 0000000000000..bb015c9be2485 --- /dev/null +++ b/tests/providers/google/marketing_platform/links/test_analytics_admin.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +from airflow.providers.google.marketing_platform.links.analytics_admin import ( + BASE_LINK, + GoogleAnalyticsPropertyLink, +) + +TEST_PROPERTY_ID = "123456789" +TEST_PROJECT_ID = "test_project" +TEST_CONF_GOOGLE_ADS_LINK = {"property_id": TEST_PROJECT_ID} +ANALYTICS_LINKS_PATH = "airflow.providers.google.marketing_platform.links.analytics_admin" + + +class TestGoogleAnalyticsPropertyLink: + @mock.patch(f"{ANALYTICS_LINKS_PATH}.XCom") + def test_get_link(self, mock_xcom): + mock_ti_key = mock.MagicMock() + mock_xcom.get_value.return_value = TEST_CONF_GOOGLE_ADS_LINK + url_expected = f"{BASE_LINK}#/p{TEST_PROJECT_ID}/" + + link = GoogleAnalyticsPropertyLink() + url = link.get_link(operator=mock.MagicMock(), ti_key=mock_ti_key) + + mock_xcom.get_value.assert_called_once_with(key=link.key, ti_key=mock_ti_key) + assert url == url_expected + + @mock.patch(f"{ANALYTICS_LINKS_PATH}.XCom") + def test_get_link_not_found(self, mock_xcom): + mock_ti_key = mock.MagicMock() + mock_xcom.get_value.return_value = None + + link = GoogleAnalyticsPropertyLink() + url = link.get_link(operator=mock.MagicMock(), ti_key=mock_ti_key) + + mock_xcom.get_value.assert_called_once_with(key=link.key, ti_key=mock_ti_key) + assert url == "" + + def test_persist(self): + mock_context = mock.MagicMock() + mock_task_instance = mock.MagicMock() + + GoogleAnalyticsPropertyLink.persist( + context=mock_context, + task_instance=mock_task_instance, + property_id=TEST_PROPERTY_ID, + ) + + mock_task_instance.xcom_push.assert_called_once_with( + mock_context, + key=GoogleAnalyticsPropertyLink.key, + value={"property_id": TEST_PROPERTY_ID}, + ) diff --git a/tests/providers/google/marketing_platform/operators/test_analytics_admin.py b/tests/providers/google/marketing_platform/operators/test_analytics_admin.py new file mode 100644 index 0000000000000..f292125140224 --- /dev/null +++ b/tests/providers/google/marketing_platform/operators/test_analytics_admin.py @@ -0,0 +1,310 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.exceptions import AirflowNotFoundException +from airflow.providers.google.marketing_platform.operators.analytics_admin import ( + GoogleAnalyticsAdminCreateDataStreamOperator, + GoogleAnalyticsAdminCreatePropertyOperator, + GoogleAnalyticsAdminDeleteDataStreamOperator, + GoogleAnalyticsAdminDeletePropertyOperator, + GoogleAnalyticsAdminGetGoogleAdsLinkOperator, + GoogleAnalyticsAdminListAccountsOperator, + GoogleAnalyticsAdminListGoogleAdsLinksOperator, +) + +GCP_CONN_ID = "google_cloud_default" +IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] +TEST_GA_GOOGLE_ADS_PROPERTY_ID = "123456789" +TEST_GA_GOOGLE_ADS_LINK_ID = "987654321" +TEST_GA_GOOGLE_ADS_LINK_NAME = ( + f"properties/{TEST_GA_GOOGLE_ADS_PROPERTY_ID}/googleAdsLinks/{TEST_GA_GOOGLE_ADS_LINK_ID}" +) +TEST_PROPERTY_ID = "123456789" +TEST_PROPERTY_NAME = f"properties/{TEST_PROPERTY_ID}" +TEST_DATASTREAM_ID = "987654321" +TEST_DATASTREAM_NAME = f"properties/{TEST_PROPERTY_ID}/dataStreams/{TEST_DATASTREAM_ID}" +ANALYTICS_PATH = "airflow.providers.google.marketing_platform.operators.analytics_admin" + + +class TestGoogleAnalyticsAdminListAccountsOperator: + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook") + @mock.patch(f"{ANALYTICS_PATH}.Account.to_dict") + def test_execute(self, account_to_dict_mock, hook_mock): + list_accounts_returned = (mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) + hook_mock.return_value.list_accounts.return_value = list_accounts_returned + + list_accounts_serialized = [mock.MagicMock(), mock.MagicMock(), mock.MagicMock()] + account_to_dict_mock.side_effect = list_accounts_serialized + + mock_page_size, mock_page_token, mock_show_deleted, mock_retry, mock_timeout, mock_metadata = ( + mock.MagicMock() for _ in range(6) + ) + + retrieved_accounts_list = GoogleAnalyticsAdminListAccountsOperator( + task_id="test_task", + page_size=mock_page_size, + page_token=mock_page_token, + show_deleted=mock_show_deleted, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ).execute(context=None) + + hook_mock.assert_called_once() + hook_mock.return_value.list_accounts.assert_called_once_with( + page_size=mock_page_size, + page_token=mock_page_token, + show_deleted=mock_show_deleted, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + account_to_dict_mock.assert_has_calls([mock.call(item) for item in list_accounts_returned]) + assert retrieved_accounts_list == list_accounts_serialized + + +class TestGoogleAnalyticsAdminCreatePropertyOperator: + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsPropertyLink") + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook") + @mock.patch(f"{ANALYTICS_PATH}.Property.to_dict") + def test_execute(self, property_to_dict_mock, hook_mock, _): + property_returned = mock.MagicMock() + hook_mock.return_value.create_property.return_value = property_returned + + property_serialized = mock.MagicMock() + property_to_dict_mock.return_value = property_serialized + + mock_property, mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(4)) + + property_created = GoogleAnalyticsAdminCreatePropertyOperator( + task_id="test_task", + analytics_property=mock_property, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ).execute(context=None) + + hook_mock.assert_called_once() + hook_mock.return_value.create_property.assert_called_once_with( + analytics_property=mock_property, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + property_to_dict_mock.assert_called_once_with(property_returned) + assert property_created == property_serialized + + +class TestGoogleAnalyticsAdminDeletePropertyOperator: + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook") + @mock.patch(f"{ANALYTICS_PATH}.Property.to_dict") + def test_execute(self, property_to_dict_mock, hook_mock): + property_returned = mock.MagicMock() + hook_mock.return_value.delete_property.return_value = property_returned + + property_serialized = mock.MagicMock() + property_to_dict_mock.return_value = property_serialized + + mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(3)) + + property_deleted = GoogleAnalyticsAdminDeletePropertyOperator( + task_id="test_task", + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + property_id=TEST_PROPERTY_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ).execute(context=None) + + hook_mock.assert_called_once() + hook_mock.return_value.delete_property.assert_called_once_with( + property_id=TEST_PROPERTY_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + property_to_dict_mock.assert_called_once_with(property_returned) + assert property_deleted == property_serialized + + +class TestGoogleAnalyticsAdminCreateDataStreamOperator: + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook") + @mock.patch(f"{ANALYTICS_PATH}.DataStream.to_dict") + def test_execute(self, data_stream_to_dict_mock, hook_mock): + data_stream_returned = mock.MagicMock() + hook_mock.return_value.create_data_stream.return_value = data_stream_returned + + data_stream_serialized = mock.MagicMock() + data_stream_to_dict_mock.return_value = data_stream_serialized + + mock_parent, mock_data_stream, mock_retry, mock_timeout, mock_metadata = ( + mock.MagicMock() for _ in range(5) + ) + + data_stream_created = GoogleAnalyticsAdminCreateDataStreamOperator( + task_id="test_task", + property_id=TEST_PROPERTY_ID, + data_stream=mock_data_stream, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ).execute(context=None) + + hook_mock.assert_called_once() + hook_mock.return_value.create_data_stream.assert_called_once_with( + property_id=TEST_PROPERTY_ID, + data_stream=mock_data_stream, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + data_stream_to_dict_mock.assert_called_once_with(data_stream_returned) + assert data_stream_created == data_stream_serialized + + +class TestGoogleAnalyticsAdminDeleteDataStreamOperator: + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook") + def test_execute(self, hook_mock): + mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(3)) + + GoogleAnalyticsAdminDeleteDataStreamOperator( + task_id="test_task", + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + property_id=TEST_PROPERTY_ID, + data_stream_id=TEST_DATASTREAM_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ).execute(context=None) + + hook_mock.assert_called_once() + hook_mock.return_value.delete_data_stream.assert_called_once_with( + property_id=TEST_PROPERTY_ID, + data_stream_id=TEST_DATASTREAM_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + + +class TestGoogleAnalyticsAdminListGoogleAdsLinksOperator: + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook") + @mock.patch(f"{ANALYTICS_PATH}.GoogleAdsLink.to_dict") + def test_execute(self, ads_link_to_dict_mock, hook_mock): + list_ads_links_returned = (mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) + hook_mock.return_value.list_google_ads_links.return_value = list_ads_links_returned + + list_ads_links_serialized = [mock.MagicMock(), mock.MagicMock(), mock.MagicMock()] + ads_link_to_dict_mock.side_effect = list_ads_links_serialized + + mock_page_size, mock_page_token, mock_show_deleted, mock_retry, mock_timeout, mock_metadata = ( + mock.MagicMock() for _ in range(6) + ) + + retrieved_ads_links = GoogleAnalyticsAdminListGoogleAdsLinksOperator( + task_id="test_task", + property_id=TEST_PROPERTY_ID, + page_size=mock_page_size, + page_token=mock_page_token, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ).execute(context=None) + + hook_mock.assert_called_once() + hook_mock.return_value.list_google_ads_links.assert_called_once_with( + property_id=TEST_PROPERTY_ID, + page_size=mock_page_size, + page_token=mock_page_token, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + ads_link_to_dict_mock.assert_has_calls([mock.call(item) for item in list_ads_links_returned]) + assert retrieved_ads_links == list_ads_links_serialized + + +class TestGoogleAnalyticsAdminGetGoogleAdsLinkOperator: + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook") + @mock.patch(f"{ANALYTICS_PATH}.GoogleAdsLink") + def test_execute(self, mock_google_ads_link, hook_mock): + mock_ad_link = mock.MagicMock() + mock_ad_link.name = TEST_GA_GOOGLE_ADS_LINK_NAME + list_ads_links = hook_mock.return_value.list_google_ads_links + list_ads_links.return_value = [mock_ad_link] + mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(3)) + + GoogleAnalyticsAdminGetGoogleAdsLinkOperator( + task_id="test_task", + property_id=TEST_GA_GOOGLE_ADS_PROPERTY_ID, + google_ads_link_id=TEST_GA_GOOGLE_ADS_LINK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ).execute(context=None) + + hook_mock.assert_called_once() + hook_mock.return_value.list_google_ads_links.assert_called_once_with( + property_id=TEST_PROPERTY_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) + mock_google_ads_link.to_dict.assert_called_once_with(mock_ad_link) + + @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook") + def test_execute_not_found(self, hook_mock): + list_ads_links = hook_mock.return_value.list_google_ads_links + list_ads_links.return_value = [] + mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in range(3)) + + with pytest.raises(AirflowNotFoundException): + GoogleAnalyticsAdminGetGoogleAdsLinkOperator( + task_id="test_task", + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + property_id=TEST_GA_GOOGLE_ADS_PROPERTY_ID, + google_ads_link_id=TEST_GA_GOOGLE_ADS_LINK_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ).execute(context=None) + + hook_mock.assert_called_once() + hook_mock.return_value.list_google_ads_links.assert_called_once_with( + property_id=TEST_PROPERTY_ID, + retry=mock_retry, + timeout=mock_timeout, + metadata=mock_metadata, + ) diff --git a/tests/system/providers/google/marketing_platform/example_analytics_admin.py b/tests/system/providers/google/marketing_platform/example_analytics_admin.py new file mode 100644 index 0000000000000..58c5b65795397 --- /dev/null +++ b/tests/system/providers/google/marketing_platform/example_analytics_admin.py @@ -0,0 +1,203 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG that shows how to use Google Analytics (GA4) Admin Operators. + +This DAG relies on the following OS environment variables + +* GA_ACCOUNT_ID - Google Analytics account id. +* GA_GOOGLE_ADS_PROPERTY_ID - Google Analytics property's id associated with Google Ads Link. + +In order to run this test, make sure you followed steps: +1. Login to https://analytics.google.com +2. In the settings section create an account and save its ID in the variable GA_ACCOUNT_ID. +3. In the settings section go to the Property access management page and add your service account email with +Editor permissions. This service account should be created on behalf of the account from the step 1. +4. Make sure Google Analytics Admin API is enabled in your GCP project. +5. Create Google Ads account and link it to your Google Analytics account in the GA admin panel. +6. Associate the Google Ads account with a property, and save this property's id in the variable +GA_GOOGLE_ADS_PROPERTY_ID. +""" +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime + +from google.analytics import admin_v1beta as google_analytics + +from airflow.decorators import task +from airflow.models import Connection +from airflow.models.dag import DAG +from airflow.operators.bash import BashOperator +from airflow.providers.google.marketing_platform.operators.analytics_admin import ( + GoogleAnalyticsAdminCreateDataStreamOperator, + GoogleAnalyticsAdminCreatePropertyOperator, + GoogleAnalyticsAdminDeleteDataStreamOperator, + GoogleAnalyticsAdminDeletePropertyOperator, + GoogleAnalyticsAdminGetGoogleAdsLinkOperator, + GoogleAnalyticsAdminListAccountsOperator, + GoogleAnalyticsAdminListGoogleAdsLinksOperator, +) +from airflow.settings import Session +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_google_analytics_admin" + +CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}" +ACCOUNT_ID = os.environ.get("GA_ACCOUNT_ID", "123456789") +PROPERTY_ID = "{{ task_instance.xcom_pull('create_property')['name'].split('/')[-1] }}" +DATA_STREAM_ID = "{{ task_instance.xcom_pull('create_data_stream')['name'].split('/')[-1] }}" +GA_GOOGLE_ADS_PROPERTY_ID = os.environ.get("GA_GOOGLE_ADS_PROPERTY_ID", "123456789") +GA_ADS_LINK_ID = "{{ task_instance.xcom_pull('list_google_ads_links')[0]['name'].split('/')[-1] }}" + +log = logging.getLogger(__name__) + +with DAG( + DAG_ID, + schedule="@once", # Override to match your needs, + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "analytics"], +) as dag: + + @task + def setup_connection(**kwargs) -> None: + connection = Connection( + conn_id=CONNECTION_ID, + conn_type="google_cloud_platform", + ) + conn_extra_json = json.dumps( + { + "scope": "https://www.googleapis.com/auth/analytics.edit," + "https://www.googleapis.com/auth/analytics.readonly", + } + ) + connection.set_extra(conn_extra_json) + + session = Session() + if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first(): + log.warning("Connection %s already exists", CONNECTION_ID) + return None + + session.add(connection) + session.commit() + + setup_connection_task = setup_connection() + + # [START howto_marketing_platform_list_accounts_operator] + list_accounts = GoogleAnalyticsAdminListAccountsOperator( + task_id="list_account", + gcp_conn_id=CONNECTION_ID, + show_deleted=True, + ) + # [END howto_marketing_platform_list_accounts_operator] + + # [START howto_marketing_platform_create_property_operator] + create_property = GoogleAnalyticsAdminCreatePropertyOperator( + task_id="create_property", + analytics_property={ + "parent": f"accounts/{ACCOUNT_ID}", + "display_name": "Test display name", + "time_zone": "America/Los_Angeles", + }, + gcp_conn_id=CONNECTION_ID, + ) + # [END howto_marketing_platform_create_property_operator] + + # [START howto_marketing_platform_create_data_stream_operator] + create_data_stream = GoogleAnalyticsAdminCreateDataStreamOperator( + task_id="create_data_stream", + property_id=PROPERTY_ID, + data_stream={ + "display_name": "Test data stream", + "web_stream_data": { + "default_uri": "www.example.com", + }, + "type_": google_analytics.DataStream.DataStreamType.WEB_DATA_STREAM, + }, + gcp_conn_id=CONNECTION_ID, + ) + # [END howto_marketing_platform_create_data_stream_operator] + + # [START howto_marketing_platform_delete_data_stream_operator] + delete_data_stream = GoogleAnalyticsAdminDeleteDataStreamOperator( + task_id="delete_datastream", + property_id=PROPERTY_ID, + data_stream_id=DATA_STREAM_ID, + gcp_conn_id=CONNECTION_ID, + ) + # [END howto_marketing_platform_delete_data_stream_operator] + + # [START howto_marketing_platform_delete_property_operator] + delete_property = GoogleAnalyticsAdminDeletePropertyOperator( + task_id="delete_property", + property_id=PROPERTY_ID, + gcp_conn_id=CONNECTION_ID, + ) + # [END howto_marketing_platform_delete_property_operator] + delete_property.trigger_rule = TriggerRule.ALL_DONE + + # [START howto_marketing_platform_list_google_ads_links] + list_google_ads_links = GoogleAnalyticsAdminListGoogleAdsLinksOperator( + task_id="list_google_ads_links", + property_id=GA_GOOGLE_ADS_PROPERTY_ID, + gcp_conn_id=CONNECTION_ID, + ) + # [END howto_marketing_platform_list_google_ads_links] + + # [START howto_marketing_platform_get_google_ad_link] + get_ad_link = GoogleAnalyticsAdminGetGoogleAdsLinkOperator( + task_id="get_ad_link", + property_id=GA_GOOGLE_ADS_PROPERTY_ID, + google_ads_link_id=GA_ADS_LINK_ID, + gcp_conn_id=CONNECTION_ID, + ) + # [END howto_marketing_platform_get_google_ad_link] + + delete_connection = BashOperator( + task_id="delete_connection", + bash_command=f"airflow connections delete {CONNECTION_ID}", + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + # TEST SETUP + setup_connection_task + # TEST BODY + >> list_accounts + >> create_property + >> create_data_stream + >> delete_data_stream + >> delete_property + >> list_google_ads_links + >> get_ad_link + # TEST TEARDOWN + >> delete_connection + ) + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)