Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ def _check_cli_args(args):
def action_cli(func=None, check_db=True):
def action_logging(f: T) -> T:
"""
Decorates function to execute function at the same time submitting action_logging
but in CLI context. It will call action logger callbacks twice,
one for pre-execution and the other one for post-execution.
Decorates function to execute function at the same time submitting action_logging but in CLI context.

It will call action logger callbacks twice, one for
pre-execution and the other one for post-execution.

Action logger will be called with below keyword parameters:
sub_command : name of sub-command
Expand All @@ -84,8 +85,7 @@ def action_logging(f: T) -> T:
@functools.wraps(f)
def wrapper(*args, **kwargs):
"""
An wrapper for cli functions. It assumes to have Namespace instance
at 1st positional argument.
A wrapper for cli functions; assumes Namespace instance as first positional argument.

:param args: Positional argument. It assumes to have Namespace instance
at 1st positional argument
Expand Down Expand Up @@ -126,7 +126,8 @@ def wrapper(*args, **kwargs):

def _build_metrics(func_name, namespace):
"""
Builds metrics dict from function args
Builds metrics dict from function args.

It assumes that function arguments is from airflow.bin.cli module's function
and has Namespace instance where it optionally contains "dag_id", "task_id",
and "execution_date".
Expand Down Expand Up @@ -359,10 +360,7 @@ def should_ignore_depends_on_past(args) -> bool:


def suppress_logs_and_warning(f: T) -> T:
"""
Decorator to suppress logging and warning messages
in cli functions.
"""
"""Decorator to suppress logging and warning messages in cli functions."""

@functools.wraps(f)
def _wrapper(*args, **kwargs):
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/cli_action_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
# specific language governing permissions and limitations
# under the License.
"""
An Action Logger module. Singleton pattern has been applied into this module
so that registered callbacks can be used all through the same python process.
An Action Logger module.

Singleton pattern has been applied into this module so that registered
callbacks can be used all through the same python process.
"""
from __future__ import annotations

Expand Down
3 changes: 1 addition & 2 deletions airflow/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
"""
Returns a path for a temporary file including a full copy of the configuration
settings.
Returns a path for a temporary file including a full copy of the configuration settings.

:param include_env: Should the value of configuration from ``AIRFLOW__``
environment variables be included or not
Expand Down
5 changes: 3 additions & 2 deletions airflow/utils/dag_cycle_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
def test_cycle(dag: DAG) -> None:
"""
A wrapper function of `check_cycle` for backward compatibility purpose.
New code should use `check_cycle` instead since this function name `test_cycle` starts with 'test_' and
will be considered as a unit test by pytest, resulting in failure.

New code should use `check_cycle` instead since this function name `test_cycle` starts
with 'test_' and will be considered as a unit test by pytest, resulting in failure.
"""
from warnings import warn

Expand Down
24 changes: 12 additions & 12 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,10 +1006,10 @@ def check_username_duplicates(session: Session) -> Iterable[str]:

def reflect_tables(tables: list[Base | str] | None, session):
"""
When running checks prior to upgrades, we use reflection to determine current state of the
database.
This function gets the current state of each table in the set of models provided and returns
a SqlAlchemy metadata object containing them.
When running checks prior to upgrades, we use reflection to determine current state of the database.

This function gets the current state of each table in the set of models
provided and returns a SqlAlchemy metadata object containing them.
"""
import sqlalchemy.schema

Expand Down Expand Up @@ -1180,6 +1180,7 @@ def _create_table_as(
):
"""
Create a new table with rows from query.

We have to handle CTAS differently for different dialects.
"""
from sqlalchemy import column, select, table
Expand Down Expand Up @@ -1256,10 +1257,7 @@ def _move_dangling_data_to_new_table(


def _dangling_against_dag_run(session, source_table, dag_run):
"""
Given a source table, we generate a subquery that will return 1 for every row that
has a dagrun.
"""
"""Given a source table, we generate a subquery that will return 1 for every row that has a dagrun."""
source_to_dag_run_join_cond = and_(
source_table.c.dag_id == dag_run.c.dag_id,
source_table.c.execution_date == dag_run.c.execution_date,
Expand All @@ -1274,8 +1272,7 @@ def _dangling_against_dag_run(session, source_table, dag_run):

def _dangling_against_task_instance(session, source_table, dag_run, task_instance):
"""
Given a source table, we generate a subquery that will return 1 for every row that
has a valid task instance (and associated dagrun).
Given a source table, generate a subquery that will return 1 for every row that has a valid task instance.

This is used to identify rows that need to be removed from tables prior to adding a TI fk.

Expand Down Expand Up @@ -1367,8 +1364,8 @@ def _move_duplicate_data_to_new_table(

def check_bad_references(session: Session) -> Iterable[str]:
"""
Starting in Airflow 2.2, we began a process of replacing `execution_date` with `run_id`
in many tables.
Starting in Airflow 2.2, we began a process of replacing `execution_date` with `run_id` in many tables.

Here we go through each table and look for records that can't be mapped to a dag run.
When we find such "dangling" rows we back them up in a special table and delete them
from the main table.
Expand All @@ -1383,6 +1380,8 @@ def check_bad_references(session: Session) -> Iterable[str]:
@dataclass
class BadReferenceConfig:
"""
Bad reference config class.

:param bad_rows_func: function that returns subquery which determines whether bad rows exist
:param join_tables: table objects referenced in subquery
:param ref_table: information-only identifier for categorizing the missing ref
Expand Down Expand Up @@ -1552,6 +1551,7 @@ def upgradedb(
session: Session = NEW_SESSION,
):
"""
Upgrades the DB.

:param to_revision: Optional Alembic revision ID to upgrade *to*.
If omitted, upgrades to latest revision.
Expand Down
5 changes: 4 additions & 1 deletion airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.
"""
This module took inspiration from the community maintenance dag
This module took inspiration from the community maintenance dag.

See:
(https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/4e5c7682a808082561d60cbc9cafaa477b0d8c65/db-cleanup/airflow-db-cleanup.py).
"""
from __future__ import annotations
Expand Down Expand Up @@ -341,6 +343,7 @@ def _print_config(*, configs: dict[str, _TableConfig]):
def _suppress_with_logging(table, session):
"""
Suppresses errors but logs them.

Also stores the exception instance so it can be referred to after exiting context.
"""
try:
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/dot_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
def _refine_color(color: str):
"""
Converts color in #RGB (12 bits) format to #RRGGBB (32 bits), if it possible.

Otherwise, it returns the original value. Graphviz does not support colors in #RGB format.

:param color: Text representation of color
Expand Down
21 changes: 9 additions & 12 deletions airflow/utils/edgemodifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

class EdgeModifier(DependencyMixin):
"""
Class that represents edge information to be added between two
tasks/operators. Has shorthand factory functions, like Label("hooray").
Class that represents edge information to be added between two tasks/operators.

Has shorthand factory functions, like Label("hooray").

Current implementation supports
t1 >> Label("Success route") >> t2
Expand Down Expand Up @@ -77,8 +78,9 @@ def _save_nodes(

def _convert_streams_to_task_groups(self):
"""
Both self._upstream and self._downstream are required to determine if
we should convert a node to a TaskGroup or leave it as a DAGNode.
Convert a node to a TaskGroup or leave it as a DAGNode.

Requires both self._upstream and self._downstream.

To do this, we keep a set of group_ids seen among the streams. If we find that
the nodes are from the same TaskGroup, we will leave them as DAGNodes and not
Expand Down Expand Up @@ -121,8 +123,7 @@ def set_upstream(
edge_modifier: EdgeModifier | None = None,
):
"""
Sets the given task/list onto the upstream attribute, and then checks if
we have both sides so we can resolve the relationship.
Set the given task/list onto the upstream attribute, then attempt to resolve the relationship.

Providing this also provides << via DependencyMixin.
"""
Expand All @@ -139,8 +140,7 @@ def set_downstream(
edge_modifier: EdgeModifier | None = None,
):
"""
Sets the given task/list onto the downstream attribute, and then checks if
we have both sides so we can resolve the relationship.
Set the given task/list onto the downstream attribute, then attempt to resolve the relationship.

Providing this also provides >> via DependencyMixin.
"""
Expand All @@ -154,10 +154,7 @@ def set_downstream(
def update_relative(
self, other: DependencyMixin, upstream: bool = True, edge_modifier: EdgeModifier | None = None
) -> None:
"""
Called if we're not the "main" side of a relationship; we still run the
same logic, though.
"""
"""Called if we're not the "main" side of a relationship; we still run the same logic, though."""
if upstream:
self.set_upstream(other)
else:
Expand Down
7 changes: 4 additions & 3 deletions airflow/utils/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,10 @@ def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) ->

def _get_email_list_from_str(addresses: str) -> list[str]:
"""
Extract a list of email addresses from a string. The string
can contain multiple email addresses separated by
any of the following delimiters: ',' or ';'.
Extract a list of email addresses from a string.

The string can contain multiple email addresses separated
by any of the following delimiters: ',' or ';'.

:param addresses: A string containing one or more email addresses.
:return: A list of email addresses.
Expand Down
22 changes: 12 additions & 10 deletions airflow/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ class _IgnoreRule(Protocol):
@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
"""
Build an ignore rule from the supplied pattern where base_dir
and definition_file should be absolute paths.
Build an ignore rule from the supplied pattern.

``base_dir`` and ``definition_file`` should be absolute paths.
"""

@staticmethod
Expand Down Expand Up @@ -134,8 +135,9 @@ def TemporaryDirectory(*args, **kwargs):

def mkdirs(path, mode):
"""
Creates the directory specified by path, creating intermediate directories
as necessary. If directory already exists, this is a no-op.
Creates the directory specified by path, creating intermediate directories as necessary.

If directory already exists, this is a no-op.

:param path: The directory to create
:param mode: The mode to give to the directory e.g. 0o755, ignores umask
Expand Down Expand Up @@ -164,10 +166,7 @@ def correct_maybe_zipped(fileloc: str | Path) -> str | Path:


def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
"""
If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive and path to zip is returned.
"""
"""If the path contains a folder with a .zip suffix, treat it as a zip archive and return path."""
if not fileloc:
return fileloc
search_ = ZIP_REGEX.search(str(fileloc))
Expand All @@ -182,8 +181,10 @@ def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:

def open_maybe_zipped(fileloc, mode="r"):
"""
Opens the given file. If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive, opening the file inside the archive.
Opens the given file.

If the path contains a folder with a .zip suffix, then the folder
is treated as a zip archive, opening the file inside the archive.

:return: a file object, as in `open`, or as in `ZipFile.open`.
"""
Expand Down Expand Up @@ -332,6 +333,7 @@ def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> l
def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool:
"""
Check whether a Python file contains Airflow DAGs.

When safe_mode is off (with False value), this function always returns True.

If might_contain_dag_callable isn't specified, it uses airflow default heuristic
Expand Down
6 changes: 2 additions & 4 deletions airflow/utils/hashlib_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@

def md5(__string: ReadableBuffer = b"") -> hashlib._Hash:
"""
Safely allows calling the hashlib.md5 function with the "usedforsecurity" disabled
when specified in the configuration.
Safely allows calling the hashlib.md5 function when "usedforsecurity" is disabled in the configuration.

:param string: The data to hash.
Default to empty str byte.
:param string: The data to hash. Default to empty str byte.
:return: The hashed value.
"""
if PY39:
Expand Down
16 changes: 4 additions & 12 deletions airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ def is_container(obj: Any) -> bool:


def as_tuple(obj: Any) -> tuple:
"""
If obj is a container, returns obj as a tuple.
Otherwise, returns a tuple containing obj.
"""
"""Return obj as a tuple if obj is a container, otherwise return a tuple containing obj."""
if is_container(obj):
return tuple(obj)
else:
Expand All @@ -139,10 +136,7 @@ def chunks(items: list[T], chunk_size: int) -> Generator[list[T], None, None]:


def reduce_in_chunks(fn: Callable[[S, list[T]], S], iterable: list[T], initializer: S, chunk_size: int = 0):
"""
Reduce the given list of items by splitting it into chunks
of the given size and passing each chunk through the reducer.
"""
"""Split the list of items into chunks of a given size and pass each chunk through the reducer."""
if len(iterable) == 0:
return initializer
if chunk_size == 0:
Expand Down Expand Up @@ -172,8 +166,7 @@ def parse_template_string(template_string: str) -> tuple[str | None, jinja2.Temp

def render_log_filename(ti: TaskInstance, try_number, filename_template) -> str:
"""
Given task instance, try_number, filename_template, return the rendered log
filename.
Given task instance, try_number, filename_template, return the rendered log filename.

:param ti: task instance
:param try_number: try_number of the task
Expand Down Expand Up @@ -327,8 +320,7 @@ def is_set(val):

def prune_dict(val: Any, mode="strict"):
"""
Given dict ``val``, returns new dict based on ``val`` with all
empty elements removed.
Given dict ``val``, returns new dict based on ``val`` with all empty elements removed.

What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
Expand Down
6 changes: 1 addition & 5 deletions airflow/utils/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,7 @@ def encode(self, o: Any) -> str:


class XComDecoder(json.JSONDecoder):
"""
This decoder deserializes dicts to objects if they contain
the `__classname__` key otherwise it will return the dict
as is.
"""
"""Deserialize dicts to objects if they contain the `__classname__` key, otherwise return the dict."""

def __init__(self, *args, **kwargs) -> None:
if not kwargs.get("object_hook"):
Expand Down
7 changes: 4 additions & 3 deletions airflow/utils/log/colored_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@

class CustomTTYColoredFormatter(TTYColoredFormatter, TimezoneAware):
"""
Custom log formatter which extends `colored.TTYColoredFormatter`
by adding attributes to message arguments and coloring error
traceback.
Custom log formatter.

Extends `colored.TTYColoredFormatter` by adding attributes
to message arguments and coloring error traceback.
"""

def __init__(self, *args, **kwargs):
Expand Down
Loading