Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ function check_boto_upgrade() {
set -x
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3 botocore \
"oss2>=2.14.0" "gcloud-aio-auth>=4.0.0,<5.0.0" "requests!=2.32.*,<3.0.0,>=2.24.0"
"oss2>=2.14.0" "cryptography<43.0.0" "requests!=2.32.*,<3.0.0,>=2.24.0"
set +x
pip check
}
Expand Down
37 changes: 31 additions & 6 deletions airflow/providers/google/common/hooks/base_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from __future__ import annotations

import asyncio
import datetime
import functools
import json
Expand All @@ -36,7 +37,7 @@
import tenacity
from asgiref.sync import sync_to_async
from deprecated import deprecated
from gcloud.aio.auth.token import Token
from gcloud.aio.auth.token import Token, TokenResponse
from google.api_core.exceptions import Forbidden, ResourceExhausted, TooManyRequests
from google.auth import _cloud_sdk, compute_engine # type: ignore[attr-defined]
from google.auth.environment_vars import CLOUD_SDK_CONFIG_DIR, CREDENTIALS
Expand Down Expand Up @@ -745,17 +746,41 @@ async def from_hook(
async def get_project(self) -> str | None:
return self.project

async def acquire_access_token(self, timeout: int = 10) -> None:
async def refresh(self, *, timeout: int) -> TokenResponse:
await sync_to_async(self.credentials.refresh)(google.auth.transport.requests.Request())

self.access_token = cast(str, self.credentials.token)
self.access_token_duration = 3600
# access_token_acquired_at is specific to gcloud-aio's Token. On subsequent calls of `get` it will be used
# with `datetime.datetime.utcnow()`. Therefore we have to use an offset-naive datetime.
# https://github.com/talkiq/gcloud-aio/blob/f1132b005ba35d8059229a9ca88b90f31f77456d/auth/gcloud/aio/auth/token.py#L204
self.access_token_acquired_at = datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None)
self.access_token_acquired_at = self._now()
return TokenResponse(value=self.access_token, expires_in=self.access_token_duration)

async def acquire_access_token(self, timeout: int = 10) -> None:
await self.refresh(timeout=timeout)
self.acquiring = None

async def ensure_token(self) -> None:
if self.acquiring and not self.acquiring.done():
await self.acquiring
return

if self.access_token:
delta = (self._now() - self.access_token_acquired_at).total_seconds()
if delta <= self.access_token_duration / 2:
return

self.acquiring = asyncio.ensure_future( # pylint: disable=used-before-assignment
self.acquire_access_token()
)
await self.acquiring

@staticmethod
def _now():
# access_token_acquired_at is specific to gcloud-aio's Token.
# On subsequent calls of `get` it will be used with `datetime.datetime.utcnow()`.
# Therefore we have to use an offset-naive datetime.
# https://github.com/talkiq/gcloud-aio/blob/f1132b005ba35d8059229a9ca88b90f31f77456d/auth/gcloud/aio/auth/token.py#L204
return datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None)


class GoogleBaseAsyncHook(BaseHook):
"""GoogleBaseAsyncHook inherits from BaseHook class, run on the trigger worker."""
Expand Down
8 changes: 1 addition & 7 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,7 @@ dependencies:
- apache-airflow-providers-common-sql>=1.7.2
- asgiref>=3.5.2
- dill>=0.2.3
# When upgrading the major version of gcloud-aio-auth we want to make sure to
# 1. use at least version 5.2, which uses offset-aware datetime internally
# 2. override Token's new `refresh` method instead of `acquire_access_token`, which allows us to avoid
# dealing with internals like `access_token_acquired_at`
# 3. continue to `subclass gcloud.aio.auth.token.Token` instead of `BaseToken`, since instances of
# `_CredentialsToken` are instances of `Token` and used as such
- gcloud-aio-auth>=4.0.0,<5.0.0
- gcloud-aio-auth>=5.2.0
- gcloud-aio-bigquery>=6.1.2
- gcloud-aio-storage>=9.0.0
- gcsfs>=2023.10.0
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-google/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ PIP package Version required
``apache-airflow-providers-common-sql`` ``>=1.7.2``
``asgiref`` ``>=3.5.2``
``dill`` ``>=0.2.3``
``gcloud-aio-auth`` ``>=4.0.0,<5.0.0``
``gcloud-aio-auth`` ``>=5.2.0``
``gcloud-aio-bigquery`` ``>=6.1.2``
``gcloud-aio-storage`` ``>=9.0.0``
``gcsfs`` ``>=2023.10.0``
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@
"apache-airflow>=2.7.0",
"asgiref>=3.5.2",
"dill>=0.2.3",
"gcloud-aio-auth>=4.0.0,<5.0.0",
"gcloud-aio-auth>=5.2.0",
"gcloud-aio-bigquery>=6.1.2",
"gcloud-aio-storage>=9.0.0",
"gcsfs>=2023.10.0",
Expand Down
2 changes: 1 addition & 1 deletion scripts/docker/entrypoint_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ function check_boto_upgrade() {
set -x
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3 botocore \
"oss2>=2.14.0" "gcloud-aio-auth>=4.0.0,<5.0.0" "requests!=2.32.*,<3.0.0,>=2.24.0"
"oss2>=2.14.0" "cryptography<43.0.0" "requests!=2.32.*,<3.0.0,>=2.24.0"
set +x
pip check
}
Expand Down