@@ -159,31 +159,6 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
159159 raise AirflowException (f"Could not find TaskInstance for { ti } " )
160160
161161
162- def _change_directory_permissions_up (directory : Path , folder_permissions : int ):
163- """
164- Change permissions of the given directory and its parents.
165-
166- Only attempt to change permissions for directories owned by the current user.
167-
168- :param directory: directory to change permissions of (including parents)
169- :param folder_permissions: permissions to set
170- """
171- if directory .stat ().st_uid == os .getuid ():
172- if directory .stat ().st_mode % 0o1000 != folder_permissions % 0o1000 :
173- print (f"Changing { directory } permission to { folder_permissions } " )
174- try :
175- directory .chmod (folder_permissions )
176- except PermissionError as e :
177- # In some circumstances (depends on user and filesystem) we might not be able to
178- # change the permission for the folder (when the folder was created by another user
179- # before or when the filesystem does not allow to change permission). We should not
180- # fail in this case but rather ignore it.
181- print (f"Failed to change { directory } permission to { folder_permissions } : { e } " )
182- return
183- if directory .parent != directory :
184- _change_directory_permissions_up (directory .parent , folder_permissions )
185-
186-
187162class FileTaskHandler (logging .Handler ):
188163 """
189164 FileTaskHandler is a python log handler that handles and reads task instance logs.
@@ -481,7 +456,8 @@ def read(self, task_instance, try_number=None, metadata=None):
481456
482457 return logs , metadata_array
483458
484- def _prepare_log_folder (self , directory : Path ):
459+ @staticmethod
460+ def _prepare_log_folder (directory : Path , new_folder_permissions : int ):
485461 """
486462 Prepare the log folder and ensure its mode is as configured.
487463
@@ -505,11 +481,9 @@ def _prepare_log_folder(self, directory: Path):
505481 sure that the same group is set as default group for both - impersonated user and main airflow
506482 user.
507483 """
508- new_folder_permissions = int (
509- conf .get ("logging" , "file_task_handler_new_folder_permissions" , fallback = "0o775" ), 8
510- )
511- directory .mkdir (mode = new_folder_permissions , parents = True , exist_ok = True )
512- _change_directory_permissions_up (directory , new_folder_permissions )
484+ for parent in reversed (directory .parents ):
485+ parent .mkdir (mode = new_folder_permissions , exist_ok = True )
486+ directory .mkdir (mode = new_folder_permissions , exist_ok = True )
513487
514488 def _init_file (self , ti , * , identifier : str | None = None ):
515489 """
@@ -531,7 +505,10 @@ def _init_file(self, ti, *, identifier: str | None = None):
531505 # if this is true, we're invoked via set_context in the context of
532506 # setting up individual trigger logging. return trigger log path.
533507 full_path = self .add_triggerer_suffix (full_path = full_path , job_id = ti .triggerer_job .id )
534- self ._prepare_log_folder (Path (full_path ).parent )
508+ new_folder_permissions = int (
509+ conf .get ("logging" , "file_task_handler_new_folder_permissions" , fallback = "0o775" ), 8
510+ )
511+ self ._prepare_log_folder (Path (full_path ).parent , new_folder_permissions )
535512
536513 if not os .path .exists (full_path ):
537514 open (full_path , "a" ).close ()
0 commit comments