Skip to content

execution_timeout not enforced, task hangs up #35474

@xmariachi

Description

@xmariachi

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Version: 2.5.1
Run env: MWAA on AWS

Summary: Once every ~500-1000 runs approximately, the task hangs up infinitely until manually killed, not allowing any other task to be placed for this dag; and so its execution_timeout is not enforced.
In my experience, it only happens on tasks that consume from Kafka using library confluent_kafka. The execution_timeout is enforced in other tasks.

Dag definition code:

# Dag Info
default_args = {
    "retries": 3,
    "on_failure_callback": on_failure_callback,
    "sla": timedelta(hours=2),
    "execution_timeout": timedelta(hours=4),
}


@dag(SERVICE_NAME,
     default_args=default_args,
     schedule_interval="*/5 * * * *",
     start_date=pendulum.datetime(2023, 7, 3, 9, tz="UTC"),
     catchup=True,
     tags=['critical', 'dumper', 'kafka'],
     max_active_runs=1)
def process_records():
    ingest_from_kafka_and_save()

The ingest_from_kafka_and_save() contains code that consumes from Kafka, providing a callback function to the consumption (which I suspect may have something to do with the problem, since it happens asynchronously).

It's hard to reproduce since it is temperamental and happens every once in a while.
Audit Log does not show anything special - just seems the hang indefinitely.
Consumption code itself works fine otherwise and it has been running for months in this and other dags that use it - but they also show the same behaviour.

What you think should happen instead

The execution_timeout should be enforced and the task should be killed so a new one could be placed.

How to reproduce

It is hard to reproduce, since it happens very unfrequently.

  • Create a dag with the definition in the "What happened" section
  • Add a function with a basic kafka consumption from a Kafka topic that consumes until end of topic partitions (or a max number of messages)
  • Leave it running and wait for the problem to happen

Operating System

MWAA on AWS

Versions of Apache Airflow Providers

--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt"
apache-airflow-providers-amazon
apache-airflow-providers-snowflake==4.0.2
apache-airflow-providers-mysql==4.0.0
apache-airflow-providers-slack
confluent-kafka==2.1.0

Deployment

Amazon (AWS) MWAA

Deployment details

Medium sized cluster
2.5.1 version, latest update applied 2 weeks ago.

Anything else

Unclear what triggers the error - but whatever the error, the task should be killed to enforce the execution_timeout.
Seems like an internal thread management issue.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions