Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
34120fe
Refactor bundle view_url to not instaniate bundle on server components
ephraimbuddy Jul 7, 2025
c622adb
fixup! Refactor bundle view_url to not instaniate bundle on server co…
ephraimbuddy Jul 4, 2025
c927764
Rename bundle url to url_template and fix backcompat
ephraimbuddy Jul 6, 2025
bfd723a
Refactor render_url to not depend on dagbundlemodel version
ephraimbuddy Jul 7, 2025
c22d3d1
Render view_url_template in view_url
ephraimbuddy Jul 7, 2025
ad13791
fixup! Render view_url_template in view_url
ephraimbuddy Jul 7, 2025
b07a898
Add deprecation warning and update s3 bundle with view_url_template
ephraimbuddy Jul 7, 2025
21f5b77
fixup! Add deprecation warning and update s3 bundle with view_url_tem…
ephraimbuddy Jul 7, 2025
d037114
Remove deprecation warning in provider's view_url
ephraimbuddy Jul 7, 2025
d1495ac
fixup! Remove deprecation warning in provider's view_url
ephraimbuddy Jul 7, 2025
1395fe3
fixup! fixup! Remove deprecation warning in provider's view_url
ephraimbuddy Jul 7, 2025
5922d8f
Add backcompat for bundles vs airflow releases
ephraimbuddy Jul 7, 2025
83a481c
fixup! Add backcompat for bundles vs airflow releases
ephraimbuddy Jul 7, 2025
0cbddc7
rename url_template to signed_url_template. Also return None when we …
ephraimbuddy Jul 8, 2025
07bf1ab
refactor template signing
ephraimbuddy Jul 8, 2025
8158632
fixup! refactor template signing
ephraimbuddy Jul 8, 2025
cfc3c27
Fix templating and conflict
ephraimbuddy Jul 9, 2025
dec98c4
Fix backcompat & refactor template signing
ephraimbuddy Jul 9, 2025
7d48388
fixup! Fix backcompat & refactor template signing
ephraimbuddy Jul 9, 2025
8c63671
fixup! fixup! Fix backcompat & refactor template signing
ephraimbuddy Jul 9, 2025
dcbffbb
skip some test if not airflow 3.1+
ephraimbuddy Jul 9, 2025
abff6cc
fixup! skip some test if not airflow 3.1+
ephraimbuddy Jul 10, 2025
9ee12d1
fixup! fixup! skip some test if not airflow 3.1+
ephraimbuddy Jul 11, 2025
af03c99
Resolve conflict
ephraimbuddy Jul 11, 2025
f982f7f
Add version to be removed for deprecated view_url
ephraimbuddy Jul 11, 2025
258ecad
Remove template_fields and use regex to extract placeholders
ephraimbuddy Jul 25, 2025
23cdf57
Fix conflict
ephraimbuddy Jul 25, 2025
46f730e
fixup! Remove template_fields and use regex to extract placeholders
ephraimbuddy Jul 25, 2025
002dd5f
Remove added deadline in dag details
ephraimbuddy Jul 28, 2025
8f460fb
Update docs
ephraimbuddy Jul 29, 2025
8fb7b0c
Remove ; from url safety check
ephraimbuddy Jul 29, 2025
b0e8a1c
Log url sanity check errors
ephraimbuddy Jul 29, 2025
5bf2505
compile re
ephraimbuddy Jul 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions airflow-core/docs/administration-and-deployment/dag-bundles.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ For example, adding multiple dag bundles to your ``airflow.cfg`` file:
The whitespace, particularly on the last line, is important so a multi-line value works properly. More details can be found in the
the `configparser docs <https://docs.python.org/3/library/configparser.html#supported-ini-file-structure>`_.

If you want a view url different from the default provided by the dag bundle, you can change the url in the kwargs of the dag bundle configuration.
For example, if you want to use a custom URL for the git dag bundle:

.. code-block:: ini

[dag_processor]
dag_bundle_config_list = [
{
"name": "my_git_repo",
"classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
"kwargs": {
"tracking_ref": "main",
"git_conn_id": "my_git_conn",
"view_url_template": "https://my.custom.git.repo/view/{subdir}",
}
}
]

Above, the ``view_url_template`` is set to a custom URL that will be used to view the Dags in the ``my_git_repo`` bundle. The ``{subdir}`` placeholder will be replaced
with the ``subdir`` attribute of the bundle. The placeholders are attributes of the bundle. You cannot use any placeholder outside of the bundle's attributes.
When you specify a custom URL, it overrides the default URL provided by the dag bundle.

The url is verified for safety, and if it is not safe, the view url for the bundle will be set to ``None``. This is to prevent any potential security issues with unsafe URLs.

You can also override the :ref:`config:dag_processor__refresh_interval` per dag bundle by passing it in kwargs.
This controls how often the dag processor refreshes, or looks for new files, in the dag bundles.

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
d2e81695973bf8b6b30e1f4543627547330ef531e50be633cf589fbdf639b0e8
efbae2f1de68413e5a6f671a306e748581fe454b81e25eeb2927567f11ebd59c
2,766 changes: 1,387 additions & 1,379 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``f56f68b9e02f`` (head) | ``09fa89ba1710`` | ``3.1.0`` | Add callback_state to deadline. |
| ``3bda03debd04`` (head) | ``f56f68b9e02f`` | ``3.1.0`` | Add url template and template params to DagBundleModel. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``f56f68b9e02f`` | ``09fa89ba1710`` | ``3.1.0`` | Add callback_state to deadline. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``09fa89ba1710`` | ``40f7c30a228b`` | ``3.1.0`` | Add trigger_id to deadline. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from uuid import UUID

from pydantic import AliasPath, Field, computed_field
from sqlalchemy import select

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.dag_processing.bundles.manager import DagBundlesManager
Expand All @@ -41,10 +42,23 @@ class DagVersionResponse(BaseModel):
@property
def bundle_url(self) -> str | None:
if self.bundle_name:
try:
return DagBundlesManager().view_url(self.bundle_name, self.bundle_version)
except ValueError:
return None
# Get the bundle model from the database and render the URL
from airflow.models.dagbundle import DagBundleModel
from airflow.utils.session import create_session

with create_session() as session:
bundle_model = session.scalar(
select(DagBundleModel).where(DagBundleModel.name == self.bundle_name)
)

if bundle_model and hasattr(bundle_model, "signed_url_template"):
return bundle_model.render_url(self.bundle_version)
# fallback to the deprecated option if the bundle model does not have a signed_url_template
# attribute
try:
return DagBundlesManager().view_url(self.bundle_name, self.bundle_version)
except ValueError:
return None
return None


Expand Down
34 changes: 32 additions & 2 deletions airflow-core/src/airflow/dag_processing/bundles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
import shutil
import tempfile
import warnings
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass, field
Expand All @@ -35,7 +36,6 @@
from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum

from airflow.configuration import conf
from airflow.dag_processing.bundles.manager import DagBundlesManager

if TYPE_CHECKING:
from pendulum import DateTime
Expand Down Expand Up @@ -217,7 +217,10 @@ def remove_stale_bundle_versions(self):
This isn't really necessary on worker types that don't share storage
with other processes.
"""
from airflow.dag_processing.bundles.manager import DagBundlesManager

log.info("checking for stale bundle versions locally")

bundles = list(DagBundlesManager().get_all_dag_bundles())
for bundle in bundles:
if not bundle.supports_versioning:
Expand Down Expand Up @@ -256,6 +259,7 @@ def __init__(
name: str,
refresh_interval: int = conf.getint("dag_processor", "refresh_interval"),
version: str | None = None,
view_url_template: str | None = None,
) -> None:
self.name = name
self.version = version
Expand All @@ -268,6 +272,8 @@ def __init__(
self.versions_dir = get_bundle_versions_base_folder(bundle_name=self.name)
"""Where bundle versions are stored locally for this bundle."""

self._view_url_template = view_url_template

def initialize(self) -> None:
"""
Initialize the bundle.
Expand Down Expand Up @@ -316,10 +322,34 @@ def view_url(self, version: str | None = None) -> str | None:
URL to view the bundle on an external website. This is shown to users in the Airflow UI, allowing them to navigate to this url for more details about that version of the bundle.

This needs to function without `initialize` being called.

:param version: Version to view
:return: URL to view the bundle
"""
warnings.warn(
"The 'view_url' method is deprecated and will be removed in a future version. "
"Use 'view_url_template' instead.",
DeprecationWarning,
stacklevel=2,
)
return None

def view_url_template(self) -> str | None:
"""
URL template to view the bundle on an external website.

This is shown to users in the Airflow UI, allowing them to navigate to
this url for more details about that version of the bundle.

The template should use format string placeholders like {version}, {subdir}, etc.
Common placeholders:
- {version}: The version identifier
- {subdir}: The subdirectory within the bundle (if applicable)

This needs to function without `initialize` being called.

:return: URL template string or None if not applicable
"""
return self._view_url_template

@contextmanager
def lock(self):
Expand Down
127 changes: 126 additions & 1 deletion airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
# under the License.
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

from itsdangerous import URLSafeSerializer
from sqlalchemy import delete

from airflow.configuration import conf
Expand Down Expand Up @@ -81,6 +83,61 @@ def _add_example_dag_bundle(config_list):
)


def _is_safe_bundle_url(url: str) -> bool:
"""
Check if a bundle URL is safe to use.

This function validates that the URL:
- Uses HTTP or HTTPS schemes (no JavaScript, data, or other schemes)
- Is properly formatted
- Doesn't contain malicious content
"""
import logging
from urllib.parse import urlparse

logger = logging.getLogger(__name__)

if not url:
return False

try:
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"}:
logger.error(
"Bundle URL uses unsafe scheme '%s'. Only 'http' and 'https' are allowed", parsed.scheme
)
return False

if not parsed.netloc:
logger.error("Bundle URL '%s' has no network location", url)
return False

if any(ord(c) < 32 for c in url):
logger.error("Bundle URL '%s' contains control characters (ASCII < 32)", url)
return False

return True
except Exception as e:
logger.error("Failed to parse bundle URL '%s': %s", url, str(e))
return False


def _sign_bundle_url(url: str, bundle_name: str) -> str:
"""
Sign a bundle URL for integrity verification.

:param url: The URL to sign
:param bundle_name: The name of the bundle (used in the payload)
:return: The signed URL token
"""
serializer = URLSafeSerializer(conf.get_mandatory_value("core", "fernet_key"))
payload = {
"url": url,
"bundle_name": bundle_name,
}
return serializer.dumps(payload)


class DagBundlesManager(LoggingMixin):
"""Manager for DAG bundles."""

Expand Down Expand Up @@ -124,12 +181,44 @@ def parse_config(self) -> None:
@provide_session
def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
self.log.debug("Syncing DAG bundles to the database")

def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]:
bundle_instance = self.get_bundle(name)
new_template_ = bundle_instance.view_url_template()
new_params_ = self._extract_template_params(bundle_instance)
if new_template_:
if not _is_safe_bundle_url(new_template_):
self.log.warning(
"Bundle %s has unsafe URL template '%s', skipping URL update",
bundle_name,
new_template_,
)
new_template_ = None
else:
# Sign the URL for integrity verification
new_template_ = _sign_bundle_url(new_template_, bundle_name)
self.log.debug("Signed URL template for bundle %s", bundle_name)
return new_template_, new_params_

stored = {b.name: b for b in session.query(DagBundleModel).all()}

for name in self._bundle_config.keys():
if bundle := stored.pop(name, None):
bundle.active = True
new_template, new_params = _extract_and_sign_template(name)
if new_template != bundle.signed_url_template:
bundle.signed_url_template = new_template
self.log.debug("Updated URL template for bundle %s", name)
if new_params != bundle.template_params:
bundle.template_params = new_params
self.log.debug("Updated template parameters for bundle %s", name)
else:
session.add(DagBundleModel(name=name))
new_template, new_params = _extract_and_sign_template(name)
new_bundle = DagBundleModel(name=name)
new_bundle.signed_url_template = new_template
new_bundle.template_params = new_params

session.add(new_bundle)
self.log.info("Added new DAG bundle %s to the database", name)

for name, bundle in stored.items():
Expand All @@ -140,6 +229,35 @@ def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name))
self.log.info("Deleted import errors for bundle %s which is no longer configured", name)

@staticmethod
def _extract_template_params(bundle_instance: BaseDagBundle) -> dict:
"""
Extract template parameters from a bundle instance's view_url_template method.

:param bundle_instance: The bundle instance to extract parameters from
:return: Dictionary of template parameters
"""
import re

params: dict[str, str] = {}
template = bundle_instance.view_url_template()

if not template:
return params

# Extract template placeholders using regex
# This matches {placeholder} patterns in the template
PLACEHOLDER_PATTERN = re.compile(r"\{([^}]+)\}")
placeholders = PLACEHOLDER_PATTERN.findall(template)

# Extract values for each placeholder found in the template
for placeholder in placeholders:
field_value = getattr(bundle_instance, placeholder, None)
if field_value:
params[placeholder] = field_value

return params

def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
"""
Get a DAG bundle by name.
Expand All @@ -165,5 +283,12 @@ def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]:
yield class_(name=name, version=None, **kwargs)

def view_url(self, name: str, version: str | None = None) -> str | None:
warnings.warn(
"The 'view_url' method is deprecated and will be removed when providers "
"have Airflow 3.1 as the minimum supported version. "
"Use DagBundleModel.render_url() instead.",
DeprecationWarning,
stacklevel=2,
)
bundle = self.get_bundle(name, version)
return bundle.view_url(version=version)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add url template and template params to DagBundleModel.

Revision ID: 3bda03debd04
Revises: f56f68b9e02f
Create Date: 2025-07-04 10:12:12.711292

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy_utils import JSONType

# revision identifiers, used by Alembic.
revision = "3bda03debd04"
down_revision = "f56f68b9e02f"
branch_labels = None
depends_on = None
airflow_version = "3.1.0"


def upgrade():
"""Apply Add url and template params to DagBundleModel."""
with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
batch_op.add_column(sa.Column("signed_url_template", sa.String(length=200), nullable=True))
batch_op.add_column(sa.Column("template_params", JSONType(), nullable=True))


def downgrade():
"""Unapply Add url and template params to DagBundleModel."""
with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
batch_op.drop_column("template_params")
batch_op.drop_column("signed_url_template")
Loading