Skip to content

Improve log messages of heartbeat connection errors and recovery #31810

@hterik

Description

@hterik

Description

Occasionally when running a task (with CeleryExecutor), the log produces following error and then the task continues executing to completion without any problem:

2023-05-31 18:52:16 E/LocalTaskJob                 LocalTaskJob heartbeat got an exception exc_info=Traceback (most recent call last):
  File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
    return fn()
  File "sqlalchemy/pool/base.py", line 325, in connect
    return _ConnectionFairy._checkout(self)
  File "sqlalchemy/pool/base.py", line 888, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "sqlalchemy/pool/base.py", line 491, in checkout
    rec = pool._do_get()
  File "sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
  File "sqlalchemy/pool/base.py", line 271, in _create_connection
    return _ConnectionRecord(self)
  File "sqlalchemy/pool/base.py", line 386, in __init__
    self.__connect()
  File "sqlalchemy/pool/base.py", line 684, in __connect
    with util.safe_reraise():
  File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "sqlalchemy/pool/base.py", line 680, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "xxxxxx" (xxxxxxxx), port 5432 failed: SSL SYSCALL error: EOF detected


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "airflow/jobs/base_job.py", line 204, in heartbeat
    session.merge(self)
  File "sqlalchemy/orm/session.py", line 3051, in merge
    return self._merge(
  File "sqlalchemy/orm/session.py", line 3131, in _merge
    merged = self.get(
  File "sqlalchemy/orm/session.py", line 2848, in get
    return self._get_impl(
  File "sqlalchemy/orm/session.py", line 2970, in _get_impl
    return db_load_fn(
  File "sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "sqlalchemy/orm/session.py", line 1713, in execute
    conn = self._connection_for_bind(bind)
  File "sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
    return self._transaction._connection_for_bind(
  File "sqlalchemy/orm/session.py", line 747, in _connection_for_bind
    conn = bind.connect()
  File "sqlalchemy/engine/base.py", line 3315, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
  File "sqlalchemy/engine/base.py", line 96, in __init__
    else engine.raw_connection()
  File "sqlalchemy/engine/base.py", line 3394, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "sqlalchemy/engine/base.py", line 3364, in _wrap_pool_connect
    Connection._handle_dbapi_exception_noconnection(
  File "sqlalchemy/engine/base.py", line 2198, in _handle_dbapi_exception_noconnection
    util.raise_(
  File "sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
    return fn()
  File "sqlalchemy/pool/base.py", line 325, in connect
    return _ConnectionFairy._checkout(self)
  File "sqlalchemy/pool/base.py", line 888, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "sqlalchemy/pool/base.py", line 491, in checkout
    rec = pool._do_get()
  File "sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
  File "sqlalchemy/pool/base.py", line 271, in _create_connection
    return _ConnectionRecord(self)
  File "sqlalchemy/pool/base.py", line 386, in __init__
    self.__connect()
  File "sqlalchemy/pool/base.py", line 684, in __connect
    with util.safe_reraise():
  File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "sqlalchemy/pool/base.py", line 680, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "xxxxxxx" (xxxxxxx), port 5432 failed: SSL SYSCALL error: EOF detected

(Background on this error at: https://sqlalche.me/e/14/e3q8) 

(Above is from Airflow 2.5.0)

Use case/motivation

Users are "trained" to scan for stacktraces in log files and think this is may be cause of the DAG failing, when it in fact is just a transient error that got recovered on next hearbeat. Leading to lots of unnecessary support activities and explanations that this error can often be ignored.

A few suggestions that would help:

A: Don't log the whole stack trace, instead just log this as an error message.
B: Include a descriptive message of the consequence of this error in the log.
C: When heartbeat recovers, log it.
D: If heartbeat was recovered after scheduler_health_check_threshold, log another more severe error message including descriptive message of the consequences.

Related issues

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions