From b4a3a9a6f3b57b890ea2322ea331cbadb5a67293 Mon Sep 17 00:00:00 2001 From: dirrao Date: Fri, 24 May 2024 23:15:59 +0530 Subject: [PATCH 1/8] align timers and timing metrics (ms) across all metrics loggers --- airflow/metrics/datadog_logger.py | 2 +- airflow/metrics/otel_logger.py | 2 +- airflow/metrics/protocols.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index 156407977305e..38c63478272f5 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -134,7 +134,7 @@ def timing( tags_list = [] if self.metrics_validator.test(stat): if isinstance(dt, datetime.timedelta): - dt = dt.total_seconds() + dt = dt.total_seconds() * 1000 return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list) return None diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 5dac960c169a0..8ccaefb6cf20b 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -274,7 +274,7 @@ def timing( """OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed.""" if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): if isinstance(dt, datetime.timedelta): - dt = dt.total_seconds() + dt = dt.total_seconds() * 1000 self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags) def timer( diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py index c46942ce95f70..8cfe4d8e7ea34 100644 --- a/airflow/metrics/protocols.py +++ b/airflow/metrics/protocols.py @@ -116,6 +116,6 @@ def start(self) -> Timer: def stop(self, send: bool = True) -> None: """Stop the timer, and optionally send it to stats backend.""" if self._start_time is not None: - self.duration = time.perf_counter() - self._start_time + self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds. if send and self.real_timer: self.real_timer.stop() From 2e9c0d2bb9590c11759884ce3adb0dc5c5f58a69 Mon Sep 17 00:00:00 2001 From: dirrao Date: Thu, 6 Jun 2024 22:40:10 +0530 Subject: [PATCH 2/8] align timers and timing metrics (ms) across all metrics loggers --- airflow/config_templates/config.yml | 7 +++++++ airflow/metrics/datadog_logger.py | 14 +++++++++++++- airflow/metrics/otel_logger.py | 13 ++++++++++++- airflow/metrics/protocols.py | 15 ++++++++++++++- airflow/models/taskinstance.py | 23 +++++++++++++++++++++-- 5 files changed, 67 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 69f7f07139f24..df9e4bbd899ff 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1074,6 +1074,13 @@ metrics: example: "\"scheduler,executor,dagrun,pool,triggerer,celery\" or \"^scheduler,^executor,heartbeat|timeout\"" default: "" + metrics_consistency_on: + description: | + Enables metrics onsistency across all metrics loggers (ex: timer and timing metrics). + version_added: 2.9.2 + type: string + example: ~ + default: "False" statsd_on: description: | Enables sending metrics to StatsD. diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index 38c63478272f5..5e462ef2dbb22 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -19,9 +19,11 @@ import datetime import logging +import warnings from typing import TYPE_CHECKING from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( AllowListValidator, @@ -40,6 +42,8 @@ log = logging.getLogger(__name__) +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) + class SafeDogStatsdLogger: """DogStatsd Logger.""" @@ -134,7 +138,15 @@ def timing( tags_list = [] if self.metrics_validator.test(stat): if isinstance(dt, datetime.timedelta): - dt = dt.total_seconds() * 1000 + if metrics_consistency_on: + dt = dt.total_seconds() * 1000 + else: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + dt = dt.total_seconds() return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list) return None diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 8ccaefb6cf20b..c7f4f02b38c92 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -31,6 +31,7 @@ from opentelemetry.sdk.resources import SERVICE_NAME, Resource from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( OTEL_NAME_MAX_LENGTH, @@ -71,6 +72,8 @@ # Delimiter is placed between the universal metric prefix and the unique metric name. DEFAULT_METRIC_NAME_DELIMITER = "." +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) + def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str: """Assembles the prefix, delimiter, and name and returns it as a string.""" @@ -274,7 +277,15 @@ def timing( """OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed.""" if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): if isinstance(dt, datetime.timedelta): - dt = dt.total_seconds() * 1000 + if metrics_consistency_on: + dt = dt.total_seconds() * 1000 + else: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + dt = dt.total_seconds() self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags) def timer( diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py index 8cfe4d8e7ea34..a4dd819df956f 100644 --- a/airflow/metrics/protocols.py +++ b/airflow/metrics/protocols.py @@ -19,12 +19,17 @@ import datetime import time +import warnings from typing import Union +from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.typing_compat import Protocol DeltaType = Union[int, float, datetime.timedelta] +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) + class TimerProtocol(Protocol): """Type protocol for StatsLogger.timer.""" @@ -116,6 +121,14 @@ def start(self) -> Timer: def stop(self, send: bool = True) -> None: """Stop the timer, and optionally send it to stats backend.""" if self._start_time is not None: - self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds. + if metrics_consistency_on: + self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds. + else: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + self.duration = time.perf_counter() - self._start_time if send and self.real_timer: self.real_timer.stop() diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 6b9afc01dd09e..bd65239885e30 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -76,6 +76,7 @@ from airflow.exceptions import ( AirflowException, AirflowFailException, + AirflowProviderDeprecationWarning, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, @@ -169,6 +170,8 @@ PAST_DEPENDS_MET = "past_depends_met" +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) + class TaskReturnCode(Enum): """ @@ -2860,7 +2863,15 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: self.task_id, ) return - timing = timezone.utcnow() - self.queued_dttm + if metrics_consistency_on: + timing = timezone.utcnow() - self.queued_dttm + else: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + timing = (timezone.utcnow() - self.queued_dttm).total_seconds() elif new_state == TaskInstanceState.QUEUED: metric_name = "scheduled_duration" if self.start_date is None: @@ -2873,7 +2884,15 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: self.task_id, ) return - timing = timezone.utcnow() - self.start_date + if metrics_consistency_on: + timing = timezone.utcnow() - self.start_date + else: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + timing = (timezone.utcnow() - self.start_date).total_seconds() else: raise NotImplementedError("no metric emission setup for state %s", new_state) From 62580e838d2239596427cd7c0b03df027ac52cfd Mon Sep 17 00:00:00 2001 From: dirrao Date: Thu, 13 Jun 2024 18:25:20 +0530 Subject: [PATCH 3/8] align timers and timing metrics (ms) across all metrics loggers --- airflow/config_templates/config.yml | 6 +++++- airflow/metrics/datadog_logger.py | 2 +- airflow/metrics/otel_logger.py | 2 +- airflow/metrics/protocols.py | 2 +- airflow/models/taskinstance.py | 4 ++-- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index df9e4bbd899ff..6124eb3e6f3fb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1077,7 +1077,11 @@ metrics: metrics_consistency_on: description: | Enables metrics onsistency across all metrics loggers (ex: timer and timing metrics). - version_added: 2.9.2 + + .. warning:: + + It is enabled by default from Airflow 3 onwards. + version_added: 2.10.0 type: string example: ~ default: "False" diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index 5e462ef2dbb22..fbeb476bd453d 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -142,7 +142,7 @@ def timing( dt = dt.total_seconds() * 1000 else: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", AirflowProviderDeprecationWarning, stacklevel=2, ) diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index c7f4f02b38c92..a47528a9eb7d8 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -281,7 +281,7 @@ def timing( dt = dt.total_seconds() * 1000 else: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", AirflowProviderDeprecationWarning, stacklevel=2, ) diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py index a4dd819df956f..3010e1284a074 100644 --- a/airflow/metrics/protocols.py +++ b/airflow/metrics/protocols.py @@ -125,7 +125,7 @@ def stop(self, send: bool = True) -> None: self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds. else: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", AirflowProviderDeprecationWarning, stacklevel=2, ) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index bd65239885e30..fdf3c539e3973 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2867,7 +2867,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: timing = timezone.utcnow() - self.queued_dttm else: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", AirflowProviderDeprecationWarning, stacklevel=2, ) @@ -2888,7 +2888,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: timing = timezone.utcnow() - self.start_date else: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", AirflowProviderDeprecationWarning, stacklevel=2, ) From d2d190be1531399b210effca88914df3d9d9edd9 Mon Sep 17 00:00:00 2001 From: dirrao Date: Fri, 5 Jul 2024 15:25:26 +0530 Subject: [PATCH 4/8] unit tests updated --- airflow/metrics/datadog_logger.py | 2 +- airflow/metrics/otel_logger.py | 2 +- tests/core/test_otel_logger.py | 25 +++++++++++++++++++++---- tests/core/test_stats.py | 30 ++++++++++++++++++++++++++---- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index fbeb476bd453d..391fdca33e163 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -139,7 +139,7 @@ def timing( if self.metrics_validator.test(stat): if isinstance(dt, datetime.timedelta): if metrics_consistency_on: - dt = dt.total_seconds() * 1000 + dt = dt.total_seconds() * 1000.0 else: warnings.warn( "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index a47528a9eb7d8..7808a40f75995 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -278,7 +278,7 @@ def timing( if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): if isinstance(dt, datetime.timedelta): if metrics_consistency_on: - dt = dt.total_seconds() * 1000 + dt = dt.total_seconds() * 1000.0 else: warnings.warn( "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", diff --git a/tests/core/test_otel_logger.py b/tests/core/test_otel_logger.py index 6cba116f652b9..d5697e585b45c 100644 --- a/tests/core/test_otel_logger.py +++ b/tests/core/test_otel_logger.py @@ -25,6 +25,7 @@ from opentelemetry.metrics import MeterProvider from airflow.exceptions import InvalidStatsNameException +from airflow.metrics import otel_logger, protocols from airflow.metrics.otel_logger import ( OTEL_NAME_MAX_LENGTH, UP_DOWN_COUNTERS, @@ -234,12 +235,22 @@ def test_gauge_value_is_correct(self, name): assert self.map[full_name(name)].value == 1 - def test_timing_new_metric(self, name): - self.stats.timing(name, dt=123) + @pytest.mark.parametrize( + "metrics_consistency_on", + [True, False], + ) + def test_timing_new_metric(self, metrics_consistency_on, name): + import datetime + + otel_logger.metrics_consistency_on = metrics_consistency_on + + self.stats.timing(name, dt=datetime.timedelta(seconds=123)) self.meter.get_meter().create_observable_gauge.assert_called_once_with( name=full_name(name), callbacks=ANY ) + expected_value = 123000.0 if metrics_consistency_on else 123 + assert self.map[full_name(name)].value == expected_value def test_timing_new_metric_with_tags(self, name): tags = {"hello": "world"} @@ -265,13 +276,19 @@ def test_timing_existing_metric(self, name): # time.perf_count() is called once to get the starting timestamp and again # to get the end timestamp. timer() should return the difference as a float. + @pytest.mark.parametrize( + "metrics_consistency_on", + [True, False], + ) @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) - def test_timer_with_name_returns_float_and_stores_value(self, mock_time, name): + def test_timer_with_name_returns_float_and_stores_value(self, mock_time, metrics_consistency_on, name): + protocols.metrics_consistency_on = metrics_consistency_on with self.stats.timer(name) as timer: pass assert isinstance(timer.duration, float) - assert timer.duration == 3.14 + expected_duration = 3140.0 if metrics_consistency_on else 3.14 + assert timer.duration == expected_duration assert mock_time.call_count == 2 self.meter.get_meter().create_observable_gauge.assert_called_once_with( name=full_name(name), callbacks=ANY diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py index 80570b9ddd9a8..41eb35525f4c2 100644 --- a/tests/core/test_stats.py +++ b/tests/core/test_stats.py @@ -20,6 +20,7 @@ import importlib import logging import re +import time from unittest import mock from unittest.mock import Mock @@ -28,6 +29,7 @@ import airflow from airflow.exceptions import AirflowConfigException, InvalidStatsNameException, RemovedInAirflow3Warning +from airflow.metrics import datadog_logger, protocols from airflow.metrics.datadog_logger import SafeDogStatsdLogger from airflow.metrics.statsd_logger import SafeStatsdLogger from airflow.metrics.validators import ( @@ -224,24 +226,44 @@ def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self) metric="empty_key", sample_rate=1, tags=[], value=1 ) - def test_timer(self): - with self.dogstatsd.timer("empty_timer"): + @pytest.mark.parametrize( + "metrics_consistency_on", + [True, False], + ) + @mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0]) + def test_timer(self, time_mock, metrics_consistency_on): + protocols.metrics_consistency_on = metrics_consistency_on + + with self.dogstatsd.timer("empty_timer") as timer: pass self.dogstatsd_client.timed.assert_called_once_with("empty_timer", tags=[]) + expected_duration = 100.0 + if metrics_consistency_on: + expected_duration = 1000.0 * 100.0 + assert expected_duration == timer.duration + assert time_mock.call_count == 2 def test_empty_timer(self): with self.dogstatsd.timer(): pass self.dogstatsd_client.timed.assert_not_called() - def test_timing(self): + @pytest.mark.parametrize( + "metrics_consistency_on", + [True, False], + ) + def test_timing(self, metrics_consistency_on): import datetime + datadog_logger.metrics_consistency_on = metrics_consistency_on + self.dogstatsd.timing("empty_timer", 123) self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", value=123, tags=[]) self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123)) - self.dogstatsd_client.timing.assert_called_with(metric="empty_timer", value=123.0, tags=[]) + self.dogstatsd_client.timing.assert_called_with( + metric="empty_timer", value=123000.0 if metrics_consistency_on else 123.0, tags=[] + ) def test_gauge(self): self.dogstatsd.gauge("empty", 123) From 0466bf1c2cfd2ad55d6442249074f211eefd4f8b Mon Sep 17 00:00:00 2001 From: dirrao Date: Fri, 5 Jul 2024 16:58:01 +0530 Subject: [PATCH 5/8] Exclude a specific warning for all test cases --- airflow/config_templates/config.yml | 2 +- tests/_internals/forbidden_warnings.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 6124eb3e6f3fb..434236387f9e3 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1076,7 +1076,7 @@ metrics: default: "" metrics_consistency_on: description: | - Enables metrics onsistency across all metrics loggers (ex: timer and timing metrics). + Enables metrics consistency across all metrics loggers (ex: timer and timing metrics). .. warning:: diff --git a/tests/_internals/forbidden_warnings.py b/tests/_internals/forbidden_warnings.py index c78e4b0333f74..324d2ff6f9824 100644 --- a/tests/_internals/forbidden_warnings.py +++ b/tests/_internals/forbidden_warnings.py @@ -62,6 +62,11 @@ def pytest_itemcollected(self, item: pytest.Item): # Add marker at the beginning of the markers list. In this case, it does not conflict with # filterwarnings markers, which are set explicitly in the test suite. item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), append=False) + item.add_marker( + pytest.mark.filterwarnings( + "ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning" + ) + ) @pytest.hookimpl(hookwrapper=True, trylast=True) def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int): From c18d25e7bb6348ed377ecd63f82b37ae1a4306d0 Mon Sep 17 00:00:00 2001 From: dirrao Date: Fri, 5 Jul 2024 17:58:41 +0530 Subject: [PATCH 6/8] Avoiding the metrics warning on each metric publish --- airflow/config_templates/config.yml | 2 +- airflow/metrics/datadog_logger.py | 11 ++++++----- airflow/metrics/otel_logger.py | 11 ++++++----- airflow/metrics/protocols.py | 11 ++++++----- airflow/models/taskinstance.py | 16 ++++++---------- 5 files changed, 25 insertions(+), 26 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 434236387f9e3..b477988626612 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1080,7 +1080,7 @@ metrics: .. warning:: - It is enabled by default from Airflow 3 onwards. + It is enabled by default from Airflow 3. version_added: 2.10.0 type: string example: ~ diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index 391fdca33e163..77ea942c7c8a7 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -43,6 +43,12 @@ log = logging.getLogger(__name__) metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) +if not metrics_consistency_on: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) class SafeDogStatsdLogger: @@ -141,11 +147,6 @@ def timing( if metrics_consistency_on: dt = dt.total_seconds() * 1000.0 else: - warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) dt = dt.total_seconds() return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list) return None diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 7808a40f75995..6909b66c2d89e 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -73,6 +73,12 @@ DEFAULT_METRIC_NAME_DELIMITER = "." metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) +if not metrics_consistency_on: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str: @@ -280,11 +286,6 @@ def timing( if metrics_consistency_on: dt = dt.total_seconds() * 1000.0 else: - warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) dt = dt.total_seconds() self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags) diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py index 3010e1284a074..02d35d609f525 100644 --- a/airflow/metrics/protocols.py +++ b/airflow/metrics/protocols.py @@ -29,6 +29,12 @@ DeltaType = Union[int, float, datetime.timedelta] metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) +if not metrics_consistency_on: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) class TimerProtocol(Protocol): @@ -124,11 +130,6 @@ def stop(self, send: bool = True) -> None: if metrics_consistency_on: self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds. else: - warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) self.duration = time.perf_counter() - self._start_time if send and self.real_timer: self.real_timer.stop() diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index fdf3c539e3973..e0eaf89b751f3 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -171,6 +171,12 @@ PAST_DEPENDS_MET = "past_depends_met" metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) +if not metrics_consistency_on: + warnings.warn( + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) class TaskReturnCode(Enum): @@ -2866,11 +2872,6 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: if metrics_consistency_on: timing = timezone.utcnow() - self.queued_dttm else: - warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) timing = (timezone.utcnow() - self.queued_dttm).total_seconds() elif new_state == TaskInstanceState.QUEUED: metric_name = "scheduled_duration" @@ -2887,11 +2888,6 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: if metrics_consistency_on: timing = timezone.utcnow() - self.start_date else: - warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) timing = (timezone.utcnow() - self.start_date).total_seconds() else: raise NotImplementedError("no metric emission setup for state %s", new_state) From 89263d0ca74957b0e053ed7055b575cdb0703717 Mon Sep 17 00:00:00 2001 From: dirrao Date: Sat, 7 Sep 2024 13:28:09 +0530 Subject: [PATCH 7/8] metrics_consistency_on enabled by default --- airflow/config_templates/config.yml | 2 +- airflow/metrics/datadog_logger.py | 2 +- airflow/metrics/otel_logger.py | 2 +- airflow/metrics/protocols.py | 2 +- airflow/models/taskinstance.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index b477988626612..1d267d1c08f69 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1084,7 +1084,7 @@ metrics: version_added: 2.10.0 type: string example: ~ - default: "False" + default: "True" statsd_on: description: | Enables sending metrics to StatsD. diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index 77ea942c7c8a7..c7bcf1986d853 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -42,7 +42,7 @@ log = logging.getLogger(__name__) -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) if not metrics_consistency_on: warnings.warn( "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 6909b66c2d89e..e8d0f54d73288 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -72,7 +72,7 @@ # Delimiter is placed between the universal metric prefix and the unique metric name. DEFAULT_METRIC_NAME_DELIMITER = "." -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) if not metrics_consistency_on: warnings.warn( "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py index 02d35d609f525..7eef7929e02db 100644 --- a/airflow/metrics/protocols.py +++ b/airflow/metrics/protocols.py @@ -28,7 +28,7 @@ DeltaType = Union[int, float, datetime.timedelta] -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) if not metrics_consistency_on: warnings.warn( "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e0eaf89b751f3..1dbd08992ba84 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -170,7 +170,7 @@ PAST_DEPENDS_MET = "past_depends_met" -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=False) +metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) if not metrics_consistency_on: warnings.warn( "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", From b3424bd7164ed16c98161147eeebaa3484da7fc5 Mon Sep 17 00:00:00 2001 From: dirrao Date: Tue, 10 Sep 2024 10:52:27 +0530 Subject: [PATCH 8/8] news fragment added --- newsfragments/39908.significant.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/39908.significant.rst diff --git a/newsfragments/39908.significant.rst b/newsfragments/39908.significant.rst new file mode 100644 index 0000000000000..bd4a2967ba4fb --- /dev/null +++ b/newsfragments/39908.significant.rst @@ -0,0 +1 @@ +Publishing timer and timing metrics in seconds has been deprecated. In Airflow 3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can disable this for backward compatibility. To publish all timer and timing metrics in milliseconds, ensure metrics consistency is enabled