-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Description
To increase the extensibility and integration capabilities of Datasets, I think we should
- Add a
register_external_dataset_change(self, dataset: Dataset, extra=None, session: Session)method in DatasetManager. This would allow for the updating a dataset without a task_instance, which is necessary for updating datasets across Airflow instances.
def register_external_dataset_change(
self, *, dataset: Dataset, session: Session, extra=extra, **kwargs
) -> None:
"""
For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
the dataset event
"""
dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
if not dataset_model:
self.log.info("DatasetModel %s not found", dataset_model)
return
session.add(
DatasetEvent(
dataset_id=dataset_model.id,
extra=extra,
)
)
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)
session.flush()Alternatively, task_instance could be an optional parameter in the default register_dataset_change method
2. Add update_dataset endpoint. This endpoint would call the register_external_dataset_change method and register a dataset change without a task_instance or dag_id
3. Formalize the concept of an "external" dataset update and possibly even add a parameter in the Dataset definition to indicate whether or it should accept external dataset updates. This would allow for the external triggering nature of a particular Dataset to be displayed in the Datasets graph in the UI.
Use case/motivation
This year we moved to a multi-instance Airflow architecture, where we deploy multiple instances of Airflow in production. With Datasets, each instance of Airflow has it's own set of datasets, but we need to manage dataset dependencies across instances.
We've taken advantage of the great extensibility of the configurable DatasetManager (kudos to whoever designed that) by overriding the register_dataset_change method to broadcast the DatasetEvent to each instance.
class CustomDatasetManager(DatasetManager):
def register_dataset_change(self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs) -> None:
# Record the dataset event and trigger DAGs in the local instance
super().register_dataset_change(task_instance=task_instance, dataset=dataset, extra=extra, session=session, **kwargs)
# Send a request to the other instances to trigger DAGs that depend on the dataset
for instance_url in instance_urls:
url = f"{instance_url}/api/v1/shopify/admin/custom_endpoint"
# execute requestTo make this work, we add a custom endpoint for registering a dataset change register_external_dataset_change. Since a separate Airflow instance doesn't know about the DAG or task_instance outlet that triggered the updating of the dataset, our endpoint calls a custom external change method that we added to our custom DatasetManager
This works because DatasetEvent has the dag and task_instance related info nullable.
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct