diff --git a/airflow-core/docs/administration-and-deployment/dag-bundles.rst b/airflow-core/docs/administration-and-deployment/dag-bundles.rst index 48401b0c1439c..53bd5e16afbdf 100644 --- a/airflow-core/docs/administration-and-deployment/dag-bundles.rst +++ b/airflow-core/docs/administration-and-deployment/dag-bundles.rst @@ -83,6 +83,30 @@ For example, adding multiple dag bundles to your ``airflow.cfg`` file: The whitespace, particularly on the last line, is important so a multi-line value works properly. More details can be found in the the `configparser docs `_. +If you want a view url different from the default provided by the dag bundle, you can change the url in the kwargs of the dag bundle configuration. +For example, if you want to use a custom URL for the git dag bundle: + +.. code-block:: ini + + [dag_processor] + dag_bundle_config_list = [ + { + "name": "my_git_repo", + "classpath": "airflow.dag_processing.bundles.git.GitDagBundle", + "kwargs": { + "tracking_ref": "main", + "git_conn_id": "my_git_conn", + "view_url_template": "https://my.custom.git.repo/view/{subdir}", + } + } + ] + +Above, the ``view_url_template`` is set to a custom URL that will be used to view the Dags in the ``my_git_repo`` bundle. The ``{subdir}`` placeholder will be replaced +with the ``subdir`` attribute of the bundle. The placeholders are attributes of the bundle. You cannot use any placeholder outside of the bundle's attributes. +When you specify a custom URL, it overrides the default URL provided by the dag bundle. + +The url is verified for safety, and if it is not safe, the view url for the bundle will be set to ``None``. This is to prevent any potential security issues with unsafe URLs. + You can also override the :ref:`config:dag_processor__refresh_interval` per dag bundle by passing it in kwargs. This controls how often the dag processor refreshes, or looks for new files, in the dag bundles. diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index bb0a75b55cd66..07e93bb4ceafa 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -d2e81695973bf8b6b30e1f4543627547330ef531e50be633cf589fbdf639b0e8 \ No newline at end of file +efbae2f1de68413e5a6f671a306e748581fe454b81e25eeb2927567f11ebd59c \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index d2ce6af2ab97e..537b1e5f6e3fd 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + dag_priority_parsing_request @@ -305,2046 +305,2054 @@ asset_alias - -asset_alias + +asset_alias + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +group + + [VARCHAR(1500)] + NOT NULL -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL -alias_id - - [INTEGER] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset + +asset + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +extra + + [JSON] + NOT NULL -extra - - [JSON] - NOT NULL +group + + [VARCHAR(1500)] + NOT NULL -group - - [VARCHAR(1500)] - NOT NULL +name + + [VARCHAR(1500)] + NOT NULL -name - - [VARCHAR(1500)] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_trigger - -asset_trigger + +asset_trigger + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL +trigger_id + + [INTEGER] + NOT NULL asset--asset_trigger - -0..N -1 + +0..N +1 asset_active - -asset_active + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 task_inlet_asset_reference - -task_inlet_asset_reference + +task_inlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL asset--task_inlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +target_dag_id + + [VARCHAR(250)] + NOT NULL -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event + +asset_event + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +asset_id + + [INTEGER] + NOT NULL -asset_id - - [INTEGER] - NOT NULL +extra + + [JSON] + NOT NULL -extra - - [JSON] - NOT NULL +source_dag_id + + [VARCHAR(250)] -source_dag_id - - [VARCHAR(250)] +source_map_index + + [INTEGER] -source_map_index - - [INTEGER] +source_run_id + + [VARCHAR(250)] -source_run_id - - [VARCHAR(250)] +source_task_id + + [VARCHAR(250)] -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 trigger - -trigger + +trigger + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +classpath + + [VARCHAR(1000)] + NOT NULL -classpath - - [VARCHAR(1000)] - NOT NULL +created_date + + [TIMESTAMP] + NOT NULL -created_date - - [TIMESTAMP] - NOT NULL +kwargs + + [TEXT] + NOT NULL -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] +triggerer_id + + [INTEGER] trigger--asset_trigger - -0..N -1 + +0..N +1 task_instance - -task_instance + +task_instance + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +context_carrier + + [JSONB] -context_carrier - - [JSONB] +custom_operator_name + + [VARCHAR(1000)] -custom_operator_name - - [VARCHAR(1000)] +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] + NOT NULL -dag_version_id - - [UUID] - NOT NULL +duration + + [DOUBLE_PRECISION] -duration - - [DOUBLE_PRECISION] +end_date + + [TIMESTAMP] -end_date - - [TIMESTAMP] +executor + + [VARCHAR(1000)] -executor - - [VARCHAR(1000)] +executor_config + + [BYTEA] -executor_config - - [BYTEA] +external_executor_id + + [VARCHAR(250)] -external_executor_id - - [VARCHAR(250)] +hostname + + [VARCHAR(1000)] -hostname - - [VARCHAR(1000)] +last_heartbeat_at + + [TIMESTAMP] -last_heartbeat_at - - [TIMESTAMP] +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +max_tries + + [INTEGER] -max_tries - - [INTEGER] +next_kwargs + + [JSONB] -next_kwargs - - [JSONB] +next_method + + [VARCHAR(1000)] -next_method - - [VARCHAR(1000)] +operator + + [VARCHAR(1000)] -operator - - [VARCHAR(1000)] +pid + + [INTEGER] -pid - - [INTEGER] +pool + + [VARCHAR(256)] + NOT NULL -pool - - [VARCHAR(256)] - NOT NULL +pool_slots + + [INTEGER] + NOT NULL -pool_slots - - [INTEGER] - NOT NULL +priority_weight + + [INTEGER] -priority_weight - - [INTEGER] +queue + + [VARCHAR(256)] -queue - - [VARCHAR(256)] +queued_by_job_id + + [INTEGER] -queued_by_job_id - - [INTEGER] +queued_dttm + + [TIMESTAMP] -queued_dttm - - [TIMESTAMP] +rendered_map_index + + [VARCHAR(250)] -rendered_map_index - - [VARCHAR(250)] +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +scheduled_dttm + + [TIMESTAMP] -scheduled_dttm - - [TIMESTAMP] +span_status + + [VARCHAR(250)] + NOT NULL -span_status - - [VARCHAR(250)] - NOT NULL +start_date + + [TIMESTAMP] -start_date - - [TIMESTAMP] +state + + [VARCHAR(20)] -state - - [VARCHAR(20)] +task_display_name + + [VARCHAR(2000)] -task_display_name - - [VARCHAR(2000)] +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +trigger_id + + [INTEGER] -trigger_id - - [INTEGER] +trigger_timeout + + [TIMESTAMP] -trigger_timeout - - [TIMESTAMP] +try_number + + [INTEGER] -try_number - - [INTEGER] +unixname + + [VARCHAR(1000)] -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} deadline - -deadline + +deadline + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +callback + + [VARCHAR(500)] + NOT NULL -callback - - [VARCHAR(500)] - NOT NULL +callback_kwargs + + [JSON] -callback_kwargs - - [JSON] +callback_state + + [VARCHAR(20)] -callback_state - - [VARCHAR(20)] +dag_id + + [VARCHAR(250)] -dag_id - - [VARCHAR(250)] +dagrun_id + + [INTEGER] -dagrun_id - - [INTEGER] +deadline_time + + [TIMESTAMP] + NOT NULL -deadline_time - - [TIMESTAMP] - NOT NULL - -trigger_id - - [INTEGER] +trigger_id + + [INTEGER] trigger--deadline - -0..N -{0,1} + +0..N +{0,1} hitl_detail - -hitl_detail + +hitl_detail + +ti_id + + [UUID] + NOT NULL -ti_id - - [UUID] - NOT NULL +body + + [TEXT] -body - - [TEXT] +chosen_options + + [JSON] -chosen_options - - [JSON] +defaults + + [JSON] -defaults +multiple - [JSON] + [BOOLEAN] -multiple - - [BOOLEAN] +options + + [JSON] + NOT NULL -options - - [JSON] - NOT NULL +params + + [JSON] + NOT NULL -params - - [JSON] - NOT NULL +params_input + + [JSON] + NOT NULL -params_input - - [JSON] - NOT NULL +response_at + + [TIMESTAMP] -response_at - - [TIMESTAMP] +subject + + [TEXT] + NOT NULL -subject +user_id - [TEXT] - NOT NULL - -user_id - - [VARCHAR(128)] + [VARCHAR(128)] task_instance--hitl_detail - -1 -1 + +1 +1 task_map - -task_map + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +keys + + [JSONB] -keys - - [JSONB] - -length - - [INTEGER] - NOT NULL +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_reschedule - -task_reschedule + +task_reschedule + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +duration + + [INTEGER] + NOT NULL -duration - - [INTEGER] - NOT NULL +end_date + + [TIMESTAMP] + NOT NULL -end_date - - [TIMESTAMP] - NOT NULL +reschedule_date + + [TIMESTAMP] + NOT NULL -reschedule_date - - [TIMESTAMP] - NOT NULL +start_date + + [TIMESTAMP] + NOT NULL -start_date - - [TIMESTAMP] - NOT NULL - -ti_id - - [UUID] - NOT NULL +ti_id + + [UUID] + NOT NULL task_instance--task_reschedule - -0..N -1 + +0..N +1 xcom - -xcom + +xcom + +dag_run_id + + [INTEGER] + NOT NULL -dag_run_id - - [INTEGER] - NOT NULL +key + + [VARCHAR(512)] + NOT NULL -key - - [VARCHAR(512)] - NOT NULL +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +timestamp + + [TIMESTAMP] + NOT NULL -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] +value + + [JSONB] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note + +task_instance_note + +ti_id + + [UUID] + NOT NULL -ti_id - - [UUID] - NOT NULL +content + + [VARCHAR(1000)] -content - - [VARCHAR(1000)] +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -1 -1 + +1 +1 task_instance_history - -task_instance_history + +task_instance_history + +task_instance_id + + [UUID] + NOT NULL -task_instance_id - - [UUID] - NOT NULL +context_carrier + + [JSONB] -context_carrier - - [JSONB] +custom_operator_name + + [VARCHAR(1000)] -custom_operator_name - - [VARCHAR(1000)] +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] -dag_version_id - - [UUID] +duration + + [DOUBLE_PRECISION] -duration - - [DOUBLE_PRECISION] +end_date + + [TIMESTAMP] -end_date - - [TIMESTAMP] +executor + + [VARCHAR(1000)] -executor - - [VARCHAR(1000)] +executor_config + + [BYTEA] -executor_config - - [BYTEA] +external_executor_id + + [VARCHAR(250)] -external_executor_id - - [VARCHAR(250)] +hostname + + [VARCHAR(1000)] -hostname - - [VARCHAR(1000)] +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +max_tries + + [INTEGER] -max_tries - - [INTEGER] +next_kwargs + + [JSONB] -next_kwargs - - [JSONB] +next_method + + [VARCHAR(1000)] -next_method - - [VARCHAR(1000)] +operator + + [VARCHAR(1000)] -operator - - [VARCHAR(1000)] +pid + + [INTEGER] -pid - - [INTEGER] +pool + + [VARCHAR(256)] + NOT NULL -pool - - [VARCHAR(256)] - NOT NULL +pool_slots + + [INTEGER] + NOT NULL -pool_slots - - [INTEGER] - NOT NULL +priority_weight + + [INTEGER] -priority_weight - - [INTEGER] +queue + + [VARCHAR(256)] -queue - - [VARCHAR(256)] +queued_by_job_id + + [INTEGER] -queued_by_job_id - - [INTEGER] +queued_dttm + + [TIMESTAMP] -queued_dttm - - [TIMESTAMP] +rendered_map_index + + [VARCHAR(250)] -rendered_map_index - - [VARCHAR(250)] +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +scheduled_dttm + + [TIMESTAMP] -scheduled_dttm - - [TIMESTAMP] +span_status + + [VARCHAR(250)] + NOT NULL -span_status - - [VARCHAR(250)] - NOT NULL +start_date + + [TIMESTAMP] -start_date - - [TIMESTAMP] +state + + [VARCHAR(20)] -state - - [VARCHAR(20)] +task_display_name + + [VARCHAR(2000)] -task_display_name - - [VARCHAR(2000)] +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +trigger_id + + [INTEGER] -trigger_id - - [INTEGER] +trigger_timeout + + [TIMESTAMP] -trigger_timeout - - [TIMESTAMP] +try_number + + [INTEGER] + NOT NULL -try_number - - [INTEGER] - NOT NULL +unixname + + [VARCHAR(1000)] -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +map_index + + [INTEGER] + NOT NULL -map_index - - [INTEGER] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +task_id + + [VARCHAR(250)] + NOT NULL -task_id - - [VARCHAR(250)] - NOT NULL +k8s_pod_yaml + + [JSON] -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 dag_bundle - -dag_bundle - -name - - [VARCHAR(250)] - NOT NULL - -active - - [BOOLEAN] - -last_refreshed - - [TIMESTAMP] - -version - - [VARCHAR(200)] + +dag_bundle + +name + + [VARCHAR(250)] + NOT NULL + +active + + [BOOLEAN] + +last_refreshed + + [TIMESTAMP] + +signed_url_template + + [VARCHAR(200)] + +template_params + + [JSON] + +version + + [VARCHAR(200)] dag - -dag + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +asset_expression + + [JSON] -asset_expression - - [JSON] +bundle_name + + [VARCHAR(250)] -bundle_name - - [VARCHAR(250)] +bundle_version + + [VARCHAR(200)] -bundle_version - - [VARCHAR(200)] +dag_display_name + + [VARCHAR(2000)] -dag_display_name - - [VARCHAR(2000)] +deadline + + [JSON] -deadline - - [JSON] +description + + [TEXT] -description - - [TEXT] +fileloc + + [VARCHAR(2000)] -fileloc - - [VARCHAR(2000)] +has_import_errors + + [BOOLEAN] -has_import_errors - - [BOOLEAN] +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL +is_paused + + [BOOLEAN] -is_paused - - [BOOLEAN] +is_stale + + [BOOLEAN] -is_stale - - [BOOLEAN] +last_expired + + [TIMESTAMP] -last_expired - - [TIMESTAMP] +last_parsed_time + + [TIMESTAMP] -last_parsed_time - - [TIMESTAMP] +max_active_runs + + [INTEGER] -max_active_runs - - [INTEGER] +max_active_tasks + + [INTEGER] + NOT NULL -max_active_tasks - - [INTEGER] - NOT NULL +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL +next_dagrun + + [TIMESTAMP] -next_dagrun - - [TIMESTAMP] +next_dagrun_create_after + + [TIMESTAMP] -next_dagrun_create_after - - [TIMESTAMP] +next_dagrun_data_interval_end + + [TIMESTAMP] -next_dagrun_data_interval_end - - [TIMESTAMP] +next_dagrun_data_interval_start + + [TIMESTAMP] -next_dagrun_data_interval_start - - [TIMESTAMP] +owners + + [VARCHAR(2000)] -owners - - [VARCHAR(2000)] +relative_fileloc + + [VARCHAR(2000)] -relative_fileloc - - [VARCHAR(2000)] +timetable_description + + [VARCHAR(1000)] -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] +timetable_summary + + [TEXT] dag_bundle--dag - -0..N -{0,1} + +0..N +{0,1} dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--task_inlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 dag--deadline - -0..N -{0,1} + +0..N +{0,1} dag_schedule_asset_name_reference - -dag_schedule_asset_name_reference + +dag_schedule_asset_name_reference + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +name + + [VARCHAR(1500)] + NOT NULL -name - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL dag--dag_schedule_asset_name_reference - -0..N -1 + +0..N +1 dag_schedule_asset_uri_reference - -dag_schedule_asset_uri_reference + +dag_schedule_asset_uri_reference + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +uri + + [VARCHAR(1500)] + NOT NULL -uri - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL dag--dag_schedule_asset_uri_reference - -0..N -1 + +0..N +1 dag_version - -dag_version + +dag_version + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +bundle_name + + [VARCHAR(250)] -bundle_name - - [VARCHAR(250)] +bundle_version + + [VARCHAR(250)] -bundle_version - - [VARCHAR(250)] +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +last_updated + + [TIMESTAMP] + NOT NULL -last_updated - - [TIMESTAMP] - NOT NULL - -version_number - - [INTEGER] - NOT NULL +version_number + + [INTEGER] + NOT NULL dag--dag_version - -0..N -1 + +0..N +1 dag_tag - -dag_tag + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +owner + + [VARCHAR(500)] + NOT NULL -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 dag_warning - -dag_warning + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +warning_type + + [VARCHAR(50)] + NOT NULL -warning_type - - [VARCHAR(50)] - NOT NULL +message + + [TEXT] + NOT NULL -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 dag_favorite - -dag_favorite + +dag_favorite + +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL - -user_id - - [VARCHAR(250)] - NOT NULL +user_id + + [VARCHAR(250)] + NOT NULL dag--dag_favorite - -0..N -1 + +0..N +1 dag_version--task_instance - -0..N -1 + +0..N +1 dag_run - -dag_run + +dag_run + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +backfill_id + + [INTEGER] -backfill_id - - [INTEGER] +bundle_version + + [VARCHAR(250)] -bundle_version - - [VARCHAR(250)] +clear_number + + [INTEGER] + NOT NULL -clear_number - - [INTEGER] - NOT NULL +conf + + [JSONB] -conf - - [JSONB] +context_carrier + + [JSONB] -context_carrier - - [JSONB] +created_dag_version_id + + [UUID] -created_dag_version_id - - [UUID] +creating_job_id + + [INTEGER] -creating_job_id - - [INTEGER] +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +data_interval_end + + [TIMESTAMP] -data_interval_end - - [TIMESTAMP] +data_interval_start + + [TIMESTAMP] -data_interval_start - - [TIMESTAMP] +end_date + + [TIMESTAMP] -end_date - - [TIMESTAMP] +last_scheduling_decision + + [TIMESTAMP] -last_scheduling_decision - - [TIMESTAMP] +log_template_id + + [INTEGER] -log_template_id - - [INTEGER] +logical_date + + [TIMESTAMP] -logical_date - - [TIMESTAMP] +queued_at + + [TIMESTAMP] -queued_at - - [TIMESTAMP] +run_after + + [TIMESTAMP] + NOT NULL -run_after - - [TIMESTAMP] - NOT NULL +run_id + + [VARCHAR(250)] + NOT NULL -run_id - - [VARCHAR(250)] - NOT NULL +run_type + + [VARCHAR(50)] + NOT NULL -run_type - - [VARCHAR(50)] - NOT NULL +scheduled_by_job_id + + [INTEGER] -scheduled_by_job_id - - [INTEGER] +span_status + + [VARCHAR(250)] + NOT NULL -span_status - - [VARCHAR(250)] - NOT NULL +start_date + + [TIMESTAMP] -start_date - - [TIMESTAMP] +state + + [VARCHAR(50)] -state - - [VARCHAR(50)] +triggered_by + + [VARCHAR(50)] -triggered_by - - [VARCHAR(50)] +triggering_user_name + + [VARCHAR(512)] -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_code - -dag_code + +dag_code + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] + NOT NULL -dag_version_id - - [UUID] - NOT NULL +fileloc + + [VARCHAR(2000)] + NOT NULL -fileloc - - [VARCHAR(2000)] - NOT NULL +last_updated + + [TIMESTAMP] + NOT NULL -last_updated - - [TIMESTAMP] - NOT NULL +source_code + + [TEXT] + NOT NULL -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL +source_code_hash + + [VARCHAR(32)] + NOT NULL dag_version--dag_code - -0..N -1 + +0..N +1 serialized_dag - -serialized_dag + +serialized_dag + +id + + [UUID] + NOT NULL -id - - [UUID] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +dag_hash + + [VARCHAR(32)] + NOT NULL -dag_hash - - [VARCHAR(32)] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_version_id + + [UUID] + NOT NULL -dag_version_id - - [UUID] - NOT NULL +data + + [JSON] -data - - [JSON] +data_compressed + + [BYTEA] -data_compressed - - [BYTEA] - -last_updated - - [TIMESTAMP] - NOT NULL +last_updated + + [TIMESTAMP] + NOT NULL dag_version--serialized_dag - -0..N -1 + +0..N +1 dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--deadline - -0..N -{0,1} + +0..N +{0,1} backfill_dag_run - -backfill_dag_run + +backfill_dag_run + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +backfill_id + + [INTEGER] + NOT NULL -backfill_id - - [INTEGER] - NOT NULL +dag_run_id + + [INTEGER] -dag_run_id - - [INTEGER] +exception_reason + + [VARCHAR(250)] -exception_reason - - [VARCHAR(250)] +logical_date + + [TIMESTAMP] + NOT NULL -logical_date +sort_ordinal - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL -dag_run_id - - [INTEGER] - NOT NULL +content + + [VARCHAR(1000)] -content - - [VARCHAR(1000)] +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 log_template - -log_template + +log_template + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +elasticsearch_id + + [TEXT] + NOT NULL -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL +filename + + [TEXT] + NOT NULL log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill - -backfill + +backfill + +id + + [INTEGER] + NOT NULL -id - - [INTEGER] - NOT NULL +completed_at + + [TIMESTAMP] -completed_at - - [TIMESTAMP] +created_at + + [TIMESTAMP] + NOT NULL -created_at - - [TIMESTAMP] - NOT NULL +dag_id + + [VARCHAR(250)] + NOT NULL -dag_id - - [VARCHAR(250)] - NOT NULL +dag_run_conf + + [JSON] + NOT NULL -dag_run_conf - - [JSON] - NOT NULL +from_date + + [TIMESTAMP] + NOT NULL -from_date +is_paused - [TIMESTAMP] - NOT NULL + [BOOLEAN] -is_paused - - [BOOLEAN] +max_active_runs + + [INTEGER] + NOT NULL -max_active_runs - - [INTEGER] - NOT NULL +reprocess_behavior + + [VARCHAR(250)] + NOT NULL -reprocess_behavior - - [VARCHAR(250)] - NOT NULL +to_date + + [TIMESTAMP] + NOT NULL -to_date - - [TIMESTAMP] - NOT NULL +triggering_user_name + + [VARCHAR(512)] -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] - NOT NULL +updated_at + + [TIMESTAMP] + NOT NULL backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index dce45f2cb0aef..cf9dc37553978 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``f56f68b9e02f`` (head) | ``09fa89ba1710`` | ``3.1.0`` | Add callback_state to deadline. | +| ``3bda03debd04`` (head) | ``f56f68b9e02f`` | ``3.1.0`` | Add url template and template params to DagBundleModel. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``f56f68b9e02f`` | ``09fa89ba1710`` | ``3.1.0`` | Add callback_state to deadline. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``09fa89ba1710`` | ``40f7c30a228b`` | ``3.1.0`` | Add trigger_id to deadline. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py index a32e315bbee29..2475d49031fa1 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py @@ -20,6 +20,7 @@ from uuid import UUID from pydantic import AliasPath, Field, computed_field +from sqlalchemy import select from airflow.api_fastapi.core_api.base import BaseModel from airflow.dag_processing.bundles.manager import DagBundlesManager @@ -41,10 +42,23 @@ class DagVersionResponse(BaseModel): @property def bundle_url(self) -> str | None: if self.bundle_name: - try: - return DagBundlesManager().view_url(self.bundle_name, self.bundle_version) - except ValueError: - return None + # Get the bundle model from the database and render the URL + from airflow.models.dagbundle import DagBundleModel + from airflow.utils.session import create_session + + with create_session() as session: + bundle_model = session.scalar( + select(DagBundleModel).where(DagBundleModel.name == self.bundle_name) + ) + + if bundle_model and hasattr(bundle_model, "signed_url_template"): + return bundle_model.render_url(self.bundle_version) + # fallback to the deprecated option if the bundle model does not have a signed_url_template + # attribute + try: + return DagBundlesManager().view_url(self.bundle_name, self.bundle_version) + except ValueError: + return None return None diff --git a/airflow-core/src/airflow/dag_processing/bundles/base.py b/airflow-core/src/airflow/dag_processing/bundles/base.py index 5d49bf43fd4a4..f85e9389cb5fa 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/base.py +++ b/airflow-core/src/airflow/dag_processing/bundles/base.py @@ -22,6 +22,7 @@ import os import shutil import tempfile +import warnings from abc import ABC, abstractmethod from contextlib import contextmanager from dataclasses import dataclass, field @@ -35,7 +36,6 @@ from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum from airflow.configuration import conf -from airflow.dag_processing.bundles.manager import DagBundlesManager if TYPE_CHECKING: from pendulum import DateTime @@ -217,7 +217,10 @@ def remove_stale_bundle_versions(self): This isn't really necessary on worker types that don't share storage with other processes. """ + from airflow.dag_processing.bundles.manager import DagBundlesManager + log.info("checking for stale bundle versions locally") + bundles = list(DagBundlesManager().get_all_dag_bundles()) for bundle in bundles: if not bundle.supports_versioning: @@ -256,6 +259,7 @@ def __init__( name: str, refresh_interval: int = conf.getint("dag_processor", "refresh_interval"), version: str | None = None, + view_url_template: str | None = None, ) -> None: self.name = name self.version = version @@ -268,6 +272,8 @@ def __init__( self.versions_dir = get_bundle_versions_base_folder(bundle_name=self.name) """Where bundle versions are stored locally for this bundle.""" + self._view_url_template = view_url_template + def initialize(self) -> None: """ Initialize the bundle. @@ -316,10 +322,34 @@ def view_url(self, version: str | None = None) -> str | None: URL to view the bundle on an external website. This is shown to users in the Airflow UI, allowing them to navigate to this url for more details about that version of the bundle. This needs to function without `initialize` being called. - :param version: Version to view :return: URL to view the bundle """ + warnings.warn( + "The 'view_url' method is deprecated and will be removed in a future version. " + "Use 'view_url_template' instead.", + DeprecationWarning, + stacklevel=2, + ) + return None + + def view_url_template(self) -> str | None: + """ + URL template to view the bundle on an external website. + + This is shown to users in the Airflow UI, allowing them to navigate to + this url for more details about that version of the bundle. + + The template should use format string placeholders like {version}, {subdir}, etc. + Common placeholders: + - {version}: The version identifier + - {subdir}: The subdirectory within the bundle (if applicable) + + This needs to function without `initialize` being called. + + :return: URL template string or None if not applicable + """ + return self._view_url_template @contextmanager def lock(self): diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index a3538f1e29191..cf3b7c5104824 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -16,8 +16,10 @@ # under the License. from __future__ import annotations +import warnings from typing import TYPE_CHECKING +from itsdangerous import URLSafeSerializer from sqlalchemy import delete from airflow.configuration import conf @@ -81,6 +83,61 @@ def _add_example_dag_bundle(config_list): ) +def _is_safe_bundle_url(url: str) -> bool: + """ + Check if a bundle URL is safe to use. + + This function validates that the URL: + - Uses HTTP or HTTPS schemes (no JavaScript, data, or other schemes) + - Is properly formatted + - Doesn't contain malicious content + """ + import logging + from urllib.parse import urlparse + + logger = logging.getLogger(__name__) + + if not url: + return False + + try: + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"}: + logger.error( + "Bundle URL uses unsafe scheme '%s'. Only 'http' and 'https' are allowed", parsed.scheme + ) + return False + + if not parsed.netloc: + logger.error("Bundle URL '%s' has no network location", url) + return False + + if any(ord(c) < 32 for c in url): + logger.error("Bundle URL '%s' contains control characters (ASCII < 32)", url) + return False + + return True + except Exception as e: + logger.error("Failed to parse bundle URL '%s': %s", url, str(e)) + return False + + +def _sign_bundle_url(url: str, bundle_name: str) -> str: + """ + Sign a bundle URL for integrity verification. + + :param url: The URL to sign + :param bundle_name: The name of the bundle (used in the payload) + :return: The signed URL token + """ + serializer = URLSafeSerializer(conf.get_mandatory_value("core", "fernet_key")) + payload = { + "url": url, + "bundle_name": bundle_name, + } + return serializer.dumps(payload) + + class DagBundlesManager(LoggingMixin): """Manager for DAG bundles.""" @@ -124,12 +181,44 @@ def parse_config(self) -> None: @provide_session def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: self.log.debug("Syncing DAG bundles to the database") + + def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]: + bundle_instance = self.get_bundle(name) + new_template_ = bundle_instance.view_url_template() + new_params_ = self._extract_template_params(bundle_instance) + if new_template_: + if not _is_safe_bundle_url(new_template_): + self.log.warning( + "Bundle %s has unsafe URL template '%s', skipping URL update", + bundle_name, + new_template_, + ) + new_template_ = None + else: + # Sign the URL for integrity verification + new_template_ = _sign_bundle_url(new_template_, bundle_name) + self.log.debug("Signed URL template for bundle %s", bundle_name) + return new_template_, new_params_ + stored = {b.name: b for b in session.query(DagBundleModel).all()} + for name in self._bundle_config.keys(): if bundle := stored.pop(name, None): bundle.active = True + new_template, new_params = _extract_and_sign_template(name) + if new_template != bundle.signed_url_template: + bundle.signed_url_template = new_template + self.log.debug("Updated URL template for bundle %s", name) + if new_params != bundle.template_params: + bundle.template_params = new_params + self.log.debug("Updated template parameters for bundle %s", name) else: - session.add(DagBundleModel(name=name)) + new_template, new_params = _extract_and_sign_template(name) + new_bundle = DagBundleModel(name=name) + new_bundle.signed_url_template = new_template + new_bundle.template_params = new_params + + session.add(new_bundle) self.log.info("Added new DAG bundle %s to the database", name) for name, bundle in stored.items(): @@ -140,6 +229,35 @@ def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name)) self.log.info("Deleted import errors for bundle %s which is no longer configured", name) + @staticmethod + def _extract_template_params(bundle_instance: BaseDagBundle) -> dict: + """ + Extract template parameters from a bundle instance's view_url_template method. + + :param bundle_instance: The bundle instance to extract parameters from + :return: Dictionary of template parameters + """ + import re + + params: dict[str, str] = {} + template = bundle_instance.view_url_template() + + if not template: + return params + + # Extract template placeholders using regex + # This matches {placeholder} patterns in the template + PLACEHOLDER_PATTERN = re.compile(r"\{([^}]+)\}") + placeholders = PLACEHOLDER_PATTERN.findall(template) + + # Extract values for each placeholder found in the template + for placeholder in placeholders: + field_value = getattr(bundle_instance, placeholder, None) + if field_value: + params[placeholder] = field_value + + return params + def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: """ Get a DAG bundle by name. @@ -165,5 +283,12 @@ def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]: yield class_(name=name, version=None, **kwargs) def view_url(self, name: str, version: str | None = None) -> str | None: + warnings.warn( + "The 'view_url' method is deprecated and will be removed when providers " + "have Airflow 3.1 as the minimum supported version. " + "Use DagBundleModel.render_url() instead.", + DeprecationWarning, + stacklevel=2, + ) bundle = self.get_bundle(name, version) return bundle.view_url(version=version) diff --git a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_add_url_and_template_params_to_dagbundle_model.py b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_add_url_and_template_params_to_dagbundle_model.py new file mode 100644 index 0000000000000..a5b3787c8758a --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_add_url_and_template_params_to_dagbundle_model.py @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add url template and template params to DagBundleModel. + +Revision ID: 3bda03debd04 +Revises: f56f68b9e02f +Create Date: 2025-07-04 10:12:12.711292 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +from sqlalchemy_utils import JSONType + +# revision identifiers, used by Alembic. +revision = "3bda03debd04" +down_revision = "f56f68b9e02f" +branch_labels = None +depends_on = None +airflow_version = "3.1.0" + + +def upgrade(): + """Apply Add url and template params to DagBundleModel.""" + with op.batch_alter_table("dag_bundle", schema=None) as batch_op: + batch_op.add_column(sa.Column("signed_url_template", sa.String(length=200), nullable=True)) + batch_op.add_column(sa.Column("template_params", JSONType(), nullable=True)) + + +def downgrade(): + """Unapply Add url and template params to DagBundleModel.""" + with op.batch_alter_table("dag_bundle", schema=None) as batch_op: + batch_op.drop_column("template_params") + batch_op.drop_column("signed_url_template") diff --git a/airflow-core/src/airflow/models/dagbundle.py b/airflow-core/src/airflow/models/dagbundle.py index e1f99d5effcc9..f0343d9de7cd7 100644 --- a/airflow-core/src/airflow/models/dagbundle.py +++ b/airflow-core/src/airflow/models/dagbundle.py @@ -17,12 +17,14 @@ from __future__ import annotations from sqlalchemy import Boolean, Column, String +from sqlalchemy_utils import JSONType from airflow.models.base import Base, StringID +from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.sqlalchemy import UtcDateTime -class DagBundleModel(Base): +class DagBundleModel(Base, LoggingMixin): """ A table for storing DAG bundle metadata. @@ -32,6 +34,8 @@ class DagBundleModel(Base): - active: Is the bundle currently found in configuration? - version: The latest version Airflow has seen for the bundle. - last_refreshed: When the bundle was last refreshed. + - signed_url_template: Signed URL template for viewing the bundle + - template_params: JSON object containing template parameters for constructing view url (e.g., {"subdir": "dags"}) """ @@ -40,6 +44,59 @@ class DagBundleModel(Base): active = Column(Boolean, default=True) version = Column(String(200), nullable=True) last_refreshed = Column(UtcDateTime, nullable=True) + signed_url_template = Column(String(200), nullable=True) + template_params = Column(JSONType, nullable=True) - def __init__(self, *, name: str): + def __init__(self, *, name: str, version: str | None = None): + super().__init__() self.name = name + self.version = version + + def _unsign_url(self) -> str | None: + """ + Unsign a URL token to get the original URL template. + + :param signed_url: The signed URL token + :return: The original URL template or None if unsigning fails + """ + try: + from itsdangerous import BadSignature, URLSafeSerializer + + from airflow.configuration import conf + + serializer = URLSafeSerializer(conf.get_mandatory_value("core", "fernet_key")) + payload = serializer.loads(self.signed_url_template) + if isinstance(payload, dict) and "url" in payload and "bundle_name" in payload: + if payload["bundle_name"] == self.name: + return payload["url"] + + return None + except (BadSignature, Exception): + return None + + def render_url(self, version: str | None = None) -> str | None: + """ + Render the URL template with the given version and stored template parameters. + + First unsigns the URL to get the original template, then formats it with + the provided version and any additional parameters. + + :param version: The version to substitute in the template + :return: The rendered URL or None if no template is available + """ + if not self.signed_url_template: + return None + + url_template = self._unsign_url() + + if url_template is None: + return None + + params = dict(self.template_params or {}) + params["version"] = version + + try: + return url_template.format(**params) + except (KeyError, ValueError) as e: + self.log.warning("Failed to render URL template for bundle %s: %s", self.name, e) + return None diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index d6e157574600a..4cf633360dab6 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -93,7 +93,7 @@ class MappedClassProtocol(Protocol): "2.10.3": "5f2621c13b39", "3.0.0": "29ce7909c52b", "3.0.3": "fe199e1abd77", - "3.1.0": "f56f68b9e02f", + "3.1.0": "3bda03debd04", } diff --git a/airflow-core/tests/unit/api_fastapi/conftest.py b/airflow-core/tests/unit/api_fastapi/conftest.py index b2497c194a5b5..65340e87efc72 100644 --- a/airflow-core/tests/unit/api_fastapi/conftest.py +++ b/airflow-core/tests/unit/api_fastapi/conftest.py @@ -19,6 +19,7 @@ import datetime import os from typing import TYPE_CHECKING +from unittest import mock import pytest import time_machine @@ -26,7 +27,9 @@ from airflow.api_fastapi.app import create_app from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser +from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.models import Connection +from airflow.providers.git.bundles.git import GitDagBundle from airflow.providers.standard.operators.empty import EmptyOperator from tests_common.test_utils.config import conf_vars @@ -121,19 +124,26 @@ def configure_git_connection_for_dag_bundle(session): conn_id="git_default", conn_type="git", description="default git connection", - host="fakeprotocol://test_host.github.com", + host="http://test_host.github.com", port=8081, login="", ) session.add(connection) - with conf_vars( - { - ( - "dag_processor", - "dag_bundle_config_list", - ): '[{ "name": "dag_maker", "classpath": "airflow.providers.git.bundles.git.GitDagBundle", "kwargs": {"subdir": "dags", "tracking_ref": "main", "refresh_interval": 0}}, { "name": "another_bundle_name", "classpath": "airflow.providers.git.bundles.git.GitDagBundle", "kwargs": {"subdir": "dags", "tracking_ref": "main", "refresh_interval": 0}}]' - } + with ( + conf_vars( + { + ( + "dag_processor", + "dag_bundle_config_list", + ): '[{ "name": "dag_maker", "classpath": "airflow.providers.git.bundles.git.GitDagBundle", "kwargs": {"subdir": "dags", "tracking_ref": "main", "refresh_interval": 0}}, { "name": "another_bundle_name", "classpath": "airflow.providers.git.bundles.git.GitDagBundle", "kwargs": {"subdir": "dags", "tracking_ref": "main", "refresh_interval": 0}}]' + } + ), + mock.patch("airflow.providers.git.bundles.git.GitHook") as mock_git_hook, + mock.patch.object(GitDagBundle, "get_current_version") as mock_get_current_version, ): + mock_get_current_version.return_value = "some_commit_hash" + mock_git_hook.return_value.repo_url = connection.host + DagBundlesManager().sync_bundles_to_db() yield # in case no flush or commit was executed after the "session.add" above, we need to flush the session # manually here to make sure that the added connection will be deleted by query(Connection).delete() @@ -153,11 +163,7 @@ def make_dag_with_multiple_versions(dag_maker, configure_git_connection_for_dag_ """ dag_id = "dag_with_multiple_versions" for version_number in range(1, 4): - with dag_maker( - dag_id, - session=session, - bundle_version=f"some_commit_hash{version_number}", - ): + with dag_maker(dag_id, session=session, bundle_version=f"some_commit_hash{version_number}"): for task_number in range(version_number): EmptyOperator(task_id=f"task{task_number + 1}") dag_maker.create_dagrun( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py index bb363475b9442..1756209564a35 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py @@ -22,7 +22,7 @@ from airflow.providers.standard.operators.empty import EmptyOperator -from tests_common.test_utils.db import clear_db_dags, clear_db_serialized_dags +from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, clear_db_serialized_dags pytestmark = pytest.mark.db_test @@ -32,6 +32,7 @@ class TestDagVersionEndpoint: def setup(request, dag_maker, session): clear_db_dags() clear_db_serialized_dags() + clear_db_dag_bundles() with dag_maker( dag_id="ANOTHER_DAG_ID", bundle_version="some_commit_hash", bundle_name="another_bundle_name" @@ -50,7 +51,7 @@ class TestGetDagVersion(TestDagVersionEndpoint): { "bundle_name": "another_bundle_name", "bundle_version": "some_commit_hash", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash/dags", "created_at": mock.ANY, "dag_id": "ANOTHER_DAG_ID", "id": mock.ANY, @@ -64,7 +65,7 @@ class TestGetDagVersion(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash1", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash1/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash1/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -78,7 +79,7 @@ class TestGetDagVersion(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash2", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash2/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash2/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -92,7 +93,7 @@ class TestGetDagVersion(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash3", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash3/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash3/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -103,15 +104,88 @@ class TestGetDagVersion(TestDagVersionEndpoint): ], ) @pytest.mark.usefixtures("make_dag_with_multiple_versions") - def test_get_dag_version(self, test_client, dag_id, dag_version, expected_response): + @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") + def test_get_dag_version(self, mock_hasattr, test_client, dag_id, dag_version, expected_response): + mock_hasattr.return_value = False + response = test_client.get(f"/dags/{dag_id}/dagVersions/{dag_version}") + assert response.status_code == 200 + assert response.json() == expected_response + + @pytest.mark.parametrize( + "dag_id, dag_version, expected_response", + [ + [ + "ANOTHER_DAG_ID", + 1, + { + "bundle_name": "another_bundle_name", + "bundle_version": "some_commit_hash", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash/dags", + "created_at": mock.ANY, + "dag_id": "ANOTHER_DAG_ID", + "id": mock.ANY, + "version_number": 1, + "dag_display_name": "ANOTHER_DAG_ID", + }, + ], + [ + "dag_with_multiple_versions", + 1, + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash1", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash1/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 1, + "dag_display_name": "dag_with_multiple_versions", + }, + ], + [ + "dag_with_multiple_versions", + 2, + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash2", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash2/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 2, + "dag_display_name": "dag_with_multiple_versions", + }, + ], + [ + "dag_with_multiple_versions", + 3, + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash3", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash3/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 3, + "dag_display_name": "dag_with_multiple_versions", + }, + ], + ], + ) + @pytest.mark.usefixtures("make_dag_with_multiple_versions") + def test_get_dag_version_with_url_template(self, test_client, dag_id, dag_version, expected_response): response = test_client.get(f"/dags/{dag_id}/dagVersions/{dag_version}") assert response.status_code == 200 assert response.json() == expected_response @pytest.mark.usefixtures("make_dag_with_multiple_versions") @mock.patch("airflow.dag_processing.bundles.manager.DagBundlesManager.view_url") - def test_get_dag_version_with_unconfigured_bundle(self, mock_view_url, test_client, dag_maker, session): + @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") + def test_get_dag_version_with_unconfigured_bundle( + self, mock_hasattr, mock_view_url, test_client, dag_maker, session + ): """Test that when a bundle is no longer configured, the bundle_url returns an error message.""" + mock_hasattr.return_value = False mock_view_url.side_effect = ValueError("Bundle not configured") response = test_client.get("/dags/dag_with_multiple_versions/dagVersions/1") @@ -149,7 +223,106 @@ class TestGetDagVersions(TestDagVersionEndpoint): { "bundle_name": "another_bundle_name", "bundle_version": "some_commit_hash", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash/dags", + "created_at": mock.ANY, + "dag_id": "ANOTHER_DAG_ID", + "id": mock.ANY, + "version_number": 1, + "dag_display_name": "ANOTHER_DAG_ID", + }, + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash1", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash1/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 1, + "dag_display_name": "dag_with_multiple_versions", + }, + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash2", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash2/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 2, + "dag_display_name": "dag_with_multiple_versions", + }, + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash3", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash3/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 3, + "dag_display_name": "dag_with_multiple_versions", + }, + ], + "total_entries": 4, + }, + ], + [ + "dag_with_multiple_versions", + { + "dag_versions": [ + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash1", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash1/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 1, + "dag_display_name": "dag_with_multiple_versions", + }, + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash2", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash2/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 2, + "dag_display_name": "dag_with_multiple_versions", + }, + { + "bundle_name": "dag_maker", + "bundle_version": "some_commit_hash3", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash3/dags", + "created_at": mock.ANY, + "dag_id": "dag_with_multiple_versions", + "id": mock.ANY, + "version_number": 3, + "dag_display_name": "dag_with_multiple_versions", + }, + ], + "total_entries": 3, + }, + ], + ], + ) + @pytest.mark.usefixtures("make_dag_with_multiple_versions") + @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") + def test_get_dag_versions(self, mock_hasattr, test_client, dag_id, expected_response): + mock_hasattr.return_value = False + response = test_client.get(f"/dags/{dag_id}/dagVersions") + assert response.status_code == 200 + assert response.json() == expected_response + + @pytest.mark.parametrize( + "dag_id, expected_response", + [ + [ + "~", + { + "dag_versions": [ + { + "bundle_name": "another_bundle_name", + "bundle_version": "some_commit_hash", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash/dags", "created_at": mock.ANY, "dag_id": "ANOTHER_DAG_ID", "id": mock.ANY, @@ -159,7 +332,7 @@ class TestGetDagVersions(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash1", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash1/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash1/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -169,7 +342,7 @@ class TestGetDagVersions(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash2", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash2/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash2/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -179,7 +352,7 @@ class TestGetDagVersions(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash3", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash3/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash3/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -197,7 +370,7 @@ class TestGetDagVersions(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash1", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash1/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash1/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -207,7 +380,7 @@ class TestGetDagVersions(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash2", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash2/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash2/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -217,7 +390,7 @@ class TestGetDagVersions(TestDagVersionEndpoint): { "bundle_name": "dag_maker", "bundle_version": "some_commit_hash3", - "bundle_url": "fakeprotocol://test_host.github.com/tree/some_commit_hash3/dags", + "bundle_url": "http://test_host.github.com/tree/some_commit_hash3/dags", "created_at": mock.ANY, "dag_id": "dag_with_multiple_versions", "id": mock.ANY, @@ -231,7 +404,7 @@ class TestGetDagVersions(TestDagVersionEndpoint): ], ) @pytest.mark.usefixtures("make_dag_with_multiple_versions") - def test_get_dag_versions(self, test_client, dag_id, expected_response): + def test_get_dag_versions_with_url_template(self, test_client, dag_id, expected_response): response = test_client.get(f"/dags/{dag_id}/dagVersions") assert response.status_code == 200 assert response.json() == expected_response diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py index 60302878dba90..e745d2a0b4fe2 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py @@ -518,8 +518,10 @@ class TestDagDetails(TestDagEndpoint): ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") def test_dag_details( self, + mock_hasattr, test_client, query_params, dag_id, @@ -528,6 +530,7 @@ def test_dag_details( start_date, owner_links, ): + mock_hasattr.return_value = False response = test_client.get(f"/dags/{dag_id}/details", params=query_params) assert response.status_code == expected_status_code if expected_status_code != 200: @@ -603,6 +606,99 @@ def test_dag_details( } assert res_json == expected + @pytest.mark.parametrize( + "query_params, dag_id, expected_status_code, dag_display_name, start_date, owner_links", + [ + ({}, "fake_dag_id", 404, "fake_dag", "2023-12-31T00:00:00Z", {}), + ({}, DAG2_ID, 200, DAG2_ID, "2021-06-15T00:00:00Z", {}), + ], + ) + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_dag_details_with_view_url_template( + self, + test_client, + query_params, + dag_id, + expected_status_code, + dag_display_name, + start_date, + owner_links, + ): + response = test_client.get(f"/dags/{dag_id}/details", params=query_params) + assert response.status_code == expected_status_code + if expected_status_code != 200: + return + + # Match expected and actual responses below. + res_json = response.json() + last_parsed = res_json["last_parsed"] + last_parsed_time = res_json["last_parsed_time"] + file_token = res_json["file_token"] + expected = { + "bundle_name": "dag_maker", + "bundle_version": None, + "asset_expression": None, + "catchup": False, + "concurrency": 16, + "dag_id": dag_id, + "dag_display_name": dag_display_name, + "dag_run_timeout": None, + "default_args": { + "depends_on_past": False, + "retries": 1, + "retry_delay": "PT5M", + }, + "description": None, + "doc_md": "details", + "end_date": None, + "fileloc": __file__, + "file_token": file_token, + "has_import_errors": False, + "has_task_concurrency_limits": True, + "is_stale": False, + "is_paused": False, + "is_paused_upon_creation": None, + "latest_dag_version": { + "bundle_name": "dag_maker", + "bundle_url": "http://test_host.github.com/tree/None/dags", + "bundle_version": None, + "created_at": mock.ANY, + "dag_id": "test_dag2", + "id": mock.ANY, + "version_number": 1, + "dag_display_name": dag_display_name, + }, + "last_expired": None, + "last_parsed": last_parsed, + "last_parsed_time": last_parsed_time, + "max_active_runs": 16, + "max_active_tasks": 16, + "max_consecutive_failed_dag_runs": 0, + "next_dagrun_data_interval_end": None, + "next_dagrun_data_interval_start": None, + "next_dagrun_logical_date": None, + "next_dagrun_run_after": None, + "owners": ["airflow"], + "owner_links": {}, + "params": { + "foo": { + "__class": "airflow.sdk.definitions.param.Param", + "description": None, + "schema": {}, + "value": 1, + } + }, + "relative_fileloc": "test_dags.py", + "render_template_as_native_obj": False, + "timetable_summary": None, + "start_date": start_date, + "tags": [], + "template_search_path": None, + "timetable_description": "Never, external triggers only", + "timezone": UTC_JSON_REPR, + } + assert res_json == expected + def test_dag_details_should_response_401(self, unauthenticated_test_client): response = unauthenticated_test_client.get(f"/dags/{DAG1_ID}/details") assert response.status_code == 401 diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 3cb85f80a70e0..8cd42df4299ee 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -237,7 +237,11 @@ def test_should_respond_403(self, unauthorized_test_client): ], ) @pytest.mark.usefixtures("make_dag_with_multiple_versions") - def test_should_respond_200_with_versions(self, test_client, run_id, expected_version_number): + @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") + def test_should_respond_200_with_versions( + self, mock_hasattr, test_client, run_id, expected_version_number + ): + mock_hasattr.return_value = False response = test_client.get(f"/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1") assert response.status_code == 200 @@ -281,7 +285,7 @@ def test_should_respond_200_with_versions(self, test_client, run_id, expected_ve "dag_display_name": "dag_with_multiple_versions", "bundle_name": "dag_maker", "bundle_version": f"some_commit_hash{expected_version_number}", - "bundle_url": f"fakeprotocol://test_host.github.com/tree/some_commit_hash{expected_version_number}/dags", + "bundle_url": f"http://test_host.github.com/tree/some_commit_hash{expected_version_number}/dags", "created_at": mock.ANY, }, } @@ -1983,7 +1987,64 @@ def test_raises_404_for_nonexistent_task_instance(self, test_client, session): ], ) @pytest.mark.usefixtures("make_dag_with_multiple_versions") - def test_should_respond_200_with_versions(self, test_client, run_id, expected_version_number): + @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") + def test_should_respond_200_with_versions( + self, mock_hasattr, test_client, run_id, expected_version_number, session + ): + mock_hasattr.return_value = False + response = test_client.get( + f"/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1/tries/0" + ) + assert response.status_code == 200 + assert response.json() == { + "task_id": "task1", + "dag_id": "dag_with_multiple_versions", + "dag_display_name": "dag_with_multiple_versions", + "dag_run_id": run_id, + "map_index": -1, + "start_date": None, + "end_date": mock.ANY, + "duration": None, + "state": None, + "try_number": 0, + "max_tries": 0, + "task_display_name": "task1", + "hostname": "", + "unixname": getuser(), + "pool": "default_pool", + "pool_slots": 1, + "queue": "default", + "priority_weight": 1, + "operator": "EmptyOperator", + "queued_when": None, + "scheduled_when": None, + "pid": None, + "executor": None, + "executor_config": "{}", + "dag_version": { + "id": mock.ANY, + "version_number": expected_version_number, + "dag_id": "dag_with_multiple_versions", + "bundle_name": "dag_maker", + "bundle_version": f"some_commit_hash{expected_version_number}", + "bundle_url": f"http://test_host.github.com/tree/some_commit_hash{expected_version_number}/dags", + "created_at": mock.ANY, + "dag_display_name": "dag_with_multiple_versions", + }, + } + + @pytest.mark.parametrize( + "run_id, expected_version_number", + [ + ("run1", 1), + ("run2", 2), + ("run3", 3), + ], + ) + @pytest.mark.usefixtures("make_dag_with_multiple_versions") + def test_should_respond_200_with_versions_using_url_template( + self, test_client, run_id, expected_version_number, session + ): response = test_client.get( f"/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1/tries/0" ) @@ -2019,7 +2080,7 @@ def test_should_respond_200_with_versions(self, test_client, run_id, expected_ve "dag_id": "dag_with_multiple_versions", "bundle_name": "dag_maker", "bundle_version": f"some_commit_hash{expected_version_number}", - "bundle_url": f"fakeprotocol://test_host.github.com/tree/some_commit_hash{expected_version_number}/dags", + "bundle_url": f"http://test_host.github.com/tree/some_commit_hash{expected_version_number}/dags", "created_at": mock.ANY, "dag_display_name": "dag_with_multiple_versions", }, @@ -3065,7 +3126,65 @@ def test_raises_404_for_nonexistent_task_instance(self, test_client, session): ], ) @pytest.mark.usefixtures("make_dag_with_multiple_versions") - def test_should_respond_200_with_versions(self, test_client, run_id, expected_version_number): + @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") + def test_should_respond_200_with_versions( + self, mock_hasattr, test_client, run_id, expected_version_number + ): + mock_hasattr.return_value = False + response = test_client.get( + f"/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1/tries" + ) + assert response.status_code == 200 + + assert response.json()["task_instances"][0] == { + "task_id": "task1", + "dag_id": "dag_with_multiple_versions", + "dag_display_name": "dag_with_multiple_versions", + "dag_run_id": run_id, + "map_index": -1, + "start_date": None, + "end_date": mock.ANY, + "duration": None, + "state": mock.ANY, + "try_number": 0, + "max_tries": 0, + "task_display_name": "task1", + "hostname": "", + "unixname": getuser(), + "pool": "default_pool", + "pool_slots": 1, + "queue": "default", + "priority_weight": 1, + "operator": "EmptyOperator", + "queued_when": None, + "scheduled_when": None, + "pid": None, + "executor": None, + "executor_config": "{}", + "dag_version": { + "id": mock.ANY, + "version_number": expected_version_number, + "dag_id": "dag_with_multiple_versions", + "bundle_name": "dag_maker", + "bundle_version": f"some_commit_hash{expected_version_number}", + "bundle_url": f"http://test_host.github.com/tree/some_commit_hash{expected_version_number}/dags", + "created_at": mock.ANY, + "dag_display_name": "dag_with_multiple_versions", + }, + } + + @pytest.mark.parametrize( + "run_id, expected_version_number", + [ + ("run1", 1), + ("run2", 2), + ("run3", 3), + ], + ) + @pytest.mark.usefixtures("make_dag_with_multiple_versions") + def test_should_respond_200_with_versions_using_url_template( + self, test_client, run_id, expected_version_number + ): response = test_client.get( f"/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1/tries" ) @@ -3102,7 +3221,7 @@ def test_should_respond_200_with_versions(self, test_client, run_id, expected_ve "dag_id": "dag_with_multiple_versions", "bundle_name": "dag_maker", "bundle_version": f"some_commit_hash{expected_version_number}", - "bundle_url": f"fakeprotocol://test_host.github.com/tree/some_commit_hash{expected_version_number}/dags", + "bundle_url": f"http://test_host.github.com/tree/some_commit_hash{expected_version_number}/dags", "created_at": mock.ANY, "dag_display_name": "dag_with_multiple_versions", }, diff --git a/airflow-core/tests/unit/cli/commands/test_asset_command.py b/airflow-core/tests/unit/cli/commands/test_asset_command.py index 47de83d6c3e5a..88f2131575115 100644 --- a/airflow-core/tests/unit/cli/commands/test_asset_command.py +++ b/airflow-core/tests/unit/cli/commands/test_asset_command.py @@ -23,6 +23,7 @@ import json import os import typing +from unittest import mock import pytest @@ -119,7 +120,9 @@ def test_cli_assets_alias_details(parser: ArgumentParser) -> None: assert alias_detail_list[0] | undeterministic == undeterministic | {"name": "example-alias", "group": ""} -def test_cli_assets_materialize(parser: ArgumentParser) -> None: +@mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") +def test_cli_assets_materialize(mock_hasattr, parser: ArgumentParser) -> None: + mock_hasattr.return_value = False args = parser.parse_args(["assets", "materialize", "--name=asset1_producer", "--output=json"]) with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: asset_command.asset_materialize(args) @@ -157,3 +160,41 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None: "triggering_user_name": "root", "run_after": "2025-02-12T19:27:59.066046Z", } + + +def test_cli_assets_materialize_with_view_url_template(parser: ArgumentParser) -> None: + args = parser.parse_args(["assets", "materialize", "--name=asset1_producer", "--output=json"]) + with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: + asset_command.asset_materialize(args) + + output = temp_stdout.getvalue() + run_list = json.loads(output) + assert len(run_list) == 1 + + # No good way to statically compare these. + undeterministic: dict = { + "dag_run_id": None, + "dag_versions": [], + "data_interval_end": None, + "data_interval_start": None, + "logical_date": None, + "queued_at": None, + "run_after": "2025-02-12T19:27:59.066046Z", + } + + assert run_list[0] | undeterministic == undeterministic | { + "conf": {}, + "bundle_version": None, + "dag_display_name": "asset1_producer", + "dag_id": "asset1_producer", + "end_date": None, + "duration": None, + "last_scheduling_decision": None, + "note": None, + "run_type": "manual", + "start_date": None, + "state": "queued", + "triggered_by": "cli", + "triggering_user_name": "root", + "run_after": "2025-02-12T19:27:59.066046Z", + } diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py index 05c5baf1dd830..b1e8b4f8b651a 100644 --- a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py +++ b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py @@ -188,10 +188,181 @@ def test_view_url(version): """Test that view_url calls the bundle's view_url method.""" bundle_manager = DagBundlesManager() with patch.object(BaseDagBundle, "view_url") as view_url_mock: - bundle_manager.view_url("my-test-bundle", version=version) + # Test that deprecation warning is raised + with pytest.warns(DeprecationWarning, match="'view_url' method is deprecated"): + bundle_manager.view_url("my-test-bundle", version=version) view_url_mock.assert_called_once_with(version=version) +class BundleWithTemplate(BaseDagBundle): + """Test bundle that provides a URL template.""" + + def __init__(self, *, subdir: str | None = None, **kwargs): + super().__init__(**kwargs) + self.subdir = subdir + + def refresh(self): + pass + + def get_current_version(self): + return "v1.0" + + @property + def path(self): + return "/tmp/test" + + +TEMPLATE_BUNDLE_CONFIG = [ + { + "name": "template-bundle", + "classpath": "unit.dag_processing.bundles.test_dag_bundle_manager.BundleWithTemplate", + "kwargs": { + "view_url_template": "https://github.com/example/repo/tree/{version}/{subdir}", + "subdir": "dags", + "refresh_interval": 1, + }, + } +] + + +@pytest.mark.db_test +@conf_vars({("core", "LOAD_EXAMPLES"): "False"}) +def test_sync_bundles_to_db_with_template(clear_db, session): + """Test that URL templates and parameters are stored in the database during sync.""" + with patch.dict( + os.environ, {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(TEMPLATE_BUNDLE_CONFIG)} + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + + # Check that the template and parameters were stored + bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first() + + session.merge(bundle_model) + + assert bundle_model is not None + assert bundle_model.render_url(version="v1.0") == "https://github.com/example/repo/tree/v1.0/dags" + assert bundle_model.template_params == {"subdir": "dags"} + assert bundle_model.active is True + + +@pytest.mark.db_test +@conf_vars({("core", "LOAD_EXAMPLES"): "False"}) +def test_bundle_model_render_url(clear_db, session): + """Test the DagBundleModel render_url method.""" + with patch.dict( + os.environ, {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(TEMPLATE_BUNDLE_CONFIG)} + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first() + + session.merge(bundle_model) + assert bundle_model is not None + + url = bundle_model.render_url(version="main") + assert url == "https://github.com/example/repo/tree/main/dags" + url = bundle_model.render_url() + assert url == "https://github.com/example/repo/tree/None/dags" + + +@pytest.mark.db_test +@conf_vars({("core", "LOAD_EXAMPLES"): "False"}) +def test_template_params_update_on_sync(clear_db, session): + """Test that template parameters are updated when bundle configuration changes.""" + with patch.dict( + os.environ, {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(TEMPLATE_BUNDLE_CONFIG)} + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + + # Verify initial template and parameters + bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first() + url = bundle_model._unsign_url() + assert url == "https://github.com/example/repo/tree/{version}/{subdir}" + assert bundle_model.template_params == {"subdir": "dags"} + + # Update the bundle config with different parameters + updated_config = [ + { + "name": "template-bundle", + "classpath": "unit.dag_processing.bundles.test_dag_bundle_manager.BundleWithTemplate", + "kwargs": { + "view_url_template": "https://gitlab.com/example/repo/-/tree/{version}/{subdir}", + "subdir": "workflows", + "refresh_interval": 1, + }, + } + ] + + with patch.dict( + os.environ, {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(updated_config)} + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + + # Verify the template and parameters were updated + bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first() + url = bundle_model._unsign_url() + assert url == "https://gitlab.com/example/repo/-/tree/{version}/{subdir}" + assert bundle_model.template_params == {"subdir": "workflows"} + assert bundle_model.render_url(version="v1") == "https://gitlab.com/example/repo/-/tree/v1/workflows" + + +@pytest.mark.db_test +@conf_vars({("core", "LOAD_EXAMPLES"): "False"}) +def test_template_update_on_sync(clear_db, session): + """Test that templates are updated when bundle configuration changes.""" + # First, sync with initial template + with patch.dict( + os.environ, {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(TEMPLATE_BUNDLE_CONFIG)} + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + + # Verify initial template + bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first() + url = bundle_model._unsign_url() + assert url == "https://github.com/example/repo/tree/{version}/{subdir}" + assert bundle_model.render_url(version="v1") == "https://github.com/example/repo/tree/v1/dags" + + # Update the bundle config with a different template + updated_config = [ + { + "name": "template-bundle", + "classpath": "unit.dag_processing.bundles.test_dag_bundle_manager.BundleWithTemplate", + "kwargs": { + "view_url_template": "https://gitlab.com/example/repo/-/tree/{version}/{subdir}", + "subdir": "dags", + "refresh_interval": 1, + }, + } + ] + + with patch.dict( + os.environ, {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(updated_config)} + ): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + + # Verify the template was updated + bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first() + url = bundle_model._unsign_url() + assert url == "https://gitlab.com/example/repo/-/tree/{version}/{subdir}" + assert bundle_model.render_url("v1") == "https://gitlab.com/example/repo/-/tree/v1/dags" + + +def test_dag_bundle_model_render_url_with_invalid_template(): + """Test that DagBundleModel.render_url handles invalid templates gracefully.""" + bundle_model = DagBundleModel(name="test-bundle") + bundle_model.signed_url_template = "https://github.com/example/repo/tree/{invalid_placeholder}" + bundle_model.template_params = {"subdir": "dags"} + + # Should return None if rendering fails + url = bundle_model.render_url("v1") + assert url is None + + def test_example_dags_bundle_added(): manager = DagBundlesManager() manager.parse_config() diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py index 3752705c7aac2..16786929e66bb 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -137,10 +137,23 @@ def refresh(self) -> None: ) def view_url(self, version: str | None = None) -> str | None: + """ + Return a URL for viewing the DAGs in S3. Currently, versioning is not supported. + + This method is deprecated and will be removed when the minimum supported Airflow version is 3.1. + Use `view_url_template` instead. + """ + return self.view_url_template() + + def view_url_template(self) -> str | None: """Return a URL for viewing the DAGs in S3. Currently, versioning is not supported.""" if self.version: raise AirflowException("S3 url with version is not supported") - + if hasattr(self, "_view_url_template") and self._view_url_template: + # Because we use this method in the view_url method, we need to handle + # backward compatibility for Airflow versions that doesn't have the + # _view_url_template attribute. Should be removed when we drop support for Airflow 3.0 + return self._view_url_template # https://.s3..amazonaws.com/ url = f"https://{self.bucket_name}.s3" if self.s3_hook.region_name: diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py index c22c585b24380..e07755eed783a 100644 --- a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -109,9 +109,18 @@ def test_view_url_generates_presigned_url(self): bundle = S3DagBundle( name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1/dags", bucket_name=S3_BUCKET_NAME ) + url: str = bundle.view_url("test_version") assert url.startswith("https://my-airflow-dags-bucket.s3.amazonaws.com/project1/dags") + @pytest.mark.db_test + def test_view_url_template_generates_presigned_url(self): + bundle = S3DagBundle( + name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1/dags", bucket_name=S3_BUCKET_NAME + ) + url: str = bundle.view_url_template() + assert url.startswith("https://my-airflow-dags-bucket.s3.amazonaws.com/project1/dags") + @pytest.mark.db_test def test_supports_versioning(self): bundle = S3DagBundle( diff --git a/providers/git/src/airflow/providers/git/bundles/git.py b/providers/git/src/airflow/providers/git/bundles/git.py index 18a393749163c..28241ab045dc4 100644 --- a/providers/git/src/airflow/providers/git/bundles/git.py +++ b/providers/git/src/airflow/providers/git/bundles/git.py @@ -25,9 +25,7 @@ from git import Repo from git.exc import BadName, GitCommandError, NoSuchPathError -from airflow.dag_processing.bundles.base import ( - BaseDagBundle, -) +from airflow.dag_processing.bundles.base import BaseDagBundle from airflow.exceptions import AirflowException from airflow.providers.git.hooks.git import GitHook @@ -215,32 +213,58 @@ def _convert_git_ssh_url_to_https(url: str) -> str: return f"{domain}/{repo_path}" def view_url(self, version: str | None = None) -> str | None: + """ + Return a URL for viewing the DAGs in the repository. + + This method is deprecated and will be removed when the minimum supported Airflow version is 3.1. + Use `view_url_template` instead. + """ if not version: return None - url = self.repo_url - if not url: + template = self.view_url_template() + if not template: return None + if not self.subdir: + # remove {subdir} from the template if subdir is not set + template = template.replace("/{subdir}", "") + return template.format(version=version, subdir=self.subdir) + + def view_url_template(self) -> str | None: + if hasattr(self, "_view_url_template") and self._view_url_template: + # Because we use this method in the view_url method, we need to handle + # backward compatibility for Airflow versions that doesn't have the + # _view_url_template attribute. Should be removed when we drop support for Airflow 3.0 + return self._view_url_template + + if not self.repo_url: + return None + + url = self.repo_url if url.startswith("git@"): url = self._convert_git_ssh_url_to_https(url) if url.endswith(".git"): url = url[:-4] + parsed_url = urlparse(url) host = parsed_url.hostname if not host: return None + if parsed_url.username or parsed_url.password: new_netloc = host if parsed_url.port: new_netloc += f":{parsed_url.port}" url = parsed_url._replace(netloc=new_netloc).geturl() + host_patterns = { - "github.com": f"{url}/tree/{version}", - "gitlab.com": f"{url}/-/tree/{version}", - "bitbucket.org": f"{url}/src/{version}", + "github.com": f"{url}/tree/{{version}}", + "gitlab.com": f"{url}/-/tree/{{version}}", + "bitbucket.org": f"{url}/src/{{version}}", } - if self.subdir: - host_patterns = {k: f"{v}/{self.subdir}" for k, v in host_patterns.items()} + for allowed_host, template in host_patterns.items(): if host == allowed_host or host.endswith(f".{allowed_host}"): + if self.subdir: + return f"{template}/{self.subdir}" return template return None diff --git a/providers/git/tests/unit/git/bundles/test_git.py b/providers/git/tests/unit/git/bundles/test_git.py index 1a97c726c8773..6fc213bf19205 100644 --- a/providers/git/tests/unit/git/bundles/test_git.py +++ b/providers/git/tests/unit/git/bundles/test_git.py @@ -35,6 +35,7 @@ from airflow.providers.git.hooks.git import GitHook from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS @pytest.fixture(autouse=True) @@ -522,6 +523,100 @@ def test_view_url_subdir( assert view_url == expected_url bundle.initialize.assert_not_called() + @pytest.mark.skipif(not AIRFLOW_V_3_1_PLUS, reason="Airflow 3.0 does not support view_url_template") + @pytest.mark.parametrize( + "repo_url, extra_conn_kwargs, expected_url", + [ + ( + "git@github.com:apache/airflow.git", + None, + "https://github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "git@github.com:apache/airflow", + None, + "https://github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "https://github.com/apache/airflow", + None, + "https://github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "https://github.com/apache/airflow.git", + None, + "https://github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "git@gitlab.com:apache/airflow.git", + None, + "https://gitlab.com/apache/airflow/-/tree/{version}/subdir", + ), + ( + "git@bitbucket.org:apache/airflow.git", + None, + "https://bitbucket.org/apache/airflow/src/{version}/subdir", + ), + ( + "git@myorg.github.com:apache/airflow.git", + None, + "https://myorg.github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "https://myorg.github.com/apache/airflow.git", + None, + "https://myorg.github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "https://github.com/apache/airflow", + {"password": "abc123"}, + "https://github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "https://github.com/apache/airflow", + {"login": "abc123"}, + "https://github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "https://github.com/apache/airflow", + {"login": "abc123", "password": "def456"}, + "https://github.com/apache/airflow/tree/{version}/subdir", + ), + ( + "https://github.com:443/apache/airflow", + None, + "https://github.com:443/apache/airflow/tree/{version}/subdir", + ), + ( + "https://github.com:443/apache/airflow", + {"password": "abc123"}, + "https://github.com:443/apache/airflow/tree/{version}/subdir", + ), + ], + ) + @mock.patch("airflow.providers.git.bundles.git.Repo") + def test_view_url_template_subdir( + self, mock_gitrepo, repo_url, extra_conn_kwargs, expected_url, create_connection_without_db + ): + create_connection_without_db( + Connection( + conn_id="git_default", + host=repo_url, + conn_type="git", + **(extra_conn_kwargs or {}), + ) + ) + bundle = GitDagBundle( + name="test", + tracking_ref="main", + subdir="subdir", + git_conn_id="git_default", + ) + bundle.initialize = mock.MagicMock() + view_url_template = bundle.view_url_template() + assert view_url_template == expected_url + bundle.initialize.assert_not_called() + @mock.patch("airflow.providers.git.bundles.git.Repo") def test_view_url_returns_none_when_no_version_in_view_url(self, mock_gitrepo): bundle = GitDagBundle(