From 7122b9d934e7af244372046c0636a3d769951f64 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 28 May 2025 17:08:51 +0530 Subject: [PATCH 1/3] Add user impersonation (run_as_user) support for task execution --- .../airflow/sdk/execution_time/task_runner.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index cb8040278c474..10d272f222369 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -703,6 +703,34 @@ def send_request(self, log: Logger, msg: SendMsgType): # 3. Shutdown and report status +def impersonate_user(username: str, log: Logger): + """ + Impersonate as the specified user by changing the process's UID and GID. + + Helper to attempt to set privileges from the current user (root) to the provided username in the task + by setting the effective UID and GID. + + Example: + impersonate_user("airflowuser") + """ + import pwd + + try: + pw_record = pwd.getpwnam(username) + uid, gid = pw_record.pw_uid, pw_record.pw_gid + + os.setgid(uid) + os.setuid(gid) + + log.info("Running task as impersonated user", impersonated_user=username) + except KeyError: + log.warning("User not found on the worker; skipping impersonation", username=username) + except PermissionError as e: + log.error("Permission denied while trying impersonation", username=username, error=str(e)) + except Exception as e: + log.error("Unexpected error during impersonation", error=str(e)) + + def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: msg = SUPERVISOR_COMMS.get_message() @@ -727,6 +755,11 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: ti = parse(msg, log) ti.log_url = get_log_url_from_ti(ti) log.debug("DAG file parsed", file=msg.dag_rel_path) + + run_as_user = getattr(ti.task, "run_as_user", None) + if run_as_user: + impersonate_user(run_as_user, log) + else: raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}") From 2d27fb2e236ad5525a7c7815162a94130981072b Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 28 May 2025 19:45:07 +0530 Subject: [PATCH 2/3] set gid for gid and uid for uid --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 10d272f222369..0cf9afdedc15d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -719,8 +719,8 @@ def impersonate_user(username: str, log: Logger): pw_record = pwd.getpwnam(username) uid, gid = pw_record.pw_uid, pw_record.pw_gid - os.setgid(uid) - os.setuid(gid) + os.setuid(uid) + os.setgid(gid) log.info("Running task as impersonated user", impersonated_user=username) except KeyError: From ac3c55b41b76dcb053ddadcde37b3871979e395f Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 29 May 2025 12:50:47 +0530 Subject: [PATCH 3/3] drop group perms before user perms --- .../src/airflow/sdk/execution_time/task_runner.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 0cf9afdedc15d..b3be7726ff71c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -719,8 +719,11 @@ def impersonate_user(username: str, log: Logger): pw_record = pwd.getpwnam(username) uid, gid = pw_record.pw_uid, pw_record.pw_gid - os.setuid(uid) + # always drop group privileges before dropping user privileges; + # otherwise, group privileges may not be able to be fully dropped. + os.setgid(gid) + os.setuid(uid) log.info("Running task as impersonated user", impersonated_user=username) except KeyError: @@ -732,6 +735,9 @@ def impersonate_user(username: str, log: Logger): def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: + from airflow.configuration import conf + from airflow.exceptions import AirflowConfigException + msg = SUPERVISOR_COMMS.get_message() log = structlog.get_logger(logger_name="task") @@ -756,7 +762,11 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: ti.log_url = get_log_url_from_ti(ti) log.debug("DAG file parsed", file=msg.dag_rel_path) - run_as_user = getattr(ti.task, "run_as_user", None) + try: + run_as_user = getattr(ti.task, "run_as_user", None) or conf.get("core", "default_impersonation") + except AirflowConfigException: + run_as_user = None + if run_as_user: impersonate_user(run_as_user, log)