Skip to content

Conversation

@snjypl
Copy link
Contributor

@snjypl snjypl commented Jan 26, 2023

Fixes: #28615


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Jan 26, 2023
@snjypl
Copy link
Contributor Author

snjypl commented Jan 26, 2023

Hi Team @potiuk @mhenc @vincbeck

this task appears to be more complex than i expected. i went through the AIP 44, , and the PRs for the other tasks to get a better understanding.

in AIP-44, i found a reference to the save_dag_to_db method.

This is a new, “heaviest” method that should be exposed by the internal API to the DAG processor. We should be able to serialize all the information stored currently in the DagBag and send it to the Internal API server. The method should be roughly equivalent to: sync_to_db and pickling dags if set. This should be done in a single transaction as a single API call.

based on my understanding so far, i have done a rough refactoring to see if i am in the right direction. it will be really helpful if anyone could take a look at the changes.

From the AIP, i got the impression that we would be sending the serialized dag to the internal api. Will BaseSerialization.serialize in the internal_api_call decorator take care about the serialization?

@snjypl
Copy link
Contributor Author

snjypl commented Jan 26, 2023

i tried the below code to check the serialization/deserialization:

from airflow.models.dagbag import DagBag
from airflow.serialization.serialized_objects import BaseSerialization,SerializedDAG
dagbag = DagBag('/opt/airflow/dags')
BaseSerialization.deserialize(BaseSerialization.serialize(dagbag.dags))

but i got this error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 475, in deserialize
    return {k: cls.deserialize(v) for k, v in var.items()}
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 475, in <dictcomp>
    return {k: cls.deserialize(v) for k, v in var.items()}
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 471, in deserialize
    var = encoded_var[Encoding.VAR]
KeyError: <Encoding.VAR: '__var'>

@snjypl snjypl force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch from 8ec3f0d to 94a1999 Compare January 27, 2023 11:49
@mhenc
Copy link
Contributor

mhenc commented Feb 1, 2023

From the AIP, i got the impression that we would be sending the serialized dag to the internal api. Will BaseSerialization.serialize in the internal_api_call decorator take care about the serialization?

Yes, the InternalApi decorator/server serializes/deserializes the objects using BaseSerialization.

https://github.com/apache/airflow/blob/main/airflow/api_internal/internal_api_call.py#L107

I see that you have problems serializing dags, which is dict[String,DAG]. We may need to extend the BaseSerialization to support it. Let me take a look.

@mhenc
Copy link
Contributor

mhenc commented Feb 1, 2023

Regarding dags: dict[String, Dag] serialization/deserialization:
I found that there is a code for it:
https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L415
and
https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L476

But for some reason the serialized object doesn't contain "__type"="dag" so the deserialization doesn't work as expected.
This can be easily fixed by changing line

return SerializedDAG.serialize_dag(var)

https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L415
to

return cls._encode(SerializedDAG.serialize_dag(var), type_=DAT.DAG)

at least it worked for me.

But I am not sure if it won't break something.

@potiuk @ashb do you have any idea why it is like that? is this code used anywhere?
Note that there are similar cases (like return SerializedBaseOperator.serialize_mapped_operator(var) few lines below).

@snjypl snjypl force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch 2 times, most recently from 8ef6e37 to 2804486 Compare February 2, 2023 13:41
@snjypl snjypl force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch 2 times, most recently from 8d64734 to 126eac3 Compare February 2, 2023 20:16
@snjypl snjypl force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch from 126eac3 to cb467e6 Compare February 14, 2023 18:08
@snjypl snjypl marked this pull request as ready for review February 14, 2023 20:43
@snjypl snjypl force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch from cb467e6 to 66fa891 Compare February 14, 2023 20:43
@potiuk
Copy link
Member

potiuk commented Feb 15, 2023

Sorry, i have not looked at this before, I want to have a closer look at that after we discuss the approach i proposed in #29513 (comment). - I have a feeling that once we agree this is a good idea and we apply it here, we might implement some simple refactor that will let us avoid any complexity (but it's too late for me now to think clearly).

@snjypl snjypl force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch from 66fa891 to 1ae8161 Compare February 27, 2023 07:03
@potiuk potiuk force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch from 1ae8161 to 4871031 Compare February 27, 2023 20:39
@snjypl snjypl force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch 4 times, most recently from 700c17f to ef555ca Compare March 2, 2023 18:51
@snjypl snjypl force-pushed the 28615-aip-44-mirgate-dagbag.sync_to_db branch from ef555ca to f48fdf4 Compare March 9, 2023 15:13
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@potiuk potiuk merged commit 5c15b23 into apache:main Mar 12, 2023
@snjypl snjypl deleted the 28615-aip-44-mirgate-dagbag.sync_to_db branch March 15, 2023 14:19
@pierrejeambrun pierrejeambrun added the AIP-44 Airflow Internal API label Mar 22, 2023
@pierrejeambrun pierrejeambrun added this to the Airflow 2.6.0 milestone Mar 22, 2023
@pierrejeambrun pierrejeambrun added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Mar 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIP-44 Airflow Internal API area:Scheduler including HA (high availability) scheduler changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AIP-44 Migrate Dagbag.sync_to_db to internal API.

5 participants