Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Feb 10, 2025

closes: #46609

Why?

If you want to add further links to operators you can define them via a plugin or provider package. Extra links will be displayed in task details page in Grid view.

Currently, users define operator links in one of two ways:

  1. Using plugins and defining operators in those plugins for defining extra links
  2. Directly using custom airflow operators to define operator extra links.

This feature doesn't work with Airflow 3, so while porting this to task sdk so it can be used by users, we decided to simplify the extra links code-flow in a way to also reduce / decouple the user code that runs using get_link of a operator extra link in the airflow webserver to outside the context of the webserver.

What?

To get this working, a simple interface change is made and the following things get easier:

  1. While defining an extra link, add a "xcom_key" required field to it.

For example:

class FooBarLink(BaseOperatorLink):
    name = "foo-bar"
    xcom_key = "link-key"

    def get_link(self, operator, *, ti_key):
        return f"http://www.example.com"
  1. At runtime, from the task runner, after the task runs (when its finalizeing), it requests to push an XCOM which contains the "entire" link for the extra link with the xcom key and task id.

Ex:

def finalize(ti: RuntimeTaskInstance, log: Logger):
    for oe in ti.task.operator_extra_link_dict.values():
        link, xcom_key = oe.get_link(operator=ti.task, ti_key=ti.id), oe.xcom_key  # type: ignore[arg-type]
        log.debug("Setting xcom for operator extra link", link=link, xcom_key=xcom_key)
        _xcom_push(ti, key=xcom_key, value=link)
  1. After the task has completed running, when the user goes to grid view, he can see the extra button.
    The link in this button comes by reading from xcom instead of running user code

How?

Changes of note:

BaseOperatorLink Model

Introducing a new xcom_key datafield that carries the xcom_key with which the entire link will be pushed.

Introduced a new slim interface that we will use during deserialisation which can return the link from xcoms instead of user defined "get_link".

@attrs.define()
class GenericOperatorLink(LoggingMixin):
    """A generic operator link class that can retrieve link only using XCOMs. Used while deserializing operators."""

    name: str
    xcom_key: str

    def get_link(self, ti: TaskInstance) -> str:
        """
        Retrieve the link from the XComs.
        :param ti: Task instance from which to retrieve the link
        :return: link to external system, but by pulling it from XComs
        """
        self.log.info("Retrieving link from XComs with key: %s for task id: %s", self.xcom_key, ti.task_id)
        return ti.xcom_pull(key=self.xcom_key, task_ids=ti.task_id)  # type: ignore

DAG serialisation/ deserialisation

We need something in the serdag to say what links a task has, which we already have but the format is overly complex.

Difference in formats (then vs now):


  Older Newer
Encoded Operator '_operator_extra_links': [{'tests.www.views.test_views_extra_links.RaiseErrorLink': {}},  {'tests.www.views.test_views_extra_links.NoResponseLink': {}},  {'tests.www.views.test_views_extra_links.FooBarLink': {}},  {'tests_common.test_utils.mock_operators.AirflowLink': {}}]} '_operator_extra_links': {'raise_error': 'key',  'no_response': 'key',  'foo-bar': 'link-key',  'airflow': 'airflow_link_key'}}
Decoded Operator _operator_links_source ={   'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink': {       'index': 0   }},list(_operator_links_source.items()) =[   (       'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',       {'index': 0}   )]list(_operator_links_source.items())[0] =(   'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',   {       'index': 0   }) deserialised_operator = SerializedBaseOperator.deserialize_operator(enc_operator)deserialised_operator.operator_extra_linksOut[8]: [GenericOperatorLink(name='raise_error', xcom_key='key'), GenericOperatorLink(name='no_response', xcom_key='key'), GenericOperatorLink(name='foo-bar', xcom_key='link-key'), GenericOperatorLink(name='airflow', xcom_key='airflow_link_key')]

Example of how its stored in the serialised dag now:

{"__version": 1, "dag": {"tags": ["extra_links"], "edge_info": {}, "task_group": {"_group_id": null, "group_display_name": "", "prefix_group_id": true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", "children": {"task": ["operator", "task"]}, "upstream_group_ids": [], "downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids": []}, "timetable": {"__type": "airflow.timetables.simple.NullTimetable", "__var": {}}, "fileloc": "/files/dags/dags/extra_links_custom_operator.py", "dag_id": "extra_links_custom_operator", "timezone": "UTC", "catchup": false, "relative_fileloc": "dags/extra_links_custom_operator.py", "_processor_dags_folder": "/files/dags", "tasks": [{"__var": {"pool": "default_pool", "task_type": "DummyTestOperator", "downstream_task_ids": [], "template_fields": [], "on_failure_fail_dagrun": false, "template_fields_renderers": {}, "task_id": "task", "is_teardown": false, "ui_color": "#fff", "weight_rule": "downstream", "ui_fgcolor": "#000", "start_from_trigger": false, "_needs_expansion": false, "template_ext": [], "is_setup": false, "_task_module": "unusual_prefix_86147154bd7e391eb5735fc97db99fe835bcf5b0_extra_links_custom_operator", "_is_empty": false, "start_trigger_args": null, "_operator_extra_links": {"no_response": "key", "foo-bar": "link-key"}, "label": "task", "_operator_name": "DummyTestOperator"}, "__type": "operator"}], "dag_dependencies": [], "params": []}}
  • During deserialisation, we create a new GenericOperatorLink type link for each link defined per operator in the serdag

Abstract Operator

  • The definitions for utils for operator extra links have been moved to task sdk.
  • The models/abstractoperator inherits from the task sdk abstarctoperator.
  • The get_extra_links in task sdk now follows the new function signature

Testing

  1. Operator Links Defined at Operator Level

DAG:

from airflow.models import BaseOperatorLink, BaseOperator
from airflow.sdk import DAG

class NoResponseLink(BaseOperatorLink):
    name = "no_response"
    xcom_key = "key"

    def get_link(self, operator, *, ti_key):
        return None


class FooBarLink(BaseOperatorLink):
    name = "foo-bar"
    xcom_key = "link-key"

    def get_link(self, operator, *, ti_key):
        return f"http://www.example.com"


class DummyTestOperator(BaseOperator):
    operator_extra_links = (
        NoResponseLink(),
        FooBarLink(),
    )

    def execute(self, context):
        print("Hello from custom operator", self.operator_extra_links)

with DAG(
    dag_id="extra_links_custom_operator",
    schedule=None,
    catchup=False,
    tags=["extra_links"],
):

    t = DummyTestOperator(
        task_id="task"
    )

image

The link elements:

<a target="_blank" class="chakra-link chakra-button c-h1717v" href="http://www.example.com">foo-bar</a>
<a class="chakra-link chakra-button c-h1717v" href="null">no_response</a>

XCOMS pushed:
image

Logs:
image

Last few lines:

{"timestamp":"2025-02-10T09:15:06.800071","level":"debug","event":"Sending request","json":"{\"state\":\"success\",\"end_date\":\"2025-02-10T09:15:06.800015Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","logger":"task"}
{"timestamp":"2025-02-10T09:15:06.800145","level":"debug","event":"Plugins are already loaded. Skipping.","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-10T09:15:06.800175","level":"debug","event":"Initialize extra operators links plugins","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-10T09:15:06.800263","level":"debug","event":"Setting xcom for operator extra link","link":null,"xcom_key":"key","logger":"task"}
{"timestamp":"2025-02-10T09:15:06.800411","level":"debug","event":"Sending request","json":"{\"key\":\"key\",\"value\":\"null\",\"dag_id\":\"extra_links_custom_operator\",\"run_id\":\"manual__2025-02-10T09:15:05.441050+00:00\",\"task_id\":\"task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
{"timestamp":"2025-02-10T09:15:06.800430","level":"debug","event":"Setting xcom for operator extra link","link":"http://www.example.com/","xcom_key":"link-key","logger":"task"}
{"timestamp":"2025-02-10T09:15:06.800457","level":"debug","event":"Sending request","json":"{\"key\":\"link-key\",\"value\":\"\\\"http://www.example.com/\\\"\",\"dag_id\":\"extra_links_custom_operator\",\"run_id\":\"manual__2025-02-10T09:15:05.441050+00:00\",\"task_id\":\"task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
  1. Operator Links Defined as plugins
    Plugin:
from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.providers.standard.operators.python import PythonOperator


# define the extra link
class HTTPDocsLink(BaseOperatorLink):
    # name the link button in the UI
    name = "HTTP docs"
    xcom_key = "key"

    # add the button to one or more operators
    operators = [PythonOperator]

    # provide the link
    def get_link(self, operator, *, ti_key=None):
        return "https://developer.mozilla.org/en-US/docs/Web/HTTP"

# define the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
    name = "extra_link_plugin"
    operator_extra_links = [
        HTTPDocsLink(),
    ]

DAG:

from airflow.models.dag import DAG
from pendulum import datetime

from airflow.providers.standard.operators.python import PythonOperator


def handler():
    print("Hello from the python operator!!")

with DAG(
    dag_id="extra_links_plugin",
    start_date=datetime(2022, 11, 1),
    schedule=None,
    catchup=False,
    tags=["extra_links"],
):

    call_api_simple = PythonOperator(
        task_id="call_api_simple",
        python_callable=handler,
    )

image

The link elements:

<a target="_blank" class="chakra-link chakra-button c-h1717v" href="https://developer.mozilla.org/en-US/docs/Web/HTTP">HTTP docs</a>

XComs pushed:
image

Logs:
image

Last few logs lines:

{"timestamp":"2025-02-10T09:19:37.131169","level":"debug","event":"Sending request","json":"{\"state\":\"success\",\"end_date\":\"2025-02-10T09:19:37.131068Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","logger":"task"}
{"timestamp":"2025-02-10T09:19:37.131332","level":"debug","event":"Plugins are already loaded. Skipping.","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-10T09:19:37.131368","level":"debug","event":"Initialize extra operators links plugins","logger":"airflow.plugins_manager"}
{"timestamp":"2025-02-10T09:19:37.131524","level":"debug","event":"Setting xcom for operator extra link","link":"[https://developer.mozilla.org/en-US/docs/Web/HTTP","xcom_key":"key","logger":"task"](https://developer.mozilla.org/en-US/docs/Web/HTTP%22,%22xcom_key%22:%22key%22,%22logger%22:%22task%22)}
{"timestamp":"2025-02-10T09:19:37.131681","level":"debug","event":"Sending request","json":"{\"key\":\"key\",\"value\":\"\\\"https://developer.mozilla.org/en-US/docs/Web/HTTP\\\"\",\"dag_id\":\"extra_links_plugin\",\"run_id\":\"manual__2025-02-10T09:19:36.380827+00:00\",\"task_id\":\"call_api_simple\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n","logger":"task"}
{"timestamp":"2025-02-10T09:19:37.877771Z","level":"info","event":"Hello from the python operator!!","chan":"stdout","logger":"task"}

Whats pending?

  • Adding newsfragments (significant / behaviour change?)
  • Testing with SDK baseoperator
  • Moving baseoperatorlink model to sdk definitions

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh changed the title AIP-72: Improving Operator Links to Prevent User Code Execution in Webserver AIP-72: Improving Operator Links Interface to Prevent User Code Execution in Webserver Feb 10, 2025
@amoghrajesh
Copy link
Contributor Author

Should something like this pass:

    @pytest.mark.usefixtures("clear_all_logger_handlers")
    def test_extra_operator_links_logs_error_for_non_registered_extra_links(self):
        """
        Assert OperatorLinks not registered via Plugins and if it is not an inbuilt Operator Link,
        it can still deserialize the DAG (does not error) but just logs an error.

        We test NOT using caplog as this is flaky, we check that the task after deserialize
        is missing the extra links.
        """

        class TaskStateLink(BaseOperatorLink):
            """OperatorLink not registered via Plugins nor a built-in OperatorLink"""

            name = "My Link"

            def get_link(self, operator, *, ti_key):
                return "https://www.google.com"

        class MyOperator(BaseOperator):
            """Just a EmptyOperator using above defined Extra Operator Link"""

            operator_extra_links = [TaskStateLink()]

            def execute(self, context: Context):
                pass

        with DAG(dag_id="simple_dag", schedule=None, start_date=datetime(2019, 8, 1)) as dag:
            MyOperator(task_id="blah")

        serialized_dag = SerializedDAG.to_dict(dag)

        sdag = SerializedDAG.from_dict(serialized_dag)
        assert sdag.task_dict["blah"].operator_extra_links == []

I do not really understand what:

        Assert OperatorLinks not registered via Plugins and if it is not an inbuilt Operator Link,
        it can still deserialize the DAG (does not error) but just logs an error.

        We test NOT using caplog as this is flaky, we check that the task after deserialize
        is missing the extra links.

means.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

1 question about how we should handle global links (I really would like to avoid plugin manager in the SDK if we can, it will make dependencies a nightmare in the medium term if we start using it there)

@amoghrajesh
Copy link
Contributor Author

Update:

  • Migrated over all the functions using plugins manager from sdk/abstractoperator over to the serialisation module.\
  • Handled all nits
  • Fixed all tests (hopefully)

Whats left:

  • Adding examples to the newsfragment (for which I need to extend it to support over 1 line)
  • Test an e2e scenario again

@amoghrajesh
Copy link
Contributor Author

A lot of core tests are failing, working on fixing them

@amoghrajesh
Copy link
Contributor Author

Last few tests to go, almost there!

@amoghrajesh
Copy link
Contributor Author

Yeah finally!! 🥇

@ashb could you take a look again at this one?

@amoghrajesh
Copy link
Contributor Author

Added some additional information about the design et al in the last self review.

@amoghrajesh amoghrajesh merged commit fe5a2ea into apache:main Feb 13, 2025
91 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-extra-links branch February 13, 2025 16:55
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 17, 2025
…tion in Webserver (apache#46613)

Operator Links interface changed to not run user code in Airflow Webserver The Operator Extra links, which can be defined either via plugins or custom operators now do not execute any user code in the Airflow Webserver, but instead push the "full" links to XCom backend and the value is again fetched from the XCom backend when viewing task details in grid view.


Example:
```
@attr.s(auto_attribs=True)
class CustomBaseIndexOpLink(BaseOperatorLink):
    """Custom Operator Link for Google BigQuery Console."""
    index: int = attr.ib()
    @Property
    def name(self) -> str:
        return f"BigQuery Console #{self.index + 1}"

    @Property
    def xcom_key(self) -> str:
        return f"bigquery_{self.index + 1}"

    def get_link(self, operator, *, ti_key):
        search_queries = XCom.get_one(
            task_id=ti_key.task_id, dag_id=ti_key.dag_id, run_id=ti_key.run_id, key="search_query"
        )
        if not search_queries:
            return None
        if len(search_queries) < self.index:
            return None
        search_query = search_queries[self.index]
        return f"https://console.cloud.google.com/bigquery?j={search_query}"
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Get Operator Extra Links working again

3 participants