Skip to content

Conversation

@potiuk
Copy link
Member

@potiuk potiuk commented Aug 6, 2023

Similarly to #33145 - this is an attempt to stabilise flaky tests for the test_xcom_arg_map.

Even if the mechanism is not entirely clear (provide_session should also close the connection) seems like using pytest-fixture provided session works better than relying on a new session created in run() methods.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Similarly to apache#33145 - this is an attempt to stabilise flaky tests
for the test_xcom_arg_map.

Even if the mechanism is not entirely clear (provide_session should
also close the connection) seems like using pytest-fixture provided
session works better than relying on a new session created in run()
methods.
@potiuk
Copy link
Member Author

potiuk commented Aug 6, 2023

Example failure: https://github.com/apache/airflow/actions/runs/5775967682/job/15654656922?pr=32991

________________________ test_xcom_map_error_fails_task ________________________

self = <sqlalchemy.future.engine.Connection object at 0x7f5a70b28c70>
dialect = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7f5ad1451db0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext'>>
statement = <sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 0x7f5ad06c7970>
parameters = []
execution_options = immutabledict({'_sa_orm_load_options': default_load_options(_legacy_uniquing=True), '_result_disable_adapt_to_context': True, 'future_result': True})
args = (<sqlalchemy.dialects.sqlite.base.SQLiteCompiler object at 0x7f5ad06c7970>, [], <sqlalchemy.sql.selectable.Select obje...0720 run_id)s', 'test', type_=String(length=250)), BindParameter('%(140026414543824 map_index)s', 0, type_=Integer())])
kw = {'cache_hit': symbol('CACHE_HIT')}
branched = <sqlalchemy.future.engine.Connection object at 0x7f5a70b28c70>
yp = None
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>

    def _execute_context(
        self,
        dialect,
        constructor,
        statement,
        parameters,
        execution_options,
        *args,
        **kw
    ):
        """Create an :class:`.ExecutionContext` and execute, returning
        a :class:`_engine.CursorResult`."""
    
        branched = self
        if self.__branch_from:
            # if this is a "branched" connection, do everything in terms
            # of the "root" connection, *except* for .close(), which is
            # the only feature that branching provides
            self = self.__branch_from
    
        if execution_options:
            yp = execution_options.get("yield_per", None)
            if yp:
                execution_options = execution_options.union(
                    {"stream_results": True, "max_row_buffer": yp}
                )
    
        try:
            conn = self._dbapi_connection
            if conn is None:
                conn = self._revalidate_connection()
    
>           context = constructor(
                dialect, self, conn, execution_options, *args, **kw
            )

/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1810: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1020: in _init_compiled
    self.cursor = self.create_cursor()
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1391: in create_cursor
    return self.create_default_cursor()
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1394: in create_default_cursor
    return self._dbapi_connection.cursor()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>
args = (), kwargs = {}

    def cursor(self, *args, **kwargs):
        """Return a new DBAPI cursor for the underlying connection.
    
        This method is a proxy for the ``connection.cursor()`` DBAPI
        method.
    
        """
>       return self.dbapi_connection.cursor(*args, **kwargs)
E       sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140026872592128 and this is thread id 140028343548800.

/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py:1133: ProgrammingError

The above exception was the direct cause of the following exception:

    @contextlib.contextmanager
    def create_session() -> Generator[settings.SASession, None, None]:
        """Contextmanager that will create and teardown a session."""
        Session = getattr(settings, "Session", None)
        if Session is None:
            raise RuntimeError("Session must be set before!")
        session = Session()
        try:
>           yield session

airflow/utils/session.py:36: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
airflow/utils/session.py:77: in wrapper
    return func(*args, session=session, **kwargs)
airflow/models/taskinstance.py:1840: in run
    self._run_raw_task(
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:1488: in _run_raw_task
    self.refresh_from_db(session=session)
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/models/taskinstance.py:866: in refresh_from_db
    ti = qry.one_or_none()
/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2850: in one_or_none
    return self._iter().one_or_none()
/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2916: in _iter
    result = self.session.execute(
/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py:1717: in execute
    result = conn._execute_20(statement, params or {}, execution_options)
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1816: in _execute_context
    self._handle_dbapi_exception(
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1810: in _execute_context
    context = constructor(
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1020: in _init_compiled
    self.cursor = self.create_cursor()
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1391: in create_cursor
    return self.create_default_cursor()
/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:1394: in create_default_cursor
    return self._dbapi_connection.cursor()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>
args = (), kwargs = {}

    def cursor(self, *args, **kwargs):
        """Return a new DBAPI cursor for the underlying connection.
    
        This method is a proxy for the ``connection.cursor()`` DBAPI
        method.
    
        """
>       return self.dbapi_connection.cursor(*args, **kwargs)
E       sqlalchemy.exc.ProgrammingError: (sqlite3.ProgrammingError) SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140026872592128 and this is thread id 140028343548800.
E       [SQL: SELECT task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.custom_operator_name AS task_instance_custom_operator_name, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.updated_at AS task_instance_updated_at, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs 
E       FROM task_instance 
E       WHERE task_instance.dag_id = ? AND task_instance.task_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
E       (Background on this error at: https://sqlalche.me/e/14/f405)

/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py:1133: ProgrammingError

During handling of the above exception, another exception occurred:

self = <sqlalchemy.future.engine.Connection object at 0x7f5a70b28c70>

    def _rollback_impl(self):
        assert not self.__branch_from
    
        if self._has_events or self.engine._has_events:
            self.dispatch.rollback(self)
    
        if self._still_open_and_dbapi_connection_is_valid:
            if self._echo:
                if self._is_autocommit_isolation():
                    self._log_info(
                        "ROLLBACK using DBAPI connection.rollback(), "
                        "DBAPI should ignore due to autocommit mode"
                    )
                else:
                    self._log_info("ROLLBACK")
            try:
>               self.engine.dialect.do_rollback(self.connection)

/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1062: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x7f5ad1451db0>
dbapi_connection = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f5a70b2add0>

    def do_rollback(self, dbapi_connection):
>       dbapi_connection.rollback()
E       sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140026872592128 and this is thread id 140028343548800.

/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:683: ProgrammingError

@potiuk
Copy link
Member Author

potiuk commented Aug 6, 2023

It's a bit strange what happens though.... The @provide_session in run_method does actually call close_session in context manager, so this is quite strange... But let's see after merge if we can get it more stable.

@potiuk potiuk added this to the Airflow 2.7.0 milestone Aug 6, 2023
@potiuk potiuk merged commit 3dd0c99 into apache:main Aug 6, 2023
@potiuk potiuk deleted the unflake-test-xcom-args-map branch August 6, 2023 11:24
potiuk added a commit to potiuk/airflow that referenced this pull request Aug 6, 2023
Using same session in different steps of the same test has the
potential of not flishing/committing the changes between.

Seems that the apache#33150 traded one flakiness with another. Attempting to
make sure that the flash/commit is executed before the second run.

Error:

The test RuntimeError: number of values in row (0) differ from number of
column processors (29)

The error is strange however and indicates a bug in sqlite library.
potiuk added a commit that referenced this pull request Aug 6, 2023
)

Using same session in different steps of the same test has the
potential of not flishing/committing the changes between.

Seems that the #33150 traded one flakiness with another. Attempting to
make sure that the flash/commit is executed before the second run.

Error:

The test RuntimeError: number of values in row (0) differ from number of
column processors (29)

The error is strange however and indicates a bug in sqlite library.
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Aug 8, 2023
ephraimbuddy pushed a commit that referenced this pull request Aug 8, 2023
Similarly to #33145 - this is an attempt to stabilise flaky tests
for the test_xcom_arg_map.

Even if the mechanism is not entirely clear (provide_session should
also close the connection) seems like using pytest-fixture provided
session works better than relying on a new session created in run()
methods.

(cherry picked from commit 3dd0c99)
ephraimbuddy pushed a commit that referenced this pull request Aug 8, 2023
)

Using same session in different steps of the same test has the
potential of not flishing/committing the changes between.

Seems that the #33150 traded one flakiness with another. Attempting to
make sure that the flash/commit is executed before the second run.

Error:

The test RuntimeError: number of values in row (0) differ from number of
column processors (29)

The error is strange however and indicates a bug in sqlite library.

(cherry picked from commit 6b21b79)
ephraimbuddy pushed a commit that referenced this pull request Aug 8, 2023
)

Using same session in different steps of the same test has the
potential of not flishing/committing the changes between.

Seems that the #33150 traded one flakiness with another. Attempting to
make sure that the flash/commit is executed before the second run.

Error:

The test RuntimeError: number of values in row (0) differ from number of
column processors (29)

The error is strange however and indicates a bug in sqlite library.

(cherry picked from commit 6b21b79)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants