From e8f5a3e928e597e80f0e4dcb292f2870a02cfa73 Mon Sep 17 00:00:00 2001 From: shubhamraj-git Date: Fri, 31 Jan 2025 22:54:30 +0530 Subject: [PATCH 1/2] Fix the extra links for mapped tasks --- .../endpoints/extra_link_endpoint.py | 2 ++ airflow/api_connexion/openapi/v1.yaml | 1 + airflow/www/static/js/types/api-generated.ts | 11 +++++- .../endpoints/test_extra_link_endpoint.py | 35 ++++++++++++++++++- tests/test_utils/mock_operators.py | 19 +++++++++- 5 files changed, 65 insertions(+), 3 deletions(-) diff --git a/airflow/api_connexion/endpoints/extra_link_endpoint.py b/airflow/api_connexion/endpoints/extra_link_endpoint.py index ddf4b670285c8..87f83fb77c93f 100644 --- a/airflow/api_connexion/endpoints/extra_link_endpoint.py +++ b/airflow/api_connexion/endpoints/extra_link_endpoint.py @@ -42,6 +42,7 @@ def get_extra_links( dag_id: str, dag_run_id: str, task_id: str, + map_index: int = -1, session: Session = NEW_SESSION, ) -> APIResponse: """Get extra links for task instance.""" @@ -62,6 +63,7 @@ def get_extra_links( TaskInstance.dag_id == dag_id, TaskInstance.run_id == dag_run_id, TaskInstance.task_id == task_id, + TaskInstance.map_index == map_index, ) ) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 180f854c02f3f..fcece7b8b5a84 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2062,6 +2062,7 @@ paths: - $ref: "#/components/parameters/DAGID" - $ref: "#/components/parameters/DAGRunID" - $ref: "#/components/parameters/TaskID" + - $ref: "#/components/parameters/FilterMapIndex" get: summary: List extra links diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 2da17d2981d03..8cc92e140038f 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -638,6 +638,10 @@ export interface paths { /** The task ID. */ task_id: components["parameters"]["TaskID"]; }; + query: { + /** Filter on map index for mapped task. */ + map_index?: components["parameters"]["FilterMapIndex"]; + }; }; }; "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}": { @@ -4741,6 +4745,10 @@ export interface operations { /** The task ID. */ task_id: components["parameters"]["TaskID"]; }; + query: { + /** Filter on map index for mapped task. */ + map_index?: components["parameters"]["FilterMapIndex"]; + }; }; responses: { /** Success. */ @@ -5990,7 +5998,8 @@ export type GetXcomEntryVariables = CamelCasedPropertiesDeep< operations["get_xcom_entry"]["parameters"]["query"] >; export type GetExtraLinksVariables = CamelCasedPropertiesDeep< - operations["get_extra_links"]["parameters"]["path"] + operations["get_extra_links"]["parameters"]["path"] & + operations["get_extra_links"]["parameters"]["query"] >; export type GetLogVariables = CamelCasedPropertiesDeep< operations["get_log"]["parameters"]["path"] & diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py b/tests/api_connexion/endpoints/test_extra_link_endpoint.py index 76b2a09603609..daca963aeba7f 100644 --- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py +++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py @@ -34,7 +34,7 @@ from tests.test_utils.api_connexion_utils import create_user, delete_user from tests.test_utils.compat import BaseOperatorLink from tests.test_utils.db import clear_db_runs, clear_db_xcom -from tests.test_utils.mock_operators import CustomOperator +from tests.test_utils.mock_operators import CustomOperator, MappedCustomOperator from tests.test_utils.mock_plugins import mock_plugin_manager pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -100,6 +100,10 @@ def _create_dag(self): CustomOperator( task_id="TEST_MULTIPLE_LINK", bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"] ) + # Mapped task expanded over a list of bash_commands + MappedCustomOperator.partial(task_id="TEST_MAPPED_TASK").expand( + bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"] + ) return dag @pytest.mark.parametrize( @@ -241,3 +245,32 @@ class AirflowTestPlugin(AirflowPlugin): "TEST_DAG_ID/TEST_SINGLE_LINK/2020-01-01T00%3A00%3A00%2B00%3A00" ), } == response.json + + @mock_plugin_manager(plugins=[]) + def test_should_respond_200_mapped_task_instance(self): + map_index = 0 + XCom.set( + key="search_query", + value="TEST_LINK_VALUE_1", + task_id="TEST_MAPPED_TASK", + dag_id="TEST_DAG_ID", + run_id="TEST_DAG_RUN_ID", + map_index=map_index, + ) + response = self.client.get( + "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_MAPPED_TASK/links?map_index=0", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "Google Custom": "http://google.com/custom_base_link?search=TEST_LINK_VALUE_1" + } + + @mock_plugin_manager(plugins=[]) + def test_should_respond_404_invalid_map_index(self): + response = self.client.get( + "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_MAPPED_TASK/links?map_index=6", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 + assert response.json["detail"] == 'DAG Run with ID = "TEST_DAG_RUN_ID" not found' diff --git a/tests/test_utils/mock_operators.py b/tests/test_utils/mock_operators.py index cd816707a59f5..3b74a9e2bf116 100644 --- a/tests/test_utils/mock_operators.py +++ b/tests/test_utils/mock_operators.py @@ -137,7 +137,11 @@ class CustomOpLink(BaseOperatorLink): def get_link(self, operator, *, ti_key): search_query = XCom.get_one( - task_id=ti_key.task_id, dag_id=ti_key.dag_id, run_id=ti_key.run_id, key="search_query" + task_id=ti_key.task_id, + dag_id=ti_key.dag_id, + run_id=ti_key.run_id, + map_index=ti_key.map_index, + key="search_query", ) if not search_query: return None @@ -166,6 +170,19 @@ def execute(self, context: Context): context["task_instance"].xcom_push(key="search_query", value="dummy_value") +class MappedCustomOperator(BaseOperator): + operator_extra_links = (CustomOpLink(),) + + def __init__(self, bash_command: Any, *args, **kwargs): + super().__init__(*args, **kwargs) + self.bash_command = bash_command + + def execute(self, context: Context): + pass + # self.log.info("Processing value: %s", self.bash_command) + # return f"Processed {self.bash_command}" + + class GoogleLink(BaseOperatorLink): """ Operator Link for Apache Airflow Website for Google From 545c3e4e6ba101dd5f6cfed691f01bcb04e8d47c Mon Sep 17 00:00:00 2001 From: shubhamraj-git Date: Sun, 2 Feb 2025 18:07:23 +0530 Subject: [PATCH 2/2] Fix tests for extra links for mapped tasks --- .../endpoints/test_extra_link_endpoint.py | 87 ++++++++++++------- tests/test_utils/mock_operators.py | 20 ++--- 2 files changed, 63 insertions(+), 44 deletions(-) diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py b/tests/api_connexion/endpoints/test_extra_link_endpoint.py index daca963aeba7f..cafdc8979ecd2 100644 --- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py +++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py @@ -27,6 +27,7 @@ from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin from airflow.security import permissions +from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.timetables.base import DataInterval from airflow.utils import timezone from airflow.utils.state import DagRunState @@ -34,7 +35,7 @@ from tests.test_utils.api_connexion_utils import create_user, delete_user from tests.test_utils.compat import BaseOperatorLink from tests.test_utils.db import clear_db_runs, clear_db_xcom -from tests.test_utils.mock_operators import CustomOperator, MappedCustomOperator +from tests.test_utils.mock_operators import CustomOperator from tests.test_utils.mock_plugins import mock_plugin_manager pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -62,7 +63,7 @@ def configured_app(minimal_app_for_api): delete_user(app, username="test_no_permissions") # type: ignore -class TestGetExtraLinks: +class BaseGetExtraLinks: @pytest.fixture(autouse=True) def setup_attrs(self, configured_app, session) -> None: self.default_time = timezone.datetime(2020, 1, 1) @@ -72,7 +73,7 @@ def setup_attrs(self, configured_app, session) -> None: self.app = configured_app - self.dag = self._create_dag() + self.dag = self._create_dag() # type: ignore self.app.dag_bag = DagBag(os.devnull, include_examples=False) self.app.dag_bag.dags = {self.dag.dag_id: self.dag} # type: ignore @@ -94,16 +95,14 @@ def teardown_method(self) -> None: clear_db_runs() clear_db_xcom() + +class TestGetExtraLinks(BaseGetExtraLinks): def _create_dag(self): with DAG(dag_id="TEST_DAG_ID", schedule=None, default_args={"start_date": self.default_time}) as dag: CustomOperator(task_id="TEST_SINGLE_LINK", bash_command="TEST_LINK_VALUE") CustomOperator( task_id="TEST_MULTIPLE_LINK", bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"] ) - # Mapped task expanded over a list of bash_commands - MappedCustomOperator.partial(task_id="TEST_MAPPED_TASK").expand( - bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"] - ) return dag @pytest.mark.parametrize( @@ -246,31 +245,59 @@ class AirflowTestPlugin(AirflowPlugin): ), } == response.json - @mock_plugin_manager(plugins=[]) - def test_should_respond_200_mapped_task_instance(self): - map_index = 0 - XCom.set( - key="search_query", - value="TEST_LINK_VALUE_1", - task_id="TEST_MAPPED_TASK", - dag_id="TEST_DAG_ID", - run_id="TEST_DAG_RUN_ID", - map_index=map_index, - ) - response = self.client.get( - "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_MAPPED_TASK/links?map_index=0", - environ_overrides={"REMOTE_USER": "test"}, - ) - assert response.status_code == 200 - assert response.json == { - "Google Custom": "http://google.com/custom_base_link?search=TEST_LINK_VALUE_1" - } +class TestMappedTaskExtraLinks(BaseGetExtraLinks): + def _create_dag(self): + with DAG(dag_id="TEST_DAG_ID", schedule=None, default_args={"start_date": self.default_time}) as dag: + # Mapped task expanded over a list of bash_commands + CustomOperator.partial(task_id="TEST_MAPPED_TASK").expand( + bash_command=["TEST_LINK_VALUE_3", "TEST_LINK_VALUE_4"] + ) + return SerializedBaseOperator.deserialize(SerializedBaseOperator.serialize(dag)) + + @pytest.mark.parametrize( + "map_index, expected_status, expected_json", + [ + ( + 0, + 200, + { + "Google Custom": "http://google.com/custom_base_link?search=TEST_LINK_VALUE_3", + "google": "https://www.google.com", + }, + ), + ( + 1, + 200, + { + "Google Custom": "http://google.com/custom_base_link?search=TEST_LINK_VALUE_4", + "google": "https://www.google.com", + }, + ), + (6, 404, {"detail": 'DAG Run with ID = "TEST_DAG_RUN_ID" not found'}), + ], + ) @mock_plugin_manager(plugins=[]) - def test_should_respond_404_invalid_map_index(self): + def test_mapped_task_links(self, map_index, expected_status, expected_json): + """Parameterized test for mapped task extra links.""" + # Set XCom data for different map indices + if map_index < 2: + XCom.set( + key="search_query", + value=f"TEST_LINK_VALUE_{map_index + 3}", + task_id="TEST_MAPPED_TASK", + dag_id="TEST_DAG_ID", + run_id="TEST_DAG_RUN_ID", + map_index=map_index, + ) + response = self.client.get( - "/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_MAPPED_TASK/links?map_index=6", + f"/api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_MAPPED_TASK/links?map_index={map_index}", environ_overrides={"REMOTE_USER": "test"}, ) - assert response.status_code == 404 - assert response.json["detail"] == 'DAG Run with ID = "TEST_DAG_RUN_ID" not found' + + assert response.status_code == expected_status + if map_index < 2: + assert response.json == expected_json + else: + assert response.json["detail"] == expected_json["detail"] diff --git a/tests/test_utils/mock_operators.py b/tests/test_utils/mock_operators.py index 3b74a9e2bf116..f254d22484c0f 100644 --- a/tests/test_utils/mock_operators.py +++ b/tests/test_utils/mock_operators.py @@ -22,6 +22,7 @@ import attr from airflow.models.baseoperator import BaseOperator +from airflow.models.mappedoperator import MappedOperator from airflow.models.xcom import XCom from tests.test_utils.compat import BaseOperatorLink @@ -157,7 +158,11 @@ def operator_extra_links(self): """ Return operator extra links """ - if isinstance(self.bash_command, str) or self.bash_command is None: + if ( + isinstance(self, MappedOperator) + or isinstance(self.bash_command, str) + or self.bash_command is None + ): return (CustomOpLink(),) return (CustomBaseIndexOpLink(i) for i, _ in enumerate(self.bash_command)) @@ -170,19 +175,6 @@ def execute(self, context: Context): context["task_instance"].xcom_push(key="search_query", value="dummy_value") -class MappedCustomOperator(BaseOperator): - operator_extra_links = (CustomOpLink(),) - - def __init__(self, bash_command: Any, *args, **kwargs): - super().__init__(*args, **kwargs) - self.bash_command = bash_command - - def execute(self, context: Context): - pass - # self.log.info("Processing value: %s", self.bash_command) - # return f"Processed {self.bash_command}" - - class GoogleLink(BaseOperatorLink): """ Operator Link for Apache Airflow Website for Google