Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Feb 18, 2025

closes: #45427

Why?

  • As part of porting all executors to use task SDK and cut down on the tasks from using DB access, KubernetesExecutor is the last one.
  • We want the K8s workers (pods) to use the new supervisor task runner machinery instead of running "airflow tasks run..."
  • The workers will now communicate with the api server which can be deployed anywhere instead of talking to the DB directly.

Approach

Changes to base executor

The interface for "CommandType" has been changed to accept either a sequence string or workloads.ExecuteTask which will be useful for KubernetesExecutor.

No interface change for CommandType, we want to make it a totally different interface -- what was done for CeleryExecutor was this:

# TODO: Task-SDK: This check is transitionary. Remove once all executors are ported over.
# Has a real queue_activity implemented
if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
workload = workloads.ExecuteTask.make(ti)
executor.queue_workload(workload, session=session)
continue

So we should instead add a new impl like this

def queue_workload(self, workload: workloads.All, session: Session | None) -> None:
from airflow.executors import workloads
if not isinstance(workload, workloads.ExecuteTask):
raise RuntimeError(f"{type(self)} cannot handle workloads of type {type(workload)}")
ti = workload.ti
self.queued_tasks[ti.key] = workload

And we can have different paths for using the KE on v2 vs for v3.

Changes to K8sExecutor

The queue_workload is simple, it just enqueues the "workload"
_process_workloads calls execute_async for every workload present. And it also pops it from queued_tasks and adds it to running

  • execute_async does the usual as it did earlier.

Changes to K8sExecutor utils

  • We have a utility: run_next which is supposed to run the "next" task in the queue in a pod.
  • We continue to do that, but now we have a workload and we need to run it in a pod.
token='dummy-token' ti=TaskInstance(id=UUID('0195178d-1863-7be6-b504-6563a746c9e5'), task_id='print_date', dag_id='tutorial', run_id='manual__2025-02-18T05:34:37.633253+00:00_Uvod17gj', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=3) dag_rel_path=PurePosixPath('tutorial.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=tutorial/run_id=manual__2025-02-18T05:34:37.633253+00:00_Uvod17gj/task_id=print_date/attempt=1.log' kind='ExecuteTask'
  • We need to send this down to a pod and then it needs to run it using the new task sdk machinery using supervisor.
  • Writing this to CLI is not a great option, as it has a token in it which is not the best option to write to CLI.
  • Instead, we serialise the input and mount it in the filesystem of the created pod. And run something like:
            command = [
                "python",
                "-m",
                "airflow.sdk.execution_time.execute_workload",
                "/tmp/execute/input.json",
            ]

Where execute_workload is a new module (discussed later)

Changes to the pod generator

  • We now accept an input: content_json_for_volume
  • If populated, we create:
    EmptyDir volume
    A volume mount at: /tmp/execute/input.json
    Dump the content into this file as received from the K8sExecutor utils
  • We also modify the entrypoint now not to run the /entrypoint

New module: airflow.sdk.execution_time.execute_workload

  • Simple new module that executes an Airflow task using the workload json provided by a input file.
  • It deserialises the json mount, converts to python json object
  • Passes it down to supervise function:
    supervise(
        # This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this.
        ti=workload.ti,  # type: ignore[arg-type]
        dag_rel_path=workload.dag_rel_path,
        bundle_info=workload.bundle_info,
        token=workload.token,
        # fallback to internal cluster service for api server
        server=conf.get(
            "workers",
            "execution_api_server_url",
            fallback="http://airflow-api-server.airflow.svc.cluster.local:9091/execution/",
        ),
        log_path=workload.log_path,
    )

Testing Setup (useful setup for anyone wanting to run a hybrid K8s Exec development)

  • Run airflow services locally, but also install it on K8s, maybe using breeze.
  • I have a kind cluster set up. I installed airflow on K8s using this:
(airflow) ➜  airflow git:(AIP72-port-kubeexecutor-task-sdk) ✗ breeze k8s build-k8s-image
(airflow) ➜  airflow git:(AIP72-port-kubeexecutor-task-sdk) ✗ breeze k8s upload-k8s-image
(airflow) ➜  airflow git:(AIP72-port-kubeexecutor-task-sdk) ✗ breeze k8s deploy-airflow --multi-namespace-mode --executor KubernetesExecutor
  • Install on K8s but do not use that airflow. Use local airflow.

  • We will leverage and try and connect to the database present on K8s so that we interact with one Db, locally and for workers on K8s.

  • To do this, port forward the postgres service on K8s to local machine, on 5432.

  • Export these set of env variables:

(airflow) ➜  airflow git:(AIP72-port-kubeexecutor-task-sdk) ✗ export AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
export AIRFLOW__KUBERNETES_EXECUTOR__POD_TEMPLATE_FILE=~/Documents/OSS/airflow/pod_template_file.yaml
export AIRFLOW__KUBERNETES_EXECUTOR__IN_CLUSTER=False
export AIRFLOW__KUBERNETES_EXECUTOR__CONFIG_FILE=/Users/amoghdesai/Documents/OSS/airflow/.build/.k8s-clusters/airflow-python-3.9-v1.29.12/.kubeconfig
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:password@localhost:5432/postgres

export AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE=airflow
  • Run 3 services: airflow scheduler, airflow dag-processor, airflow fastapi-api --apps all in 3 terminals
    image

  • Check if you can access to the web UI on localhost:9091 or localhost:29091

Note:

  1. ~/Documents/OSS/airflow/pod_template_file.yaml is my local pod template file that I got from K8s and stripped off few things like DB access: (commented these):
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-metadata
              key: connection
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: airflow-metadata
              key: connection
        - name: AIRFLOW__WEBSERVER__SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: airflow-webserver-secret-key
              key: webserver-secret-key
  1. /Users/amoghdesai/Documents/OSS/airflow/.build/.k8s-clusters/airflow-python-3.9-v1.29.12/.kubeconfig is the kubeconfig to my cluster.
  2. In cluster setting has to be false.

Testing results

DAG being used:

from airflow.decorators import dag
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime


@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
def sleep_dag():
    t1 = BashOperator(
        task_id="sleep_10_minutes",
        bash_command="sleep 600",
    )


sleep_dag()
  • With the above setup, we can go to the new web UI and launch tasks for run
  • Scheduler logs will look like this:
[2025-02-19T17:16:11.899+0530] {dag.py:2432} INFO - Setting next_dagrun for sleep_dag to None, run_after=None
Dag run  in running state
Dag information Queued at: 2025-02-19 11:46:11.881249+00:00 version: sleep_dag-1
[2025-02-19T17:16:11.958+0530] {scheduler_job_runner.py:385} INFO - 1 tasks up for execution:
	<TaskInstance: sleep_dag.sleep_10_minutes scheduled__2021-01-01T00:00:00+00:00 [scheduled]>
[2025-02-19T17:16:11.959+0530] {scheduler_job_runner.py:457} INFO - DAG sleep_dag has 0/16 running and queued tasks
[2025-02-19T17:16:11.959+0530] {scheduler_job_runner.py:595} INFO - Setting the following tasks to queued state:
	<TaskInstance: sleep_dag.sleep_10_minutes scheduled__2021-01-01T00:00:00+00:00 [scheduled]>
[2025-02-19T17:16:11.961+0530] {scheduler_job_runner.py:704} INFO - Trying to enqueue tasks: [<TaskInstance: sleep_dag.sleep_10_minutes scheduled__2021-01-01T00:00:00+00:00 [scheduled]>] for executor: KubernetesExecutor(parallelism=32)
[2025-02-19T17:16:11.969+0530] {kubernetes_executor.py:259} INFO - Add task TaskInstanceKey(dag_id='sleep_dag', task_id='sleep_10_minutes', run_id='scheduled__2021-01-01T00:00:00+00:00', try_number=1, map_index=-1) with command [ExecuteTask(token='placeholder-token', ti=TaskInstance(id=UUID('01951e07-a338-70e5-8956-a483e6656387'), task_id='sleep_10_minutes', dag_id='sleep_dag', run_id='scheduled__2021-01-01T00:00:00+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1), dag_rel_path=PosixPath('sleep_dag.py'), bundle_info=BundleInfo(name='dags-folder', version=None), log_path='dag_id=sleep_dag/run_id=scheduled__2021-01-01T00:00:00+00:00/task_id=sleep_10_minutes/attempt=.log', kind='ExecuteTask')]
[2025-02-19T17:16:12.004+0530] {kubernetes_executor_utils.py:433} INFO - Creating kubernetes pod for job is TaskInstanceKey(dag_id='sleep_dag', task_id='sleep_10_minutes', run_id='scheduled__2021-01-01T00:00:00+00:00', try_number=1, map_index=-1), with pod name sleep-dag-sleep-10-minutes-komv025p, annotations: <omitted>
[2025-02-19T17:16:12.021+0530] {scheduler_job_runner.py:744} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='sleep_dag', task_id='sleep_10_minutes', run_id='scheduled__2021-01-01T00:00:00+00:00', try_number=1, map_index=-1)
[2025-02-19T17:16:12.022+0530] {kubernetes_executor_utils.py:269} INFO - Event: sleep-dag-sleep-10-minutes-komv025p Pending, annotations: <omitted>
[2025-02-19T17:16:12.026+0530] {kubernetes_executor_utils.py:269} INFO - Event: sleep-dag-sleep-10-minutes-komv025p Pending, annotations: <omitted>
[2025-02-19T17:16:12.033+0530] {scheduler_job_runner.py:776} INFO - Setting external_id for <TaskInstance: sleep_dag.sleep_10_minutes scheduled__2021-01-01T00:00:00+00:00 [queued]> to 4
[2025-02-19T17:16:12.034+0530] {kubernetes_executor_utils.py:269} INFO - Event: sleep-dag-sleep-10-minutes-komv025p Pending, annotations: <omitted>
[2025-02-19T17:16:15.360+0530] {kubernetes_executor_utils.py:269} INFO - Event: sleep-dag-sleep-10-minutes-komv025p Pending, annotations: <omitted>
[2025-02-19T17:16:16.363+0530] {kubernetes_executor_utils.py:293} INFO - Event: sleep-dag-sleep-10-minutes-komv025p is Running, annotations: <omitted>
[2025-02-19T17:16:20.369+0530] {kubernetes_executor_utils.py:293} INFO - Event: sleep-dag-sleep-10-minutes-komv025p is Running, annotations: <omitted>
[2025-02-19T17:16:21.419+0530] {kubernetes_executor_utils.py:278} INFO - Event: sleep-dag-sleep-10-minutes-komv025p Succeeded, annotations: <omitted>
[2025-02-19T17:16:22.009+0530] {kubernetes_executor.py:321} INFO - Changing state of (TaskInstanceKey(dag_id='sleep_dag', task_id='sleep_10_minutes', run_id='scheduled__2021-01-01T00:00:00+00:00', try_number=1, map_index=-1), None, 'sleep-dag-sleep-10-minutes-komv025p', 'airflow', '161679') to None
[2025-02-19T17:16:22.010+0530] {kubernetes_executor_utils.py:449} INFO - Deleting pod sleep-dag-sleep-10-minutes-komv025p in namespace airflow
[2025-02-19T17:16:22.030+0530] {kubernetes_executor_utils.py:278} INFO - Event: sleep-dag-sleep-10-minutes-komv025p Succeeded, annotations: <omitted>
[2025-02-19T17:16:22.031+0530] {kubernetes_executor.py:422} INFO - Deleted pod associated with the TI TaskInstanceKey(dag_id='sleep_dag', task_id='sleep_10_minutes', run_id='scheduled__2021-01-01T00:00:00+00:00', try_number=1, map_index=-1). Pod name: sleep-dag-sleep-10-minutes-komv025p. Namespace: airflow
  • DAG Run
    image

  • The supervisor and client logs are present on stdout of pod, but the task runner logs are written to a file and we dont get it as of now by design of KubeExecutor: a) Currently kube executor when a pod is running streams out the stdout to logs b) Also logs it to a file inside filesystem: /opt/airflow/logs which can later be sent to either PVC or s3 etc once pod dies.

Logs example:

Message: '[%s] Executing workload in Kubernetes: %s'
Arguments: (ExecuteTask(token='placeholder-token', ti=TaskInstance(id=UUID('01951e1b-a7d3-729c-8bec-41e430704330'), task_id='extract', dag_id='tutorial_dag', run_id='manual__2025-02-19T12:08:03.770791+00:00_MHdToaCL', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=3), dag_rel_path=PurePosixPath('tutorial_dag.py'), bundle_info=BundleInfo(name='example_dags', version=None), log_path='dag_id=tutorial_dag/run_id=manual__2025-02-19T12:08:03.770791+00:00_MHdToaCL/task_id=extract/attempt=.log', kind='ExecuteTask'),)
[2025-02-19T12:08:11.276+0000] {_config.py:80} DEBUG - load_ssl_context verify=True cert=None trust_env=True http2=False
[2025-02-19T12:08:11.277+0000] {_config.py:146} DEBUG - load_verify_locations cafile='/home/airflow/.local/lib/python3.9/site-packages/certifi/cacert.pem'
[2025-02-19T12:08:11.297+0000] {_trace.py:47} DEBUG - connect_tcp.started host='airflow-api-server' port=9091 local_address=None timeout=5.0 socket_options=None
[2025-02-19T12:08:11.298+0000] {_trace.py:47} DEBUG - connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0xffff87450eb0>
[2025-02-19T12:08:11.299+0000] {_trace.py:47} DEBUG - send_request_headers.started request=<Request [b'PATCH']>
[2025-02-19T12:08:11.299+0000] {_trace.py:47} DEBUG - send_request_headers.complete
[2025-02-19T12:08:11.299+0000] {_trace.py:47} DEBUG - send_request_body.started request=<Request [b'PATCH']>
[2025-02-19T12:08:11.299+0000] {_trace.py:47} DEBUG - send_request_body.complete
[2025-02-19T12:08:11.299+0000] {_trace.py:47} DEBUG - receive_response_headers.started request=<Request [b'PATCH']>
[2025-02-19T12:08:11.308+0000] {_trace.py:47} DEBUG - receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'date', b'Wed, 19 Feb 2025 12:08:10 GMT'), (b'server', b'uvicorn'), (b'content-type', b'application/json'), (b'content-encoding', b'gzip'), (b'vary', b'Accept-Encoding'), (b'transfer-encoding', b'chunked')])
[2025-02-19T12:08:11.309+0000] {_client.py:1026} INFO - HTTP Request: PATCH http://airflow-api-server:9091/execution/task-instances/01951e1b-a7d3-729c-8bec-41e430704330/run "HTTP/1.1 200 OK"
[2025-02-19T12:08:11.309+0000] {_trace.py:47} DEBUG - receive_response_body.started request=<Request [b'PATCH']>
[2025-02-19T12:08:11.309+0000] {_trace.py:47} DEBUG - receive_response_body.complete
[2025-02-19T12:08:11.309+0000] {_trace.py:47} DEBUG - response_closed.started
[2025-02-19T12:08:11.309+0000] {_trace.py:47} DEBUG - response_closed.complete
2025-02-19 12:08:11 [debug    ] Sending                        [supervisor] msg=StartupDetails(ti=TaskInstance(id=UUID('01951e1b-a7d3-729c-8bec-41e430704330'), task_id='extract', dag_id='tutorial_dag', run_id='manual__2025-02-19T12:08:03.770791+00:00_MHdToaCL', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=3), dag_rel_path='tutorial_dag.py', bundle_info=BundleInfo(name='example_dags', version=None), requests_fd=12, start_date=datetime.datetime(2025, 2, 19, 12, 8, 11, 295427, tzinfo=datetime.timezone.utc), ti_context=TIRunContext(dag_run=DagRun(dag_id='tutorial_dag', run_id='manual__2025-02-19T12:08:03.770791+00:00_MHdToaCL', logical_date=None, data_interval_start=None, data_interval_end=None, run_after=datetime.datetime(2025, 2, 19, 12, 8, 3, 770791, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2025, 2, 19, 12, 8, 3, 902978, tzinfo=TzInfo(UTC)), end_date=None, clear_number=0, run_type=<DagRunType.MANUAL: 'manual'>, conf={}, external_trigger=True), task_reschedule_count=0, max_tries=2, variables=[], connections=[], upstream_map_indexes=None), type='StartupDetails')
[2025-02-19T12:08:11.332+0000] {_trace.py:47} DEBUG - send_request_headers.started request=<Request [b'PUT']>
[2025-02-19T12:08:11.333+0000] {_trace.py:47} DEBUG - send_request_headers.complete
[2025-02-19T12:08:11.333+0000] {_trace.py:47} DEBUG - send_request_body.started request=<Request [b'PUT']>
[2025-02-19T12:08:11.333+0000] {_trace.py:47} DEBUG - send_request_body.complete
[2025-02-19T12:08:11.333+0000] {_trace.py:47} DEBUG - receive_response_headers.started request=<Request [b'PUT']>
[2025-02-19T12:08:11.337+0000] {_trace.py:47} DEBUG - receive_response_headers.complete return_value=(b'HTTP/1.1', 204, b'No Content', [(b'date', b'Wed, 19 Feb 2025 12:08:10 GMT'), (b'server', b'uvicorn'), (b'content-type', b'application/json')])
[2025-02-19T12:08:11.337+0000] {_client.py:1026} INFO - HTTP Request: PUT http://airflow-api-server:9091/execution/task-instances/01951e1b-a7d3-729c-8bec-41e430704330/heartbeat "HTTP/1.1 204 No Content"
[2025-02-19T12:08:11.337+0000] {_trace.py:47} DEBUG - receive_response_body.started request=<Request [b'PUT']>
[2025-02-19T12:08:11.337+0000] {_trace.py:47} DEBUG - receive_response_body.complete
[2025-02-19T12:08:11.337+0000] {_trace.py:47} DEBUG - response_closed.started
[2025-02-19T12:08:11.337+0000] {_trace.py:47} DEBUG - response_closed.complete
  • To get the task runner logs, shell into the pod and try:
airflow@sleep-dag-sleep-10-minutes-iw4xwbnq:/opt/airflow/logs/dag_id=sleep_dag/run_id=manual__2025-02-19T12:00:17.341745+00:00_yPCDmEG1/task_id=sleep_10_minutes$ cat attempt\=1.log 
{"timestamp":"2025-02-19T12:00:24.898884","level":"debug","event":"Loading plugins","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:24.899020","level":"debug","event":"Loading plugins from directory: /opt/airflow/plugins","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:24.899108","level":"debug","event":"Loading plugins from entrypoints","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:24.907369","level":"debug","event":"Importing entry_point plugin hive","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:24.915277","level":"debug","event":"Importing entry_point plugin openlineage","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:25.231096","level":"debug","event":"Importing entry_point plugin hive","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:25.235119","level":"debug","event":"Importing entry_point plugin databricks_workflow","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:25.397873","level":"debug","event":"Adding <function default_action_log at 0xffffa72a7e50> to pre execution callback","logger":"airflow.utils.cli_action_loggers"}
{"timestamp":"2025-02-19T12:00:25.534654","level":"debug","event":"Importing entry_point plugin edge_executor","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:25.578153","level":"debug","event":"Importing entry_point plugin openlineage","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:25.581951","level":"debug","event":"Importing entry_point plugin databricks_workflow","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:25.582307","level":"debug","event":"Importing entry_point plugin edge_executor","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:25.582569","level":"debug","event":"Loading 4 plugin(s) took 683.56 seconds","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-19T12:00:25.582625","level":"debug","event":"Calling 'on_starting' with {'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at 0xffffb5adc340>}","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-19T12:00:25.582765","level":"debug","event":"Hook impls: []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-19T12:00:25.582814","level":"debug","event":"Result from 'on_starting': []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-19T12:00:25.583702","level":"info","event":"DAG bundles loaded: dags-folder","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager"}
{"timestamp":"2025-02-19T12:00:25.583818","level":"info","event":"Filling up the DagBag from /opt/airflow/dags/sleep_dag.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-19T12:00:25.584009","level":"debug","event":"Importing /opt/airflow/dags/sleep_dag.py","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-19T12:00:25.611879","level":"debug","event":"Loaded DAG <DAG: sleep_dag>","logger":"airflow.models.dagbag.DagBag"}
{"timestamp":"2025-02-19T12:00:25.612021","level":"debug","event":"DAG file parsed","file":"sleep_dag.py","logger":"task"}
{"timestamp":"2025-02-19T12:00:25.616929","level":"debug","event":"Sending request","json":"{\"rendered_fields\":{\"bash_command\":\"sleep 600\",\"env\":null,\"cwd\":null},\"type\":\"SetRenderedFields\"}\n","logger":"task"}
{"timestamp":"2025-02-19T12:00:25.617083","level":"debug","event":"Calling 'on_task_instance_running' with {'previous_state': <TaskInstanceState.QUEUED: 'queued'>, 'task_instance': RuntimeTaskInstance(id=UUID('01951e14-89d1-715c-9544-50f7d67f3b04'), task_id='sleep_10_minutes', dag_id='sleep_dag', run_id='manual__2025-02-19T12:00:17.341745+00:00_yPCDmEG1', try_number=1, map_index=-1, hostname='sleep-dag-sleep-10-minutes-iw4xwbnq', task=<Task(BashOperator): sleep_10_minutes>, max_tries=0, start_date=datetime.datetime(2025, 2, 19, 12, 0, 24, 855227, tzinfo=TzInfo(UTC)))}","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-19T12:00:25.617121","level":"debug","event":"Hook impls: []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-19T12:00:25.617149","level":"debug","event":"Result from 'on_task_instance_running': []","logger":"airflow.listeners.listener"}
{"timestamp":"2025-02-19T12:00:25.617220","level":"warning","event":"BashOperator.execute cannot be called outside TaskInstance!","logger":"airflow.task.operators.airflow.providers.standard.operators.bash.BashOperator"}
{"timestamp":"2025-02-19T12:00:25.617442","level":"debug","event":"Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='sleep_dag' AIRFLOW_CTX_TASK_ID='sleep_10_minutes' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-02-19T12:00:17.341745+00:00_yPCDmEG1'","logger":"airflow.task.operators.airflow.providers.standard.operators.bash.BashOperator"}
{"timestamp":"2025-02-19T12:00:25.617646","level":"info","event":"Tmp dir root location: /tmp","logger":"airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"}
{"timestamp":"2025-02-19T12:00:25.617845","level":"info","event":"Running command: ['/usr/bin/bash', '-c', 'sleep 600']","logger":"airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"}
{"timestamp":"2025-02-19T12:00:25.626277","level":"info","event":"Output:","logger":"airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"}

Once a job is complete, you will not see any logs:
image

  • Logs inside task execution API server:
[2025-02-19T12:00:24.869+0000] {task_instances.py:136} INFO - Task with queued state started on sleep-dag-sleep-10-minutes-iw4xwbnq
[2025-02-19T12:00:24.870+0000] {task_instances.py:148} INFO - TI 01951e14-89d1-715c-9544-50f7d67f3b04 state updated: 1 row(s) affected
[2025-02-19T12:00:24.872+0000] {task_instances.py:175} INFO - Clearing xcom data for task id: 01951e14-89d1-715c-9544-50f7d67f3b04
INFO:     10.244.1.236:58260 - "PATCH /execution/task-instances/01951e14-89d1-715c-9544-50f7d67f3b04/run HTTP/1.1" 200 OK
[2025-02-19T12:00:24.901+0000] {task_instances.py:406} DEBUG - Task with running state heartbeated
INFO:     10.244.1.236:58260 - "PUT /execution/task-instances/01951e14-89d1-715c-9544-50f7d67f3b04/heartbeat HTTP/1.1" 204 No Content
[2025-02-19T12:00:25.635+0000] {retries.py:95} DEBUG - Running RenderedTaskInstanceFields.write with retries. Try 1 of 3
[2025-02-19T12:00:25.640+0000] {retries.py:95} DEBUG - Running RenderedTaskInstanceFields._do_delete_old_records with retries. Try 1 of 3
INFO:     10.244.1.236:58260 - "PUT /execution/task-instances/01951e14-89d1-715c-9544-50f7d67f3b04/rtif HTTP/1.1" 201 Created
[2025-02-19T12:00:30.650+0000] {task_instances.py:406} DEBUG - Task with running state heartbeated
INFO:     10.244.1.236:40882 - "PUT /execution/task-instances/01951e14-89d1-715c-9544-50f7d67f3b04/heartbeat HTTP/1.1" 204 No Content
[2025-02-19T12:00:34.054+0000] {platform.py:61} DEBUG - [Errno 2] No such file or directory: '/opt/airflow/airflow/git_version'
[2025-02-19T12:10:25.637+0000] {xcoms.py:283} DEBUG - Checking write XCom access for xcom from TaskInstance with key 'test_key' to XCom 'return_value'
INFO:     10.244.1.236:42626 - "POST /execution/xcoms/sleep_dag/manual__2025-02-19T12%3A00%3A17.341745%2B00%3A00_yPCDmEG1/sleep_10_minutes/return_value HTTP/1.1" 201 Created
[2025-02-19T12:10:25.653+0000] {task_instances.py:339} INFO - TI 01951e14-89d1-715c-9544-50f7d67f3b04 state updated to success: 1 row(s) affected

What's next?


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:Executors-core LocalExecutor & SequentialExecutor area:task-sdk provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels Feb 18, 2025
@ashb
Copy link
Member

ashb commented Feb 18, 2025

The interface for "CommandType" has been changed to accept either a sequence string or workloads.ExecuteTask which will be useful for KubernetesExecutor.

We shouldn't really be changing it to be a union, we want to make it a totally different interface -- what I did in CeleryExecutor was this:

# TODO: Task-SDK: This check is transitionary. Remove once all executors are ported over.
# Has a real queue_activity implemented
if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
workload = workloads.ExecuteTask.make(ti)
executor.queue_workload(workload, session=session)
continue

So we should instead add a new impl like this

def queue_workload(self, workload: workloads.All, session: Session | None) -> None:
from airflow.executors import workloads
if not isinstance(workload, workloads.ExecuteTask):
raise RuntimeError(f"{type(self)} cannot handle workloads of type {type(workload)}")
ti = workload.ti
self.queued_tasks[ti.key] = workload

And we can have different paths for using the KE on v2 vs for v3

@amoghrajesh
Copy link
Contributor Author

The interface for "CommandType" has been changed to accept either a sequence string or workloads.ExecuteTask which will be useful for KubernetesExecutor.

We shouldn't really be changing it to be a union, we want to make it a totally different interface -- what I did in CeleryExecutor was this:

# TODO: Task-SDK: This check is transitionary. Remove once all executors are ported over.
# Has a real queue_activity implemented
if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
workload = workloads.ExecuteTask.make(ti)
executor.queue_workload(workload, session=session)
continue

So we should instead add a new impl like this

def queue_workload(self, workload: workloads.All, session: Session | None) -> None:
from airflow.executors import workloads
if not isinstance(workload, workloads.ExecuteTask):
raise RuntimeError(f"{type(self)} cannot handle workloads of type {type(workload)}")
ti = workload.ti
self.queued_tasks[ti.key] = workload

And we can have different paths for using the KE on v2 vs for v3

Yeah you are right, I already implemented the change you suggested from the very beginning. But looks like for some reason I got driven to make this change, probably it was mypy that made me do it! Fixing it

@amoghrajesh amoghrajesh self-assigned this Feb 19, 2025
@amoghrajesh amoghrajesh force-pushed the AIP72-port-kubeexecutor-task-sdk branch from dea6605 to 34968ec Compare February 19, 2025 11:33
@amoghrajesh amoghrajesh requested a review from ashb February 19, 2025 14:43
@amoghrajesh
Copy link
Contributor Author

Waiting for a green CI now!

@amoghrajesh
Copy link
Contributor Author

I think i fixed the static check now, lets see how it goes!

@amoghrajesh amoghrajesh requested a review from ashb February 19, 2025 19:41
@amoghrajesh
Copy link
Contributor Author

Thanks for the approvals, @ashb and @o-nikolas! Merging this one

@amoghrajesh amoghrajesh merged commit 56e083f into apache:main Feb 20, 2025
72 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-port-kubeexecutor-task-sdk branch February 20, 2025 06:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Executors-core LocalExecutor & SequentialExecutor area:task-sdk provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Convert the KubernetesExecutor to run tasks using new Task SDK supervisor code

4 participants