Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dimos/types/test_timestamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ def spy(image):

def process_video_frame(frame):
processed_frames.append(frame.ts)
print("PROCESSING", frame.ts)
time.sleep(0.5 / speed)
return frame

Expand Down
27 changes: 10 additions & 17 deletions dimos/types/timestamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import bisect
from datetime import datetime, timezone
from typing import Generic, Iterable, Optional, Tuple, TypedDict, TypeVar, Union

from reactivex.observable import Observable
from sortedcontainers import SortedList
from sortedcontainers import SortedKeyList

# any class that carries a timestamp should inherit from this
# this allows us to work with timeseries in consistent way, allign messages, replay etc
Expand Down Expand Up @@ -97,7 +96,7 @@ class TimestampedCollection(Generic[T]):
"""A collection of timestamped objects with efficient time-based operations."""

def __init__(self, items: Optional[Iterable[T]] = None):
self._items = SortedList(items or [], key=lambda x: x.ts)
self._items = SortedKeyList(items or [], key=lambda x: x.ts)

def add(self, item: T) -> None:
"""Add a timestamped item to the collection."""
Expand All @@ -109,8 +108,7 @@ def find_closest(self, timestamp: float, tolerance: Optional[float] = None) -> O
return None

# Use binary search to find insertion point
timestamps = [item.ts for item in self._items]
idx = bisect.bisect_left(timestamps, timestamp)
idx = self._items.bisect_key_left(timestamp)

# Check exact match
if idx < len(self._items) and self._items[idx].ts == timestamp:
Expand Down Expand Up @@ -142,20 +140,18 @@ def find_closest(self, timestamp: float, tolerance: Optional[float] = None) -> O

def find_before(self, timestamp: float) -> Optional[T]:
"""Find the last item before the given timestamp."""
timestamps = [item.ts for item in self._items]
idx = bisect.bisect_left(timestamps, timestamp)
idx = self._items.bisect_key_left(timestamp)
return self._items[idx - 1] if idx > 0 else None

def find_after(self, timestamp: float) -> Optional[T]:
"""Find the first item after the given timestamp."""
timestamps = [item.ts for item in self._items]
idx = bisect.bisect_right(timestamps, timestamp)
idx = self._items.bisect_key_right(timestamp)
return self._items[idx] if idx < len(self._items) else None

def merge(self, other: "TimestampedCollection[T]") -> "TimestampedCollection[T]":
"""Merge two timestamped collections into a new one."""
result = TimestampedCollection[T]()
result._items = SortedList(self._items + other._items, key=lambda x: x.ts)
result._items = SortedKeyList(self._items + other._items, key=lambda x: x.ts)
return result

def duration(self) -> float:
Expand All @@ -172,9 +168,8 @@ def time_range(self) -> Optional[Tuple[float, float]]:

def slice_by_time(self, start: float, end: float) -> "TimestampedCollection[T]":
"""Get a subset of items within the given time range."""
timestamps = [item.ts for item in self._items]
start_idx = bisect.bisect_left(timestamps, start)
end_idx = bisect.bisect_right(timestamps, end)
start_idx = self._items.bisect_key_left(start)
end_idx = self._items.bisect_key_right(end)
return TimestampedCollection(self._items[start_idx:end_idx])

@property
Expand Down Expand Up @@ -225,13 +220,11 @@ def _prune_old_messages(self, current_ts: float) -> None:
cutoff_ts = current_ts - self.window_duration

# Find the index of the first item that should be kept
timestamps = [item.ts for item in self._items]
keep_idx = bisect.bisect_left(timestamps, cutoff_ts)
keep_idx = self._items.bisect_key_left(cutoff_ts)

# Remove old items
if keep_idx > 0:
# Create new SortedList with items to keep
self._items = SortedList(self._items[keep_idx:], key=lambda x: x.ts)
del self._items[:keep_idx]


def align_timestamped(
Expand Down