-
Notifications
You must be signed in to change notification settings - Fork 16.4k
add impersonation_chain support when calling Bigquery Operators in deferrable mode #36341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add impersonation_chain support when calling Bigquery Operators in deferrable mode #36341
Conversation
|
@Lee-W Thanks for taking a look at this but I don't think this will fully solve the problem. If you follow the trigger call through, it leads to |
|
I dig it a bit more. It seems the lib we're using does not support Impersonated credentials. talkiq/gcloud-aio#421 Might need to dig a bit t see how we add it |
|
So we might first need to look into impersonation support in gcloud-aio? |
|
It's not the most active library to rely upon for a key part of GCP authentication and deferrable task capabilities. |
Thanks for reminding me! Yep, I've done some test afterward and find it did not work.
Yep, I think we could take a look at how
I tried to did the investigation but did not find a more active one 😞 aiogoogle does not seem to fit our need either |
|
after digging a bit deeper, I doubt we can use a similar way to use this impersonation feature https://github.com/googleapis/google-auth-library-python/blob/776d634ac6d989b224f8dbfb11d166cb3025a342/google/auth/_default_async.py#L29 |
1b64ce8 to
7d2dfc7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impersonation_chain can have multiple service accounts in the chain. you'll have to handle that by doing something similar to _get_target_principal_and_delegates:
airflow/airflow/providers/google/common/hooks/base_google.py
Lines 273 to 285 in e2393ee
| target_principal, delegates = _get_target_principal_and_delegates(self.impersonation_chain) | |
| credentials, project_id = get_credentials_and_project_id( | |
| key_path=key_path, | |
| keyfile_dict=keyfile_dict_json, | |
| credential_config_file=credential_config_file, | |
| key_secret_name=key_secret_name, | |
| key_secret_project_id=key_secret_project_id, | |
| scopes=self.scopes, | |
| delegate_to=self.delegate_to, | |
| target_principal=target_principal, | |
| delegates=delegates, | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reminding me! just addressed
9172959 to
aeed629
Compare
aeed629 to
74525d4
Compare
|
FYI We might get better impersonation support in gcloud-aio soon. :) |
This looks great! Should we use the current PR as a quick fix? Or is it ok to wait for the release of the next gcloud-aio? |
067bccf to
6f81639
Compare
|
Lets wait for a fix in upstream first |
Sure 🙂 |
6f81639 to
796c59c
Compare
phanikumv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking it till the upstream changes are ready
| return project | ||
|
|
||
| async def get(self) -> str | None: | ||
| creds, _ = google.auth.default() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of getting default credentials, we should retrieve the credentials for a given connection ID here, no?
Furthermore, I was looking at other Google hooks which work fine asyncronously with impersonation chain and noticed that for example the DataprocAsyncHook (subclass of GoogleBaseHook) just calls self.get_credentials().
I suppose this itself (creating a credentials object) is non-blocking (unless you consider file IO blocking) until we want to actually generate a token for the given credentials, or? So I wonder if we maybe just have to implement credentials -> token asynchronously? That's probably what Google's async clients do under the hood, but unfortunately we're here also because there is no async client for BigQuery to begin with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to add the GoogleBaseHook.get_credentials() probably not only respects impersonation chain set on hook level, but also on connection level. If we can rely on that, then we would need to cover those cases individually.
| return project | ||
|
|
||
| async def get(self) -> str | None: | ||
| creds, _ = google.auth.default() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to add the GoogleBaseHook.get_credentials() probably not only respects impersonation chain set on hook level, but also on connection level. If we can rely on that, then we would need to cover those cases individually.
| target_scopes=["https://www.googleapis.com/auth/cloud-platform"], | ||
| ) | ||
|
|
||
| impersonated_creds.refresh(google_auth_requests.Request()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This need's to be done asynchronously. Otherwise it will block the entire triggerer process.
|
Based on my review comments I've taken another attempt on this in #36849. That one should make it easier to use credentials from hooks in gcloud-aio clients without much extra work. |
|
Hi @m1racoli, thanks for your feedback! I just took a look at your new PRs. Both of them look good. I think we might no longer need this PR, but I'll mark it as draft for now before we merge yours |
|
as #36849 has been merged, close this one |
#34727
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.