-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix Databricks SQL operator serialization #31780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Databricks SQL operator serialization #31780
Conversation
The Databricks SQL operator returned Databricks Row which were not serializatble, because they were special extension of tuples that also acted as dict. In case of SQLOperator, we return a different format of output - separately descriptions of the rows and separately rows of values which are regular tuples. This PR converts the Databrick Rows into regular tuples on the flight while processing the output Fixes: apache#31753 Fixes: apache#31499
hussein-awala
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
phanikumv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution works ! I have tested it on my local and here are the logs
*** Found local files:
*** * /root/airflow/logs/dag_id=example_databricks_sql/run_id=manual__2023-06-08T06:24:41.576562+00:00/task_id=get_max_id/attempt=1.log
[2023-06-08, 06:24:44 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_databricks_sql.get_max_id manual__2023-06-08T06:24:41.576562+00:00 [queued]>
[2023-06-08, 06:24:44 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_databricks_sql.get_max_id manual__2023-06-08T06:24:41.576562+00:00 [queued]>
[2023-06-08, 06:24:44 UTC] {taskinstance.py:1338} INFO - Starting attempt 1 of 1
[2023-06-08, 06:24:45 UTC] {taskinstance.py:1359} INFO - Executing <Task(DatabricksSqlOperator): get_max_id> on 2023-06-08 06:24:41.576562+00:00
[2023-06-08, 06:24:45 UTC] {standard_task_runner.py:57} INFO - Started process 276 to run task
[2023-06-08, 06:24:45 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'example_databricks_sql', 'get_max_id', 'manual__2023-06-08T06:24:41.576562+00:00', '--job-id', '3', '--raw', '--subdir', 'DAGS_FOLDER/example_databricks_sql.py', '--cfg-path', '/tmp/tmpzojqhr93']
[2023-06-08, 06:24:45 UTC] {standard_task_runner.py:85} INFO - Job 3: Subtask get_max_id
[2023-06-08, 06:24:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: example_databricks_sql.get_max_id manual__2023-06-08T06:24:41.576562+00:00 [running]> on host 82860d2acc66
[2023-06-08, 06:24:45 UTC] {taskinstance.py:1637} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_databricks_sql' AIRFLOW_CTX_TASK_ID='get_max_id' AIRFLOW_CTX_EXECUTION_DATE='2023-06-08T06:24:41.576562+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-08T06:24:41.576562+00:00'
[2023-06-08, 06:24:45 UTC] {sql.py:265} INFO - Executing: SELECT cast(max(col3) as STRING) FROM test_table3
[2023-06-08, 06:24:45 UTC] {base.py:73} INFO - Using connection ID 'databricks_default' for task execution.
[2023-06-08, 06:24:45 UTC] {databricks_base.py:430} INFO - Using token auth. For security reasons, please set token in Password field instead of extra
[2023-06-08, 06:24:47 UTC] {databricks_base.py:430} INFO - Using token auth. For security reasons, please set token in Password field instead of extra
[2023-06-08, 06:24:48 UTC] {client.py:193} INFO - Successfully opened session b'\x01\xee\x05\xc5+\x06\x11I\x90$$u\nE\xa9\xfb'
[2023-06-08, 06:24:48 UTC] {sql.py:374} INFO - Running statement: SELECT cast(max(col3) as STRING) FROM test_table3, parameters: None
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ^^^^^^^^^^^^^^^^^
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - printing o.....
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ([('CAST(max(col3) AS STRING)', 'string', None, None, None, None, None)], [('102.23',)])
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ['__add__', '__class__', '__contains__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'count', 'index']
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ^^^^^^^^^^^^^^^^^
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - printing o.....
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ([('CAST(max(col3) AS STRING)', 'string', None, None, None, None, None)], [('102.23',)])
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ['__add__', '__class__', '__contains__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'count', 'index']
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ^^^^^^^^^^^^^^^^^
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - printing o.....
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ('CAST(max(col3) AS STRING)', 'string', None, None, None, None, None)
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ['__add__', '__class__', '__contains__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'count', 'index']
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ^^^^^^^^^^^^^^^^^
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - printing o.....
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ('CAST(max(col3) AS STRING)', 'string', None, None, None, None, None)
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ['__add__', '__class__', '__contains__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'count', 'index']
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ^^^^^^^^^^^^^^^^^
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - printing o.....
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ('102.23',)
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ['__add__', '__class__', '__contains__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'count', 'index']
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ^^^^^^^^^^^^^^^^^
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - printing o.....
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ('102.23',)
[2023-06-08, 06:30:39 UTC] {logging_mixin.py:152} INFO - ['__add__', '__class__', '__contains__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'count', 'index']
[2023-06-08, 06:30:39 UTC] {taskinstance.py:1377} INFO - Marking task as SUCCESS. dag_id=example_databricks_sql, task_id=get_max_id, execution_date=20230608T062441, start_date=20230608T062444, end_date=20230608T063039
[2023-06-08, 06:30:39 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-08, 06:30:39 UTC] {taskinstance.py:2752} INFO - 0 downstream tasks scheduled from follow-on schedule check
Version: [v2.7.0.dev0](https://pypi.python.org/pypi/apache-airflow/2.7.0.dev0)
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
| from databricks.sql.types import Row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. have not seen it before merging. It's a left-over from some other trials done - not a big harm, it's the same Row just imported from databricks_sql - so it actually might even make more sense to be imported from there.
Coool |
This operator was already returning a serializable object since apache#31780. But the PR apache#31780 was reverted in favor of a serialization at the Hook level. To avoid regression, the DatabricksSqlOperator has to return a serializable
The Databricks SQL operator returned Databricks Row which were not serializatble, because they were special extension of tuples that also acted as dict. In case of SQLOperator, we return a different format of output - separately descriptions of the rows and separately rows of values which are regular tuples.
This PR converts the Databrick Rows into regular tuples on the flight while processing the output
Fixes: #31753
Fixes: #31499
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.