Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ repos:
# Since ruff makes use of multiple cores we _purposefully_ don't run this in docker so it can use the
# host CPU to it's fullest
entry: ruff --fix --no-update-check --force-exclude
additional_dependencies: ['ruff==0.0.226']
additional_dependencies: ['ruff==0.0.262']
files: \.pyi?$
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py
- repo: https://github.com/asottile/blacken-docs
Expand Down
2 changes: 1 addition & 1 deletion airflow/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources # noqa: autoflake
from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources

warnings.warn(
"This module is deprecated. Please use `kubernetes.client.models` for `V1ResourceRequirements` and `Port`.",
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
:param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan>
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transforms a dynamodb item to bytes. By default we dump the json
""" # noqa: E501
"""

template_fields: Sequence[str] = (
*AwsToAwsBaseOperator.template_fields,
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/triggers/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
hook = RedshiftAsyncHook(aws_conn_id=self.aws_conn_id)
while self.attempts >= 1:
self.attempts = self.attempts - 1
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/kafka/hooks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _get_client(self, config):

@cached_property
def get_conn(self) -> Any:
"""get the configuration object"""
"""Get the configuration object"""
config = self.get_connection(self.kafka_config_id).extra_dejson

if not (config.get("bootstrap.servers", None)):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/kafka/hooks/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def create_topic(
self,
topics: Sequence[Sequence[Any]],
) -> None:
"""creates a topic
"""Creates a topic

:param topics: a list of topics to create including the number of partitions for the topic
and the replication factor. Format: [ ("topic_name", number of partitions, replication factor)]
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/livy/triggers/livy.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Checks if the _polling_interval > 0, in that case it pools Livy for
batch termination asynchronously.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/triggers/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current pod status and yields a TriggerEvent"""
hook = self._get_async_hook()
self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/dbt/cloud/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def execute(self, context: Context):
)
return self.run_id

def execute_complete(self, context: "Context", event: dict[str, Any]) -> int:
def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/dbt/cloud/triggers/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Dbt, polls for the pipeline run status"""
hook = DbtCloudHook(self.conn_id)
try:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def delete_table(self, instance_id: str, table_id: str, project_id: str) -> None
"""
instance = self.get_instance(instance_id=instance_id, project_id=project_id)
if instance is None:
raise RuntimeError("Instance %s did not exist; unable to delete table %s" % instance_id, table_id)
raise RuntimeError(f"Instance {instance_id} did not exist; unable to delete table {table_id}")
table = instance.table(table_id=table_id)
table.delete()

Expand Down
14 changes: 7 additions & 7 deletions airflow/providers/google/cloud/triggers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -122,7 +122,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -181,7 +181,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent with response data"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -286,7 +286,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -414,7 +414,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -487,7 +487,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
def _get_async_hook(self) -> BigQueryTableAsyncHook:
return BigQueryTableAsyncHook(gcp_conn_id=self.gcp_conn_id)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Will run until the table exists in the Google Big Query."""
while True:
try:
Expand Down Expand Up @@ -562,7 +562,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Will run until the table exists in the Google Big Query."""
hook = BigQueryAsyncHook(gcp_conn_id=self.gcp_conn_id)
job_id = None
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current build execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current pipeline status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/triggers/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
while True:
cluster = await self.get_async_hook().get_cluster(
project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
Expand Down Expand Up @@ -261,7 +261,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Wait until cluster is deleted completely"""
while self.end_time > time.time():
try:
Expand Down Expand Up @@ -309,7 +309,7 @@ def serialize(self):
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
hook = self.get_async_hook()
while True:
try:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/triggers/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
"""loop until the relevant file/folder is found."""
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Loop until the relevant file/folder is found."""
try:
hook = self._get_async_hook()
while True:
Expand Down Expand Up @@ -144,7 +144,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Loop until the object updated time is greater than target datetime"""
try:
hook = self._get_async_hook()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets operation status and yields corresponding event."""
hook = self._get_hook()
while True:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/mlengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent"""
hook = self._get_async_hook()
while True:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/http/hooks/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ async def run(
data: dict[str, Any] | str | None = None,
headers: dict[str, Any] | None = None,
extra_options: dict[str, Any] | None = None,
) -> "ClientResponse":
) -> ClientResponse:
r"""
Performs an asynchronous HTTP request call

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/triggers/data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline run status"""
hook = AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
try:
Expand Down Expand Up @@ -140,7 +140,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to Azure Data Factory, polls for the pipeline run status"""
hook = AzureDataFactoryAsyncHook(azure_data_factory_conn_id=self.azure_data_factory_conn_id)
try:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/triggers/wasb.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Makes async connection to Azure WASB and polls for existence of the given blob name."""
blob_exists = False
hook = WasbAsyncHook(wasb_conn_id=self.wasb_conn_id, public_read=self.public_read)
Expand Down Expand Up @@ -138,7 +138,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Makes async connection to Azure WASB and polls for existence of a blob with given prefix."""
prefix_exists = False
hook = WasbAsyncHook(wasb_conn_id=self.wasb_conn_id, public_read=self.public_read)
Expand Down
2 changes: 1 addition & 1 deletion airflow/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def wrapper(
if stat is not None and tags is not None:
for k, v in tags.items():
if self.metric_tags_validator.test(k):
if all((c not in [",", "="] for c in v + k)):
if all(c not in [",", "="] for c in v + k):
stat += f",{k}={v}"
else:
log.error("Dropping invalid tag: %s=%s.", k, v)
Expand Down
2 changes: 1 addition & 1 deletion airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
raise NotImplementedError("Triggers must implement serialize()")

@abc.abstractmethod
async def run(self) -> AsyncIterator["TriggerEvent"]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Runs the trigger in an asynchronous context.

Expand Down
4 changes: 2 additions & 2 deletions airflow/triggers/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""
Checks periodically in the database to see if the task exists, and has
hit one of the states yet, or not.
Expand Down Expand Up @@ -136,7 +136,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""
Checks periodically in the database to see if the dag run exists, and has
hit one of the states yet, or not.
Expand Down
2 changes: 1 addition & 1 deletion airflow/triggers/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
)

async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""Loop until the relevant files are found."""
while True:
for path in glob(self.filepath, recursive=self.recursive):
Expand Down
2 changes: 1 addition & 1 deletion docs/exts/exampleinclude.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from sphinx.util.nodes import set_source_info

try:
import sphinx_airflow_theme # noqa: autoflake
import sphinx_airflow_theme

airflow_theme_is_available = True
except ImportError:
Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/pre_commit/pre_commit_insert_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
sys.path.insert(0, str(AIRFLOW_SOURCES_DIR)) # make sure setup is imported from Airflow
# flake8: noqa: F401

from common_precommit_utils import insert_documentation # isort: skip # noqa E402
from setup import EXTRAS_DEPENDENCIES # isort:skip # noqa
from common_precommit_utils import insert_documentation # isort: skip
from setup import EXTRAS_DEPENDENCIES # isort:skip

sys.path.append(str(AIRFLOW_SOURCES_DIR))

Expand Down
6 changes: 3 additions & 3 deletions scripts/ci/pre_commit/pre_commit_local_yml_mounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@

sys.path.insert(0, str(Path(__file__).parent.resolve())) # make sure common_precommit_utils is imported

from common_precommit_utils import AIRFLOW_SOURCES_ROOT_PATH # isort: skip # noqa E402
from common_precommit_utils import AIRFLOW_SOURCES_ROOT_PATH # isort: skip

sys.path.insert(0, str(AIRFLOW_SOURCES_ROOT_PATH)) # make sure setup is imported from Airflow
sys.path.insert(
0, str(AIRFLOW_SOURCES_ROOT_PATH / "dev" / "breeze" / "src")
) # make sure setup is imported from Airflow
# flake8: noqa: F401
from airflow_breeze.utils.docker_command_utils import VOLUMES_FOR_SELECTED_MOUNTS # isort: skip # noqa E402
from airflow_breeze.utils.docker_command_utils import VOLUMES_FOR_SELECTED_MOUNTS # isort: skip

from common_precommit_utils import insert_documentation # isort: skip # noqa E402
from common_precommit_utils import insert_documentation # isort: skip

sys.path.append(str(AIRFLOW_SOURCES_ROOT_PATH))

Expand Down
2 changes: 1 addition & 1 deletion tests/cli/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from tests.test_utils.config import conf_vars

# Create custom executors here because conftest is imported first
custom_executor_module = type(sys)("custom_executor") # noqa
custom_executor_module = type(sys)("custom_executor")
custom_executor_module.CustomCeleryExecutor = type( # type: ignore
"CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.cluster
* CBT_TABLE_ID - desired ID of the Table
* CBT_POKE_INTERVAL - number of seconds between every attempt of Sensor check
""" # noqa: E501
"""
from __future__ import annotations

import os
Expand Down