Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Feb 26, 2025

closes: #46645

While serialising and sorting dags like one mentioned in the issue, we run into TypeError because the sorting function doesn't account carefully for non iterables like floats.

Example:

The dag data is {'__version': 1, 'dag': {'edge_info': {}, 'dag_id': 'datetime_mapped', 'task_group': {'_group_id': None, 'group_display_name': '', 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'get_delays': ['operator', 'get_delays'], 'get_wakes': ['operator', 'get_wakes'], 'expanded_datetime': ['operator', 'expanded_datetime'], 'expanded_timedelta': ['operator', 'expanded_timedelta'], 'expanded_datetime_async': ['operator', 'expanded_datetime_async'], 'expanded_timedelta_async': ['operator', 'expanded_timedelta_async'], 'static_timedelta': ['operator', 'static_timedelta'], 'static_datetime': ['operator', 'static_datetime'], 'op_sleep_90': ['operator', 'op_sleep_90']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'tags': ['taskmap'], 'timetable': {'__type': 'airflow.timetables.simple.NullTimetable', '__var': {}}, 'timezone': 'UTC', 'start_date': 0.0, 'relative_fileloc': 'dags/mapped_dag.py', 'fileloc': '/files/dags/dags/mapped_dag.py', '_processor_dags_folder': '/files/dags', 'tasks': [{'__var': {'is_setup': False, 'template_fields_renderers': {'templates_dict': 'json', 'op_args': 'py', 'op_kwargs': 'py'}, 'ui_fgcolor': '#000', 'start_from_trigger': False, 'ui_color': '#ffefeb', 'downstream_task_ids': ['get_wakes'], 'is_teardown': False, 'weight_rule': 'downstream', '_needs_expansion': False, 'pool': 'default_pool', 'template_ext': [], 'on_failure_fail_dagrun': False, 'template_fields': ['templates_dict', 'op_args', 'op_kwargs'], 'task_type': '_PythonDecoratedOperator', 'task_id': 'get_delays', 'python_callable_name': 'unusual_prefix_150f7a7e99daaccc924d7defc46da76a8bc8c011_mapped_dag.get_delays', '_task_module': 'airflow.decorators.python', '_operator_name': '@task', '_is_empty': False, 'start_trigger_args': None, 'op_args': '()', 'op_kwargs': {}, 'label': 'get_delays'}, '__type': 'operator'}, {'__var': {'_operator_name': '@task', 'template_fields_renderers': {'templates_dict': 'json', 'op_args': 'py', 'op_kwargs': 'py'}, 'ui_fgcolor': '#000', '_expand_input_attr': 'op_kwargs_expand_input', 'start_from_trigger': False, 'operator_extra_links': [], 'ui_color': '#ffefeb', 'downstream_task_ids': ['expanded_datetime', 'expanded_datetime_async'], 'partial_kwargs': {'is_setup': False, 'is_teardown': False, 'on_failure_fail_dagrun': False, 'op_args': [], 'op_kwargs': {'__var': {}, '__type': 'dict'}}, '_disallow_kwargs_override': False, 'task_type': '_PythonDecoratedOperator', 'template_ext': [], '_task_module': 'airflow.decorators.python', 'template_fields': ['templates_dict', 'op_args', 'op_kwargs'], '_is_empty': False, 'task_id': 'get_wakes', 'python_callable_name': 'unusual_prefix_150f7a7e99daaccc924d7defc46da76a8bc8c011_mapped_dag.get_wakes', 'start_trigger_args': None, 'op_kwargs_expand_input': {'type': 'dict-of-lists', 'value': {'__var': {'delay': {'__var': {'task_id': 'get_delays', 'key': 'return_value'}, '__type': 'xcomref'}}, '__type': 'dict'}}, '_is_mapped': True, 'label': 'get_wakes'}, '__type': 'operator'}, {'__var': {'template_fields_renderers': {}, 'ui_fgcolor': '#000', '_expand_input_attr': 'expand_input', 'start_from_trigger': False, 'operator_extra_links': [], 'ui_color': '#e6f1f2', 'downstream_task_ids': [], 'partial_kwargs': {}, '_disallow_kwargs_override': False, 'task_type': 'DateTimeSensor', '_is_sensor': True, 'template_ext': [], '_task_module': 'airflow.providers.standard.sensors.date_time', 'template_fields': ['target_time'], '_is_empty': False, 'task_id': 'expanded_datetime', 'start_trigger_args': None, 'expand_input': {'type': 'dict-of-lists', 'value': {'__var': {'target_time': {'__var': {'task_id': 'get_wakes', 'key': 'return_value'}, '__type': 'xcomref'}}, '__type': 'dict'}}, '_is_mapped': True, 'label': 'expanded_datetime', '_operator_name': 'DateTimeSensor'}, '__type': 'operator'}, {'__var': {'template_fields_renderers': {}, 'ui_fgcolor': '#000', '_expand_input_attr': 'expand_input', 'start_from_trigger': False, 'operator_extra_links': [], 'ui_color': '#e6f1f2', 'downstream_task_ids': [], 'partial_kwargs': {}, '_disallow_kwargs_override': False, 'task_type': 'TimeDeltaSensor', '_is_sensor': True, 'template_ext': [], '_task_module': 'airflow.providers.standard.sensors.time_delta', 'template_fields': [], '_is_empty': False, 'task_id': 'expanded_timedelta', 'start_trigger_args': None, 'expand_input': {'type': 'dict-of-lists', 'value': {'__var': {'delta': [{'__var': 30.0, '__type': 'timedelta'}, {'__var': 60.0, '__type': 'timedelta'}, {'__var': 90.0, '__type': 'timedelta'}]}, '__type': 'dict'}}, '_is_mapped': True, 'label': 'expanded_timedelta', '_operator_name': 'TimeDeltaSensor'}, '__type': 'operator'}, {'__var': {'template_fields_renderers': {}, 'ui_fgcolor': '#000', '_expand_input_attr': 'expand_input', 'start_from_trigger': False, 'operator_extra_links': [], 'ui_color': '#e6f1f2', 'downstream_task_ids': [], 'partial_kwargs': {}, '_disallow_kwargs_override': False, 'task_type': 'DateTimeSensorAsync', '_is_sensor': True, 'template_ext': [], '_task_module': 'airflow.providers.standard.sensors.date_time', 'template_fields': ['target_time'], 'start_trigger_args': {'__type': 'START_TRIGGER_ARGS', 'trigger_cls': 'airflow.providers.standard.triggers.temporal.DateTimeTrigger', 'trigger_kwargs': {'__var': {'moment': '', 'end_from_trigger': False}, '__type': 'dict'}, 'next_method': 'execute_complete', 'next_kwargs': None, 'timeout': None}, '_is_empty': False, 'task_id': 'expanded_datetime_async', 'expand_input': {'type': 'dict-of-lists', 'value': {'__var': {'target_time': {'__var': {'task_id': 'get_wakes', 'key': 'return_value'}, '__type': 'xcomref'}}, '__type': 'dict'}}, '_is_mapped': True, 'label': 'expanded_datetime_async', '_operator_name': 'DateTimeSensorAsync'}, '__type': 'operator'}, {'__var': {'template_fields_renderers': {}, 'ui_fgcolor': '#000', '_expand_input_attr': 'expand_input', 'start_from_trigger': False, 'operator_extra_links': [], 'ui_color': '#e6f1f2', 'downstream_task_ids': [], 'partial_kwargs': {}, '_disallow_kwargs_override': False, 'task_type': 'TimeDeltaSensorAsync', '_is_sensor': True, 'template_ext': [], '_task_module': 'airflow.providers.standard.sensors.time_delta', 'template_fields': [], '_is_empty': False, 'task_id': 'expanded_timedelta_async', 'start_trigger_args': None, 'expand_input': {'type': 'dict-of-lists', 'value': {'__var': {'delta': [{'__var': 30.0, '__type': 'timedelta'}, {'__var': 60.0, '__type': 'timedelta'}, {'__var': 90.0, '__type': 'timedelta'}]}, '__type': 'dict'}}, '_is_mapped': True, 'label': 'expanded_timedelta_async', '_operator_name': 'TimeDeltaSensorAsync'}, '__type': 'operator'}, {'__var': {'is_setup': False, 'template_fields_renderers': {}, 'ui_fgcolor': '#000', 'start_from_trigger': False, 'ui_color': '#e6f1f2', 'downstream_task_ids': [], 'reschedule': False, 'is_teardown': False, 'weight_rule': 'downstream', '_needs_expansion': False, 'pool': 'default_pool', 'template_ext': [], '_is_sensor': True, 'on_failure_fail_dagrun': False, 'template_fields': [], 'task_type': 'TimeDeltaSensor', 'task_id': 'static_timedelta', '_task_module': 'airflow.providers.standard.sensors.time_delta', '_is_empty': False, 'start_trigger_args': None, 'label': 'static_timedelta', '_operator_name': 'TimeDeltaSensor'}, '__type': 'operator'}, {'__var': {'is_setup': False, 'template_fields_renderers': {}, 'ui_fgcolor': '#000', 'start_from_trigger': False, 'ui_color': '#e6f1f2', 'downstream_task_ids': [], 'reschedule': False, 'is_teardown': False, 'weight_rule': 'downstream', '_needs_expansion': False, 'pool': 'default_pool', 'template_ext': [], '_is_sensor': True, 'on_failure_fail_dagrun': False, 'template_fields': ['target_time'], 'task_type': 'DateTimeSensor', 'task_id': 'static_datetime', '_task_module': 'airflow.providers.standard.sensors.date_time', '_is_empty': False, 'start_trigger_args': None, 'target_time': '{{macros.datetime.now() + macros.timedelta(seconds=90)}}', 'label': 'static_datetime', '_operator_name': 'DateTimeSensor'}, '__type': 'operator'}, {'__var': {'is_setup': False, 'template_fields_renderers': {'templates_dict': 'json', 'op_args': 'py', 'op_kwargs': 'py'}, 'ui_fgcolor': '#000', 'start_from_trigger': False, 'ui_color': '#ffefeb', 'downstream_task_ids': [], 'is_teardown': False, 'weight_rule': 'downstream', '_needs_expansion': False, 'pool': 'default_pool', 'template_ext': [], 'on_failure_fail_dagrun': False, 'template_fields': ['templates_dict', 'op_args', 'op_kwargs'], 'task_type': 'PythonOperator', 'task_id': 'op_sleep_90', 'python_callable_name': 'unusual_prefix_150f7a7e99daaccc924d7defc46da76a8bc8c011_mapped_dag.<lambda>', '_task_module': 'airflow.providers.standard.operators.python', '_is_empty': False, 'start_trigger_args': None, 'op_args': '()', 'op_kwargs': {}, 'label': 'op_sleep_90', '_operator_name': 'PythonOperator'}, '__type': 'operator'}], 'dag_dependencies': [], 'params': []}}

In this serdag, observe: {'__var': {'delta': [{'__var': 30.0, '__type': 'timedelta'}, {'__var': 60.0, '__type': 'timedelta'}, {'__var': 90.0, '__type': 'timedelta'}]}, '__type': 'dict'}

When this enters the sorting function, it comes as [{'__var': 30.0, '__type': 'timedelta'}, {'__var': 60.0, '__type': 'timedelta'}, {'__var': 90.0, '__type': 'timedelta'}] and hence the check: if all("task_id" in i.get("__var", {}) for i in serialized_dag): fails.

Error stack:

[2025-02-26T09:45:50.367+0000] {collection.py:210} ERROR - Failed to write serialized DAG dag_id=datetime_mapped fileloc=/files/dags/dags/mapped_dag.py
Traceback (most recent call last):
  File "/opt/airflow/airflow/dag_processing/collection.py", line 193, in _serialize_dag_capturing_errors
    dag_was_updated = SerializedDagModel.write_dag(
  File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/serialized_dag.py", line 206, in write_dag
    new_serialized_dag = cls(dag)
  File "<string>", line 4, in __init__
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/state.py", line 482, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/state.py", line 479, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/opt/airflow/airflow/models/serialized_dag.py", line 121, in __init__
    self.dag_hash = SerializedDagModel.hash(dag_data)
  File "/opt/airflow/airflow/models/serialized_dag.py", line 145, in hash
    dag_data = cls._sort_serialized_dag_dict(dag_data)
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in _sort_serialized_dag_dict
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in <dictcomp>
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in _sort_serialized_dag_dict
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in <dictcomp>
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 161, in _sort_serialized_dag_dict
    [cls._sort_serialized_dag_dict(i) for i in serialized_dag],
  File "/opt/airflow/airflow/models/serialized_dag.py", line 161, in <listcomp>
    [cls._sort_serialized_dag_dict(i) for i in serialized_dag],
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in _sort_serialized_dag_dict
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in <dictcomp>
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in _sort_serialized_dag_dict
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in <dictcomp>
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in _sort_serialized_dag_dict
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in <dictcomp>
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in _sort_serialized_dag_dict
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in <dictcomp>
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in _sort_serialized_dag_dict
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 153, in <dictcomp>
    return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())}
  File "/opt/airflow/airflow/models/serialized_dag.py", line 159, in _sort_serialized_dag_dict
    if all("task_id" in i.get("__var", {}) for i in serialized_dag):
  File "/opt/airflow/airflow/models/serialized_dag.py", line 159, in <genexpr>
    if all("task_id" in i.get("__var", {}) for i in serialized_dag):
TypeError: argument of type 'float' is not iterable

This PR intends to fix that. It has been tested with the same DAG and parsing is through.

This was never hit earlier in AF2 because: #42517 wasnt released in AF2.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a unit test please

@amoghrajesh amoghrajesh merged commit 6faa429 into apache:main Feb 26, 2025
45 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-fix-ser branch February 26, 2025 14:47
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 28, 2025
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dynamic task map DAG failing to serialized || getting import error

2 participants