Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions distributed/metrics.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import annotations

import collections
import sys
import time as timemod
from collections.abc import Callable
from functools import wraps

from distributed.compatibility import WINDOWS

_empty_namedtuple = collections.namedtuple("_empty_namedtuple", ())


Expand Down Expand Up @@ -36,15 +40,15 @@ def wrapper(): # pragma: no cover


class _WindowsTime:
"""
Combine time.time() and time.perf_counter() to get an absolute clock
with fine resolution.
"""Combine time.time() or time.monotonic() with time.perf_counter() to get an
absolute clock with fine resolution.
"""

# Resync every N seconds, to avoid drifting
RESYNC_EVERY = 600

def __init__(self):
def __init__(self, base: Callable[[], float]):
self.base_timer = base
self.delta = None
self.last_resync = float("-inf")

Expand All @@ -59,7 +63,7 @@ def time(self):
return delta + cur

def resync(self):
_time = timemod.time
_time = self.base_timer
_perf_counter = self.perf_counter
min_samples = 5
while True:
Expand All @@ -77,19 +81,20 @@ def resync(self):


# A high-resolution wall clock timer measuring the seconds since Unix epoch
if sys.platform.startswith("win"):
time = _WindowsTime().time
if WINDOWS:
time = _WindowsTime(timemod.time).time
monotonic = _WindowsTime(timemod.monotonic).time
else:
# Under modern Unices, time.time() should be good enough
# Under modern Unixes, time.time() and time.monotonic() should be good enough
time = timemod.time
monotonic = timemod.monotonic

process_time = timemod.process_time

# Get a per-thread CPU timer function if possible, otherwise
# use a per-process CPU timer function.
try:
# thread_time is supported on Python 3.7+ but not all platforms
# thread_time is not supported on all platforms
thread_time = timemod.thread_time
except (AttributeError, OSError): # pragma: no cover
# process_time is supported on Python 3.3+ everywhere
thread_time = process_time
17 changes: 10 additions & 7 deletions distributed/tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import time

import pytest

from distributed import metrics


def test_wall_clock():
@pytest.mark.parametrize("name", ["time", "monotonic"])
def test_wall_clock(name):
for i in range(3):
time.sleep(0.01)
t = time.time()
samples = [metrics.time() for j in range(50)]
t = getattr(time, name)()
samples = [getattr(metrics, name)() for _ in range(100)]
# Resolution
deltas = [samples[j + 1] - samples[j] for j in range(len(samples) - 1)]
deltas = [sj - si for si, sj in zip(samples[:-1], samples[1:])]
assert min(deltas) >= 0.0, deltas
assert max(deltas) <= 1.0, deltas
assert any(lambda d: 0.0 < d < 0.0001 for d in deltas), deltas
# Close to time.time()
assert max(deltas) <= 0.001, deltas
assert any(0.0 < d < 0.0001 for d in deltas), deltas
# Close to time.time() / time.monotonic()
assert t - 0.5 < samples[0] < t + 0.5
4 changes: 2 additions & 2 deletions distributed/tests/test_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import logging
import random
from datetime import timedelta
from time import monotonic, sleep
from time import sleep

import pytest

from distributed import Client, Nanny, TimeoutError, Variable, wait, worker_client
from distributed.compatibility import WINDOWS
from distributed.metrics import time
from distributed.metrics import monotonic, time
from distributed.utils_test import captured_logger, div, gen_cluster, inc, popen


Expand Down