Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,41 @@ def send_request(self, log: Logger, msg: SendMsgType):
# 3. Shutdown and report status


def impersonate_user(username: str, log: Logger):
"""
Impersonate as the specified user by changing the process's UID and GID.

Helper to attempt to set privileges from the current user (root) to the provided username in the task
by setting the effective UID and GID.

Example:
impersonate_user("airflowuser")
"""
import pwd

try:
pw_record = pwd.getpwnam(username)
uid, gid = pw_record.pw_uid, pw_record.pw_gid

# always drop group privileges before dropping user privileges;
# otherwise, group privileges may not be able to be fully dropped.

os.setgid(gid)
os.setuid(uid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work if Airflow worker is run with a sudo user instead of root?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there's a limitation that the user has to be "root" in order to run this and I am trying to work on an alternative proposal due to that limitation. Ideas are welcome @codenamelxl


log.info("Running task as impersonated user", impersonated_user=username)
except KeyError:
log.warning("User not found on the worker; skipping impersonation", username=username)
except PermissionError as e:
log.error("Permission denied while trying impersonation", username=username, error=str(e))
except Exception as e:
log.error("Unexpected error during impersonation", error=str(e))


def startup() -> tuple[RuntimeTaskInstance, Context, Logger]:
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException

msg = SUPERVISOR_COMMS.get_message()

log = structlog.get_logger(logger_name="task")
Expand All @@ -727,6 +761,15 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]:
ti = parse(msg, log)
ti.log_url = get_log_url_from_ti(ti)
log.debug("DAG file parsed", file=msg.dag_rel_path)

try:
run_as_user = getattr(ti.task, "run_as_user", None) or conf.get("core", "default_impersonation")
except AirflowConfigException:
run_as_user = None

if run_as_user:
impersonate_user(run_as_user, log)

else:
raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}")

Expand Down
Loading