diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index d723421c6a..e81a7c6cfa 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -298,7 +298,6 @@ def spy(image): def process_video_frame(frame): processed_frames.append(frame.ts) - print("PROCESSING", frame.ts) time.sleep(0.5 / speed) return frame diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index 6446c5167b..b4dbdfe036 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -12,12 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import bisect from datetime import datetime, timezone from typing import Generic, Iterable, Optional, Tuple, TypedDict, TypeVar, Union from reactivex.observable import Observable -from sortedcontainers import SortedList +from sortedcontainers import SortedKeyList # any class that carries a timestamp should inherit from this # this allows us to work with timeseries in consistent way, allign messages, replay etc @@ -97,7 +96,7 @@ class TimestampedCollection(Generic[T]): """A collection of timestamped objects with efficient time-based operations.""" def __init__(self, items: Optional[Iterable[T]] = None): - self._items = SortedList(items or [], key=lambda x: x.ts) + self._items = SortedKeyList(items or [], key=lambda x: x.ts) def add(self, item: T) -> None: """Add a timestamped item to the collection.""" @@ -109,8 +108,7 @@ def find_closest(self, timestamp: float, tolerance: Optional[float] = None) -> O return None # Use binary search to find insertion point - timestamps = [item.ts for item in self._items] - idx = bisect.bisect_left(timestamps, timestamp) + idx = self._items.bisect_key_left(timestamp) # Check exact match if idx < len(self._items) and self._items[idx].ts == timestamp: @@ -142,20 +140,18 @@ def find_closest(self, timestamp: float, tolerance: Optional[float] = None) -> O def find_before(self, timestamp: float) -> Optional[T]: """Find the last item before the given timestamp.""" - timestamps = [item.ts for item in self._items] - idx = bisect.bisect_left(timestamps, timestamp) + idx = self._items.bisect_key_left(timestamp) return self._items[idx - 1] if idx > 0 else None def find_after(self, timestamp: float) -> Optional[T]: """Find the first item after the given timestamp.""" - timestamps = [item.ts for item in self._items] - idx = bisect.bisect_right(timestamps, timestamp) + idx = self._items.bisect_key_right(timestamp) return self._items[idx] if idx < len(self._items) else None def merge(self, other: "TimestampedCollection[T]") -> "TimestampedCollection[T]": """Merge two timestamped collections into a new one.""" result = TimestampedCollection[T]() - result._items = SortedList(self._items + other._items, key=lambda x: x.ts) + result._items = SortedKeyList(self._items + other._items, key=lambda x: x.ts) return result def duration(self) -> float: @@ -172,9 +168,8 @@ def time_range(self) -> Optional[Tuple[float, float]]: def slice_by_time(self, start: float, end: float) -> "TimestampedCollection[T]": """Get a subset of items within the given time range.""" - timestamps = [item.ts for item in self._items] - start_idx = bisect.bisect_left(timestamps, start) - end_idx = bisect.bisect_right(timestamps, end) + start_idx = self._items.bisect_key_left(start) + end_idx = self._items.bisect_key_right(end) return TimestampedCollection(self._items[start_idx:end_idx]) @property @@ -225,13 +220,11 @@ def _prune_old_messages(self, current_ts: float) -> None: cutoff_ts = current_ts - self.window_duration # Find the index of the first item that should be kept - timestamps = [item.ts for item in self._items] - keep_idx = bisect.bisect_left(timestamps, cutoff_ts) + keep_idx = self._items.bisect_key_left(cutoff_ts) # Remove old items if keep_idx > 0: - # Create new SortedList with items to keep - self._items = SortedList(self._items[keep_idx:], key=lambda x: x.ts) + del self._items[:keep_idx] def align_timestamped(