Skip to content

Deferred AWS BatchOperator appears to re-trigger task and error out #33016

@rishi-kulkarni

Description

@rishi-kulkarni

Apache Airflow version

2.6.3

What happened

I started a long-running job with the BatchOperator in deferrable mode. Traceback is below:

The strange behavior here is that just before erroring out, it looks like the BatchOperator resubmits the job. However, the Batch is not actually submitted (verified in the console) - however, this seems to break the trigger and it errors out immediately after.

*** Found local files:
***   * /home/airflow/airflow/logs/dag_id=foxy_salesforce/run_id=scheduled__2023-08-01T19:30:00+00:00/task_id=foxy_salesforce_batch_job/attempt=2.log
***   * /home/airflow/airflow/logs/dag_id=foxy_salesforce/run_id=scheduled__2023-08-01T19:30:00+00:00/task_id=foxy_salesforce_batch_job/attempt=2.log.trigger.17484.log
[2023-08-01, 20:35:37 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: foxy_salesforce.foxy_salesforce_batch_job scheduled__2023-08-01T19:30:00+00:00 [queued]>
[2023-08-01, 20:35:37 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: foxy_salesforce.foxy_salesforce_batch_job scheduled__2023-08-01T19:30:00+00:00 [queued]>
[2023-08-01, 20:35:37 UTC] {taskinstance.py:1308} INFO - Starting attempt 2 of 2
[2023-08-01, 20:35:37 UTC] {taskinstance.py:1327} INFO - Executing <Task(BatchOperator): foxy_salesforce_batch_job> on 2023-08-01 19:30:00+00:00
[2023-08-01, 20:35:37 UTC] {standard_task_runner.py:57} INFO - Started process 8834 to run task
[2023-08-01, 20:35:37 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'foxy_salesforce', 'foxy_salesforce_batch_job', 'scheduled__2023-08-01T19:30:00+00:00', '--job-id', '17555', '--raw', '--subdir', 'DAGS_FOLDER/foxy_salesforce.py', '--cfg-path', '/tmp/tmpr0isg35r']
[2023-08-01, 20:35:37 UTC] {standard_task_runner.py:85} INFO - Job 17555: Subtask foxy_salesforce_batch_job
[2023-08-01, 20:35:37 UTC] {task_command.py:410} INFO - Running <TaskInstance: foxy_salesforce.foxy_salesforce_batch_job scheduled__2023-08-01T19:30:00+00:00 [running]> on host ip-172-20-18-105.ec2.internal
[2023-08-01, 20:35:37 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='data_engineering_alerts@intelycare.com' AIRFLOW_CTX_DAG_OWNER='nrobinson' AIRFLOW_CTX_DAG_ID='foxy_salesforce' AIRFLOW_CTX_TASK_ID='foxy_salesforce_batch_job' AIRFLOW_CTX_EXECUTION_DATE='2023-08-01T19:30:00+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-08-01T19:30:00+00:00'
[2023-08-01, 20:35:37 UTC] {batch.py:255} INFO - Running AWS Batch job - job definition: foxy_dev_batch_job:1 - on queue foxy-queue
[2023-08-01, 20:35:37 UTC] {batch.py:262} INFO - AWS Batch job - container overrides: {'command': ['-tap', 'salesforce', '-ds', 'salesforce', '-to', 'data_engineering_alerts@intelycare.com']}
[2023-08-01, 20:35:37 UTC] {base.py:73} INFO - Using connection ID 'aws_prod_batch' for task execution.
[2023-08-01, 20:35:37 UTC] {credentials.py:1051} INFO - Found credentials from IAM Role: DSAirflowIAMStack-DSDaggerServerRoleBC9B4D69-BVJN2Q2JAYNS
[2023-08-01, 20:35:37 UTC] {batch.py:292} INFO - AWS Batch job (0c52c6e5-6b77-4e48-88d6-e98478fdcae2) started: {'ResponseMetadata': {'RequestId': '4510c251-622d-4882-aec1-50eef30d2b7d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Tue, 01 Aug 2023 20:35:37 GMT', 'content-type': 'application/json', 'content-length': '165', 'connection': 'keep-alive', 'x-amzn-requestid': '4510c251-622d-4882-aec1-50eef30d2b7d', 'access-control-allow-origin': '*', 'x-amz-apigw-id': 'I_3oCGPhoAMEa6Q=', 'access-control-expose-headers': 'X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date', 'x-amzn-trace-id': 'Root=1-64c96c99-57cff0982ce9c97360d0fd02'}, 'RetryAttempts': 0}, 'jobArn': 'arn:aws:batch:us-east-1:806657589280:job/0c52c6e5-6b77-4e48-88d6-e98478fdcae2', 'jobName': 'foxy_salesforce', 'jobId': '0c52c6e5-6b77-4e48-88d6-e98478fdcae2'}
[2023-08-01, 20:35:37 UTC] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=foxy_salesforce, task_id=foxy_salesforce_batch_job, execution_date=20230801T193000, start_date=20230801T203537
[2023-08-01, 20:35:37 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-08-01, 20:35:38 UTC] {base.py:73} INFO - Using connection ID 'aws_prod_batch' for task execution.
[2023-08-01, 20:35:38 UTC] {credentials.py:1051} INFO - Found credentials from IAM Role: DSAirflowIAMStack-DSDaggerServerRoleBC9B4D69-BVJN2Q2JAYNS
[2023-08-01, 20:35:39 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['STARTING']
[2023-08-01, 20:36:09 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['STARTING']
[2023-08-01, 20:36:39 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['RUNNING']
[2023-08-01, 20:37:09 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['RUNNING']
[2023-08-01, 20:37:39 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['RUNNING']
[2023-08-01, 20:38:09 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['RUNNING']
[2023-08-01, 20:38:39 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['RUNNING']
[2023-08-01, 20:39:09 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['RUNNING']
[2023-08-01, 20:39:39 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['RUNNING']
[2023-08-01, 20:40:09 UTC] {waiter_with_logging.py:129} INFO - Batch job 0c52c6e5-6b77-4e48-88d6-e98478fdcae2 not ready yet: ['RUNNING']
[2023-08-01, 20:40:12 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: foxy_salesforce.foxy_salesforce_batch_job scheduled__2023-08-01T19:30:00+00:00 [queued]>
[2023-08-01, 20:40:12 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: foxy_salesforce.foxy_salesforce_batch_job scheduled__2023-08-01T19:30:00+00:00 [queued]>
[2023-08-01, 20:40:12 UTC] {taskinstance.py:1306} INFO - Resuming after deferral
[2023-08-01, 20:40:12 UTC] {taskinstance.py:1327} INFO - Executing <Task(BatchOperator): foxy_salesforce_batch_job> on 2023-08-01 19:30:00+00:00
[2023-08-01, 20:40:12 UTC] {standard_task_runner.py:57} INFO - Started process 21621 to run task
[2023-08-01, 20:40:12 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'foxy_salesforce', 'foxy_salesforce_batch_job', 'scheduled__2023-08-01T19:30:00+00:00', '--job-id', '17561', '--raw', '--subdir', 'DAGS_FOLDER/foxy_salesforce.py', '--cfg-path', '/tmp/tmpyl2o5l2k']
[2023-08-01, 20:40:12 UTC] {standard_task_runner.py:85} INFO - Job 17561: Subtask foxy_salesforce_batch_job
[2023-08-01, 20:40:12 UTC] {task_command.py:410} INFO - Running <TaskInstance: foxy_salesforce.foxy_salesforce_batch_job scheduled__2023-08-01T19:30:00+00:00 [running]> on host ip-172-20-18-105.ec2.internal
[2023-08-01, 20:40:12 UTC] {taskinstance.py:1598} ERROR - Trigger failed:
Traceback (most recent call last):
  File "/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/providers/amazon/aws/utils/waiter_with_logging.py", line 122, in async_wait
    await waiter.wait(**args, WaiterConfig={"MaxAttempts": 1})
  File "/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/waiter.py", line 49, in wait
    await AIOWaiter.wait(self, **kwargs)
  File "/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/waiter.py", line 139, in wait
    raise WaiterError(
botocore.exceptions.WaiterError: Waiter batch_job_complete failed: Max attempts exceeded
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py", line 537, in cleanup_finished_triggers
    result = details["task"].result()
  File "/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py", line 615, in run_trigger
    async for event in trigger.run():
  File "/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/providers/amazon/aws/triggers/base.py", line 121, in run
    await async_wait(
  File "/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/providers/amazon/aws/utils/waiter_with_logging.py", line 131, in async_wait
    raise AirflowException("Waiter error: max attempts reached")
airflow.exceptions.AirflowException: Waiter error: max attempts reached
[2023-08-01, 20:40:12 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
airflow.exceptions.TaskDeferralError: Trigger failure
[2023-08-01, 20:40:12 UTC] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=foxy_salesforce, task_id=foxy_salesforce_batch_job, execution_date=20230801T193000, start_date=20230801T203537, end_date=20230801T204012
[2023-08-01, 20:40:12 UTC] {logging_mixin.py:150} WARNING - /home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/utils/email.py:153 RemovedInAirflow3Warning: Fetching SMTP credentials from configuration variables will be deprecated in a future release. Please set credentials using a connection instead.
[2023-08-01, 20:40:12 UTC] {email.py:269} INFO - Email alerting: attempt 1
[2023-08-01, 20:40:12 UTC] {email.py:281} INFO - Sent an alert email to ['data_engineering_alerts@intelycare.com']
[2023-08-01, 20:40:12 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 17561 for task foxy_salesforce_batch_job (Trigger failure; 21621)
[2023-08-01, 20:40:12 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2023-08-01, 20:40:12 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead

No response

How to reproduce

Run a long-running Batch Job using the BatchOperator with deferrable=True

Operating System

AmazonLinux

Versions of Apache Airflow Providers

Version 8.4.0 of the AWS provider

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions