Skip to content

Flaky test_xcom_map_error_fails_task test #33178

@potiuk

Description

@potiuk

Body

This flaky test appears recently in our jobs and it seems this is a real problem with our code - after few attempts of fixing it, it still appears in our builds:

tests/models/test_xcom_arg_map.py:174: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:1840: in run
    self._run_raw_task(
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:1494: in _run_raw_task
    session.commit()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit
    self._transaction.commit(_to_root=self.future)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:832: in commit
    self._prepare_impl()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:811: in _prepare_impl
    self.session.flush()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3449: in flush
    self._flush(objects)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3589: in _flush
    transaction.rollback(_capture_exception=True)
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
    compat.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3549: in _flush
    flush_context.execute()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:456: in execute
    rec.execute(self)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:630: in execute
    util.preloaded.orm_persistence.save_obj(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:237: in save_obj
    _emit_update_statements(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1001: in _emit_update_statements
    c = connection._execute_20(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1953: in _execute_context
    self._handle_dbapi_exception(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in _execute_context
    self.dialect.do_execute(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: in do_execute
    cursor.execute(statement, parameters)
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:174: in execute
    self._discard()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <MySQLdb.cursors.Cursor object at 0x7f52bc978a60>

    def _discard(self):
        self.description = None
        self.description_flags = None
        # Django uses some member after __exit__.
        # So we keep rowcount and lastrowid here. They are cleared in Cursor._query().
        # self.rowcount = 0
        # self.lastrowid = None
        self._rows = None
        self.rownumber = None
    
        if self._result:
            self._result.discard()
            self._result = None
    
        con = self.connection
        if con is None:
            return
>       while con.next_result() == 0:  # -1 means no more data.
E       sqlalchemy.exc.ProgrammingError: (MySQLdb.ProgrammingError) (2014, "Commands out of sync; you can't run this command now")
E       [SQL: UPDATE task_instance SET pid=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
E       [parameters: (90, datetime.datetime(2023, 8, 7, 14, 44, 7, 580365), 'test_dag', 'pull', 'test', 0)]
E       (Background on this error at: https://sqlalche.me/e/14/f405)
E       sqlalchemy.exc.ProgrammingError: (MySQLdb.ProgrammingError) (2014, "Commands out of sync; you can't run this command now")
E       [SQL: UPDATE task_instance SET pid=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
E       [parameters: (90, datetime.datetime(2023, 8, 7, 14, 44, 7, 580365), 'test_dag', 'pull', 'test', 0)]
E       (Background on this error at: https://sqlalche.me/e/14/f405)

Example failures:

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind:metaHigh-level information important to the community

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions