Skip to content

Conversation

@plockaby
Copy link
Contributor

@plockaby plockaby commented Dec 8, 2021

I did not have a chance to fix the previously hidden error about cleaning up files when submitting #20114 before it was merged. When the temporary file's ownership is changed for impersonation it can't be removed automatically by the NamedTemporaryFile class so we need to manually remove it with sudo.

Basically, this is to fix this error message when a task finishes:

[2021-12-08 06:31:05,703: ERROR/ForkPoolWorker-31] Failed to execute task [Errno 1] Operation not permitted: '/tmp/tmp9g_3zc5j'.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/executors/celery_executor.py", line 121, in _execute_in_fork
    args.func(args)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 148, in _execute
    self.on_kill()
  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 174, in on_kill
    self.task_runner.on_finish()
  File "/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/base_task_runner.py", line 183, in on_finish
    self._error_file.close()
  File "/usr/local/lib/python3.9/tempfile.py", line 504, in close
    self._closer.close()
  File "/usr/local/lib/python3.9/tempfile.py", line 441, in close
    unlink(self.name)
PermissionError: [Errno 1] Operation not permitted: '/tmp/tmp9g_3zc5j'

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Dec 8, 2021
self.run_as_user = None

self._error_file = NamedTemporaryFile(delete=True)
self._error_file = NamedTemporaryFile(delete=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets do something like this

Suggested change
self._error_file = NamedTemporaryFile(delete=False)
impersonation = self.run_as_user and (self.run_as_user != getuser())
self._error_file = NamedTemporaryFile(delete=not impersonation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is what you meant but there's my stab at it. Worth noting that if this PR isn't merged nothing is actually broken. This just cleans up files and stops the log files from filling up with error messages about how it couldn't clean up files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this fine, but @ashb was right that we should keep deleting the file when there is no impersonation. After changing delete = False into delete=True, The current implementation will keep the termporary file when there is no impersonation. Closing the file below does not help - the file is anyhow being written to and it will not be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file does get deleted, manually, in the new try block on line 209.

However, in all cases the process that creates _error_file is not the same as the process that uses it. Both with and without impersonation the _error_file temporary file is being created and then the file name is being passed to another process and that other process will truncate and write to the file. Since the process that creates the file never uses the file, it should be closing it to remove its file handle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that on_finish is called. There might be cases where it won't but the interpreter stops and the Temp file shold be deleted.

However, in all cases the process that creates _error_file is not the same as the process that uses it. Both with and without impersonation the _error_file temporary file is being created and then the file name is being passed to another process and that other process will truncate and write to the file. Since the process that creates the file never uses the file, it should be closing it to remove its file handle.

Have you looked at the "StandardTaskRunner" and fork behaviour (default) ? Are you absolutely sure it will work after forking ? From what I know child after fork inherits open files from parent and I believe we rely on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am missing something.

Well, there are cases where the NamedTemporaryFile doesn't get cleaned up automatically, too, like SIGKILL or a SIGTERM. I can't actually figure out how on_finish gets called so I can't figure out if it would always be called.

So the StandardTaskRunner class uses the run_command function this base class. I wrote a program to check file handles.

lsof.py

import os
import subprocess


def print_open_files(pid):
    proc = subprocess.Popen(
        ["lsof", "-p", str(pid)],
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        universal_newlines=True,
    )
    print(proc.stdout.read())


def main():
    pid = os.getpid()
    print(f"open files for {pid}")
    print_open_files(pid)


if __name__ == "__main__":
    main()

call.py

import os
import subprocess
from tempfile import NamedTemporaryFile


def print_open_files(pid):
    proc = subprocess.Popen(
        ["lsof", "-p", str(pid)],
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        universal_newlines=True,
    )
    print(proc.stdout.read())


def call_child():
    proc = subprocess.Popen(
        ["python3", "lsof.py"],
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        universal_newlines=True,
        close_fds=True,
        env=os.environ.copy(),
        preexec_fn=os.setsid,
    )
    print(proc.stdout.read())


def main():
    # create a temporary file
    pid = os.getpid()
    x = NamedTemporaryFile(delete=False)
    print(f"created temporary file {x.name} in {pid}")

    print(f"open files for {pid}")
    print_open_files(pid)

    call_child()
    x.close()


if __name__ == "__main__":
    main()

When I tested this code, when the child process is called, it does not have the file handle, so I'm not sure that it can be depending on this functionality. But maybe my test is incorrect in some way, which could entirely be possible, because I am not in any way an expert here on Airflow or forking. In fact, when I modified my test programs to both write to the temporary file, the changes made by lsof.py are completely overwritten by call.py when the close happens after call.py is forked. If I close the file before forking then the forked program writes to the temporary file then everything works as expected.

So I'm not an expert on Airflow and I don't know all the intricacies of how it does all of its stuff. This is just what I think will work to correctly clean up temp files when impersonation is in use. I also don't have an Airflow cluster anymore so I can't really test any of this in a production setting. I'll attempt to write some tests for this, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen that in StandardTaskRunnner we are not using Popen but fork ?

Copy link
Member

@potiuk potiuk Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am saying is - that your test is wrong for default setting of Airlfow. If you do not understand how fork() works then you need to find out to be able to assess how the file handle closing will impact it:

This is the part that is relevant.

    def _start_by_fork(self):
        pid = os.fork()
        if pid:
            self.log.info("Started process %d to run task", pid)
            return psutil.Process(pid)
        else:
            # Start a new process group
            os.setpgid(0, 0)
            import signal

What we are using in Airflow we fork the process rather than start a new process by Popen by default. This means that the child process will inherit opened handles from the parent process and it will "fork" - i.e. parent process will go one branch and the child process will start another branch. Both of them will have the "error_file" opened, and there is no Popen execution at all.

This allows to save seconds on starting a new task (there is no need to initialize new python interpreter, the task process will be a clone of the parent process with copy-on-write semantics when it comes to memory used, This basically means that it is almost instanteneous and there are no extra resources used.

I believe (and please double check it) than we rely on the fact that the temporary error file remains open when the task is forked so that we do not re-open it again. If you close the error file before forking, the file will be closed and attempt to write to it will fail (at least I believe it will be like that).,

Comment on lines -184 to -200
# The subprocess has deleted this file before we do
# so we ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need this. In case of success the child process should delete this file I thought?

I can't find that code anymore though.... Hmmmmm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you're right. I glossed over this in my reading of the code.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jan 24, 2022
@plockaby
Copy link
Contributor Author

Hmm, stale. I would like to see this get merged. It does clean the logs up.

@uranusjr uranusjr removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Jan 24, 2022
@potiuk
Copy link
Member

potiuk commented Jan 24, 2022

Please rebase and make it green then :)

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 11, 2022
@plockaby
Copy link
Contributor Author

I've updated my branch with the latest commits.

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 13, 2022
@potiuk
Copy link
Member

potiuk commented Mar 14, 2022

Checks + docs are failing though

@github-actions
Copy link

github-actions bot commented May 8, 2022

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label May 8, 2022
@github-actions github-actions bot closed this May 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants