diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index cb8040278c474..b3be7726ff71c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -703,7 +703,41 @@ def send_request(self, log: Logger, msg: SendMsgType): # 3. Shutdown and report status +def impersonate_user(username: str, log: Logger): + """ + Impersonate as the specified user by changing the process's UID and GID. + + Helper to attempt to set privileges from the current user (root) to the provided username in the task + by setting the effective UID and GID. + + Example: + impersonate_user("airflowuser") + """ + import pwd + + try: + pw_record = pwd.getpwnam(username) + uid, gid = pw_record.pw_uid, pw_record.pw_gid + + # always drop group privileges before dropping user privileges; + # otherwise, group privileges may not be able to be fully dropped. + + os.setgid(gid) + os.setuid(uid) + + log.info("Running task as impersonated user", impersonated_user=username) + except KeyError: + log.warning("User not found on the worker; skipping impersonation", username=username) + except PermissionError as e: + log.error("Permission denied while trying impersonation", username=username, error=str(e)) + except Exception as e: + log.error("Unexpected error during impersonation", error=str(e)) + + def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: + from airflow.configuration import conf + from airflow.exceptions import AirflowConfigException + msg = SUPERVISOR_COMMS.get_message() log = structlog.get_logger(logger_name="task") @@ -727,6 +761,15 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: ti = parse(msg, log) ti.log_url = get_log_url_from_ti(ti) log.debug("DAG file parsed", file=msg.dag_rel_path) + + try: + run_as_user = getattr(ti.task, "run_as_user", None) or conf.get("core", "default_impersonation") + except AirflowConfigException: + run_as_user = None + + if run_as_user: + impersonate_user(run_as_user, log) + else: raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}")