Skip to content

XCom - Attribute Error when serializing output of Merge Into databricks sql command. #31499

@aru-trackunit

Description

@aru-trackunit

Apache Airflow version

2.6.1

What happened

After upgrading from Airflow 2.5.3 to 2.6.1 - the dag started to fail and it's related to XCom serialization.

I noticed that something has changed in regards to serializing XCom:```

key Value Version 2.5.3 Value Version 2.6.1 result
return_value [[['Result', 'string', None, None, None, None, None]], []] (['(num_affected_rows,bigint,None,None,None,None,None)', '(num_inserted_rows,bigint,None,None,None,None,None)'],[])
return_value [[['Result', 'string', None, None, None, None, None]], []] (['(Result,string,None,None,None,None,None)'],[])
return_value [[['num_affected_rows', 'bigint', None, None, None, None, None], ['num_updated_rows', 'bigint', None, None, None, None, None], ['num_deleted_rows', 'bigint', None, None, None, None, None], ['num_inserted_rows', 'bigint', None, None, None, None, None]], [[1442, 605, 0, 837]]] AttributeError: __name__. Did you mean: '__ne__'?

Query syntax that procuded an error: MERGE INTO https://docs.databricks.com/sql/language-manual/delta-merge-into.html

Stacktrace included below:

[2023-05-24, 01:12:43 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2354, in xcom_push
    XCom.set(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 237, in set
    value = cls.serialize_value(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 632, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
    o = self.default(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
    return serialize(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 144, in serialize
    return encode(classname, version, serialize(data, depth + 1))
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 132, in serialize
    qn = qualname(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 47, in qualname
    return f"{o.__module__}.{o.__name__}"
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/types.py", line 161, in __getattr__
    raise AttributeError(item)
AttributeError: __name__. Did you mean: '__ne__'?

What you think should happen instead

Serialization should finish without an exception raised.

How to reproduce

  1. Dag file with declared operator:
    task = DatabricksSqlOperator(
        task_id="task",
        databricks_conn_id="databricks_conn_id",
        sql_endpoint_name="name",
        sql="file.sql"
    )

file.sql

MERGE INTO table_name
ON condition
WHEN MATCHED THEN UPDATE
WHEN NOT MATCHED THEN INSERT

https://docs.databricks.com/sql/language-manual/delta-merge-into.html
Query output is a table

num_affected_rows num_updated_rows num_deleted_rows num_inserted_rows
0 0 0 0

EDIT: It also happens for a select command

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-cncf-kubernetes==6.1.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-databricks==4.1.0
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==10.0.0
apache-airflow-providers-hashicorp==3.3.1
apache-airflow-providers-http==4.3.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-mysql==5.0.0
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-sftp==4.2.4
apache-airflow-providers-sqlite==3.3.2
apache-airflow-providers-ssh==3.6.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions