-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Start building the replacement task runner for Task Execution SDK #43893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jscheffl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow. This is a large one and probably I need a second pass to understand more. Defintely this is not an easy read.
Is there a way to have a example task execution being "working" with the classes here? (Assume a Celery or similar integration would be a follow-up PR?)
|
do you plan to make cause I have multiple def clear_dag_runs(dag_id, status_to_clear):
context = get_current_context()
session = settings.Session()
query = session.query(DagRun).filter(
DagRun.state == status_to_clear, DagRun.dag_id == dag_id)
rst = query.all()
dag_bag = DagBag(dag_folder=path.join(SRC_FOLDER, 'dags'), include_examples=False)
dag: DAG = dag_bag.get_dag(dag_id, session)
for dag_run in rst:
dag.clear(
start_date=dag_run.logical_date,
end_date=dag_run.logical_date,
task_ids=None,
include_subdags=True,
include_parentdag=True,
only_failed=False,
)
session.close() |
@raphaelauv No plans today. This is not something in scope for AIP-72 (nor is it possible today) |
c4ffd32 to
96ce156
Compare
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Worth splitting certain methods/func into more granular funcs and adding more docstrings wherever you can.
For sure, I will do that before merging and ping you tomrorow for another look |
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nits, more tests and we are good to go
e1cbfd4 to
f48edfd
Compare
d2af0a5 to
50a0d12
Compare
The eventual goal if this "airflow.sdk.exeuction_time" package is to replace LocalTaskJob and StandardTaskRunner, but at this stage it co-exists with it's replacement. As this PR is not a complete re-implementation of all the features that exist currently (no handling of task level callbacks yet, no AirflowSkipException etc.) the current tests are skeleton at best. Once we get closer to feature parity (in future PRs) the tests will grow to match. This supervisor and task runner operates slightly differently to the current classes in these ways **Logs from the subprocess are send over a different channel to stdout/stderr** This makes the task supervisor a little bit more complex as it now has to read stdout, stderr and a logs channel. The advantage of this approach is that it makes the logs setup in the task process itself markedly simpler -- all it has to do is write logs output to the custom file handle as JSON and it will show up "natively" as logs. structlog has been chosen as the logging engine over stdlib's own logging as the ability to have structured fields in the logs is nice, and stdlib is configured to send logs to a stuctlog processor. **Direct database access is replaced with an HTTP API client** This is the crux of this feature and of AIP-72 in general -- tasks run via this runner can no longer access DB models or DB session directly. This PR doesn't yet implement the code/shims to make `Connection.get_connection_from_secrets` use this client yet - that will be future work. The reason tasks don't speak directly to the API server is primarily for two reasons: 1. The supervisor process already needs to maintain an http session in order to report the task as started, to heart beat it, and to mark it as finished; and so because of that 2. Reduce the number of active HTTP connections for tasks to 1 per task (instead of 2 per task). THe other reason we have this interface is that DAG parsing code will very soon need to be updated to not have direct DB access either, and having this "in process" interface ability already means that we can support commands like `airflow dags reserialize` without having a running API server. The API client itself is not auto-generated: I tried a number of different client generates based on the OpenAPI spec and found them all lacking or buggy in different ways, and the http client side itself is very simple, the only interesting/difficult bit is the generation of the datamodels from the OpenAPI spec which I found one that msgspec was chosen over Pydantic as it is much lighter weight (and thus quicker), especially on a client side when we have next to no validation requirements of response data. I admit that I have not benchmarked it specifically though.
6118a7f to
952d078
Compare
The eventual goal of this "airflow.sdk.exeuction_time" package is to replace
LocalTaskJob and StandardTaskRunner, but at this stage it co-exists with it's
replacement.
As this PR is not a complete re-implementation of all the features that exist
currently (no handling of task level callbacks yet, no AirflowSkipException
etc.) the current tests are skeleton at best. Once we get closer to feature
parity (in future PRs) the tests will grow to match.
This supervisor and task runner operates slightly differently to the current
classes in these ways
Logs from the subprocess are send over a different channel to stdout/stderr
This makes the task supervisor a little bit more complex as it now has to
read stdout, stderr and a logs channel. The advantage of this approach is
that it makes the logs setup in the task process itself markedly simpler --
all it has to do is write logs output to the custom file handle as JSON and
it will show up "natively" as logs.
structlog has been chosen as the logging engine over stdlib's own logging as
the ability to have structured fields in the logs is nice, and stdlib is
configured to send logs to a stuctlog processor.
Direct database access is replaced with an HTTP API client
This is the crux of this feature and of AIP-72 in general -- tasks run via
this runner can no longer access DB models or DB session directly. This PR
doesn't yet implement the code/shims to make
Connection.get_connection_from_secretsuse this client yet - that will be future work.
The reason tasks don't speak directly to the API server is primarily for two
reasons:
to report the task as started, to heart beat it, and to mark it as
finished; and so because of that
(instead of 2 per task).
THe other reason we have this interface is that DAG parsing code will very
soon need to be updated to not have direct DB access either, and having this
"in process" interface ability already means that we can support commands like
airflow dags reserializewithout having a running API server.The API client itself is not auto-generated: I tried a number of different
client generates based on the OpenAPI spec and found them all lacking or buggy
in different ways, and the http client side itself is very simple, the only
interesting/difficult bit is the generation of the datamodels from the OpenAPI
spec which I found one that
msgspec was chosen over Pydantic as it is much lighter weight (and thus
quicker), especially on a client side when we have next to no validation
requirements of response data. I admit that I have not benchmarked it
specifically though.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.