Skip to content

AppflowHook with wait_for_completion = True does not finish executing the task although the appflow flow does. #33461

@dahero95

Description

@dahero95

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

I'm using airflow 2.6.2 with apache-airflow-providers-amazon 8.5.1

When I use AppflowHook with the wait_for_completion parameter set to True the task execution never finishes.
I have checked in Appflow and the flow executes correctly and finishes in a couple of seconds, however, AppflowHook does not finish responding.
If I change wait_for_completion to False, everything works correctly.

The logs show a "403 FORBIDDEN" error and marking the task as success or failed fixes the logs.

Logs during task execution:

470b2412b735
*** Found local files: 
*** * /opt/airflow/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log ***
!!!! Please make sure that all your Airflow components (e.g. schedulers, webservers, workers and triggerer) have the same 'secret_key' configured in 'webserver' section and time is synchronized on all your machines (for example with ntpd) See more at https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#secret-key
*** Could not read served logs: Client error '403 FORBIDDEN' for url 'http://470b2412b735:8793/log/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log' For more information check: https://httpstatuses.com/403
[2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509
[2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509
[2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00
[queued]>
[2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00
[queued]>
[2023-08-16, 19:04:44 CST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-08-16, 19:04:44 CST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): extract_from_stripe> on 2023-08-17 01:04:41.723261+00:00
[2023-08-16, 19:04:44 CST] {standard_task_runner.py:57} INFO - Started process 796 to run task
[2023-08-16, 19:04:44 CST] {standard_task_runner.py:84} INFO - Running:
['***', 'tasks', 'run', 'stripe_ingest_flow', 'extract_from_stripe', 'manual__2023-08-17T01:04:41.723261+00:00', '--job-id', '903', '--raw', '--subdir', 'DAGS_FOLDER/stripe_ingest_flow_to_lakehouse/dag.py', '--cfg-path', '/tmp/tmpqz8uvben']
[2023-08-16, 19:04:44 CST] {standard_task_runner.py:85} INFO - Job 903: Subtask extract_from_stripe
[2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509
[2023-08-16, 19:04:44 CST] {task_command.py:410} INFO - Running <TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00
[running]> on host 470b2412b735
[2023-08-16, 19:04:44 CST] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='dhernandez' AIRFLOW_CTX_DAG_ID='stripe_ingest_flow' AIRFLOW_CTX_TASK_ID='extract_from_stripe' AIRFLOW_CTX_EXECUTION_DATE='2023-08-17T01:04:41.723261+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-17T01:04:41.723261+00:00'
[2023-08-16, 19:04:44 CST] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.
[2023-08-16, 19:04:44 CST] {base.py:73} INFO - Using connection ID 'siclo_***_lakehouse_conn' for task execution.
[2023-08-16, 19:04:44 CST] {connection_wrapper.py:340} INFO - AWS Connection (conn_id='siclo_***_lakehouse_conn', conn_type='aws') credentials retrieved from login and password.
[2023-08-16, 19:04:45 CST] {appflow.py:63} INFO - executionId: 58ad6275-0a70-48d9-8414-f0215924c876

Logs when marking the task as success or failed

470b2412b735
*** Found local files:
***   * /opt/airflow/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log
[2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509
[2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509
[2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [queued]>
[2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [queued]>
[2023-08-16, 19:04:44 CST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-08-16, 19:04:44 CST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): extract_from_stripe> on 2023-08-17 01:04:41.723261+00:00
[2023-08-16, 19:04:44 CST] {standard_task_runner.py:57} INFO - Started process 796 to run task
[2023-08-16, 19:04:44 CST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'stripe_ingest_flow', 'extract_from_stripe', 'manual__2023-08-17T01:04:41.723261+00:00', '--job-id', '903', '--raw', '--subdir', 'DAGS_FOLDER/stripe_ingest_flow_to_lakehouse/dag.py', '--cfg-path', '/tmp/tmpqz8uvben']
[2023-08-16, 19:04:44 CST] {standard_task_runner.py:85} INFO - Job 903: Subtask extract_from_stripe
[2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509
[2023-08-16, 19:04:44 CST] {task_command.py:410} INFO - Running <TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [running]> on host 470b2412b735
[2023-08-16, 19:04:44 CST] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='dhernandez' AIRFLOW_CTX_DAG_ID='stripe_ingest_flow' AIRFLOW_CTX_TASK_ID='extract_from_stripe' AIRFLOW_CTX_EXECUTION_DATE='2023-08-17T01:04:41.723261+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-17T01:04:41.723261+00:00'
[2023-08-16, 19:04:44 CST] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.
[2023-08-16, 19:04:44 CST] {base.py:73} INFO - Using connection ID 'siclo_***_lakehouse_conn' for task execution.
[2023-08-16, 19:04:44 CST] {connection_wrapper.py:340} INFO - AWS Connection (conn_id='siclo_***_lakehouse_conn', conn_type='aws') credentials retrieved from login and password.
[2023-08-16, 19:04:45 CST] {appflow.py:63} INFO - executionId: 58ad6275-0a70-48d9-8414-f0215924c876
[2023-08-16, 19:05:24 CST] {local_task_job_runner.py:291} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2023-08-16, 19:05:24 CST] {process_utils.py:131} INFO - Sending 15 to group 796. PIDs of all processes in the group: [796]
[2023-08-16, 19:05:24 CST] {process_utils.py:86} INFO - Sending the signal 15 to group 796
[2023-08-16, 19:05:24 CST] {taskinstance.py:1517} ERROR - Received SIGTERM. Terminating subprocesses.

What you think should happen instead

That having wait_for_completion set to True, the task finishes successfully and retrieves the execution id from appflow.

How to reproduce

With a dag that has the following task

@task
def extract():
    appflow = AppflowHook(
        aws_conn_id='conn_id'
    )
    execution_id = appflow.run_flow(
        flow_name='flow_name',
        wait_for_completion=True
        # with wait_for_completion=False if it works
    )
    return execution_id

The aws connection has the following permissions

  • "appflow:DescribeFlow",
  • "appflow:StartFlow",
  • "appflow:RunFlow",
  • "appflow:ListFlows",
  • "appflow:DescribeFlowExecutionRecords"

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow==2.6.2
apache-airflow-providers-amazon==8.5.1
apache-airflow-providers-common-sql==1.5.1
apache-airflow-providers-http==4.4.1
boto3==1.26.76
asgiref==3.7.2
watchtower==2.0.1
jsonpath-ng==1.5.3
redshift-connector==2.0.911
sqlalchemy-redshift==0.8.14
mypy-boto3-appflow==1.28.16
mypy-boto3-rds==1.26.144
mypy-boto3-redshift-data==1.26.109
mypy-boto3-s3==1.26.153
celery==5.3.0

Deployment

Docker-Compose

Deployment details

Docker 4.10.1 (82475)
Airflow image apache/airflow:2.6.2-python3.11

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions