Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ Airflow supports multiple types of dag Bundles, each catering to specific use ca
**airflow.providers.git.bundles.git.GitDagBundle**
These bundles integrate with Git repositories, allowing Airflow to fetch dags directly from a repository.

**airflow.providers.amazon.aws.bundles.s3.S3DagBundle**
These bundles reference an S3 bucket containing DAG files. They do not support versioning of the bundle, meaning tasks always run using the latest code.

Configuring dag bundles
-----------------------

Expand All @@ -65,7 +68,7 @@ For example, adding multiple dag bundles to your ``airflow.cfg`` file:
dag_bundle_config_list = [
{
"name": "my_git_repo",
"classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
"classpath": "airflow.providers.git.bundles.git.GitDagBundle",
"kwargs": {"tracking_ref": "main", "git_conn_id": "my_git_conn"}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Some configuration options require that the logging config class be overwritten.
configuration of Airflow and modifying it to suit your needs.

The default configuration can be seen in the
`airflow_local_settings.py template <https://github.com/apache/airflow/blob/|airflow-version|/airflow/config_templates/airflow_local_settings.py>`_
`airflow_local_settings.py template <https://github.com/apache/airflow/blob/|airflow-version|/airflow-core/src/airflow/config_templates/airflow_local_settings.py>`_
and you can see the loggers and handlers used there.

See :ref:`Configuring local settings <set-config:configuring-local-settings>` for details on how to
Expand Down
18 changes: 17 additions & 1 deletion airflow-core/docs/installation/upgrading_to_airflow3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,25 @@ Some changes can be automatically fixed. To do so, run the following command:
ruff check dag/ --select AIR301 --fix --preview


Some of the fixes are marked as unsafe. Unsafe fixes usually do not break dag code. They're marked as unsafe as they may change some runtime behavior. For more information, see `Fix Safety <https://docs.astral.sh/ruff/linter/#fix-safety>`_.
To trigger these fixes, run the following command:

.. code-block:: bash

ruff check dags/ --select AIR301 --fix --unsafe-fixes --preview

.. note::
Ruff has strict policy about when a rule becomes stable. Till it does you must use --preview flag.
The progress of Airflow Ruff rule become stable can be tracked in https://github.com/astral-sh/ruff/issues/17749
That said, from Airflow side the rules are perfectly fine to be used.

.. note::

In AIR rules, unsafe fixes involve changing import paths while keeping the name of the imported member the same. For instance, changing the import from ``from airflow.sensors.base_sensor_operator import BaseSensorOperator`` to ``from airflow.sdk.bases.sensor import BaseSensorOperator`` requires ruff to remove the original import before adding the new one. In contrast, safe fixes include changes to both the member name and the import path, such as changing ``from airflow.datasets import Dataset`` to `from airflow.sdk import Asset``. These adjustments do not require ruff to remove the old import. To remove unused legacy imports, it is necessary to enable the `unused-import` rule (F401) <https://docs.astral.sh/ruff/rules/unused-import/#unused-import-f401>.

You can also configure these flags through configuration files. See `Configuring Ruff <https://docs.astral.sh/ruff/configuration/>`_ for details.

Step 4: Install the Standard Providers
Step 4: Install the Standard Provider
--------------------------------------

- Some of the commonly used Operators which were bundled as part of the ``airflow-core`` package (for example ``BashOperator`` and ``PythonOperator``)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
from airflow.api_fastapi.execution_api.datamodels.asset import AssetProfile
from airflow.api_fastapi.execution_api.datamodels.connection import ConnectionResponse
from airflow.api_fastapi.execution_api.datamodels.variable import VariableResponse
from airflow.utils.state import IntermediateTIState, TaskInstanceState as TIState, TerminalTIState
from airflow.utils.state import (
DagRunState,
IntermediateTIState,
TaskInstanceState as TIState,
TerminalTIState,
)
from airflow.utils.types import DagRunType

AwareDatetimeAdapter = TypeAdapter(AwareDatetime)
Expand Down Expand Up @@ -292,6 +297,7 @@ class DagRun(StrictBaseModel):
end_date: UtcDateTime | None
clear_number: int = 0
run_type: DagRunType
state: DagRunState
conf: Annotated[dict[str, Any], Field(default_factory=dict)]
consumed_asset_events: list[AssetEventDagRunReference]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
from airflow.exceptions import DagRunAlreadyExists
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dagrun import DagRun as DagRunModel
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType

router = APIRouter()
Expand Down Expand Up @@ -140,7 +142,9 @@ def get_dagrun_state(
session: SessionDep,
) -> DagRunStateResponse:
"""Get a DAG Run State."""
dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id))
dag_run = session.scalar(
select(DagRunModel).where(DagRunModel.dag_id == dag_id, DagRunModel.run_id == run_id)
)
if dag_run is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
Expand All @@ -162,16 +166,45 @@ def get_dr_count(
states: Annotated[list[str] | None, Query()] = None,
) -> int:
"""Get the count of DAG runs matching the given criteria."""
query = select(func.count()).select_from(DagRun).where(DagRun.dag_id == dag_id)
query = select(func.count()).select_from(DagRunModel).where(DagRunModel.dag_id == dag_id)

if logical_dates:
query = query.where(DagRun.logical_date.in_(logical_dates))
query = query.where(DagRunModel.logical_date.in_(logical_dates))

if run_ids:
query = query.where(DagRun.run_id.in_(run_ids))
query = query.where(DagRunModel.run_id.in_(run_ids))

if states:
query = query.where(DagRun.state.in_(states))
query = query.where(DagRunModel.state.in_(states))

count = session.scalar(query)
return count or 0


@router.get("/{dag_id}/previous", status_code=status.HTTP_200_OK)
def get_previous_dagrun(
dag_id: str,
logical_date: UtcDateTime,
session: SessionDep,
state: Annotated[DagRunState | None, Query()] = None,
) -> DagRun | None:
"""Get the previous DAG run before the given logical date, optionally filtered by state."""
query = (
select(DagRunModel)
.where(
DagRunModel.dag_id == dag_id,
DagRunModel.logical_date < logical_date,
)
.order_by(DagRunModel.logical_date.desc())
.limit(1)
)

if state:
query = query.where(DagRunModel.state == state)

dag_run = session.scalar(query)

if not dag_run:
return None

return DagRun.model_validate(dag_run)
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class GetXComSliceFilterParams(BaseModel):
start: int | None = None
stop: int | None = None
step: int | None = None
include_prior_dates: bool = False


@router.get(
Expand All @@ -249,6 +250,7 @@ def get_mapped_xcom_by_slice(
key=key,
task_ids=task_id,
dag_ids=dag_id,
include_prior_dates=params.include_prior_dates,
session=session,
)
query = query.order_by(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,18 @@

from airflow.api_fastapi.execution_api.versions.v2025_04_28 import AddRenderedMapIndexField
from airflow.api_fastapi.execution_api.versions.v2025_05_20 import DowngradeUpstreamMapIndexes
from airflow.api_fastapi.execution_api.versions.v2025_08_10 import (
AddDagRunStateFieldAndPreviousEndpoint,
AddIncludePriorDatesToGetXComSlice,
)

bundle = VersionBundle(
HeadVersion(),
Version(
"2025-08-10",
AddDagRunStateFieldAndPreviousEndpoint,
AddIncludePriorDatesToGetXComSlice,
),
Version("2025-05-20", DowngradeUpstreamMapIndexes),
Version("2025-04-28", AddRenderedMapIndexField),
Version("2025-04-11"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, endpoint, schema

from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext
from airflow.api_fastapi.execution_api.routes.xcoms import GetXComSliceFilterParams


class AddDagRunStateFieldAndPreviousEndpoint(VersionChange):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt add changes for #50825, it is a dependent change but for 3.1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm clear, you didn't add the rest of this PR, but you have included the Cadwyn migration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that PR is entirely excluded. The cadywn migration if from #53655

"""Add the `state` field to DagRun model and `/dag-runs/{dag_id}/previous` endpoint."""

description = __doc__

instructions_to_migrate_to_previous_version = (
schema(DagRun).field("state").didnt_exist,
endpoint("/dag-runs/{dag_id}/previous", ["GET"]).didnt_exist,
)

@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
def remove_state_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc]
"""Remove the `state` field from the dag_run object when converting to the previous version."""
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
response.body["dag_run"].pop("state", None)


class AddIncludePriorDatesToGetXComSlice(VersionChange):
"""Add the `include_prior_dates` field to GetXComSliceFilterParams."""

description = __doc__

instructions_to_migrate_to_previous_version = (
schema(GetXComSliceFilterParams).field("include_prior_dates").didnt_exist,
)
20 changes: 20 additions & 0 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
CommsDecoder,
ConnectionResult,
DagRunStateResult,
DeleteVariable,
DeleteXCom,
DRCount,
ErrorResponse,
GetConnection,
Expand All @@ -56,6 +58,9 @@
GetTICount,
GetVariable,
GetXCom,
OKResponse,
PutVariable,
SetXCom,
TaskStatesResult,
TICount,
VariableResult,
Expand Down Expand Up @@ -221,6 +226,7 @@ class TriggerStateSync(BaseModel):
TICount,
TaskStatesResult,
ErrorResponse,
OKResponse,
],
Field(discriminator="type"),
]
Expand All @@ -234,8 +240,12 @@ class TriggerStateSync(BaseModel):
Union[
messages.TriggerStateChanges,
GetConnection,
DeleteVariable,
GetVariable,
PutVariable,
DeleteXCom,
GetXCom,
SetXCom,
GetTICount,
GetTaskStates,
GetDagRunState,
Expand Down Expand Up @@ -400,6 +410,8 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r
dump_opts = {"exclude_unset": True, "by_alias": True}
else:
resp = conn
elif isinstance(msg, DeleteVariable):
resp = self.client.variables.delete(msg.key)
elif isinstance(msg, GetVariable):
var = self.client.variables.get(msg.key)
if isinstance(var, VariableResponse):
Expand All @@ -408,6 +420,10 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r
dump_opts = {"exclude_unset": True}
else:
resp = var
elif isinstance(msg, PutVariable):
self.client.variables.set(msg.key, msg.value, msg.description)
elif isinstance(msg, DeleteXCom):
self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index)
elif isinstance(msg, GetXCom):
xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index)
if isinstance(xcom, XComResponse):
Expand All @@ -416,6 +432,10 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r
dump_opts = {"exclude_unset": True}
else:
resp = xcom
elif isinstance(msg, SetXCom):
self.client.xcoms.set(
msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.value, msg.map_index, msg.mapped_length
)
elif isinstance(msg, GetDRCount):
dr_count = self.client.dag_runs.get_count(
dag_id=msg.dag_id,
Expand Down
21 changes: 21 additions & 0 deletions airflow-core/src/airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from __future__ import annotations

from airflow.utils.deprecation_tools import add_deprecated_classes

# Do not add new models to this -- this is for compat only
__all__ = [
"DAG",
Expand Down Expand Up @@ -141,3 +143,22 @@ def __getattr__(name):
from airflow.sdk.bases.xcom import BaseXCom
from airflow.sdk.definitions.param import Param
from airflow.sdk.execution_time.xcom import XCom

__deprecated_classes = {
"abstractoperator": {
"AbstractOperator": "airflow.sdk.definitions._internal.abstractoperator.AbstractOperator",
"NotMapped": "airflow.sdk.definitions._internal.abstractoperator.NotMapped",
"TaskStateChangeCallback": "airflow.sdk.definitions._internal.abstractoperator.TaskStateChangeCallback",
"DEFAULT_OWNER": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_OWNER",
"DEFAULT_QUEUE": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_QUEUE",
"DEFAULT_TASK_EXECUTION_TIMEOUT": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_TASK_EXECUTION_TIMEOUT",
},
"param": {
"Param": "airflow.sdk.definitions.param.Param",
"ParamsDict": "airflow.sdk.definitions.param.ParamsDict",
},
"baseoperatorlink": {
"BaseOperatorLink": "airflow.sdk.bases.operatorlink.BaseOperatorLink",
},
}
add_deprecated_classes(__deprecated_classes, __name__)
34 changes: 0 additions & 34 deletions airflow-core/src/airflow/models/abstractoperator.py

This file was deleted.

4 changes: 2 additions & 2 deletions airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,13 +867,13 @@ def _read_from_local(

def _read_from_logs_server(
self,
ti: TaskInstance,
ti: TaskInstance | TaskInstanceHistory,
worker_log_rel_path: str,
) -> LogResponse:
sources: LogSourceInfo = []
log_streams: list[RawLogStream] = []
try:
log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER
log_type = LogType.TRIGGER if getattr(ti, "triggerer_job", False) else LogType.WORKER
url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type)
response = _fetch_logs_from_service(url, rel_path)
if response.status_code == 403:
Expand Down
Loading
Loading