Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ In this example, the DAG ``waiting_for_dataset_1_and_2`` will be triggered when
...


``quededEvent`` API endpoints are introduced to manipulate such records.
``queuedEvent`` API endpoints are introduced to manipulate such records.

* Get a queued Dataset event for a DAG: ``/datasets/queuedEvent/{uri}``
* Get queued Dataset events for a DAG: ``/dags/{dag_id}/datasets/queuedEvent``
Expand All @@ -347,7 +347,7 @@ In this example, the DAG ``waiting_for_dataset_1_and_2`` will be triggered when
* Get queued Dataset events for a Dataset: ``/dags/{dag_id}/datasets/queuedEvent/{uri}``
* Delete queued Dataset events for a Dataset: ``DELETE /dags/{dag_id}/datasets/queuedEvent/{uri}``

For how to use REST API and the parameters needed for these endpoints, please refer to :doc:`Airflow API </stable-rest-api-ref>`
For how to use REST API and the parameters needed for these endpoints, please refer to :doc:`Airflow API </stable-rest-api-ref>`.

Advanced dataset scheduling with conditional expressions
--------------------------------------------------------
Expand Down Expand Up @@ -444,7 +444,7 @@ The following example creates a dataset event against the S3 URI ``f"s3://bucket

@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_metadata():
s3_dataset = Dataset("s3://bucket/my-task}")
s3_dataset = Dataset("s3://bucket/my-task")
yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")

Only one dataset event is emitted for an added dataset, even if it is added to the alias multiple times, or added to multiple aliases. However, if different ``extra`` values are passed, it can emit multiple dataset events. In the following example, two dataset events will be emitted.
Expand All @@ -470,7 +470,7 @@ Only one dataset event is emitted for an added dataset, even if it is added to t

Scheduling based on dataset aliases
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Since dataset events added to an alias are just simple dataset events, a downstream depending on the actual dataset can read dataset events of it normally, without considering the associated aliases. A downstream can also depend on a dataset alias. The authoring syntax is referencing the ``DatasetAlias`` by name, and the associated dataset events are picked up for scheduling. Note that a DAG can be triggered by a task with ``outlets=DatasetAlias("xxx")`` if and only if the alias is resolved into ``Dataset("s3://bucket/my-task")``. The DAG runs whenever a task with outlet ``DatasetAlias("out")`` gets associated with at least one dataset at runtime, regardless of the dataset's identity. The downstream DAG is not triggered if no datasets are associated to the alias for a particular given task run. This also means we can do conditional dataset-triggering.
Since dataset events added to an alias are just simple dataset events, a downstream DAG depending on the actual dataset can read dataset events of it normally, without considering the associated aliases. A downstream DAG can also depend on a dataset alias. The authoring syntax is referencing the ``DatasetAlias`` by name, and the associated dataset events are picked up for scheduling. Note that a DAG can be triggered by a task with ``outlets=DatasetAlias("xxx")`` if and only if the alias is resolved into ``Dataset("s3://bucket/my-task")``. The DAG runs whenever a task with outlet ``DatasetAlias("out")`` gets associated with at least one dataset at runtime, regardless of the dataset's identity. The downstream DAG is not triggered if no datasets are associated to the alias for a particular given task run. This also means we can do conditional dataset-triggering.

The dataset alias is resolved to the datasets during DAG parsing. Thus, if the "min_file_process_interval" configuration is set to a high value, there is a possibility that the dataset alias may not be resolved. To resolve this issue, you can trigger DAG parsing.

Expand Down