Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,17 @@ metrics:
example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
or \"^scheduler,^executor,heartbeat|timeout\""
default: ""
metrics_consistency_on:
description: |
Enables metrics consistency across all metrics loggers (ex: timer and timing metrics).

.. warning::

It is enabled by default from Airflow 3.
version_added: 2.10.0
type: string
example: ~
default: "True"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!!! @potiuk I guess no one noticed, cos this made it out with a default of on.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, except this hasn't made it to a release yet, and it wasn't marked for backporting. (Lucky miss there)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't slip through, it was discussed above and intentional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied on #43966 as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed to enable it directly in Airflow 3 without backporting. However, updating this was overlooked.

statsd_on:
description: |
Enables sending metrics to StatsD.
Expand Down
15 changes: 14 additions & 1 deletion airflow/metrics/datadog_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import datetime
import logging
import warnings
from typing import TYPE_CHECKING

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
AllowListValidator,
Expand All @@ -40,6 +42,14 @@

log = logging.getLogger(__name__)

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)


class SafeDogStatsdLogger:
"""DogStatsd Logger."""
Expand Down Expand Up @@ -134,7 +144,10 @@ def timing(
tags_list = []
if self.metrics_validator.test(stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
if metrics_consistency_on:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
return None

Expand Down
14 changes: 13 additions & 1 deletion airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
OTEL_NAME_MAX_LENGTH,
Expand Down Expand Up @@ -71,6 +72,14 @@
# Delimiter is placed between the universal metric prefix and the unique metric name.
DEFAULT_METRIC_NAME_DELIMITER = "."

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)


def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
"""Assembles the prefix, delimiter, and name and returns it as a string."""
Expand Down Expand Up @@ -274,7 +283,10 @@ def timing(
"""OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed."""
if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
if metrics_consistency_on:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags)

def timer(
Expand Down
16 changes: 15 additions & 1 deletion airflow/metrics/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@

import datetime
import time
import warnings
from typing import Union

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.typing_compat import Protocol

DeltaType = Union[int, float, datetime.timedelta]

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)


class TimerProtocol(Protocol):
"""Type protocol for StatsLogger.timer."""
Expand Down Expand Up @@ -116,6 +127,9 @@ def start(self) -> Timer:
def stop(self, send: bool = True) -> None:
"""Stop the timer, and optionally send it to stats backend."""
if self._start_time is not None:
self.duration = time.perf_counter() - self._start_time
if metrics_consistency_on:
self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds.
else:
self.duration = time.perf_counter() - self._start_time
if send and self.real_timer:
self.real_timer.stop()
19 changes: 17 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from airflow.exceptions import (
AirflowException,
AirflowFailException,
AirflowProviderDeprecationWarning,
AirflowRescheduleException,
AirflowSensorTimeout,
AirflowSkipException,
Expand Down Expand Up @@ -168,6 +169,14 @@

PAST_DEPENDS_MET = "past_depends_met"

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)


class TaskReturnCode(Enum):
"""
Expand Down Expand Up @@ -2809,7 +2818,10 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
timing = timezone.utcnow() - self.queued_dttm
if metrics_consistency_on:
timing = timezone.utcnow() - self.queued_dttm
else:
timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
elif new_state == TaskInstanceState.QUEUED:
metric_name = "scheduled_duration"
if self.start_date is None:
Expand All @@ -2822,7 +2834,10 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
timing = timezone.utcnow() - self.start_date
if metrics_consistency_on:
timing = timezone.utcnow() - self.start_date
else:
timing = (timezone.utcnow() - self.start_date).total_seconds()
else:
raise NotImplementedError("no metric emission setup for state %s", new_state)

Expand Down
1 change: 1 addition & 0 deletions newsfragments/39908.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Publishing timer and timing metrics in seconds has been deprecated. In Airflow 3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can disable this for backward compatibility. To publish all timer and timing metrics in milliseconds, ensure metrics consistency is enabled
5 changes: 5 additions & 0 deletions tests/_internals/forbidden_warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ def pytest_itemcollected(self, item: pytest.Item):
# Add marker at the beginning of the markers list. In this case, it does not conflict with
# filterwarnings markers, which are set explicitly in the test suite.
item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), append=False)
item.add_marker(
pytest.mark.filterwarnings(
"ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning"
)
)

@pytest.hookimpl(hookwrapper=True, trylast=True)
def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int):
Expand Down
25 changes: 21 additions & 4 deletions tests/core/test_otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from opentelemetry.metrics import MeterProvider

from airflow.exceptions import InvalidStatsNameException
from airflow.metrics import otel_logger, protocols
from airflow.metrics.otel_logger import (
OTEL_NAME_MAX_LENGTH,
UP_DOWN_COUNTERS,
Expand Down Expand Up @@ -234,12 +235,22 @@ def test_gauge_value_is_correct(self, name):

assert self.map[full_name(name)].value == 1

def test_timing_new_metric(self, name):
self.stats.timing(name, dt=123)
@pytest.mark.parametrize(
"metrics_consistency_on",
[True, False],
)
def test_timing_new_metric(self, metrics_consistency_on, name):
import datetime

otel_logger.metrics_consistency_on = metrics_consistency_on

self.stats.timing(name, dt=datetime.timedelta(seconds=123))

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
expected_value = 123000.0 if metrics_consistency_on else 123
assert self.map[full_name(name)].value == expected_value

def test_timing_new_metric_with_tags(self, name):
tags = {"hello": "world"}
Expand All @@ -265,13 +276,19 @@ def test_timing_existing_metric(self, name):
# time.perf_count() is called once to get the starting timestamp and again
# to get the end timestamp. timer() should return the difference as a float.

@pytest.mark.parametrize(
"metrics_consistency_on",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_with_name_returns_float_and_stores_value(self, mock_time, name):
def test_timer_with_name_returns_float_and_stores_value(self, mock_time, metrics_consistency_on, name):
protocols.metrics_consistency_on = metrics_consistency_on
with self.stats.timer(name) as timer:
pass

assert isinstance(timer.duration, float)
assert timer.duration == 3.14
expected_duration = 3140.0 if metrics_consistency_on else 3.14
assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
Expand Down
30 changes: 26 additions & 4 deletions tests/core/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import importlib
import logging
import re
import time
from unittest import mock
from unittest.mock import Mock

Expand All @@ -28,6 +29,7 @@

import airflow
from airflow.exceptions import AirflowConfigException, InvalidStatsNameException, RemovedInAirflow3Warning
from airflow.metrics import datadog_logger, protocols
from airflow.metrics.datadog_logger import SafeDogStatsdLogger
from airflow.metrics.statsd_logger import SafeStatsdLogger
from airflow.metrics.validators import (
Expand Down Expand Up @@ -224,24 +226,44 @@ def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self)
metric="empty_key", sample_rate=1, tags=[], value=1
)

def test_timer(self):
with self.dogstatsd.timer("empty_timer"):
@pytest.mark.parametrize(
"metrics_consistency_on",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0])
def test_timer(self, time_mock, metrics_consistency_on):
protocols.metrics_consistency_on = metrics_consistency_on

with self.dogstatsd.timer("empty_timer") as timer:
pass
self.dogstatsd_client.timed.assert_called_once_with("empty_timer", tags=[])
expected_duration = 100.0
if metrics_consistency_on:
expected_duration = 1000.0 * 100.0
assert expected_duration == timer.duration
assert time_mock.call_count == 2

def test_empty_timer(self):
with self.dogstatsd.timer():
pass
self.dogstatsd_client.timed.assert_not_called()

def test_timing(self):
@pytest.mark.parametrize(
"metrics_consistency_on",
[True, False],
)
def test_timing(self, metrics_consistency_on):
import datetime

datadog_logger.metrics_consistency_on = metrics_consistency_on

self.dogstatsd.timing("empty_timer", 123)
self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", value=123, tags=[])

self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123))
self.dogstatsd_client.timing.assert_called_with(metric="empty_timer", value=123.0, tags=[])
self.dogstatsd_client.timing.assert_called_with(
metric="empty_timer", value=123000.0 if metrics_consistency_on else 123.0, tags=[]
)

def test_gauge(self):
self.dogstatsd.gauge("empty", 123)
Expand Down