Skip to content

Conversation

@jedcunningham
Copy link
Member

This consolidates the cli sync-perm command and the syncing that happens by default during webserver startup. Prior to this change, access_control wasn't supported in the default sync and the cli sync-perm command was slow when you have many DAGs.

With ~5k simple DAGs, this makes sync-perms faster (~24s -> ~10s). It does make the webserver startup slower (due to loading DagBag from the db vs just querying DagModel, but it also syncs access_control where it wasn't before.

This consolidates the cli sync-perm command and the syncing that
happens by default during webserver startup. Prior to this change,
`access_control` wasn't supported in the default sync and the cli
sync-perm command was slow when you have many DAGs.
@boring-cyborg boring-cyborg bot added area:CLI area:webserver Webserver related Issues labels Apr 8, 2021
@kaxil kaxil self-requested a review April 8, 2021 23:54
.all()
)
dagbag = DagBag(read_dags_from_db=True)
dagbag.collect_dags_from_db()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooff -- This will load all the Serialized DAGs though here and start again with an incremental DagBag in

def init_dagbag(app):
"""
Create global DagBag for webserver and API. To access it use
``flask.current_app.dag_bag``.
"""
if os.environ.get('SKIP_DAGS_PARSING') == 'True':
app.dag_bag = DagBag(os.devnull, include_examples=False)
else:
app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)

The Serialized DAG will be loaded when required by:

# If DAG is in the DagBag, check the following
# 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
# 2. check the last_updated column in SerializedDag table to see if Serialized DAG is updated
# 3. if (2) is yes, fetch the Serialized DAG.
min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
if (
dag_id in self.dags_last_fetched
and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
):
sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
dag_id=dag_id,
session=session,
)
if sd_last_updated_datetime and sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
self._add_dag_from_db(dag_id=dag_id, session=session)
return self.dags.get(dag_id)

Let's say if someone changes the DAG File with a change in access_control and the Parsing process writes the serialized_dag, it will hit the above code-block and if 10 seconds have passed (AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL -https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#min-serialized-dag-fetch-interval), it will re-fetch and update the DAG. I think we should refresh the permission for that DAG in that nested if statement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add if dag.access_control: sync_dag_perm.... in:

def _add_dag_from_db(self, dag_id: str, session: Session):
"""Add DAG to DagBag from DB"""
from airflow.models.serialized_dag import SerializedDagModel
row = SerializedDagModel.get(dag_id, session)
if not row:
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
row.load_op_links = self.load_op_links
dag = row.dag
for subdag in dag.subdags:
self.dags[subdag.dag_id] = subdag
self.dags[dag.dag_id] = dag
self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
self.dags_hash[dag.dag_id] = row.dag_hash

Copy link
Member

@kaxil kaxil Apr 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do that, we could just remove syncing DAG level permissions from sync-perm command

@jedcunningham
Copy link
Member Author

I'm going to merge this with pr #15311.

@jedcunningham jedcunningham deleted the cli_sync_perms branch July 29, 2021 22:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:CLI area:webserver Webserver related Issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants