From 9335e919587bdaa045b64aa3d067213d37669dbf Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Thu, 13 Jul 2023 01:42:17 -0700 Subject: [PATCH 1/3] D205 Support - Utils --- airflow/utils/cli.py | 18 ++++---- airflow/utils/cli_action_loggers.py | 6 ++- airflow/utils/configuration.py | 3 +- airflow/utils/dag_cycle_tester.py | 5 ++- airflow/utils/db.py | 24 +++++------ airflow/utils/db_cleanup.py | 5 ++- airflow/utils/dot_renderer.py | 1 + airflow/utils/edgemodifier.py | 21 ++++------ airflow/utils/email.py | 7 ++-- airflow/utils/file.py | 22 +++++----- airflow/utils/hashlib_wrapper.py | 6 +-- airflow/utils/helpers.py | 16 ++----- airflow/utils/json.py | 6 +-- airflow/utils/log/colored_log.py | 7 ++-- airflow/utils/log/file_processor_handler.py | 12 +++--- airflow/utils/log/file_task_handler.py | 16 +++---- airflow/utils/log/logging_mixin.py | 25 +++++------ airflow/utils/log/non_caching_file_handler.py | 26 ++++++------ airflow/utils/log/trigger_handler.py | 3 +- airflow/utils/mixins.py | 5 ++- airflow/utils/module_loading.py | 5 ++- airflow/utils/net.py | 9 ++-- airflow/utils/operator_helpers.py | 10 +++-- airflow/utils/operator_resources.py | 10 ++--- airflow/utils/platform.py | 8 +--- airflow/utils/process_utils.py | 1 + airflow/utils/session.py | 1 + airflow/utils/sqlalchemy.py | 38 +++++++++-------- airflow/utils/state.py | 5 +-- airflow/utils/task_group.py | 42 ++++++++----------- airflow/utils/timezone.py | 1 + airflow/utils/types.py | 5 +-- 32 files changed, 176 insertions(+), 193 deletions(-) diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py index c800ed71fcd8f..354c2b236c07f 100644 --- a/airflow/utils/cli.py +++ b/airflow/utils/cli.py @@ -61,9 +61,10 @@ def _check_cli_args(args): def action_cli(func=None, check_db=True): def action_logging(f: T) -> T: """ - Decorates function to execute function at the same time submitting action_logging - but in CLI context. It will call action logger callbacks twice, - one for pre-execution and the other one for post-execution. + Decorates function to execute function at the same time submitting action_logging but in CLI context. + + It will call action logger callbacks twice, one for + pre-execution and the other one for post-execution. Action logger will be called with below keyword parameters: sub_command : name of sub-command @@ -84,8 +85,7 @@ def action_logging(f: T) -> T: @functools.wraps(f) def wrapper(*args, **kwargs): """ - An wrapper for cli functions. It assumes to have Namespace instance - at 1st positional argument. + A wrapper for cli functions; assumes Namespace instance as first positional argument. :param args: Positional argument. It assumes to have Namespace instance at 1st positional argument @@ -126,7 +126,8 @@ def wrapper(*args, **kwargs): def _build_metrics(func_name, namespace): """ - Builds metrics dict from function args + Builds metrics dict from function args. + It assumes that function arguments is from airflow.bin.cli module's function and has Namespace instance where it optionally contains "dag_id", "task_id", and "execution_date". @@ -359,10 +360,7 @@ def should_ignore_depends_on_past(args) -> bool: def suppress_logs_and_warning(f: T) -> T: - """ - Decorator to suppress logging and warning messages - in cli functions. - """ + """Decorator to suppress logging and warning messages in cli functions.""" @functools.wraps(f) def _wrapper(*args, **kwargs): diff --git a/airflow/utils/cli_action_loggers.py b/airflow/utils/cli_action_loggers.py index 02c311d40ab37..575f4bfb08cad 100644 --- a/airflow/utils/cli_action_loggers.py +++ b/airflow/utils/cli_action_loggers.py @@ -16,8 +16,10 @@ # specific language governing permissions and limitations # under the License. """ -An Action Logger module. Singleton pattern has been applied into this module -so that registered callbacks can be used all through the same python process. +An Action Logger module. + +Singleton pattern has been applied into this module so that registered +callbacks can be used all through the same python process. """ from __future__ import annotations diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py index 98fb6388e64b8..84c5e946ee1a2 100644 --- a/airflow/utils/configuration.py +++ b/airflow/utils/configuration.py @@ -27,8 +27,7 @@ def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True): """ - Returns a path for a temporary file including a full copy of the configuration - settings. + Returns a path for a temporary file including a full copy of the configuration settings. :param include_env: Should the value of configuration from ``AIRFLOW__`` environment variables be included or not diff --git a/airflow/utils/dag_cycle_tester.py b/airflow/utils/dag_cycle_tester.py index 8f150dc0a349c..4cea52200da1e 100644 --- a/airflow/utils/dag_cycle_tester.py +++ b/airflow/utils/dag_cycle_tester.py @@ -33,8 +33,9 @@ def test_cycle(dag: DAG) -> None: """ A wrapper function of `check_cycle` for backward compatibility purpose. - New code should use `check_cycle` instead since this function name `test_cycle` starts with 'test_' and - will be considered as a unit test by pytest, resulting in failure. + + New code should use `check_cycle` instead since this function name `test_cycle` starts + with 'test_' and will be considered as a unit test by pytest, resulting in failure. """ from warnings import warn diff --git a/airflow/utils/db.py b/airflow/utils/db.py index eb4c863dc3e3c..f4c2ae8b1bd00 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1006,10 +1006,10 @@ def check_username_duplicates(session: Session) -> Iterable[str]: def reflect_tables(tables: list[Base | str] | None, session): """ - When running checks prior to upgrades, we use reflection to determine current state of the - database. - This function gets the current state of each table in the set of models provided and returns - a SqlAlchemy metadata object containing them. + When running checks prior to upgrades, we use reflection to determine current state of the database. + + This function gets the current state of each table in the set of models + provided and returns a SqlAlchemy metadata object containing them. """ import sqlalchemy.schema @@ -1180,6 +1180,7 @@ def _create_table_as( ): """ Create a new table with rows from query. + We have to handle CTAS differently for different dialects. """ from sqlalchemy import column, select, table @@ -1256,10 +1257,7 @@ def _move_dangling_data_to_new_table( def _dangling_against_dag_run(session, source_table, dag_run): - """ - Given a source table, we generate a subquery that will return 1 for every row that - has a dagrun. - """ + """Given a source table, we generate a subquery that will return 1 for every row that has a dagrun.""" source_to_dag_run_join_cond = and_( source_table.c.dag_id == dag_run.c.dag_id, source_table.c.execution_date == dag_run.c.execution_date, @@ -1274,8 +1272,7 @@ def _dangling_against_dag_run(session, source_table, dag_run): def _dangling_against_task_instance(session, source_table, dag_run, task_instance): """ - Given a source table, we generate a subquery that will return 1 for every row that - has a valid task instance (and associated dagrun). + Given a source table, generate a subquery that will return 1 for every row that has a valid task instance. This is used to identify rows that need to be removed from tables prior to adding a TI fk. @@ -1367,8 +1364,8 @@ def _move_duplicate_data_to_new_table( def check_bad_references(session: Session) -> Iterable[str]: """ - Starting in Airflow 2.2, we began a process of replacing `execution_date` with `run_id` - in many tables. + Starting in Airflow 2.2, we began a process of replacing `execution_date` with `run_id` in many tables. + Here we go through each table and look for records that can't be mapped to a dag run. When we find such "dangling" rows we back them up in a special table and delete them from the main table. @@ -1383,6 +1380,8 @@ def check_bad_references(session: Session) -> Iterable[str]: @dataclass class BadReferenceConfig: """ + Bad reference config class. + :param bad_rows_func: function that returns subquery which determines whether bad rows exist :param join_tables: table objects referenced in subquery :param ref_table: information-only identifier for categorizing the missing ref @@ -1552,6 +1551,7 @@ def upgradedb( session: Session = NEW_SESSION, ): """ + Upgrades the DB. :param to_revision: Optional Alembic revision ID to upgrade *to*. If omitted, upgrades to latest revision. diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index 22fb0020e72a7..36bced947348d 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -15,7 +15,9 @@ # specific language governing permissions and limitations # under the License. """ -This module took inspiration from the community maintenance dag +This module took inspiration from the community maintenance dag. + +See: (https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/4e5c7682a808082561d60cbc9cafaa477b0d8c65/db-cleanup/airflow-db-cleanup.py). """ from __future__ import annotations @@ -341,6 +343,7 @@ def _print_config(*, configs: dict[str, _TableConfig]): def _suppress_with_logging(table, session): """ Suppresses errors but logs them. + Also stores the exception instance so it can be referred to after exiting context. """ try: diff --git a/airflow/utils/dot_renderer.py b/airflow/utils/dot_renderer.py index d3b329cb5453f..f168566f7e33d 100644 --- a/airflow/utils/dot_renderer.py +++ b/airflow/utils/dot_renderer.py @@ -38,6 +38,7 @@ def _refine_color(color: str): """ Converts color in #RGB (12 bits) format to #RRGGBB (32 bits), if it possible. + Otherwise, it returns the original value. Graphviz does not support colors in #RGB format. :param color: Text representation of color diff --git a/airflow/utils/edgemodifier.py b/airflow/utils/edgemodifier.py index b693e1a1bee26..e760beb50a457 100644 --- a/airflow/utils/edgemodifier.py +++ b/airflow/utils/edgemodifier.py @@ -24,8 +24,9 @@ class EdgeModifier(DependencyMixin): """ - Class that represents edge information to be added between two - tasks/operators. Has shorthand factory functions, like Label("hooray"). + Class that represents edge information to be added between two tasks/operators. + + Has shorthand factory functions, like Label("hooray"). Current implementation supports t1 >> Label("Success route") >> t2 @@ -77,8 +78,9 @@ def _save_nodes( def _convert_streams_to_task_groups(self): """ - Both self._upstream and self._downstream are required to determine if - we should convert a node to a TaskGroup or leave it as a DAGNode. + Convert a node to a TaskGroup or leave it as a DAGNode. + + Requires both self._upstream and self._downstream. To do this, we keep a set of group_ids seen among the streams. If we find that the nodes are from the same TaskGroup, we will leave them as DAGNodes and not @@ -121,8 +123,7 @@ def set_upstream( edge_modifier: EdgeModifier | None = None, ): """ - Sets the given task/list onto the upstream attribute, and then checks if - we have both sides so we can resolve the relationship. + Set the given task/list onto the upstream attribute, then attempt to resolve the relationship. Providing this also provides << via DependencyMixin. """ @@ -139,8 +140,7 @@ def set_downstream( edge_modifier: EdgeModifier | None = None, ): """ - Sets the given task/list onto the downstream attribute, and then checks if - we have both sides so we can resolve the relationship. + Set the given task/list onto the downstream attribute, then attempt to resolve the relationship. Providing this also provides >> via DependencyMixin. """ @@ -154,10 +154,7 @@ def set_downstream( def update_relative( self, other: DependencyMixin, upstream: bool = True, edge_modifier: EdgeModifier | None = None ) -> None: - """ - Called if we're not the "main" side of a relationship; we still run the - same logic, though. - """ + """Called if we're not the "main" side of a relationship; we still run the same logic, though.""" if upstream: self.set_upstream(other) else: diff --git a/airflow/utils/email.py b/airflow/utils/email.py index e807b8f75d9f8..37e028fbd99e9 100644 --- a/airflow/utils/email.py +++ b/airflow/utils/email.py @@ -321,9 +321,10 @@ def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) -> def _get_email_list_from_str(addresses: str) -> list[str]: """ - Extract a list of email addresses from a string. The string - can contain multiple email addresses separated by - any of the following delimiters: ',' or ';'. + Extract a list of email addresses from a string. + + The string can contain multiple email addresses separated + by any of the following delimiters: ',' or ';'. :param addresses: A string containing one or more email addresses. :return: A list of email addresses. diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 08b4048ad58af..fb17ee6a8d573 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -41,8 +41,9 @@ class _IgnoreRule(Protocol): @staticmethod def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: """ - Build an ignore rule from the supplied pattern where base_dir - and definition_file should be absolute paths. + Build an ignore rule from the supplied pattern. + + ``base_dir`` and ``definition_file`` should be absolute paths. """ @staticmethod @@ -134,8 +135,9 @@ def TemporaryDirectory(*args, **kwargs): def mkdirs(path, mode): """ - Creates the directory specified by path, creating intermediate directories - as necessary. If directory already exists, this is a no-op. + Creates the directory specified by path, creating intermediate directories as necessary. + + If directory already exists, this is a no-op. :param path: The directory to create :param mode: The mode to give to the directory e.g. 0o755, ignores umask @@ -164,10 +166,7 @@ def correct_maybe_zipped(fileloc: str | Path) -> str | Path: def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path: - """ - If the path contains a folder with a .zip suffix, then - the folder is treated as a zip archive and path to zip is returned. - """ + """If the path contains a folder with a .zip suffix, treat it as a zip archive and return path.""" if not fileloc: return fileloc search_ = ZIP_REGEX.search(str(fileloc)) @@ -182,8 +181,10 @@ def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path: def open_maybe_zipped(fileloc, mode="r"): """ - Opens the given file. If the path contains a folder with a .zip suffix, then - the folder is treated as a zip archive, opening the file inside the archive. + Opens the given file. + + If the path contains a folder with a .zip suffix, then the folder + is treated as a zip archive, opening the file inside the archive. :return: a file object, as in `open`, or as in `ZipFile.open`. """ @@ -332,6 +333,7 @@ def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> l def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool: """ Check whether a Python file contains Airflow DAGs. + When safe_mode is off (with False value), this function always returns True. If might_contain_dag_callable isn't specified, it uses airflow default heuristic diff --git a/airflow/utils/hashlib_wrapper.py b/airflow/utils/hashlib_wrapper.py index 25f02e5c3ad7e..09850c565cb0e 100644 --- a/airflow/utils/hashlib_wrapper.py +++ b/airflow/utils/hashlib_wrapper.py @@ -28,11 +28,9 @@ def md5(__string: ReadableBuffer = b"") -> hashlib._Hash: """ - Safely allows calling the hashlib.md5 function with the "usedforsecurity" disabled - when specified in the configuration. + Safely allows calling the hashlib.md5 function when "usedforsecurity" is disabled in the configuration. - :param string: The data to hash. - Default to empty str byte. + :param string: The data to hash. Default to empty str byte. :return: The hashed value. """ if PY39: diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index a29cf07a59194..b900ca1033c70 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -120,10 +120,7 @@ def is_container(obj: Any) -> bool: def as_tuple(obj: Any) -> tuple: - """ - If obj is a container, returns obj as a tuple. - Otherwise, returns a tuple containing obj. - """ + """Return obj as a tuple if obj is a container, otherwise return a tuple containing obj.""" if is_container(obj): return tuple(obj) else: @@ -139,10 +136,7 @@ def chunks(items: list[T], chunk_size: int) -> Generator[list[T], None, None]: def reduce_in_chunks(fn: Callable[[S, list[T]], S], iterable: list[T], initializer: S, chunk_size: int = 0): - """ - Reduce the given list of items by splitting it into chunks - of the given size and passing each chunk through the reducer. - """ + """Split the list of items into chunks of a given size and pass each chunk through the reducer.""" if len(iterable) == 0: return initializer if chunk_size == 0: @@ -172,8 +166,7 @@ def parse_template_string(template_string: str) -> tuple[str | None, jinja2.Temp def render_log_filename(ti: TaskInstance, try_number, filename_template) -> str: """ - Given task instance, try_number, filename_template, return the rendered log - filename. + Given task instance, try_number, filename_template, return the rendered log filename. :param ti: task instance :param try_number: try_number of the task @@ -327,8 +320,7 @@ def is_set(val): def prune_dict(val: Any, mode="strict"): """ - Given dict ``val``, returns new dict based on ``val`` with all - empty elements removed. + Given dict ``val``, returns new dict based on ``val`` with all empty elements removed. What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict' then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x`` diff --git a/airflow/utils/json.py b/airflow/utils/json.py index 3a0a70fb7585c..7f05c8778d27b 100644 --- a/airflow/utils/json.py +++ b/airflow/utils/json.py @@ -105,11 +105,7 @@ def encode(self, o: Any) -> str: class XComDecoder(json.JSONDecoder): - """ - This decoder deserializes dicts to objects if they contain - the `__classname__` key otherwise it will return the dict - as is. - """ + """Deserialize dicts to objects if they contain the `__classname__` key, otherwise return the dict.""" def __init__(self, *args, **kwargs) -> None: if not kwargs.get("object_hook"): diff --git a/airflow/utils/log/colored_log.py b/airflow/utils/log/colored_log.py index aea7c255e618e..3afa39410d044 100644 --- a/airflow/utils/log/colored_log.py +++ b/airflow/utils/log/colored_log.py @@ -42,9 +42,10 @@ class CustomTTYColoredFormatter(TTYColoredFormatter, TimezoneAware): """ - Custom log formatter which extends `colored.TTYColoredFormatter` - by adding attributes to message arguments and coloring error - traceback. + Custom log formatter. + + Extends `colored.TTYColoredFormatter` by adding attributes + to message arguments and coloring error traceback. """ def __init__(self, *args, **kwargs): diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py index 79efb387a3041..cda24cdd06be8 100644 --- a/airflow/utils/log/file_processor_handler.py +++ b/airflow/utils/log/file_processor_handler.py @@ -30,9 +30,10 @@ class FileProcessorHandler(logging.Handler): """ - FileProcessorHandler is a python log handler that handles - dag processor logs. It creates and delegates log handling - to `logging.FileHandler` after receiving dag processor context. + FileProcessorHandler is a python log handler that handles dag processor logs. + + It creates and delegates log handling to `logging.FileHandler` + after receiving dag processor context. :param base_log_folder: Base log folder to place logs. :param filename_template: template filename string @@ -108,8 +109,9 @@ def _get_log_directory(self): def _symlink_latest_log_directory(self): """ - Create symbolic link to the current day's log directory to - allow easy access to the latest scheduler log files. + Create symbolic link to the current day's log directory. + + Allows easy access to the latest scheduler log files. :return: None """ diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 9184a2042091a..5d791aaa0ccaa 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -134,10 +134,10 @@ def _interleave_logs(*logs): class FileTaskHandler(logging.Handler): """ - FileTaskHandler is a python log handler that handles and reads - task instance logs. It creates and delegates log handling - to `logging.FileHandler` after receiving task instance context. - It reads logs from task instance's host machine. + FileTaskHandler is a python log handler that handles and reads task instance logs. + + It creates and delegates log handling to `logging.FileHandler` after receiving task + instance context. It reads logs from task instance's host machine. :param base_log_folder: Base log folder to place logs. :param filename_template: template filename string @@ -278,8 +278,7 @@ def _read( metadata: dict[str, Any] | None = None, ): """ - Template method that contains custom logic of reading - logs given the try_number. + Template method that contains custom logic of reading logs given the try_number. :param ti: task instance record :param try_number: current try_number to read log from @@ -462,8 +461,9 @@ def _prepare_log_folder(self, directory: Path): def _init_file(self, ti): """ - Create log directory and give it permissions that are configured. See above _prepare_log_folder - method for more detailed explanation. + Create log directory and give it permissions that are configured. + + See above _prepare_log_folder method for more detailed explanation. :param ti: task instance object :return: relative log path of the given task instance diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 646c73f1b4bc0..be10ff8ec1c9d 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -54,10 +54,7 @@ def __getattr__(name): def remove_escape_codes(text: str) -> str: - """ - Remove ANSI escapes codes from string. It's used to remove - "colors" from log messages. - """ + """Remove ANSI escapes codes from string; used to remove "colors" from log messages.""" return ANSI_ESCAPE.sub("", text) @@ -118,15 +115,15 @@ def supports_external_link(self) -> bool: # IO generics (and apparently it has not even been intended) # See more: https://giters.com/python/typeshed/issues/6077 class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc] - """Allows to redirect stdout and stderr to logger.""" + """ + Allows to redirect stdout and stderr to logger. + + :param log: The log level method to write to, ie. log.debug, log.warning + """ encoding: None = None def __init__(self, logger, level): - """ - :param log: The log level method to write to, ie. log.debug, log.warning - :return: - """ self.logger = logger self.level = level self._buffer = "" @@ -141,8 +138,9 @@ def close(self): @property def closed(self): """ - Returns False to indicate that the stream is not closed, as it will be - open for the duration of Airflow's lifecycle. + Return False to indicate that the stream is not closed. + + Streams will be open for the duration of Airflow's lifecycle. For compatibility with the io.IOBase interface. """ @@ -174,6 +172,7 @@ def flush(self): def isatty(self): """ Returns False to indicate the fd is not connected to a tty(-like) device. + For compatibility reasons. """ return False @@ -181,8 +180,10 @@ def isatty(self): class RedirectStdHandler(StreamHandler): """ + # TODO D205-ify this docstring + This class is like a StreamHandler using sys.stderr/stdout, but uses - whatever sys.stderr/stderr is currently set to rather than the value of + whatever sys.stderr/stdout is currently set to rather than the value of sys.stderr/stdout at handler construction time, except when running a task in a kubernetes executor pod. """ diff --git a/airflow/utils/log/non_caching_file_handler.py b/airflow/utils/log/non_caching_file_handler.py index 46fe69a99bc75..aa0ca9864e2ea 100644 --- a/airflow/utils/log/non_caching_file_handler.py +++ b/airflow/utils/log/non_caching_file_handler.py @@ -36,12 +36,13 @@ def make_file_io_non_caching(io: IO[str]) -> IO[str]: class NonCachingFileHandler(FileHandler): """ - This is an extension of the python FileHandler that advises the Kernel to not cache the file - in PageCache when it is written. While there is nothing wrong with such cache (it will be cleaned - when memory is needed), it causes ever-growing memory usage when scheduler is running as it keeps - on writing new log files and the files are not rotated later on. This might lead to confusion - for our users, who are monitoring memory usage of Scheduler - without realising that it is - harmless and expected in this case. + An extension of FileHandler, advises the Kernel to not cache the file in PageCache when it is written. + + While there is nothing wrong with such cache (it will be cleaned when memory is needed), it + causes ever-growing memory usage when scheduler is running as it keeps on writing new log + files and the files are not rotated later on. This might lead to confusion for our users, + who are monitoring memory usage of Scheduler - without realising that it is harmless and + expected in this case. See https://github.com/apache/airflow/issues/14924 @@ -54,12 +55,13 @@ def _open(self): class NonCachingRotatingFileHandler(RotatingFileHandler): """ - This is an extension of the python RotatingFileHandler that advises the Kernel to not cache the file - in PageCache when it is written. While there is nothing wrong with such cache (it will be cleaned - when memory is needed), it causes ever-growing memory usage when scheduler is running as it keeps - on writing new log files and the files are not rotated later on. This might lead to confusion - for our users, who are monitoring memory usage of Scheduler - without realising that it is - harmless and expected in this case. + An extension of RotatingFileHandler, advises the Kernel to not cache the file in PageCache when written. + + While there is nothing wrong with such cache (it will be cleaned when memory is needed), it + causes ever-growing memory usage when scheduler is running as it keeps on writing new log + files and the files are not rotated later on. This might lead to confusion for our users, + who are monitoring memory usage of Scheduler - without realising that it is harmless and + expected in this case. See https://github.com/apache/airflow/issues/27065 diff --git a/airflow/utils/log/trigger_handler.py b/airflow/utils/log/trigger_handler.py index 50756cb048375..54f8ace65e746 100644 --- a/airflow/utils/log/trigger_handler.py +++ b/airflow/utils/log/trigger_handler.py @@ -67,8 +67,7 @@ def filter(self, record): class TriggererHandlerWrapper(logging.Handler): """ - Wrap inheritors of FileTaskHandler and direct log messages - to them based on trigger_id. + Wrap inheritors of FileTaskHandler and direct log messages to them based on trigger_id. :meta private: """ diff --git a/airflow/utils/mixins.py b/airflow/utils/mixins.py index d157c7f078ff9..e3c6a8efec6ef 100644 --- a/airflow/utils/mixins.py +++ b/airflow/utils/mixins.py @@ -35,8 +35,9 @@ class MultiprocessingStartMethodMixin: def _get_multiprocessing_start_method(self) -> str: """ - Determine method of creating new processes by checking if the - mp_start_method is set in configs, else, it uses the OS default. + Determine method of creating new processes. + + Checks if the mp_start_method is set in configs, else, it uses the OS default. """ if conf.has_option("core", "mp_start_method"): return conf.get_mandatory_value("core", "mp_start_method") diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py index 8decbc6a61cb8..d81ab65e8589b 100644 --- a/airflow/utils/module_loading.py +++ b/airflow/utils/module_loading.py @@ -25,8 +25,9 @@ def import_string(dotted_path: str): """ - Import a dotted module path and return the attribute/class designated by the - last name in the path. Raise ImportError if the import failed. + Import a dotted module path and return the attribute/class designated by the last name in the path. + + Raise ImportError if the import failed. """ try: module_path, class_name = dotted_path.rsplit(".", 1) diff --git a/airflow/utils/net.py b/airflow/utils/net.py index 57bd9008b95d8..992aee67e8000 100644 --- a/airflow/utils/net.py +++ b/airflow/utils/net.py @@ -26,7 +26,9 @@ # patched version of socket.getfqdn() - see https://github.com/python/cpython/issues/49254 @lru_cache(maxsize=None) def getfqdn(name=""): - """Get fully qualified domain name from name. + """ + Get fully qualified domain name from name. + An empty argument is interpreted as meaning the local host. """ name = name.strip() @@ -50,8 +52,5 @@ def get_host_ip_address(): def get_hostname(): - """ - Fetch the hostname using the callable from the config or using - `airflow.utils.net.getfqdn` as a fallback. - """ + """Fetch the hostname using the callable from config or use `airflow.utils.net.getfqdn` as a fallback.""" return conf.getimport("core", "hostname_callable", fallback="airflow.utils.net.getfqdn")() diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py index 0390edc81cde4..8a77ca1a67498 100644 --- a/airflow/utils/operator_helpers.py +++ b/airflow/utils/operator_helpers.py @@ -62,6 +62,8 @@ def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool = False) -> dict[str, str]: """ + Return values used to externally reconstruct relations between dags, dag_runs, tasks and task_instances. + Given a context, this function provides a dictionary of values that can be used to externally reconstruct relations between dags, dag_runs, tasks and task_instances. Default to abc.def.ghi format and can be made to ABC_DEF_GHI format if @@ -185,12 +187,10 @@ def determine_kwargs( kwargs: Mapping[str, Any], ) -> Mapping[str, Any]: """ - Inspect the signature of a given callable to determine which arguments in kwargs need - to be passed to the callable. + Inspect the signature of a callable to determine which kwargs need to be passed to the callable. :param func: The callable that you want to invoke - :param args: The positional arguments that needs to be passed to the callable, so we - know how many to skip. + :param args: The positional arguments that need to be passed to the callable, so we know how many to skip. :param kwargs: The keyword arguments that need to be filtered before passing to the callable. :return: A dictionary which contains the keyword arguments that are compatible with the callable. """ @@ -199,6 +199,8 @@ def determine_kwargs( def make_kwargs_callable(func: Callable[..., R]) -> Callable[..., R]: """ + # TODO: D205-ify this one + Make a new callable that can accept any number of positional or keyword arguments but only forwards those required by the given callable func. """ diff --git a/airflow/utils/operator_resources.py b/airflow/utils/operator_resources.py index 638034a81a920..cd6a225244129 100644 --- a/airflow/utils/operator_resources.py +++ b/airflow/utils/operator_resources.py @@ -70,10 +70,7 @@ def units_str(self): @property def qty(self): - """ - The number of units of the specified resource that are required for - execution of the operator. - """ + """The number of units of the specified resource that are required for execution of the operator.""" return self._qty def to_dict(self): @@ -114,8 +111,9 @@ def __init__(self, qty): class Resources: """ - The resources required by an operator. Resources that are not specified will use the - default values from the airflow config. + The resources required by an operator. + + Resources that are not specified will use the default values from the airflow config. :param cpus: The number of cpu cores that are required :param ram: The amount of RAM required diff --git a/airflow/utils/platform.py b/airflow/utils/platform.py index 691aac033b3c8..72906683f63ed 100644 --- a/airflow/utils/platform.py +++ b/airflow/utils/platform.py @@ -32,10 +32,7 @@ def is_tty(): - """ - Checks if the standard output is connected (is associated with a terminal device) to a tty(-like) - device. - """ + """Check if stdout is connected (is associated with a terminal device) to a tty(-like) device.""" if not hasattr(sys.stdout, "isatty"): return False return sys.stdout.isatty() @@ -69,8 +66,7 @@ def get_airflow_git_version(): @cache def getuser() -> str: """ - Gets the username associated with the current user, or error with a nice - error message if there's no current user. + Get the username of the current user, or error with a nice error message if there's no current user. We don't want to fall back to os.getuid() because not having a username probably means the rest of the user environment is wrong (e.g. no $HOME). diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py index 663f0f496c7bb..74ad50bf7404e 100644 --- a/airflow/utils/process_utils.py +++ b/airflow/utils/process_utils.py @@ -300,6 +300,7 @@ def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, No def check_if_pidfile_process_is_running(pid_file: str, process_name: str): """ Checks if a pidfile already exists and process is still running. + If process is dead then pidfile is removed. :param pid_file: path to the pidfile diff --git a/airflow/utils/session.py b/airflow/utils/session.py index 0f8bdb7103670..5c7e9eef505c9 100644 --- a/airflow/utils/session.py +++ b/airflow/utils/session.py @@ -61,6 +61,7 @@ def find_session_idx(func: Callable[PS, RT]) -> int: def provide_session(func: Callable[PS, RT]) -> Callable[PS, RT]: """ Function decorator that provides a session if it isn't provided. + If you want to reuse a session or run the function as part of a database transaction, you pass it to the function, if not this wrapper will create one and close it for you. diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index 04af5bddc31c4..499aa31c34974 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -50,8 +50,8 @@ class UtcDateTime(TypeDecorator): """ - Almost equivalent to :class:`~sqlalchemy.types.TIMESTAMP` with - ``timezone=True`` option, but it differs from that by: + Similar to :class:`~sqlalchemy.types.TIMESTAMP` with ``timezone=True`` option, with some differences. + - Never silently take naive :class:`~datetime.datetime`, instead it always raise :exc:`ValueError` unless time zone aware value. - :class:`~datetime.datetime` value's :attr:`~datetime.datetime.tzinfo` @@ -86,11 +86,11 @@ def process_bind_param(self, value, dialect): def process_result_value(self, value, dialect): """ - Processes DateTimes from the DB making sure it is always - returning UTC. Not using timezone.convert_to_utc as that - converts to configured TIMEZONE while the DB might be - running with some other setting. We assume UTC datetimes - in the database. + Processes DateTimes from the DB making sure it is always returning UTC. + + Not using timezone.convert_to_utc as that converts to configured TIMEZONE + while the DB might be running with some other setting. We assume UTC + datetimes in the database. """ if value is not None: if value.tzinfo is None: @@ -110,8 +110,9 @@ def load_dialect_impl(self, dialect): class ExtendedJSON(TypeDecorator): """ - A version of the JSON column that uses the Airflow extended JSON - serialization provided by airflow.serialization. + A version of the JSON column that uses the Airflow extended JSON serialization. + + See airflow.serialization. """ impl = Text @@ -244,10 +245,11 @@ def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> V1Pod | None: class ExecutorConfigType(PickleType): """ - Adds special handling for K8s executor config. If we unpickle a k8s object that was - pickled under an earlier k8s library version, then the unpickled object may throw an error - when to_dict is called. To be more tolerant of version changes we convert to JSON using - Airflow's serializer before pickling. + Adds special handling for K8s executor config. + + If we unpickle a k8s object that was pickled under an earlier k8s library version, then + the unpickled object may throw an error when to_dict is called. To be more tolerant of + version changes we convert to JSON using Airflow's serializer before pickling. """ cache_ok = True @@ -293,11 +295,11 @@ def process(value): def compare_values(self, x, y): """ - The TaskInstance.executor_config attribute is a pickled object that may contain - kubernetes objects. If the installed library version has changed since the - object was originally pickled, due to the underlying ``__eq__`` method on these - objects (which converts them to JSON), we may encounter attribute errors. In this - case we should replace the stored object. + The TaskInstance.executor_config attribute is a pickled object that may contain kubernetes objects. + + If the installed library version has changed since the object was originally pickled, + due to the underlying ``__eq__`` method on these objects (which converts them to JSON), + we may encounter attribute errors. In this case we should replace the stored object. From https://github.com/apache/airflow/pull/24356 we use our serializer to store k8s objects, but there could still be raw pickled k8s objects in the database, diff --git a/airflow/utils/state.py b/airflow/utils/state.py index f18fd48c82fb0..a0c828ee07477 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -82,10 +82,7 @@ def __str__(self) -> str: class State: - """ - Static class with task instance state constants and color methods to - avoid hardcoding. - """ + """Static class with task instance state constants and color methods to avoid hardcoding.""" # Backwards-compat constants for code that does not yet use the enum # These first three are shared by DagState and TaskState diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py index d59ecaf40b4d2..771499c8e9f85 100644 --- a/airflow/utils/task_group.py +++ b/airflow/utils/task_group.py @@ -15,10 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped -together when the DAG is displayed graphically. -""" +"""A collection of closely related tasks on the same DAG that should be grouped together visually.""" from __future__ import annotations import copy @@ -53,8 +50,10 @@ class TaskGroup(DAGNode): """ - A collection of tasks. When set_downstream() or set_upstream() are called on the - TaskGroup, it is applied across all tasks within the group if necessary. + A collection of tasks. + + When set_downstream() or set_upstream() are called on the TaskGroup, it is applied across + all tasks within the group if necessary. :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id @@ -313,6 +312,7 @@ def _set_relatives( ) -> None: """ Call set_upstream/set_downstream for all root/leaf tasks within this TaskGroup. + Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids. """ if not isinstance(task_or_task_list, Sequence): @@ -353,19 +353,13 @@ def leaves(self) -> list[BaseOperator]: return list(self.get_leaves()) def get_roots(self) -> Generator[BaseOperator, None, None]: - """ - Returns a generator of tasks that are root tasks, i.e. those with no upstream - dependencies within the TaskGroup. - """ + """Return a generator of tasks with no upstream dependencies within the TaskGroup.""" for task in self: if not any(self.has_task(parent) for parent in task.get_direct_relatives(upstream=True)): yield task def get_leaves(self) -> Generator[BaseOperator, None, None]: - """ - Returns a generator of tasks that are leaf tasks, i.e. those with no downstream - dependencies within the TaskGroup. - """ + """Return a generator of tasks with no downstream dependencies within the TaskGroup.""" def recurse_for_first_non_setup_teardown(group, task): for upstream_task in task.upstream_list: @@ -384,10 +378,7 @@ def recurse_for_first_non_setup_teardown(group, task): yield from recurse_for_first_non_setup_teardown(self, task) def child_id(self, label): - """ - Prefix label with group_id if prefix_group_id is True. Otherwise return the label - as-is. - """ + """Prefix label with group_id if prefix_group_id is True. Otherwise return the label as-is.""" if self.prefix_group_id: group_id = self.group_id if group_id: @@ -398,6 +389,8 @@ def child_id(self, label): @property def upstream_join_id(self) -> str: """ + # TODO: D205-ify this one + If this TaskGroup has immediate upstream TaskGroups or tasks, a proxy node called upstream_join_id will be created in Graph view to join the outgoing edges from this TaskGroup to reduce the total number of edges needed to be displayed. @@ -407,6 +400,8 @@ def upstream_join_id(self) -> str: @property def downstream_join_id(self) -> str: """ + # TODO: D205-ify this one + If this TaskGroup has immediate downstream TaskGroups or tasks, a proxy node called downstream_join_id will be created in Graph view to join the outgoing edges from this TaskGroup to reduce the total number of edges needed to be displayed. @@ -441,7 +436,8 @@ def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]: def hierarchical_alphabetical_sort(self): """ - Sorts children in hierarchical alphabetical order: + Sorts children in hierarchical alphabetical order. + - groups in alphabetical order first - tasks in alphabetical order after them. @@ -453,8 +449,7 @@ def hierarchical_alphabetical_sort(self): def topological_sort(self, _include_subdag_tasks: bool = False): """ - Sorts children in topographical order, such that a task comes after any of its - upstream dependencies. + Sorts children in topographical order, such that a task comes after any of its upstream dependencies. :return: list of tasks in topological order """ @@ -648,10 +643,7 @@ def get_current_task_group(cls, dag: DAG | None) -> TaskGroup | None: def task_group_to_dict(task_item_or_group): - """ - Create a nested dict representation of this TaskGroup and its children used to construct - the Graph. - """ + """Create a nested dict representation of this TaskGroup and its children used to construct the Graph.""" from airflow.models.abstractoperator import AbstractOperator if isinstance(task := task_item_or_group, AbstractOperator): diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 2b65055f38d5b..1db15ac612195 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -241,6 +241,7 @@ def coerce_datetime(v: dt.datetime | None, tz: dt.tzinfo | None = None) -> DateT def td_format(td_object: None | dt.timedelta | float | int) -> str | None: """ Format a timedelta object or float/int into a readable string for time duration. + For example timedelta(seconds=3752) would become `1h:2M:32s`. If the time is less than a second, the return will be `<1s`. """ diff --git a/airflow/utils/types.py b/airflow/utils/types.py index 788b072f39842..0eab9b3b8785b 100644 --- a/airflow/utils/types.py +++ b/airflow/utils/types.py @@ -68,9 +68,6 @@ def from_run_id(run_id: str) -> DagRunType: class EdgeInfoType(TypedDict): - """ - Represents extra metadata that the DAG can store about an edge, - usually generated from an EdgeModifier. - """ + """Extra metadata that the DAG can store about an edge, usually generated from an EdgeModifier.""" label: str | None From c26f6b8c77fb8aaf64c6083f52521b569fd03ae4 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Mon, 17 Jul 2023 09:54:57 -0700 Subject: [PATCH 2/3] shubham suggestions Co-author: shubham22 --- airflow/utils/log/logging_mixin.py | 2 +- airflow/utils/operator_helpers.py | 2 +- airflow/utils/task_group.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index be10ff8ec1c9d..97e1a29ff6e2c 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -180,7 +180,7 @@ def isatty(self): class RedirectStdHandler(StreamHandler): """ - # TODO D205-ify this docstring + Custom StreamHandler that uses current sys.stderr/stdout as the stream for logging. This class is like a StreamHandler using sys.stderr/stdout, but uses whatever sys.stderr/stdout is currently set to rather than the value of diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py index 8a77ca1a67498..20f272f4f36df 100644 --- a/airflow/utils/operator_helpers.py +++ b/airflow/utils/operator_helpers.py @@ -199,7 +199,7 @@ def determine_kwargs( def make_kwargs_callable(func: Callable[..., R]) -> Callable[..., R]: """ - # TODO: D205-ify this one + Creates a new callable that only forwards necessary arguments from any provided input. Make a new callable that can accept any number of positional or keyword arguments but only forwards those required by the given callable func. diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py index a9489b2957167..72a777889692a 100644 --- a/airflow/utils/task_group.py +++ b/airflow/utils/task_group.py @@ -398,7 +398,7 @@ def child_id(self, label): @property def upstream_join_id(self) -> str: """ - # TODO: D205-ify this one + Creates a unique ID for upstream dependencies of this TaskGroup. If this TaskGroup has immediate upstream TaskGroups or tasks, a proxy node called upstream_join_id will be created in Graph view to join the outgoing edges from this @@ -409,7 +409,7 @@ def upstream_join_id(self) -> str: @property def downstream_join_id(self) -> str: """ - # TODO: D205-ify this one + Creates a unique ID for downstream dependencies of this TaskGroup. If this TaskGroup has immediate downstream TaskGroups or tasks, a proxy node called downstream_join_id will be created in Graph view to join the outgoing edges from this From 147d248e6798c24da16178cf5244c65f75120246 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Mon, 17 Jul 2023 16:01:39 -0700 Subject: [PATCH 3/3] Update airflow/utils/task_group.py Removed superfluous whitespace --- airflow/utils/task_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py index 72a777889692a..3c3a01bc7de9f 100644 --- a/airflow/utils/task_group.py +++ b/airflow/utils/task_group.py @@ -358,7 +358,7 @@ def leaves(self) -> list[BaseOperator]: return list(self.get_leaves()) def get_roots(self) -> Generator[BaseOperator, None, None]: - """Return a generator of tasks with no upstream dependencies within the TaskGroup.""" + """Return a generator of tasks with no upstream dependencies within the TaskGroup.""" tasks = list(self) ids = {x.task_id for x in tasks} for task in tasks: