From 02e0bbf2da290592481420fe919e5c2cdd2942f7 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sun, 2 Aug 2020 10:51:07 +0100 Subject: [PATCH] Replace os.mkdirs usage with pathlib.Path(path).mkdir makedirs is used in 'airlfow.utils.file.mkdirs' - it is replaced with pathlib now with python3.5+ --- .../config_templates/airflow_local_settings.py | 4 ++-- .../providers/google/cloud/hooks/cloud_sql.py | 10 +++------- airflow/providers/sftp/operators/sftp.py | 8 ++------ airflow/utils/file.py | 15 +++++++-------- airflow/utils/log/file_processor_handler.py | 18 +++--------------- airflow/utils/log/file_task_handler.py | 7 ++----- tests/test_utils/system_tests_class.py | 4 ++-- 7 files changed, 21 insertions(+), 45 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 3d3a15d72e940..afe4aea1df6d3 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -18,12 +18,12 @@ """Airflow logging settings""" import os +from pathlib import Path from typing import Any, Dict, Union from urllib.parse import urlparse from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.utils.file import mkdirs # TODO: Logging format and level should be configured # in this file instead of from airflow.cfg. Currently @@ -151,7 +151,7 @@ processor_manager_handler_config: Dict[str, Any] = \ DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']['processor_manager'] directory: str = os.path.dirname(processor_manager_handler_config['filename']) - mkdirs(directory, 0o755) + Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) ################## # Remote logging # diff --git a/airflow/providers/google/cloud/hooks/cloud_sql.py b/airflow/providers/google/cloud/hooks/cloud_sql.py index a8c0b2066d4d5..f89e213f311b1 100644 --- a/airflow/providers/google/cloud/hooks/cloud_sql.py +++ b/airflow/providers/google/cloud/hooks/cloud_sql.py @@ -33,6 +33,7 @@ import subprocess import time import uuid +from pathlib import Path from subprocess import PIPE, Popen from typing import Any, Dict, List, Optional, Sequence, Union from urllib.parse import quote_plus @@ -553,13 +554,8 @@ def start_proxy(self) -> None: else: command_to_run = [self.sql_proxy_path] command_to_run.extend(self.command_line_parameters) - try: - self.log.info("Creating directory %s", - self.cloud_sql_proxy_socket_directory) - os.makedirs(self.cloud_sql_proxy_socket_directory) - except OSError: - # Needed for python 2 compatibility (exists_ok missing) - pass + self.log.info("Creating directory %s", self.cloud_sql_proxy_socket_directory) + Path(self.cloud_sql_proxy_socket_directory).mkdir(parents=True, exist_ok=True) command_to_run.extend(self._get_credential_parameters()) # pylint: disable=no-value-for-parameter self.log.info("Running the command: `%s`", " ".join(command_to_run)) self.sql_proxy_process = Popen(command_to_run, diff --git a/airflow/providers/sftp/operators/sftp.py b/airflow/providers/sftp/operators/sftp.py index d01fa43d44849..0e268f59b993b 100644 --- a/airflow/providers/sftp/operators/sftp.py +++ b/airflow/providers/sftp/operators/sftp.py @@ -19,6 +19,7 @@ This module contains SFTP operator. """ import os +from pathlib import Path from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -130,12 +131,7 @@ def execute(self, context): if self.operation.lower() == SFTPOperation.GET: local_folder = os.path.dirname(self.local_filepath) if self.create_intermediate_dirs: - # Create Intermediate Directories if it doesn't exist - try: - os.makedirs(local_folder) - except OSError: - if not os.path.isdir(local_folder): - raise + Path(local_folder).mkdir(parents=True, exist_ok=True) file_msg = "from {0} to {1}".format(self.remote_filepath, self.local_filepath) self.log.info("Starting to transfer %s", file_msg) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index a6282d21ca361..143fc30f9c7a7 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -20,6 +20,7 @@ import os import re import zipfile +from pathlib import Path from typing import Dict, Generator, List, Optional, Pattern from airflow.configuration import conf @@ -50,14 +51,12 @@ def mkdirs(path, mode): :param mode: The mode to give to the directory e.g. 0o755, ignores umask :type mode: int """ - try: - o_umask = os.umask(0) - os.makedirs(path, mode) - except OSError: - if not os.path.isdir(path): - raise - finally: - os.umask(o_umask) + import warnings + warnings.warn( + f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`", + DeprecationWarning, stacklevel=2 + ) + Path(path).mkdir(mode=mode, parents=True, exist_ok=True) ZIP_REGEX = re.compile(r'((.*\.zip){})?(.*)'.format(re.escape(os.sep))) diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py index 9b1f60d247166..a552c115708f2 100644 --- a/airflow/utils/log/file_processor_handler.py +++ b/airflow/utils/log/file_processor_handler.py @@ -19,6 +19,7 @@ import logging import os from datetime import datetime +from pathlib import Path from airflow import settings from airflow.utils.helpers import parse_template_string @@ -43,15 +44,7 @@ def __init__(self, base_log_folder, filename_template): parse_template_string(filename_template) self._cur_date = datetime.today() - if not os.path.exists(self._get_log_directory()): - try: - os.makedirs(self._get_log_directory()) - except OSError: - # only ignore case where the directory already exist - if not os.path.isdir(self._get_log_directory()): - raise - - logging.warning("%s already exists", self._get_log_directory()) + Path(self._get_log_directory()).mkdir(parents=True, exist_ok=True) self._symlink_latest_log_directory() @@ -137,12 +130,7 @@ def _init_file(self, filename): log_file_path = os.path.abspath(relative_log_file_path) directory = os.path.dirname(log_file_path) - if not os.path.exists(directory): - try: - os.makedirs(directory) - except OSError: - if not os.path.isdir(directory): - raise + Path(directory).mkdir(parents=True, exist_ok=True) if not os.path.exists(log_file_path): open(log_file_path, "a").close() diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index e26cf90056cee..281a1a1611296 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -18,13 +18,13 @@ """File logging handler for tasks.""" import logging import os +from pathlib import Path from typing import Optional import requests from airflow.configuration import AirflowConfigException, conf from airflow.models import TaskInstance -from airflow.utils.file import mkdirs from airflow.utils.helpers import parse_template_string @@ -223,10 +223,7 @@ def _init_file(self, ti): # operator is not compatible with impersonation (e.g. if a Celery executor is used # for a SubDag operator and the SubDag operator has a different owner than the # parent DAG) - if not os.path.exists(directory): - # Create the directory as globally writable using custom mkdirs - # as os.makedirs doesn't set mode properly. - mkdirs(directory, 0o777) + Path(directory).mkdir(mode=0o777, parents=True, exist_ok=True) if not os.path.exists(full_path): open(full_path, "a").close() diff --git a/tests/test_utils/system_tests_class.py b/tests/test_utils/system_tests_class.py index 217b2ef2bfa64..63e7c07a3f043 100644 --- a/tests/test_utils/system_tests_class.py +++ b/tests/test_utils/system_tests_class.py @@ -19,12 +19,12 @@ import shutil import sys from datetime import datetime +from pathlib import Path from unittest import TestCase from airflow.configuration import AIRFLOW_HOME, AirflowConfigParser, get_airflow_config from airflow.exceptions import AirflowException from airflow.models.dagbag import DagBag -from airflow.utils.file import mkdirs from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State from tests.test_utils import AIRFLOW_MAIN_FOLDER @@ -94,7 +94,7 @@ def tearDown(self) -> None: print(f"Saving all log files to {logs_folder}/previous_runs/{date_str}") print() target_dir = os.path.join(logs_folder, "previous_runs", date_str) - mkdirs(target_dir, 0o755) + Path(target_dir).mkdir(parents=True, exist_ok=True, mode=0o755) files = os.listdir(logs_folder) for file in files: if file != "previous_runs":