Skip to content

Conversation

@majorosdonat
Copy link
Contributor

@majorosdonat majorosdonat commented Jan 23, 2025

Maintenance mode is enabled for the edge worker. In maintenance mode, the worker is alive, but cannot consume any jobs.
The maintenance mode can be triggered by a button from the edge worker status page. It writes the state "maintenance request" directly to the database as worker state. Then the worker will go to maintenance pending if there are running jobs, and maintenance mode if all jobs have finished.

When exiting maintenance mode, maintenance exit is written to the database. Then the worker will switch to running state if it was in state maintenance pending, and to idle if it was in maintenance mode.

image

Why do we need the state maintenance exit?
If the user requested maintenance, so the maintenance request is in the database, and the user wants to exit maintenance immidiately e.g. for misclick, then we will not know if we should write running or idle to the database.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@majorosdonat majorosdonat requested a review from jscheffl February 3, 2025 07:43
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope CI is turning green now, then LGTM!

@jscheffl jscheffl merged commit f013527 into apache:main Feb 3, 2025
61 checks passed
dabla pushed a commit to dabla/airflow that referenced this pull request Feb 3, 2025
* Initial implementation

* fix import

* fix brackets

* try harder

* fix name

* change to get request

* debug print

* try-except

* new return type

* fix expose

* use str in db

* link fix

* fix variable length

* MAINTENANCE_REQUEST

* add valid return

* add valid return statement

* try different naming

* more debug print

* extend enum

* new button to enter maintenance mode

* Implement maintenance off

* try new redirect

* fix redirect url

* commit sesion

* try another endpoint

* remove debug print

* exit print

* static checks

* Initial implementation

* fix import

* fix brackets

* try harder

* fix name

* change to get request

* debug print

* try-except

* new return type

* fix expose

* use str in db

* link fix

* fix variable length

* MAINTENANCE_REQUEST

* add valid return

* add valid return statement

* try different naming

* more debug print

* extend enum

* new button to enter maintenance mode

* Implement maintenance off

* try new redirect

* fix redirect url

* commit sesion

* try another endpoint

* remove debug print

* exit print

* static checks

* add new colors for maintenance modes

* modfiy host html

* fix quote

* remove more debug prints

* fix mypy

* another mypy fix

* final mypy fix

* add pytests

* Update edge worker versions

* fix spelling

* update versions

* create pydantic class

* fix pytest

* update docs

* apply review findings

* moved logic from plugins

* immidiate hertbeat if state cahgnes

* openapi fix

* fix condition

* exclude csfr checks

* return WorkerSetStateReturn by worker_set_state

* add debug print

* try new return

* fix heartbeat state

* fix expresiion

* fix variable isue

* fix logic

* fix pytest

* fix pytest

* minor fix

* fix airflow 3 compatibility

---------

Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <donat.majoros2@hu.bosch.com>
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 4, 2025
* Initial implementation

* fix import

* fix brackets

* try harder

* fix name

* change to get request

* debug print

* try-except

* new return type

* fix expose

* use str in db

* link fix

* fix variable length

* MAINTENANCE_REQUEST

* add valid return

* add valid return statement

* try different naming

* more debug print

* extend enum

* new button to enter maintenance mode

* Implement maintenance off

* try new redirect

* fix redirect url

* commit sesion

* try another endpoint

* remove debug print

* exit print

* static checks

* Initial implementation

* fix import

* fix brackets

* try harder

* fix name

* change to get request

* debug print

* try-except

* new return type

* fix expose

* use str in db

* link fix

* fix variable length

* MAINTENANCE_REQUEST

* add valid return

* add valid return statement

* try different naming

* more debug print

* extend enum

* new button to enter maintenance mode

* Implement maintenance off

* try new redirect

* fix redirect url

* commit sesion

* try another endpoint

* remove debug print

* exit print

* static checks

* add new colors for maintenance modes

* modfiy host html

* fix quote

* remove more debug prints

* fix mypy

* another mypy fix

* final mypy fix

* add pytests

* Update edge worker versions

* fix spelling

* update versions

* create pydantic class

* fix pytest

* update docs

* apply review findings

* moved logic from plugins

* immidiate hertbeat if state cahgnes

* openapi fix

* fix condition

* exclude csfr checks

* return WorkerSetStateReturn by worker_set_state

* add debug print

* try new return

* fix heartbeat state

* fix expresiion

* fix variable isue

* fix logic

* fix pytest

* fix pytest

* minor fix

* fix airflow 3 compatibility

---------

Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <donat.majoros2@hu.bosch.com>
@eladkal
Copy link
Contributor

eladkal commented Feb 5, 2025

Then the worker will go to maintenance pending if there are running jobs, and maintenance mode if all jobs have finished.

The diagram suggest changing mode depends only on the count of jobs. Is there a way for cluster admin to force entering to mantanince mode? (Force kill all existed jobs)?

@majorosdonat
Copy link
Contributor Author

Then the worker will go to maintenance pending if there are running jobs, and maintenance mode if all jobs have finished.

The diagram suggest changing mode depends only on the count of jobs. Is there a way for cluster admin to force entering to mantanince mode? (Force kill all existed jobs)?

This is no option directly, although it is visible that which jobs are executed by the worker, so they can be killed individually.
Killing the jobs would contradict the reason why maintenance mode was created. In maintenance mode and maintenance pending the worker cannot consume jobs, so maintenance can be done on-site without the worker picking up jobs. If we didn't care about the running jobs, then it would be easier to just delete the worker, but it would cause lot of headaches for the customers.

@jscheffl
Copy link
Contributor

jscheffl commented Feb 5, 2025

Then the worker will go to maintenance pending if there are running jobs, and maintenance mode if all jobs have finished.

The diagram suggest changing mode depends only on the count of jobs. Is there a way for cluster admin to force entering to mantanince mode? (Force kill all existed jobs)?

I think a force maintenance still could be implemented. If somebody wants/needs this. The intend of the current implementation is a graceful drain of running jobs. The "pending" stzate is the transition, same like if you send a SIGINT to a Celery worker, then also the worker does not pull new jobs (stps consuming from queue) but will attempt to complete all jobs and then terminate.

Yeah, if there is a urgent demand (that is how I do during testing to be faster) I check the jobs list page and then find the task in execution and mark as failed/success. Not a single click solution but basically a manual workaround. Not often used by me, just in testing :-D

niklasr22 pushed a commit to niklasr22/airflow that referenced this pull request Feb 8, 2025
* Initial implementation

* fix import

* fix brackets

* try harder

* fix name

* change to get request

* debug print

* try-except

* new return type

* fix expose

* use str in db

* link fix

* fix variable length

* MAINTENANCE_REQUEST

* add valid return

* add valid return statement

* try different naming

* more debug print

* extend enum

* new button to enter maintenance mode

* Implement maintenance off

* try new redirect

* fix redirect url

* commit sesion

* try another endpoint

* remove debug print

* exit print

* static checks

* Initial implementation

* fix import

* fix brackets

* try harder

* fix name

* change to get request

* debug print

* try-except

* new return type

* fix expose

* use str in db

* link fix

* fix variable length

* MAINTENANCE_REQUEST

* add valid return

* add valid return statement

* try different naming

* more debug print

* extend enum

* new button to enter maintenance mode

* Implement maintenance off

* try new redirect

* fix redirect url

* commit sesion

* try another endpoint

* remove debug print

* exit print

* static checks

* add new colors for maintenance modes

* modfiy host html

* fix quote

* remove more debug prints

* fix mypy

* another mypy fix

* final mypy fix

* add pytests

* Update edge worker versions

* fix spelling

* update versions

* create pydantic class

* fix pytest

* update docs

* apply review findings

* moved logic from plugins

* immidiate hertbeat if state cahgnes

* openapi fix

* fix condition

* exclude csfr checks

* return WorkerSetStateReturn by worker_set_state

* add debug print

* try new return

* fix heartbeat state

* fix expresiion

* fix variable isue

* fix logic

* fix pytest

* fix pytest

* minor fix

* fix airflow 3 compatibility

---------

Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <donat.majoros2@hu.bosch.com>
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 17, 2025
* Initial implementation

* fix import

* fix brackets

* try harder

* fix name

* change to get request

* debug print

* try-except

* new return type

* fix expose

* use str in db

* link fix

* fix variable length

* MAINTENANCE_REQUEST

* add valid return

* add valid return statement

* try different naming

* more debug print

* extend enum

* new button to enter maintenance mode

* Implement maintenance off

* try new redirect

* fix redirect url

* commit sesion

* try another endpoint

* remove debug print

* exit print

* static checks

* Initial implementation

* fix import

* fix brackets

* try harder

* fix name

* change to get request

* debug print

* try-except

* new return type

* fix expose

* use str in db

* link fix

* fix variable length

* MAINTENANCE_REQUEST

* add valid return

* add valid return statement

* try different naming

* more debug print

* extend enum

* new button to enter maintenance mode

* Implement maintenance off

* try new redirect

* fix redirect url

* commit sesion

* try another endpoint

* remove debug print

* exit print

* static checks

* add new colors for maintenance modes

* modfiy host html

* fix quote

* remove more debug prints

* fix mypy

* another mypy fix

* final mypy fix

* add pytests

* Update edge worker versions

* fix spelling

* update versions

* create pydantic class

* fix pytest

* update docs

* apply review findings

* moved logic from plugins

* immidiate hertbeat if state cahgnes

* openapi fix

* fix condition

* exclude csfr checks

* return WorkerSetStateReturn by worker_set_state

* add debug print

* try new return

* fix heartbeat state

* fix expresiion

* fix variable isue

* fix logic

* fix pytest

* fix pytest

* minor fix

* fix airflow 3 compatibility

---------

Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <donat.majoros2@hu.bosch.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIP-69 Edge Executor area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3 type:new-feature Changelog: New Features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants