-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Split Deadline Alerts into core and task-sdk components #61118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split Deadline Alerts into core and task-sdk components #61118
Conversation
airflow-core/tests/unit/serialization/test_serialized_objects.py
Outdated
Show resolved
Hide resolved
|
@ferruzzi thanks for your review, I have handled your comments now, let me know how it looks. |
ferruzzi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my questions. I think it looks right.
|
Thanks @ferruzzi! Just to gain some more confidence prior to merge, I tried it out functionally. Used this custom callback: async def custom_async_callback(**kwargs):
context = kwargs.get("context", {})
print(
f"Deadline exceeded for Dag {context.get('dag_run', {}).get('dag_id')}!"
)
print(f"Context: {context}")
print(f"Alert type: {kwargs.get('alert_type')}")This DAG: from datetime import timedelta
from deadline_callback import custom_async_callback
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference
with DAG(
dag_id="custom_deadline_alert",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=10),
callback=AsyncCallback(
custom_async_callback,
kwargs={"alert_type": "time_exceeded"},
),
),
):
BashOperator(task_id="example_task", bash_command="sleep 30")And it works as I expect it to:
|

Was generative AI tooling used to co-author this PR?
Used cursor IDE
Why This Change?
Trying to assist the client-server separation by moving worker side deadline functionality to
task-sdkwhile keeping the DB dependent evaluation logic inairflow-core. This follows the established pattern from the assets migration: #58993What stays where?
In simpler terms, this PR tries to address this.
SDK
DeadlineAlert: Its the user facing class for defining deadline alerts (no serialization methods from now)DeadlineReference: The same factory class for creating deadline referencesReferenceModels.*: Original reference implementations for backward compatibilityThe principle here it to keep the lightweight DAG authoring interface with no database dependencies in sdk
Core / Serialization module
SerializedDeadlineAlert: Internal representation for core usages used post deserialization of a DeadlineAlertSerializedReferenceModels.*: Reference implementations with database accessencode_deadline_alert()/decode_deadline_alert(): Centralized serialization functions used to ser/deser the deadline alertsThe principle here is to keep the serialization, deserialization, and deadline evaluation with database access in core.
Serialization Changes
Structure
Serialization format remains unchanged - no breaking changes to stored DAGs:
{ "__type": "deadline_alert", "__var": { "reference": {"reference_type": "DagRunLogicalDateDeadline"}, "interval": 3600.0, "callback": { "__classname__": "airflow.sdk.definitions.callback.AsyncCallback", "__version__": 0, "__data__": {"path": "...", "kwargs": {...}} } } }Same with the flow of control
DeadlineAlert→ dict viaencode_deadline_alert()usingairflow.sdk.serdeSerializedDeadlineAlertviadecode_deadline_alert()SerializedReferenceModelsuses database session to calculate deadlinesOne thing of note is the callback serialisation, I chose to continue using serde for this purpose because BaseSerialisation cannot handle callbacks. Using serde made sense since this part of serialisation runs in dag processor, which untilmately is not a core component and can use task sdk. So, flow:
airflow.sdk.serde.serialize()/deserialize()for proper callback handlingBackward Compatibility
DagRunLogicalDateDeadlinenotSerializedDagRunLogicalDateDeadline){pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.