Skip to content

All Asset DAGs are failing #46641

@atul-astronomer

Description

@atul-astronomer

Apache Airflow version

3.0.0a2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

All asset DAGs are failing and not much information is available on the task logs or scheduler logs.

Task logs:

{"timestamp":"2025-02-11T10:34:33.430949","level":"info","event":"DAG bundles loaded: dags-folder","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager"}
{"timestamp":"2025-02-11T10:34:33.431462","level":"info","event":"Filling up the DagBag from /files/dags/asset_wei_example.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-11T10:34:33.431857","level":"debug","event":"Importing /files/dags/asset_wei_example.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-11T10:34:33.432816","level":"debug","event":"Initializing Providers Manager[asset_uris]","logger":"airflow.providers_manager"}
{"timestamp":"2025-02-11T10:34:33.930980","level":"debug","event":"Initialization of Providers Manager[asset_uris] took 0.50 seconds","logger":"airflow.providers_manager"}
{"timestamp":"2025-02-11T10:34:33.934953","level":"debug","event":"Loaded DAG <DAG: asset_produces_1>","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-11T10:34:33.935145","level":"debug","event":"DAG file parsed","file":"asset_wei_example.py","logger":"task"}
{"timestamp":"2025-02-11T10:34:33.935581","level":"debug","event":"Sending request","json":"{\"inlets\":[],\"outlets\":[{\"name\":\"s3://dag1/output_1.txt\",\"uri\":\"s3://dag1/output_1.txt\",\"asset_type\":\"Asset\"}],\"type\":\"RuntimeCheckOnTask\"}\n","logger":"task"}

Scheduler logs:

[2025-02-11T10:47:01.256+0000] {scheduler_job_runner.py:744} INFO - Received executor event with state failed�(B for task instance TaskInstanceKey(dag_id='asset_produces_1', task_id='producing_task_1', run_id='manual__2025-02-11T10:46:59.990062+00:00', try_number=1, map_index=-1,�_duration=None, state=running, executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=1, max_tries=0, pool=default_pool, queue=default�(B, priority_weight=1, operator=BashOperator, queued_dttm=2025-02-11 10:47:00.185050+00:00, scheduled_dttm=2025-02-11 10:47:00.169728+00:00(B,queued_by_job_id=1, pid=5716, taskInstance: asset_produces_1.producing_task_1 manual__2025-02-11T10:46:59.990062+00:00 [running]> finished with state failed, but the task instance's state attribute is running. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally�[2025-02-11T10:47:01.264+0000] {taskinstance.py:1079} INFO - Marking task as FAILED�

What you think should happen instead?

The DAG should pass and create asset events.

This DAG is passing in main branch

How to reproduce

Steps:

  1. Checkout to tag 3.0.0a2 and run the provided DAG.
  2. DAG is failing but should pass.

DAG code:

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk.definitions.asset import Asset

# [START asset_def]
dag1_asset = Asset("s3://dag1/output_1.txt", extra={"hi": "bye"})
# [END asset_def]
dag2_asset = Asset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_asset = Asset("s3://dag3/output_3.txt", extra={"hi": "bye"})

with DAG(
    dag_id="asset_produces_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="@daily",
    tags=["produces", "asset-scheduled"],
) as dag1:
    # [START task_outlet]
    BashOperator(
        outlets=[dag1_asset], task_id="producing_task_1", bash_command="sleep 5"
    )
    # [END task_outlet]

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions