Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion airflow-core/src/airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,19 @@ def handle_event_submit(event: TriggerEvent, *, task_instance: TaskInstance, ses
from airflow.utils.state import TaskInstanceState

# Get the next kwargs of the task instance, or an empty dictionary if it doesn't exist
next_kwargs = task_instance.next_kwargs or {}
raw_next_kwargs = task_instance.next_kwargs

if raw_next_kwargs is None:
next_kwargs: dict[str, Any] = {}
elif isinstance(raw_next_kwargs, str):
from airflow.serialization.serialized_objects import BaseSerialization

# BaseSerialization.deserialize will normalize it back to a dict.
next_kwargs = BaseSerialization.deserialize(raw_next_kwargs)
else:
# mypy now knows this is a dict[Any, Any]
next_kwargs = raw_next_kwargs

# Add the event's payload into the kwargs for the task
next_kwargs["event"] = event.payload

Expand Down
Loading