From b6e607d18efd8bd1c7a2f303008e3d4fc664eb44 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Fri, 5 Jul 2024 13:03:17 -0700 Subject: [PATCH] Do not dynamically determine op links for emr serverless The dynamic determination of which extra link to include for the EmrServerlessStartJobOperator does not work with templated fields, since that evaluation was happening at DAG parsing time. This dynamic determination has been completely removed because: 1) If using templated fields for some inputs it needed, DAG parsing would fail. 2) Changing the DAG definition relating to those links, would often remove links for all previous DAG runs. 3) It overly complicates the code. The new behaviour is to add all links to the TI and only those that are enabled will be "persisted" (i.e. actually have a link) and those that are not will be present but greyed out and will link back to the TI (the Airflow default). --- airflow/providers/amazon/aws/operators/emr.py | 92 ++----------------- .../operators/emr/emr_serverless.rst | 3 +- .../aws/operators/test_emr_serverless.py | 54 ----------- 3 files changed, 8 insertions(+), 141 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index f75a0224fbe1c..c13c622937ffc 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -27,7 +27,6 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import BaseOperator -from airflow.models.mappedoperator import MappedOperator from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook from airflow.providers.amazon.aws.links.emr import ( EmrClusterLink, @@ -1259,91 +1258,12 @@ class EmrServerlessStartJobOperator(BaseOperator): "configuration_overrides": "json", } - @property - def operator_extra_links(self): - """ - Dynamically add extra links depending on the job type and if they're enabled. - - If S3 or CloudWatch monitoring configurations exist, add links directly to the relevant consoles. - Only add dashboard links if they're explicitly enabled. These are one-time links that any user - can access, but expire on first click or one hour, whichever comes first. - """ - op_extra_links = [] - - if isinstance(self, MappedOperator): - operator_class = self.operator_class - enable_application_ui_links = self.partial_kwargs.get( - "enable_application_ui_links" - ) or self.expand_input.value.get("enable_application_ui_links") - job_driver = self.partial_kwargs.get("job_driver", {}) or self.expand_input.value.get( - "job_driver", {} - ) - configuration_overrides = self.partial_kwargs.get( - "configuration_overrides" - ) or self.expand_input.value.get("configuration_overrides") - - # Configuration overrides can either be a list or a dictionary, depending on whether it's passed in as partial or expand. - if isinstance(configuration_overrides, list): - if any( - [ - operator_class.is_monitoring_in_job_override( - self=operator_class, - config_key="s3MonitoringConfiguration", - job_override=job_override, - ) - for job_override in configuration_overrides - ] - ): - op_extra_links.extend([EmrServerlessS3LogsLink()]) - if any( - [ - operator_class.is_monitoring_in_job_override( - self=operator_class, - config_key="cloudWatchLoggingConfiguration", - job_override=job_override, - ) - for job_override in configuration_overrides - ] - ): - op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) - else: - if operator_class.is_monitoring_in_job_override( - self=operator_class, - config_key="s3MonitoringConfiguration", - job_override=configuration_overrides, - ): - op_extra_links.extend([EmrServerlessS3LogsLink()]) - if operator_class.is_monitoring_in_job_override( - self=operator_class, - config_key="cloudWatchLoggingConfiguration", - job_override=configuration_overrides, - ): - op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) - - else: - operator_class = self - enable_application_ui_links = self.enable_application_ui_links - configuration_overrides = self.configuration_overrides - job_driver = self.job_driver - - if operator_class.is_monitoring_in_job_override( - "s3MonitoringConfiguration", configuration_overrides - ): - op_extra_links.extend([EmrServerlessS3LogsLink()]) - if operator_class.is_monitoring_in_job_override( - "cloudWatchLoggingConfiguration", configuration_overrides - ): - op_extra_links.extend([EmrServerlessCloudWatchLogsLink()]) - - if enable_application_ui_links: - op_extra_links.extend([EmrServerlessDashboardLink()]) - if isinstance(job_driver, list): - if any("sparkSubmit" in ind_job_driver for ind_job_driver in job_driver): - op_extra_links.extend([EmrServerlessLogsLink()]) - elif "sparkSubmit" in job_driver: - op_extra_links.extend([EmrServerlessLogsLink()]) - - return tuple(op_extra_links) + operator_extra_links = ( + EmrServerlessS3LogsLink(), + EmrServerlessCloudWatchLogsLink(), + EmrServerlessDashboardLink(), + EmrServerlessLogsLink(), + ) def __init__( self, diff --git a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst index 65a0fc8bfebe6..9915763e19e09 100644 --- a/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst +++ b/docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst @@ -72,7 +72,8 @@ Open Application UIs The operator can also be configured to generate one-time links to the application UIs and Spark stdout logs by passing the ``enable_application_ui_links=True`` as a parameter. Once the job starts running, these links -are available in the Details section of the relevant Task. +are available in the Details section of the relevant Task. If ``enable_application_ui_links=False`` then the +links will be present but grayed out. You need to ensure you have the following IAM permissions to generate the dashboard link. diff --git a/tests/providers/amazon/aws/operators/test_emr_serverless.py b/tests/providers/amazon/aws/operators/test_emr_serverless.py index a93366d296703..4804f2286993c 100644 --- a/tests/providers/amazon/aws/operators/test_emr_serverless.py +++ b/tests/providers/amazon/aws/operators/test_emr_serverless.py @@ -25,23 +25,13 @@ from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, TaskDeferred from airflow.providers.amazon.aws.hooks.emr import EmrServerlessHook -from airflow.providers.amazon.aws.links.emr import ( - EmrServerlessCloudWatchLogsLink, - EmrServerlessDashboardLink, - EmrServerlessLogsLink, - EmrServerlessS3LogsLink, -) from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessDeleteApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessStopApplicationOperator, ) -from airflow.serialization.serialized_objects import ( - BaseSerialization, -) from airflow.utils.types import NOTSET -from tests.test_utils.compat import deserialize_operator if TYPE_CHECKING: from unittest.mock import MagicMock @@ -1152,50 +1142,6 @@ def test_links_spark_without_applicationui_enabled( job_run_id=job_run_id, ) - def test_operator_extra_links_mapped_without_applicationui_enabled( - self, - ): - operator = EmrServerlessStartJobOperator.partial( - task_id=task_id, - application_id=application_id, - execution_role_arn=execution_role_arn, - job_driver=spark_job_driver, - enable_application_ui_links=False, - ).expand( - configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides], - ) - - ser_operator = BaseSerialization.serialize(operator) - deser_operator = deserialize_operator(ser_operator) - - assert deser_operator.operator_extra_links == [ - EmrServerlessS3LogsLink(), - EmrServerlessCloudWatchLogsLink(), - ] - - def test_operator_extra_links_mapped_with_applicationui_enabled_at_partial( - self, - ): - operator = EmrServerlessStartJobOperator.partial( - task_id=task_id, - application_id=application_id, - execution_role_arn=execution_role_arn, - job_driver=spark_job_driver, - enable_application_ui_links=True, - ).expand( - configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides], - ) - - ser_operator = BaseSerialization.serialize(operator) - deser_operator = deserialize_operator(ser_operator) - - assert deser_operator.operator_extra_links == [ - EmrServerlessS3LogsLink(), - EmrServerlessCloudWatchLogsLink(), - EmrServerlessDashboardLink(), - EmrServerlessLogsLink(), - ] - class TestEmrServerlessDeleteOperator: @mock.patch.object(EmrServerlessHook, "get_waiter")