Skip to content

Conversation

@seanghaeli
Copy link
Contributor

@seanghaeli seanghaeli commented Jul 8, 2025

PR for #52999:

Move UPSTREAM_FAILED task instance state from intermediate to terminal state class and add tests to validate that state class relationships remain consistent (failed + success = terminal, terminal + intermediate = all task states).

Thoughts on this change? Motivated from #51719, @eladkal's comment:

there is no protection here against possible future addition of new state to task instance. For example we are discussing #12199

I suggest to add defensive test around adding more states so we'll know to modify code here or maybe we can consider adding more classes to categorized states similar to

class TerminalTIState(str, Enum):
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
REMOVED = "removed"

Wanted to make this a separate PR for it since its scope it quite different.

Most likely this code will break some integration tests because there's various places that depend on the task instance states in interesting ways, but wanted to field feedback first before fixing them.

@seanghaeli
Copy link
Contributor Author

@o-nikolas if you don't mind, let me know what you think

@o-nikolas
Copy link
Contributor

@o-nikolas if you don't mind, let me know what you think

I like this change. All the sets that we have for specific terminal states (I think it's just success and failure states at the moment) should definitely sum up to the full set of terminal states. This makes the set math you're trying to do in your original PR much cleaner, future proof and not hardcoded to specific sets which is all great stuff.

Only concern I'd have is what you mention, the changes propagating in ways we don't expect, but I don't think this is insurmountable and I'd be interested to see hear how many tests failed because of this change.

@seanghaeli
Copy link
Contributor Author

@o-nikolas Tests passing, ready for review.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels strange that none of our current tests needed modification due to this PR.

Like Niko I do have concerns that this change might cause unexpected behaviour. I see that the TerminalStateNonSuccess class is referenced in:

def finish(self, id: uuid.UUID, state: TerminalStateNonSuccess, when: datetime, rendered_map_index):
"""Tell the API server that this TI has reached a terminal state."""
if state == TaskInstanceState.SUCCESS:
raise ValueError("Logic error. SUCCESS state should call the `succeed` function instead")
# TODO: handle the naming better. finish sounds wrong as "even" deferred is essentially finishing.
body = TITerminalStatePayload(
end_date=when, state=TerminalStateNonSuccess(state), rendered_map_index=rendered_map_index
)
self.client.patch(f"task-instances/{id}/state", content=body.model_dump_json())

This means that after this change UPSTREAM_FAILED will be processed differently. Do we know the implication of this?
If I read this right it may implicate on the end date of the task which can affect metrics collection. This may be a welcomed change but we need to be very sure that the all behavioral changes are expected.

This is one of the PRs that we should get several approval just to make sure we are making the right change. I would also ask to include newsfragment for this change.

@vincbeck vincbeck requested a review from eladkal July 14, 2025 17:14
Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting temporary block to avoid accidental merge
Any maintainer may lift this block if

This is one of the PRs that we should get several approvals just to make sure we are making the right change.

If satisfied or no longer relevant.

@seanghaeli
Copy link
Contributor Author

@kaxil Since you created the IntermediateTIState and TerminalTIState classes, what are your thoughts about moving the Task Instance State UPSTREAM_FAILED from IntermediateTIState to TerminalTIState?

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPSTREAM_FAILED change looks good but adding of removed to success_states does not

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good but need to address 2 comments

@kaxil kaxil merged commit 016b7cd into apache:main Jul 22, 2025
102 checks passed
seanghaeli added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jul 23, 2025
@seanghaeli
Copy link
Contributor Author

UPSTREAM_FAILED change looks good but adding of removed to success_states does not

In my opinion, REMOVED semantically makes sense as a failure state. It represents work that we expect to be completed at the time the DAG is initially called, but then is no longer available when it comes time to execute the task. Would you agree @kaxil ?

seanghaeli added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jul 23, 2025
seanghaeli added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jul 23, 2025
vincbeck pushed a commit that referenced this pull request Aug 5, 2025
* Add MwaaTaskSensor to Amazon Provider Package

* include pre-commit hooks

* - Comply with PR #51196: explicitly pass  to its superclass, and adjust the default value of  in base class to .
- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.

* documentation update

* Response to PR #51719 comments

- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.

* update tests

* Remove duplicate pointer to mwaatasksensor docs

* merge #53000

* Fix integration tests

* removed unnecessary defensive tests for trigger acceptor states after #53000 merge

* removed unnecessary defensive tests for trigger acceptor states after #53000 merge

* remove State file from PR

* Remove hard coding deferrable property

* Remove unnecessary execute_complete function in sensor and instead use end_from_trigger

* Correctly use end_from_trigger attribute

* Correctly use end_from_trigger attibute for both dag run sensor and task sensor

* Remove unnecessary import
HsiuChuanHsu pushed a commit to HsiuChuanHsu/airflow that referenced this pull request Aug 5, 2025
* Add MwaaTaskSensor to Amazon Provider Package

* include pre-commit hooks

* - Comply with PR apache#51196: explicitly pass  to its superclass, and adjust the default value of  in base class to .
- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.

* documentation update

* Response to PR apache#51719 comments

- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.

* update tests

* Remove duplicate pointer to mwaatasksensor docs

* merge apache#53000

* Fix integration tests

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* remove State file from PR

* Remove hard coding deferrable property

* Remove unnecessary execute_complete function in sensor and instead use end_from_trigger

* Correctly use end_from_trigger attribute

* Correctly use end_from_trigger attibute for both dag run sensor and task sensor

* Remove unnecessary import
ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 7, 2025
* Add MwaaTaskSensor to Amazon Provider Package

* include pre-commit hooks

* - Comply with PR apache#51196: explicitly pass  to its superclass, and adjust the default value of  in base class to .
- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.

* documentation update

* Response to PR apache#51719 comments

- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.

* update tests

* Remove duplicate pointer to mwaatasksensor docs

* merge apache#53000

* Fix integration tests

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* remove State file from PR

* Remove hard coding deferrable property

* Remove unnecessary execute_complete function in sensor and instead use end_from_trigger

* Correctly use end_from_trigger attribute

* Correctly use end_from_trigger attibute for both dag run sensor and task sensor

* Remove unnecessary import
fweilun pushed a commit to fweilun/airflow that referenced this pull request Aug 11, 2025
* Add MwaaTaskSensor to Amazon Provider Package

* include pre-commit hooks

* - Comply with PR apache#51196: explicitly pass  to its superclass, and adjust the default value of  in base class to .
- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.

* documentation update

* Response to PR apache#51719 comments

- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.

* update tests

* Remove duplicate pointer to mwaatasksensor docs

* merge apache#53000

* Fix integration tests

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* remove State file from PR

* Remove hard coding deferrable property

* Remove unnecessary execute_complete function in sensor and instead use end_from_trigger

* Correctly use end_from_trigger attribute

* Correctly use end_from_trigger attibute for both dag run sensor and task sensor

* Remove unnecessary import
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants