From c29ad3c995a19760defec3295603cc4439f2e108 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 21 Aug 2025 16:52:49 +0300 Subject: [PATCH 1/5] timestamped find_closest takes tolerance --- dimos/types/test_timestamped.py | 16 +++++++++----- dimos/types/timestamped.py | 39 +++++++++++++++++++++++---------- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index 7f043750ea..5482ede549 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -145,19 +145,24 @@ def test_find_closest(collection): assert collection.find_closest(3.0).data == "third" # Between items (closer to left) - assert collection.find_closest(1.5).data == "first" + assert collection.find_closest(1.5, tolerance=1.0).data == "first" # Between items (closer to right) - assert collection.find_closest(3.5).data == "third" + assert collection.find_closest(3.5, tolerance=1.0).data == "third" # Exactly in the middle (should pick the later one due to >= comparison) - assert collection.find_closest(4.0).data == "fifth" # 4.0 is equidistant from 3.0 and 5.0 + assert ( + collection.find_closest(4.0, tolerance=1.0).data == "fifth" + ) # 4.0 is equidistant from 3.0 and 5.0 # Before all items - assert collection.find_closest(0.0).data == "first" + assert collection.find_closest(0.0, tolerance=1.0).data == "first" # After all items - assert collection.find_closest(10.0).data == "seventh" + assert collection.find_closest(10.0, tolerance=4.0).data == "seventh" + + # low tolerance, should return None + assert collection.find_closest(10.0, tolerance=2.0) is None def test_find_before_after(collection): @@ -223,4 +228,3 @@ def test_single_item_collection(): single = TimestampedCollection([SimpleTimestamped(5.0, "only")]) assert single.duration() == 0.0 assert single.time_range() == (5.0, 5.0) - assert single.find_closest(100.0).data == "only" diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index 858a2bdaad..c8c496aeb3 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -16,6 +16,7 @@ from datetime import datetime, timezone from typing import Generic, Iterable, Optional, Tuple, TypedDict, TypeVar, Union +from reactivex.observable import Observable from sortedcontainers import SortedList # any class that carries a timestamp should inherit from this @@ -102,26 +103,42 @@ def add(self, item: T) -> None: """Add a timestamped item to the collection.""" self._items.add(item) - def find_closest(self, timestamp: float) -> Optional[T]: + def find_closest(self, timestamp: float, tolerance: float = 0.05) -> Optional[T]: """Find the timestamped object closest to the given timestamp.""" if not self._items: return None - # Find insertion point using binary search on timestamps + # Use binary search to find insertion point timestamps = [item.ts for item in self._items] idx = bisect.bisect_left(timestamps, timestamp) - # Check boundaries - if idx == 0: - return self._items[0] - if idx == len(self._items): - return self._items[-1] + # Check exact match + if idx < len(self._items) and self._items[idx].ts == timestamp: + return self._items[idx] - # Compare distances to neighbors - left_diff = abs(timestamp - self._items[idx - 1].ts) - right_diff = abs(self._items[idx].ts - timestamp) + # Find candidates: item before and after + candidates = [] - return self._items[idx - 1] if left_diff < right_diff else self._items[idx] + # Item before + if idx > 0: + candidates.append((idx - 1, abs(self._items[idx - 1].ts - timestamp))) + + # Item after + if idx < len(self._items): + candidates.append((idx, abs(self._items[idx].ts - timestamp))) + + if not candidates: + return None + + # Find closest + # When distances are equal, prefer the later item (higher index) + closest_idx, closest_distance = min(candidates, key=lambda x: (x[1], -x[0])) + + # Check tolerance + if closest_distance > tolerance: + return None + + return self._items[closest_idx] def find_before(self, timestamp: float) -> Optional[T]: """Find the last item before the given timestamp.""" From 11d08fa6208e6b2581e33bffbfbb191a04aa2191 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 21 Aug 2025 17:30:24 +0300 Subject: [PATCH 2/5] initial implementation of frame alignment mechanism --- dimos/types/test_timestamped.py | 95 ++++++++++++++++++++++++++++++++- dimos/types/timestamped.py | 74 +++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 2 deletions(-) diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index 5482ede549..ff3b441e04 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -12,11 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from datetime import datetime, timezone import pytest - -from dimos.types.timestamped import Timestamped, TimestampedCollection, to_datetime, to_ros_stamp +from reactivex import operators as ops + +from dimos.msgs.sensor_msgs import Image +from dimos.types.timestamped import ( + Timestamped, + TimestampedBufferCollection, + TimestampedCollection, + align_timestamped, + to_datetime, + to_ros_stamp, +) +from dimos.utils import testing +from dimos.utils.data import get_data +from dimos.utils.reactive import backpressure def test_timestamped_dt_method(): @@ -228,3 +241,81 @@ def test_single_item_collection(): single = TimestampedCollection([SimpleTimestamped(5.0, "only")]) assert single.duration() == 0.0 assert single.time_range() == (5.0, 5.0) + + +def test_time_window_collection(): + # Create a collection with a 2-second window + window = TimestampedBufferCollection[SimpleTimestamped](window_duration=2.0) + + # Add messages at different timestamps + window.add(SimpleTimestamped(1.0, "msg1")) + window.add(SimpleTimestamped(2.0, "msg2")) + window.add(SimpleTimestamped(3.0, "msg3")) + + # At this point, all messages should be present (within 2s window) + assert len(window) == 3 + + # Add a message at t=4.0, should keep messages from t=2.0 onwards + window.add(SimpleTimestamped(4.0, "msg4")) + assert len(window) == 3 # msg1 should be dropped + assert window[0].data == "msg2" # oldest is now msg2 + assert window[-1].data == "msg4" # newest is msg4 + + # Add a message at t=5.5, should drop msg2 and msg3 + window.add(SimpleTimestamped(5.5, "msg5")) + assert len(window) == 2 # only msg4 and msg5 remain + assert window[0].data == "msg4" + assert window[1].data == "msg5" + + # Verify time range + assert window.start_ts == 4.0 + assert window.end_ts == 5.5 + + +def test_timestamp_alignment(): + speed = 5.0 + + # ensure that lfs package is downloaded + get_data("unitree_office_walk") + + raw_frames = [] + + def spy(image): + raw_frames.append(image.ts) + print(image.ts) + return image + + # sensor reply of raw video frames + video_raw = ( + testing.TimedSensorReplay( + "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() + ) + .stream(speed) + .pipe(ops.map(spy), ops.take(30)) + ) + + processed_frames = [] + + def process_video_frame(frame): + processed_frames.append(frame.ts) + print("PROCESSING", frame.ts) + time.sleep(0.5 / speed) + return frame + + # fake reply of some 0.5s processor of video frames that drops messages + fake_video_processor = backpressure(video_raw).pipe(ops.map(process_video_frame)) + + aligned_frames = align_timestamped(fake_video_processor, video_raw).pipe(ops.to_list()).run() + + assert len(raw_frames) == 30 + assert len(processed_frames) > 2 + assert len(aligned_frames) == len(processed_frames) + + for value in aligned_frames: + [primary, secondary] = value + diff = abs(primary.ts - secondary.ts) + print( + f"Aligned pair: primary={primary.ts:.6f}, secondary={secondary.ts:.6f}, diff={diff:.6f}s" + ) + # Verify alignment is within tolerance + assert diff <= 0.05 # Default match_tolerance diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index c8c496aeb3..d88c0c3c22 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -195,3 +195,77 @@ def __iter__(self): def __getitem__(self, idx: int) -> T: return self._items[idx] + + +PRIMARY = TypeVar("PRIMARY", bound=Timestamped) +SECONDARY = TypeVar("SECONDARY", bound=Timestamped) + + +class TimestampedBufferCollection(TimestampedCollection[T]): + """A timestamped collection that maintains a sliding time window, dropping old messages.""" + + def __init__(self, window_duration: float, items: Optional[Iterable[T]] = None): + """ + Initialize with a time window duration in seconds. + + Args: + window_duration: Maximum age of messages to keep in seconds + items: Optional initial items + """ + super().__init__(items) + self.window_duration = window_duration + + def add(self, item: T) -> None: + """Add a timestamped item and remove any items outside the time window.""" + super().add(item) + self._prune_old_messages(item.ts) + + def _prune_old_messages(self, current_ts: float) -> None: + """Remove messages older than window_duration from the given timestamp.""" + cutoff_ts = current_ts - self.window_duration + + # Find the index of the first item that should be kept + timestamps = [item.ts for item in self._items] + keep_idx = bisect.bisect_left(timestamps, cutoff_ts) + + # Remove old items + if keep_idx > 0: + # Create new SortedList with items to keep + self._items = SortedList(self._items[keep_idx:], key=lambda x: x.ts) + + +def align_timestamped( + primary_observable: Observable[PRIMARY], + secondary_observable: Observable[SECONDARY], + buffer_size: float = 1.0, # seconds + match_tolerance: float = 0.05, # seconds +) -> Observable[Tuple[PRIMARY, SECONDARY]]: + from reactivex import create + + def subscribe(observer, scheduler=None): + secondary_collection: TimestampedBufferCollection[SECONDARY] = TimestampedBufferCollection( + buffer_size + ) + # Subscribe to secondary to populate the buffer + secondary_sub = secondary_observable.subscribe(secondary_collection.add) + + def on_primary(primary_item: PRIMARY): + secondary_item = secondary_collection.find_closest( + primary_item.ts, tolerance=match_tolerance + ) + if secondary_item is not None: + observer.on_next((primary_item, secondary_item)) + + # Subscribe to primary and emit aligned pairs + primary_sub = primary_observable.subscribe( + on_next=on_primary, on_error=observer.on_error, on_completed=observer.on_completed + ) + + # Return cleanup function + def dispose(): + secondary_sub.dispose() + primary_sub.dispose() + + return dispose + + return create(subscribe) From ad7680186dd166e1146a52453939d43cadc31ce2 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 21 Aug 2025 17:40:48 +0300 Subject: [PATCH 3/5] timestamped alignment finished --- dimos/types/test_timestamped.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index ff3b441e04..d723421c6a 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -291,7 +291,7 @@ def spy(image): "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() ) .stream(speed) - .pipe(ops.map(spy), ops.take(30)) + .pipe(ops.take(30)) ) processed_frames = [] @@ -303,13 +303,18 @@ def process_video_frame(frame): return frame # fake reply of some 0.5s processor of video frames that drops messages - fake_video_processor = backpressure(video_raw).pipe(ops.map(process_video_frame)) + fake_video_processor = backpressure(video_raw.pipe(ops.map(spy))).pipe( + ops.map(process_video_frame) + ) aligned_frames = align_timestamped(fake_video_processor, video_raw).pipe(ops.to_list()).run() assert len(raw_frames) == 30 assert len(processed_frames) > 2 - assert len(aligned_frames) == len(processed_frames) + assert len(aligned_frames) > 2 + + # Due to async processing, the last frame might not be aligned before completion + assert len(aligned_frames) >= len(processed_frames) - 1 for value in aligned_frames: [primary, secondary] = value @@ -317,5 +322,4 @@ def process_video_frame(frame): print( f"Aligned pair: primary={primary.ts:.6f}, secondary={secondary.ts:.6f}, diff={diff:.6f}s" ) - # Verify alignment is within tolerance - assert diff <= 0.05 # Default match_tolerance + assert diff <= 0.05 From 865a47f5fc7602909063cc2c905c01fd50ed5ee6 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 21 Aug 2025 17:49:36 +0300 Subject: [PATCH 4/5] tests passing --- dimos/protocol/tf/tf.py | 13 ++----------- dimos/types/timestamped.py | 6 +++--- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/dimos/protocol/tf/tf.py b/dimos/protocol/tf/tf.py index c06998bed0..1c7bb32101 100644 --- a/dimos/protocol/tf/tf.py +++ b/dimos/protocol/tf/tf.py @@ -93,23 +93,14 @@ def _prune_old_transforms(self) -> None: self._items.pop(0) def get( - self, time_point: Optional[float] = None, time_tolerance: Optional[float] = None + self, time_point: Optional[float] = None, time_tolerance: float = 1.0 ) -> Optional[Transform]: """Get transform at specified time or latest if no time given.""" if time_point is None: # Return the latest transform return self[-1] if len(self) > 0 else None - # Find closest transform within tolerance - closest = self.find_closest(time_point) - if closest is None: - return None - - if time_tolerance is not None: - if abs(closest.ts - time_point) > time_tolerance: - return None - - return closest + return self.find_closest(time_point, time_tolerance) def __str__(self) -> str: if not self._items: diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index d88c0c3c22..6446c5167b 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -103,7 +103,7 @@ def add(self, item: T) -> None: """Add a timestamped item to the collection.""" self._items.add(item) - def find_closest(self, timestamp: float, tolerance: float = 0.05) -> Optional[T]: + def find_closest(self, timestamp: float, tolerance: Optional[float] = None) -> Optional[T]: """Find the timestamped object closest to the given timestamp.""" if not self._items: return None @@ -134,8 +134,8 @@ def find_closest(self, timestamp: float, tolerance: float = 0.05) -> Optional[T] # When distances are equal, prefer the later item (higher index) closest_idx, closest_distance = min(candidates, key=lambda x: (x[1], -x[0])) - # Check tolerance - if closest_distance > tolerance: + # Check tolerance if provided + if tolerance is not None and closest_distance > tolerance: return None return self._items[closest_idx] From 63e2a5ad9cff1e8660e5a098beb612a9afac7aeb Mon Sep 17 00:00:00 2001 From: lesh Date: Mon, 25 Aug 2025 19:07:24 +0300 Subject: [PATCH 5/5] pr review cleanup --- dimos/types/test_timestamped.py | 1 - dimos/types/timestamped.py | 27 ++++++++++----------------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index d723421c6a..e81a7c6cfa 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -298,7 +298,6 @@ def spy(image): def process_video_frame(frame): processed_frames.append(frame.ts) - print("PROCESSING", frame.ts) time.sleep(0.5 / speed) return frame diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index 6446c5167b..b4dbdfe036 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -12,12 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import bisect from datetime import datetime, timezone from typing import Generic, Iterable, Optional, Tuple, TypedDict, TypeVar, Union from reactivex.observable import Observable -from sortedcontainers import SortedList +from sortedcontainers import SortedKeyList # any class that carries a timestamp should inherit from this # this allows us to work with timeseries in consistent way, allign messages, replay etc @@ -97,7 +96,7 @@ class TimestampedCollection(Generic[T]): """A collection of timestamped objects with efficient time-based operations.""" def __init__(self, items: Optional[Iterable[T]] = None): - self._items = SortedList(items or [], key=lambda x: x.ts) + self._items = SortedKeyList(items or [], key=lambda x: x.ts) def add(self, item: T) -> None: """Add a timestamped item to the collection.""" @@ -109,8 +108,7 @@ def find_closest(self, timestamp: float, tolerance: Optional[float] = None) -> O return None # Use binary search to find insertion point - timestamps = [item.ts for item in self._items] - idx = bisect.bisect_left(timestamps, timestamp) + idx = self._items.bisect_key_left(timestamp) # Check exact match if idx < len(self._items) and self._items[idx].ts == timestamp: @@ -142,20 +140,18 @@ def find_closest(self, timestamp: float, tolerance: Optional[float] = None) -> O def find_before(self, timestamp: float) -> Optional[T]: """Find the last item before the given timestamp.""" - timestamps = [item.ts for item in self._items] - idx = bisect.bisect_left(timestamps, timestamp) + idx = self._items.bisect_key_left(timestamp) return self._items[idx - 1] if idx > 0 else None def find_after(self, timestamp: float) -> Optional[T]: """Find the first item after the given timestamp.""" - timestamps = [item.ts for item in self._items] - idx = bisect.bisect_right(timestamps, timestamp) + idx = self._items.bisect_key_right(timestamp) return self._items[idx] if idx < len(self._items) else None def merge(self, other: "TimestampedCollection[T]") -> "TimestampedCollection[T]": """Merge two timestamped collections into a new one.""" result = TimestampedCollection[T]() - result._items = SortedList(self._items + other._items, key=lambda x: x.ts) + result._items = SortedKeyList(self._items + other._items, key=lambda x: x.ts) return result def duration(self) -> float: @@ -172,9 +168,8 @@ def time_range(self) -> Optional[Tuple[float, float]]: def slice_by_time(self, start: float, end: float) -> "TimestampedCollection[T]": """Get a subset of items within the given time range.""" - timestamps = [item.ts for item in self._items] - start_idx = bisect.bisect_left(timestamps, start) - end_idx = bisect.bisect_right(timestamps, end) + start_idx = self._items.bisect_key_left(start) + end_idx = self._items.bisect_key_right(end) return TimestampedCollection(self._items[start_idx:end_idx]) @property @@ -225,13 +220,11 @@ def _prune_old_messages(self, current_ts: float) -> None: cutoff_ts = current_ts - self.window_duration # Find the index of the first item that should be kept - timestamps = [item.ts for item in self._items] - keep_idx = bisect.bisect_left(timestamps, cutoff_ts) + keep_idx = self._items.bisect_key_left(cutoff_ts) # Remove old items if keep_idx > 0: - # Create new SortedList with items to keep - self._items = SortedList(self._items[keep_idx:], key=lambda x: x.ts) + del self._items[:keep_idx] def align_timestamped(