-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Occasionally, a BigQueryInsertJobOperator task can fail when in deferred mode due to an inability to acquire impersonated credentials when checking the job status.
Here is an example of the task log.
[2024-03-26, 05:38:04 GMT] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-03-26, 05:38:04 GMT] {bigquery.py:62} INFO - Using the connection google_cloud_default .
[2024-03-26, 05:38:04 GMT] {taskinstance.py:2370} INFO - Pausing task as DEFERRED. dag_id=my-dag, task_id=my-task, execution_date=20240325T000000, start_date=20240326T053802
[2024-03-26, 05:38:04 GMT] {local_task_job_runner.py:231} INFO - Task exited with return code 100 (task deferral)
[2024-03-26, 05:38:07 GMT] {bigquery.py:110} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
[2024-03-26, 05:38:08 GMT] {gcs_task_handler.py:158} INFO - Error checking for previous log; if exists, may be overwritten: 'NoneType' object has no attribute 'decode'
[2024-03-26, 05:38:12 GMT] {bigquery.py:110} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
[2024-03-26, 05:38:16 GMT] {bigquery.py:110} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
[2024-03-26, 05:38:21 GMT] {bigquery.py:110} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
...
...
[2024-03-26, 06:05:44 GMT] {bigquery.py:110} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
[2024-03-26, 06:05:48 GMT] {bigquery.py:117} ERROR - Exception occurred while checking for query completion
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 94, in run
job_status = await hook.get_job_status(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 3292, in get_job_status
job = await self._get_job(job_id=job_id, project_id=project_id, location=location)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 3268, in _get_job
job = await loop.run_in_executor(None, self._get_job_sync, job_id, project_id, location)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 3287, in _get_job_sync
return hook.get_job(job_id=job_id, project_id=project_id, location=location)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 485, in inner_wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1546, in get_job
job = client.get_job(job_id=job_id, project=project_id, location=location)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/client.py", line 2107, in get_job
resource = self._call_api(
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/client.py", line 827, in _call_api
return call()
^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
return retry_target(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
_retry_error_helper(
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
raise final_exc from source_exc
File "/usr/local/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
result = target()
^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/_http/__init__.py", line 482, in api_request
response = self._make_request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/_http/__init__.py", line 341, in _make_request
return self._do_request(
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/cloud/_http/__init__.py", line 379, in _do_request
return self.http.request(
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/auth/transport/requests.py", line 537, in request
self.credentials.before_request(auth_request, method, url, request_headers)
File "/usr/local/lib/python3.11/site-packages/google/auth/credentials.py", line 230, in before_request
self._blocking_refresh(request)
File "/usr/local/lib/python3.11/site-packages/google/auth/credentials.py", line 193, in _blocking_refresh
self.refresh(request)
File "/usr/local/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 250, in refresh
self._update_token(request)
File "/usr/local/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 282, in _update_token
self.token, self.expiry = _make_iam_token_request(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 100, in _make_iam_token_request
raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
This also returns an exception error such as the following:
('Unable to acquire impersonated credentials', '<!DOCTYPE html>\n<html lang=en>\n <meta charset=utf-8>\n <meta name=viewport content="initial-scale=1, minimum-scale=1, width=device-width">\n <title>Error 502 (Server Error)!!1</title>\n <style>\n {margin:0;padding:0}html,code{font:15px/22px arial,sans-serif}html{background:#fff;color:#222;padding:15px}body{margin:7% auto 0;max-width:390px;min-height:180px;padding:30px 0 15px} > body{background:url(//www.google.com/images/errors/robot.png) 100% 5px no-repeat;padding-right:205px}p{margin:11px 0 22px;overflow:hidden}ins{color:#777;text-decoration:none}a img{border:0}@media screen and (max-width:772px){body{background:none;margin-top:0;max-width:none;padding-right:0}}#logo{background:url(//www.google.com/images/branding/googlelogo/1x/googlelogo_color_150x54dp.png) no-repeat;margin-left:-5px}@media only screen and (min-resolution:192dpi){#logo{background:url(//www.google.com/images/branding/googlelogo/2x/googlelogo_colo
What you think should happen instead?
A problem with this is that, although the task can enter the retry state, the initial BQ job can still be running which can have secondary effects; such as the retry failing due to it trying to perform concurrent updates on the same table.
airflow.exceptions.AirflowException: Query error: Transaction is aborted due to concurrent update against table
Ideally, an issue with acquiring the impersonated credentials when checking the job status wouldn't immediately result in the task failing.
How to reproduce
Unfortunately, this is not possible to replicate consistently due to the unpredictable nature of the scenario.
Operating System
PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"
Versions of Apache Airflow Providers
apache-airflow-providers-google==10.16.0
Deployment
Astronomer
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct