From f7a01a6bee0550a77728115e870623b32d61d157 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 20 Jul 2024 14:52:25 +0200 Subject: [PATCH 1/6] Have integration points for AIP-69 in Internal API --- .../endpoints/rpc_api_endpoint.py | 4 ++-- airflow/api_internal/internal_api_call.py | 24 +++++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index d2b952c5b716d..d8e69cef68ea5 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -39,7 +39,7 @@ @functools.lru_cache -def _initialize_map() -> dict[str, Callable]: +def initialize_method_map() -> dict[str, Callable]: from airflow.cli.commands.task_command import _get_ti_db_access from airflow.dag_processing.manager import DagFileProcessorManager from airflow.dag_processing.processor import DagFileProcessor @@ -148,7 +148,7 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse: if json_rpc != "2.0": return log_and_build_error_response(message="Expected jsonrpc 2.0 request.", status=400) - methods_map = _initialize_map() + methods_map = initialize_method_map() method_name = body.get("method") if method_name not in methods_map: return log_and_build_error_response(message=f"Unrecognized method: {method_name}.", status=400) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index c3a67d03ee18c..c8567a66425a9 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -22,6 +22,7 @@ import logging from functools import wraps from typing import Callable, TypeVar +from urllib.parse import urlparse import requests import tenacity @@ -56,6 +57,18 @@ def force_database_direct_access(): InternalApiConfig._initialized = True InternalApiConfig._use_internal_api = False + @staticmethod + def force_api_access(api_endpoint: str): + """ + Force using Internal API with provided endpoint. + + All methods decorated with internal_api_call will always be executed remote/via API. + This mode is needed for remote setups/remote executor. + """ + InternalApiConfig._initialized = True + InternalApiConfig._use_internal_api = True + InternalApiConfig._internal_api_endpoint = api_endpoint + @staticmethod def get_use_internal_api(): if not InternalApiConfig._initialized: @@ -75,10 +88,13 @@ def _init_values(): raise RuntimeError("The AIP_44 is not enabled so you cannot use it.") internal_api_endpoint = "" if use_internal_api: - internal_api_url = conf.get("core", "internal_api_url") - internal_api_endpoint = internal_api_url + "/internal_api/v1/rpcapi" - if not internal_api_endpoint.startswith("http://"): - raise AirflowConfigException("[core]internal_api_url must start with http://") + url_conf = urlparse(conf.get("core", "internal_api_url")) + api_path = url_conf.path + if len(api_path) < 2: + api_path = "/internal_api/v1/rpcapi" + if url_conf.scheme in ["http", "https"]: + raise AirflowConfigException("[core]internal_api_url must start with http:// or https://") + InternalApiConfig._internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}" InternalApiConfig._initialized = True InternalApiConfig._use_internal_api = use_internal_api From a7527e596468f480c04a09bb17dcbf2362e8f9dc Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 20 Jul 2024 15:00:29 +0200 Subject: [PATCH 2/6] Allow adding custom classes into serialization mapping --- airflow/serialization/serialized_objects.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 42027f981c804..426ba061a2d8e 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -422,7 +422,7 @@ def deref(self, dag: DAG) -> ExpandInput: LogTemplate: LogTemplatePydantic, Dataset: DatasetPydantic, } -_type_to_class: dict[DAT, list] = { +_type_to_class: dict[DAT | str, list] = { DAT.BASE_JOB: [JobPydantic, Job], DAT.TASK_INSTANCE: [TaskInstancePydantic, TaskInstance], DAT.DAG_RUN: [DagRunPydantic, DagRun], @@ -433,6 +433,13 @@ def deref(self, dag: DAG) -> ExpandInput: _class_to_type = {cls_: type_ for type_, classes in _type_to_class.items() for cls_ in classes} +def add_pydantic_class_type_mapping(attribute_type: str, orm_class, pydantic_class): + _orm_to_model[orm_class] = pydantic_class + _type_to_class[attribute_type] = [pydantic_class, orm_class] + _class_to_type[pydantic_class] = attribute_type + _class_to_type[orm_class] = attribute_type + + class BaseSerialization: """BaseSerialization provides utils for serialization.""" From e8291d3470e4f0f684a4e84ea0ebad49ab8a0651 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 20 Jul 2024 15:04:13 +0200 Subject: [PATCH 3/6] Have integration points for AIP-69 in Internal API --- tests/api_internal/endpoints/test_rpc_api_endpoint.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/api_internal/endpoints/test_rpc_api_endpoint.py b/tests/api_internal/endpoints/test_rpc_api_endpoint.py index 4c312da3a708d..64ea733d39c58 100644 --- a/tests/api_internal/endpoints/test_rpc_api_endpoint.py +++ b/tests/api_internal/endpoints/test_rpc_api_endpoint.py @@ -75,12 +75,12 @@ def setup_attrs(self, minimal_app_for_internal_api: Flask) -> Generator: mock_test_method.reset_mock() mock_test_method.side_effect = None with mock.patch( - "airflow.api_internal.endpoints.rpc_api_endpoint._initialize_map" - ) as mock_initialize_map: - mock_initialize_map.return_value = { + "airflow.api_internal.endpoints.rpc_api_endpoint.initialize_method_map" + ) as mock_initialize_method_map: + mock_initialize_method_map.return_value = { TEST_METHOD_NAME: mock_test_method, } - yield mock_initialize_map + yield mock_initialize_method_map @pytest.mark.parametrize( "input_params, method_result, result_cmp_func, method_params", From 1516f68efd6ee56df555487c01899fe4f533214f Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 20 Jul 2024 18:05:55 +0200 Subject: [PATCH 4/6] Fix pytest, reverse validation check for scheme --- airflow/api_internal/internal_api_call.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index c8567a66425a9..4df4aee30baca 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -91,8 +91,9 @@ def _init_values(): url_conf = urlparse(conf.get("core", "internal_api_url")) api_path = url_conf.path if len(api_path) < 2: + # Add the default path if not given in the configuration api_path = "/internal_api/v1/rpcapi" - if url_conf.scheme in ["http", "https"]: + if url_conf.scheme not in ["http", "https"]: raise AirflowConfigException("[core]internal_api_url must start with http:// or https://") InternalApiConfig._internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}" From 419d18a27c3747cbaebf453d6babd3cf9d669838 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 20 Jul 2024 18:15:29 +0200 Subject: [PATCH 5/6] Have integration points for AIP-69 in Internal API --- airflow/api_internal/internal_api_call.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 4df4aee30baca..9661b493a7e82 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -95,7 +95,7 @@ def _init_values(): api_path = "/internal_api/v1/rpcapi" if url_conf.scheme not in ["http", "https"]: raise AirflowConfigException("[core]internal_api_url must start with http:// or https://") - InternalApiConfig._internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}" + internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}" InternalApiConfig._initialized = True InternalApiConfig._use_internal_api = use_internal_api From 47f8caf0dcb3254ea79cdec221cc2b477619266d Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 21 Jul 2024 07:26:19 +0200 Subject: [PATCH 6/6] Review feedback --- airflow/api_internal/internal_api_call.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 9661b493a7e82..9450a2d45c135 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -90,7 +90,7 @@ def _init_values(): if use_internal_api: url_conf = urlparse(conf.get("core", "internal_api_url")) api_path = url_conf.path - if len(api_path) < 2: + if api_path in ["", "/"]: # Add the default path if not given in the configuration api_path = "/internal_api/v1/rpcapi" if url_conf.scheme not in ["http", "https"]: