diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index dc703446af240..c77f9476b0d24 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1064,17 +1064,24 @@ metrics: example: "\"scheduler,executor,dagrun,pool,triggerer,celery\" or \"^scheduler,^executor,heartbeat|timeout\"" default: "" - metrics_consistency_on: - description: | - Enables metrics consistency across all metrics loggers (ex: timer and timing metrics). + # TODO: Remove 'timer_unit_consistency' in Airflow 3.0 + timer_unit_consistency: + description: | + Controls the consistency of timer units across all metrics loggers + (e.g., Statsd, Datadog, OpenTelemetry) + for timing and duration-based metrics. When enabled, all timers will publish + metrics in milliseconds for consistency and alignment with Airflow's default + metrics behavior in version 3.0+. .. warning:: - It is enabled by default from Airflow 3. - version_added: 2.10.0 + It will be the default behavior from Airflow 3.0. If disabled, timers may publish + in seconds for backwards compatibility, though it is recommended to enable this + setting to ensure metric uniformity and forward-compat with Airflow 3. + version_added: 2.11.0 type: string example: ~ - default: "True" + default: "False" statsd_on: description: | Enables sending metrics to StatsD. diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index 60b5424a6f5c8..81926716eb25b 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -23,7 +23,7 @@ from typing import TYPE_CHECKING from airflow.configuration import conf -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import RemovedInAirflow3Warning from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( PatternAllowListValidator, @@ -42,11 +42,11 @@ log = logging.getLogger(__name__) -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) -if not metrics_consistency_on: +timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency") +if not timer_unit_consistency: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.", + RemovedInAirflow3Warning, stacklevel=2, ) @@ -144,7 +144,7 @@ def timing( tags_list = [] if self.metrics_validator.test(stat): if isinstance(dt, datetime.timedelta): - if metrics_consistency_on: + if timer_unit_consistency: dt = dt.total_seconds() * 1000.0 else: dt = dt.total_seconds() diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 6d7d6e8fffa1c..ed123608626ff 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -31,7 +31,7 @@ from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource from airflow.configuration import conf -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import RemovedInAirflow3Warning from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( OTEL_NAME_MAX_LENGTH, @@ -73,11 +73,11 @@ # Delimiter is placed between the universal metric prefix and the unique metric name. DEFAULT_METRIC_NAME_DELIMITER = "." -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) -if not metrics_consistency_on: +timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency") +if not timer_unit_consistency: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.", + RemovedInAirflow3Warning, stacklevel=2, ) @@ -284,7 +284,7 @@ def timing( """OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed.""" if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): if isinstance(dt, datetime.timedelta): - if metrics_consistency_on: + if timer_unit_consistency: dt = dt.total_seconds() * 1000.0 else: dt = dt.total_seconds() diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py index 7eef7929e02db..0d12704e87a3b 100644 --- a/airflow/metrics/protocols.py +++ b/airflow/metrics/protocols.py @@ -23,16 +23,16 @@ from typing import Union from airflow.configuration import conf -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import RemovedInAirflow3Warning from airflow.typing_compat import Protocol DeltaType = Union[int, float, datetime.timedelta] -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) -if not metrics_consistency_on: +timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency") +if not timer_unit_consistency: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.", + RemovedInAirflow3Warning, stacklevel=2, ) @@ -127,7 +127,7 @@ def start(self) -> Timer: def stop(self, send: bool = True) -> None: """Stop the timer, and optionally send it to stats backend.""" if self._start_time is not None: - if metrics_consistency_on: + if timer_unit_consistency: self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds. else: self.duration = time.perf_counter() - self._start_time diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index c05b1dd62ecaa..0b2a71e92317f 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -80,12 +80,12 @@ from airflow.exceptions import ( AirflowException, AirflowFailException, - AirflowProviderDeprecationWarning, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, AirflowTaskTerminated, AirflowTaskTimeout, + RemovedInAirflow3Warning, TaskDeferralError, TaskDeferred, UnmappableXComLengthPushed, @@ -176,11 +176,11 @@ PAST_DEPENDS_MET = "past_depends_met" -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) -if not metrics_consistency_on: +timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency") +if not timer_unit_consistency: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.", + RemovedInAirflow3Warning, stacklevel=2, ) @@ -2827,7 +2827,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: self.task_id, ) return - if metrics_consistency_on: + if timer_unit_consistency: timing = timezone.utcnow() - self.queued_dttm else: timing = (timezone.utcnow() - self.queued_dttm).total_seconds() @@ -2843,7 +2843,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: self.task_id, ) return - if metrics_consistency_on: + if timer_unit_consistency: timing = timezone.utcnow() - self.start_date else: timing = (timezone.utcnow() - self.start_date).total_seconds() diff --git a/newsfragments/39908.significant.rst b/newsfragments/39908.significant.rst index bd4a2967ba4fb..d5ba99fa9fa53 100644 --- a/newsfragments/39908.significant.rst +++ b/newsfragments/39908.significant.rst @@ -1 +1,11 @@ -Publishing timer and timing metrics in seconds has been deprecated. In Airflow 3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can disable this for backward compatibility. To publish all timer and timing metrics in milliseconds, ensure metrics consistency is enabled +Publishing timer and timing metrics in seconds is now deprecated. + +In Airflow 3.0, the ``timer_unit_consistency`` setting in the ``metrics`` section will be +enabled by default and setting itself will be removed. This will standardize all timer and timing metrics to +milliseconds across all metric loggers. + +**Users Integrating with Datadog, OpenTelemetry, or other metric backends** should enable this setting. For users, using +``statsd``, this change will not affect you. + +If you need backward compatibility, you can leave this setting disabled temporarily, but enabling +``timer_unit_consistency`` is encouraged to future-proof your metrics setup. diff --git a/tests/core/test_otel_logger.py b/tests/core/test_otel_logger.py index 1b0f01c89b902..a4bf7c4c41567 100644 --- a/tests/core/test_otel_logger.py +++ b/tests/core/test_otel_logger.py @@ -236,20 +236,20 @@ def test_gauge_value_is_correct(self, name): assert self.map[full_name(name)].value == 1 @pytest.mark.parametrize( - "metrics_consistency_on", + "timer_unit_consistency", [True, False], ) - def test_timing_new_metric(self, metrics_consistency_on, name): + def test_timing_new_metric(self, timer_unit_consistency, name): import datetime - otel_logger.metrics_consistency_on = metrics_consistency_on + otel_logger.timer_unit_consistency = timer_unit_consistency self.stats.timing(name, dt=datetime.timedelta(seconds=123)) self.meter.get_meter().create_observable_gauge.assert_called_once_with( name=full_name(name), callbacks=ANY ) - expected_value = 123000.0 if metrics_consistency_on else 123 + expected_value = 123000.0 if timer_unit_consistency else 123 assert self.map[full_name(name)].value == expected_value def test_timing_new_metric_with_tags(self, name): @@ -277,17 +277,17 @@ def test_timing_existing_metric(self, name): # to get the end timestamp. timer() should return the difference as a float. @pytest.mark.parametrize( - "metrics_consistency_on", + "timer_unit_consistency", [True, False], ) @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) - def test_timer_with_name_returns_float_and_stores_value(self, mock_time, metrics_consistency_on, name): - protocols.metrics_consistency_on = metrics_consistency_on + def test_timer_with_name_returns_float_and_stores_value(self, mock_time, timer_unit_consistency, name): + protocols.timer_unit_consistency = timer_unit_consistency with self.stats.timer(name) as timer: pass assert isinstance(timer.duration, float) - expected_duration = 3140.0 if metrics_consistency_on else 3.14 + expected_duration = 3140.0 if timer_unit_consistency else 3.14 assert timer.duration == expected_duration assert mock_time.call_count == 2 self.meter.get_meter().create_observable_gauge.assert_called_once_with( @@ -295,33 +295,33 @@ def test_timer_with_name_returns_float_and_stores_value(self, mock_time, metrics ) @pytest.mark.parametrize( - "metrics_consistency_on", + "timer_unit_consistency", [True, False], ) @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) def test_timer_no_name_returns_float_but_does_not_store_value( - self, mock_time, metrics_consistency_on, name + self, mock_time, timer_unit_consistency, name ): - protocols.metrics_consistency_on = metrics_consistency_on + protocols.timer_unit_consistency = timer_unit_consistency with self.stats.timer() as timer: pass assert isinstance(timer.duration, float) - expected_duration = 3140.0 if metrics_consistency_on else 3.14 + expected_duration = 3140.0 if timer_unit_consistency else 3.14 assert timer.duration == expected_duration assert mock_time.call_count == 2 self.meter.get_meter().create_observable_gauge.assert_not_called() @pytest.mark.parametrize( - "metrics_consistency_on", + "timer_unit_consistency", [ True, False, ], ) @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) - def test_timer_start_and_stop_manually_send_false(self, mock_time, metrics_consistency_on, name): - protocols.metrics_consistency_on = metrics_consistency_on + def test_timer_start_and_stop_manually_send_false(self, mock_time, timer_unit_consistency, name): + protocols.timer_unit_consistency = timer_unit_consistency timer = self.stats.timer(name) timer.start() @@ -329,28 +329,28 @@ def test_timer_start_and_stop_manually_send_false(self, mock_time, metrics_consi timer.stop(send=False) assert isinstance(timer.duration, float) - expected_value = 3140.0 if metrics_consistency_on else 3.14 + expected_value = 3140.0 if timer_unit_consistency else 3.14 assert timer.duration == expected_value assert mock_time.call_count == 2 self.meter.get_meter().create_observable_gauge.assert_not_called() @pytest.mark.parametrize( - "metrics_consistency_on", + "timer_unit_consistency", [ True, False, ], ) @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) - def test_timer_start_and_stop_manually_send_true(self, mock_time, metrics_consistency_on, name): - protocols.metrics_consistency_on = metrics_consistency_on + def test_timer_start_and_stop_manually_send_true(self, mock_time, timer_unit_consistency, name): + protocols.timer_unit_consistency = timer_unit_consistency timer = self.stats.timer(name) timer.start() # Perform some task timer.stop(send=True) assert isinstance(timer.duration, float) - expected_value = 3140.0 if metrics_consistency_on else 3.14 + expected_value = 3140.0 if timer_unit_consistency else 3.14 assert timer.duration == expected_value assert mock_time.call_count == 2 self.meter.get_meter().create_observable_gauge.assert_called_once_with( diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py index dc6a6737f13e5..1b30bc9990af3 100644 --- a/tests/core/test_stats.py +++ b/tests/core/test_stats.py @@ -222,18 +222,18 @@ def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self) ) @pytest.mark.parametrize( - "metrics_consistency_on", + "timer_unit_consistency", [True, False], ) @mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0]) - def test_timer(self, time_mock, metrics_consistency_on): - protocols.metrics_consistency_on = metrics_consistency_on + def test_timer(self, time_mock, timer_unit_consistency): + protocols.timer_unit_consistency = timer_unit_consistency with self.dogstatsd.timer("empty_timer") as timer: pass self.dogstatsd_client.timed.assert_called_once_with("empty_timer", tags=[]) expected_duration = 100.0 - if metrics_consistency_on: + if timer_unit_consistency: expected_duration = 1000.0 * 100.0 assert expected_duration == timer.duration assert time_mock.call_count == 2 @@ -244,20 +244,20 @@ def test_empty_timer(self): self.dogstatsd_client.timed.assert_not_called() @pytest.mark.parametrize( - "metrics_consistency_on", + "timer_unit_consistency", [True, False], ) - def test_timing(self, metrics_consistency_on): + def test_timing(self, timer_unit_consistency): import datetime - datadog_logger.metrics_consistency_on = metrics_consistency_on + datadog_logger.timer_unit_consistency = timer_unit_consistency self.dogstatsd.timing("empty_timer", 123) self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", value=123, tags=[]) self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123)) self.dogstatsd_client.timing.assert_called_with( - metric="empty_timer", value=123000.0 if metrics_consistency_on else 123.0, tags=[] + metric="empty_timer", value=123000.0 if timer_unit_consistency else 123.0, tags=[] ) def test_gauge(self): diff --git a/tests_common/_internals/forbidden_warnings.py b/tests_common/_internals/forbidden_warnings.py index 0b0a11262b3b6..856960935bd41 100644 --- a/tests_common/_internals/forbidden_warnings.py +++ b/tests_common/_internals/forbidden_warnings.py @@ -75,7 +75,7 @@ def pytest_itemcollected(self, item: pytest.Item): item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), append=False) item.add_marker( pytest.mark.filterwarnings( - "ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning" + "ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning" ) )