From c29ad3c995a19760defec3295603cc4439f2e108 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 21 Aug 2025 16:52:49 +0300 Subject: [PATCH 01/10] timestamped find_closest takes tolerance --- dimos/types/test_timestamped.py | 16 +++++++++----- dimos/types/timestamped.py | 39 +++++++++++++++++++++++---------- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index 7f043750ea..5482ede549 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -145,19 +145,24 @@ def test_find_closest(collection): assert collection.find_closest(3.0).data == "third" # Between items (closer to left) - assert collection.find_closest(1.5).data == "first" + assert collection.find_closest(1.5, tolerance=1.0).data == "first" # Between items (closer to right) - assert collection.find_closest(3.5).data == "third" + assert collection.find_closest(3.5, tolerance=1.0).data == "third" # Exactly in the middle (should pick the later one due to >= comparison) - assert collection.find_closest(4.0).data == "fifth" # 4.0 is equidistant from 3.0 and 5.0 + assert ( + collection.find_closest(4.0, tolerance=1.0).data == "fifth" + ) # 4.0 is equidistant from 3.0 and 5.0 # Before all items - assert collection.find_closest(0.0).data == "first" + assert collection.find_closest(0.0, tolerance=1.0).data == "first" # After all items - assert collection.find_closest(10.0).data == "seventh" + assert collection.find_closest(10.0, tolerance=4.0).data == "seventh" + + # low tolerance, should return None + assert collection.find_closest(10.0, tolerance=2.0) is None def test_find_before_after(collection): @@ -223,4 +228,3 @@ def test_single_item_collection(): single = TimestampedCollection([SimpleTimestamped(5.0, "only")]) assert single.duration() == 0.0 assert single.time_range() == (5.0, 5.0) - assert single.find_closest(100.0).data == "only" diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index 858a2bdaad..c8c496aeb3 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -16,6 +16,7 @@ from datetime import datetime, timezone from typing import Generic, Iterable, Optional, Tuple, TypedDict, TypeVar, Union +from reactivex.observable import Observable from sortedcontainers import SortedList # any class that carries a timestamp should inherit from this @@ -102,26 +103,42 @@ def add(self, item: T) -> None: """Add a timestamped item to the collection.""" self._items.add(item) - def find_closest(self, timestamp: float) -> Optional[T]: + def find_closest(self, timestamp: float, tolerance: float = 0.05) -> Optional[T]: """Find the timestamped object closest to the given timestamp.""" if not self._items: return None - # Find insertion point using binary search on timestamps + # Use binary search to find insertion point timestamps = [item.ts for item in self._items] idx = bisect.bisect_left(timestamps, timestamp) - # Check boundaries - if idx == 0: - return self._items[0] - if idx == len(self._items): - return self._items[-1] + # Check exact match + if idx < len(self._items) and self._items[idx].ts == timestamp: + return self._items[idx] - # Compare distances to neighbors - left_diff = abs(timestamp - self._items[idx - 1].ts) - right_diff = abs(self._items[idx].ts - timestamp) + # Find candidates: item before and after + candidates = [] - return self._items[idx - 1] if left_diff < right_diff else self._items[idx] + # Item before + if idx > 0: + candidates.append((idx - 1, abs(self._items[idx - 1].ts - timestamp))) + + # Item after + if idx < len(self._items): + candidates.append((idx, abs(self._items[idx].ts - timestamp))) + + if not candidates: + return None + + # Find closest + # When distances are equal, prefer the later item (higher index) + closest_idx, closest_distance = min(candidates, key=lambda x: (x[1], -x[0])) + + # Check tolerance + if closest_distance > tolerance: + return None + + return self._items[closest_idx] def find_before(self, timestamp: float) -> Optional[T]: """Find the last item before the given timestamp.""" From 11d08fa6208e6b2581e33bffbfbb191a04aa2191 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 21 Aug 2025 17:30:24 +0300 Subject: [PATCH 02/10] initial implementation of frame alignment mechanism --- dimos/types/test_timestamped.py | 95 ++++++++++++++++++++++++++++++++- dimos/types/timestamped.py | 74 +++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 2 deletions(-) diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index 5482ede549..ff3b441e04 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -12,11 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from datetime import datetime, timezone import pytest - -from dimos.types.timestamped import Timestamped, TimestampedCollection, to_datetime, to_ros_stamp +from reactivex import operators as ops + +from dimos.msgs.sensor_msgs import Image +from dimos.types.timestamped import ( + Timestamped, + TimestampedBufferCollection, + TimestampedCollection, + align_timestamped, + to_datetime, + to_ros_stamp, +) +from dimos.utils import testing +from dimos.utils.data import get_data +from dimos.utils.reactive import backpressure def test_timestamped_dt_method(): @@ -228,3 +241,81 @@ def test_single_item_collection(): single = TimestampedCollection([SimpleTimestamped(5.0, "only")]) assert single.duration() == 0.0 assert single.time_range() == (5.0, 5.0) + + +def test_time_window_collection(): + # Create a collection with a 2-second window + window = TimestampedBufferCollection[SimpleTimestamped](window_duration=2.0) + + # Add messages at different timestamps + window.add(SimpleTimestamped(1.0, "msg1")) + window.add(SimpleTimestamped(2.0, "msg2")) + window.add(SimpleTimestamped(3.0, "msg3")) + + # At this point, all messages should be present (within 2s window) + assert len(window) == 3 + + # Add a message at t=4.0, should keep messages from t=2.0 onwards + window.add(SimpleTimestamped(4.0, "msg4")) + assert len(window) == 3 # msg1 should be dropped + assert window[0].data == "msg2" # oldest is now msg2 + assert window[-1].data == "msg4" # newest is msg4 + + # Add a message at t=5.5, should drop msg2 and msg3 + window.add(SimpleTimestamped(5.5, "msg5")) + assert len(window) == 2 # only msg4 and msg5 remain + assert window[0].data == "msg4" + assert window[1].data == "msg5" + + # Verify time range + assert window.start_ts == 4.0 + assert window.end_ts == 5.5 + + +def test_timestamp_alignment(): + speed = 5.0 + + # ensure that lfs package is downloaded + get_data("unitree_office_walk") + + raw_frames = [] + + def spy(image): + raw_frames.append(image.ts) + print(image.ts) + return image + + # sensor reply of raw video frames + video_raw = ( + testing.TimedSensorReplay( + "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() + ) + .stream(speed) + .pipe(ops.map(spy), ops.take(30)) + ) + + processed_frames = [] + + def process_video_frame(frame): + processed_frames.append(frame.ts) + print("PROCESSING", frame.ts) + time.sleep(0.5 / speed) + return frame + + # fake reply of some 0.5s processor of video frames that drops messages + fake_video_processor = backpressure(video_raw).pipe(ops.map(process_video_frame)) + + aligned_frames = align_timestamped(fake_video_processor, video_raw).pipe(ops.to_list()).run() + + assert len(raw_frames) == 30 + assert len(processed_frames) > 2 + assert len(aligned_frames) == len(processed_frames) + + for value in aligned_frames: + [primary, secondary] = value + diff = abs(primary.ts - secondary.ts) + print( + f"Aligned pair: primary={primary.ts:.6f}, secondary={secondary.ts:.6f}, diff={diff:.6f}s" + ) + # Verify alignment is within tolerance + assert diff <= 0.05 # Default match_tolerance diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index c8c496aeb3..d88c0c3c22 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -195,3 +195,77 @@ def __iter__(self): def __getitem__(self, idx: int) -> T: return self._items[idx] + + +PRIMARY = TypeVar("PRIMARY", bound=Timestamped) +SECONDARY = TypeVar("SECONDARY", bound=Timestamped) + + +class TimestampedBufferCollection(TimestampedCollection[T]): + """A timestamped collection that maintains a sliding time window, dropping old messages.""" + + def __init__(self, window_duration: float, items: Optional[Iterable[T]] = None): + """ + Initialize with a time window duration in seconds. + + Args: + window_duration: Maximum age of messages to keep in seconds + items: Optional initial items + """ + super().__init__(items) + self.window_duration = window_duration + + def add(self, item: T) -> None: + """Add a timestamped item and remove any items outside the time window.""" + super().add(item) + self._prune_old_messages(item.ts) + + def _prune_old_messages(self, current_ts: float) -> None: + """Remove messages older than window_duration from the given timestamp.""" + cutoff_ts = current_ts - self.window_duration + + # Find the index of the first item that should be kept + timestamps = [item.ts for item in self._items] + keep_idx = bisect.bisect_left(timestamps, cutoff_ts) + + # Remove old items + if keep_idx > 0: + # Create new SortedList with items to keep + self._items = SortedList(self._items[keep_idx:], key=lambda x: x.ts) + + +def align_timestamped( + primary_observable: Observable[PRIMARY], + secondary_observable: Observable[SECONDARY], + buffer_size: float = 1.0, # seconds + match_tolerance: float = 0.05, # seconds +) -> Observable[Tuple[PRIMARY, SECONDARY]]: + from reactivex import create + + def subscribe(observer, scheduler=None): + secondary_collection: TimestampedBufferCollection[SECONDARY] = TimestampedBufferCollection( + buffer_size + ) + # Subscribe to secondary to populate the buffer + secondary_sub = secondary_observable.subscribe(secondary_collection.add) + + def on_primary(primary_item: PRIMARY): + secondary_item = secondary_collection.find_closest( + primary_item.ts, tolerance=match_tolerance + ) + if secondary_item is not None: + observer.on_next((primary_item, secondary_item)) + + # Subscribe to primary and emit aligned pairs + primary_sub = primary_observable.subscribe( + on_next=on_primary, on_error=observer.on_error, on_completed=observer.on_completed + ) + + # Return cleanup function + def dispose(): + secondary_sub.dispose() + primary_sub.dispose() + + return dispose + + return create(subscribe) From ad7680186dd166e1146a52453939d43cadc31ce2 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 21 Aug 2025 17:40:48 +0300 Subject: [PATCH 03/10] timestamped alignment finished --- dimos/types/test_timestamped.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index ff3b441e04..d723421c6a 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -291,7 +291,7 @@ def spy(image): "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() ) .stream(speed) - .pipe(ops.map(spy), ops.take(30)) + .pipe(ops.take(30)) ) processed_frames = [] @@ -303,13 +303,18 @@ def process_video_frame(frame): return frame # fake reply of some 0.5s processor of video frames that drops messages - fake_video_processor = backpressure(video_raw).pipe(ops.map(process_video_frame)) + fake_video_processor = backpressure(video_raw.pipe(ops.map(spy))).pipe( + ops.map(process_video_frame) + ) aligned_frames = align_timestamped(fake_video_processor, video_raw).pipe(ops.to_list()).run() assert len(raw_frames) == 30 assert len(processed_frames) > 2 - assert len(aligned_frames) == len(processed_frames) + assert len(aligned_frames) > 2 + + # Due to async processing, the last frame might not be aligned before completion + assert len(aligned_frames) >= len(processed_frames) - 1 for value in aligned_frames: [primary, secondary] = value @@ -317,5 +322,4 @@ def process_video_frame(frame): print( f"Aligned pair: primary={primary.ts:.6f}, secondary={secondary.ts:.6f}, diff={diff:.6f}s" ) - # Verify alignment is within tolerance - assert diff <= 0.05 # Default match_tolerance + assert diff <= 0.05 From 865a47f5fc7602909063cc2c905c01fd50ed5ee6 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 21 Aug 2025 17:49:36 +0300 Subject: [PATCH 04/10] tests passing --- dimos/protocol/tf/tf.py | 13 ++----------- dimos/types/timestamped.py | 6 +++--- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/dimos/protocol/tf/tf.py b/dimos/protocol/tf/tf.py index c06998bed0..1c7bb32101 100644 --- a/dimos/protocol/tf/tf.py +++ b/dimos/protocol/tf/tf.py @@ -93,23 +93,14 @@ def _prune_old_transforms(self) -> None: self._items.pop(0) def get( - self, time_point: Optional[float] = None, time_tolerance: Optional[float] = None + self, time_point: Optional[float] = None, time_tolerance: float = 1.0 ) -> Optional[Transform]: """Get transform at specified time or latest if no time given.""" if time_point is None: # Return the latest transform return self[-1] if len(self) > 0 else None - # Find closest transform within tolerance - closest = self.find_closest(time_point) - if closest is None: - return None - - if time_tolerance is not None: - if abs(closest.ts - time_point) > time_tolerance: - return None - - return closest + return self.find_closest(time_point, time_tolerance) def __str__(self) -> str: if not self._items: diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index d88c0c3c22..6446c5167b 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -103,7 +103,7 @@ def add(self, item: T) -> None: """Add a timestamped item to the collection.""" self._items.add(item) - def find_closest(self, timestamp: float, tolerance: float = 0.05) -> Optional[T]: + def find_closest(self, timestamp: float, tolerance: Optional[float] = None) -> Optional[T]: """Find the timestamped object closest to the given timestamp.""" if not self._items: return None @@ -134,8 +134,8 @@ def find_closest(self, timestamp: float, tolerance: float = 0.05) -> Optional[T] # When distances are equal, prefer the later item (higher index) closest_idx, closest_distance = min(candidates, key=lambda x: (x[1], -x[0])) - # Check tolerance - if closest_distance > tolerance: + # Check tolerance if provided + if tolerance is not None and closest_distance > tolerance: return None return self._items[closest_idx] From d1e890191fdd293fcd5df4abf0c413e814d4b4d5 Mon Sep 17 00:00:00 2001 From: Matthew Daiter Date: Fri, 22 Aug 2025 10:20:20 +0000 Subject: [PATCH 05/10] Sharpness for Images --- dimos/msgs/sensor_msgs/Image.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 90bd851222..e25c6c837f 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -276,6 +276,24 @@ def crop(self, x: int, y: int, width: int, height: int) -> "Image": ts=self.ts, ) + def sharpness(self) -> float: + """ + Compute the Tenengrad focus measure for an image. + """ + grayscale = self.to_grayscale() + # Sobel gradient computation in x and y directions + sx = cv2.Sobel(grayscale, cv2.CV_32F, 1, 0, ksize=5) + sy = cv2.Sobel(grayscale, cv2.CV_32F, 0, 1, ksize=5) + + # Compute gradient magnitude + magnitude = cv2.magnitude(sx, sy) + + # Normalize the gradient magnitude for visualization + magnitude_normalized = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX) + + # Return the mean of the gradient magnitudes and the normalized image + return magnitude.mean() + def save(self, filepath: str) -> bool: """Save image to file.""" # Convert to OpenCV format for saving From bdcb24bcc1e228d383217f6058b4863a95356afd Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 22 Aug 2025 13:56:53 +0300 Subject: [PATCH 06/10] sharpness issue --- dimos/msgs/sensor_msgs/Image.py | 17 +++++++++++++++++ dimos/msgs/sensor_msgs/test_image.py | 23 ++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index e25c6c837f..4260a0c80a 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -14,15 +14,20 @@ import time from dataclasses import dataclass, field +from datetime import timedelta from enum import Enum from typing import Optional, Tuple import cv2 import numpy as np +import reactivex as rx # Import LCM types from dimos_lcm.sensor_msgs.Image import Image as LCMImage from dimos_lcm.std_msgs.Header import Header +from reactivex import operators as ops +from reactivex.observable import Observable +from reactivex.scheduler import ThreadPoolScheduler from dimos.types.timestamped import Timestamped @@ -463,3 +468,15 @@ def agent_encode(self) -> str: base64_str = base64.b64encode(jpeg_bytes).decode("utf-8") return base64_str + + +def sharpness_window(target_frequency: float, source: Observable[Image]) -> Observable[Image]: + pool = ThreadPoolScheduler() + window = timedelta(seconds=1 / target_frequency) + + return source.pipe( + ops.flat_map(lambda img: rx.start(lambda: (img.sharpness(), img), scheduler=pool)), + ops.buffer_with_time(window, window), + ops.filter(bool), + ops.map(lambda buf: max(buf, key=lambda t: t[0])[1]), + ) diff --git a/dimos/msgs/sensor_msgs/test_image.py b/dimos/msgs/sensor_msgs/test_image.py index 8e4e0a413f..cd12540b12 100644 --- a/dimos/msgs/sensor_msgs/test_image.py +++ b/dimos/msgs/sensor_msgs/test_image.py @@ -15,8 +15,9 @@ import numpy as np import pytest -from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat, sharpness_window from dimos.utils.data import get_data +from dimos.utils.testing import TimedSensorReplay @pytest.fixture @@ -61,3 +62,23 @@ def test_opencv_conversion(img: Image): # artificially patch timestamp decoded_img.ts = img.ts assert decoded_img == img + + +def test_sharpness_detector(): + get_data("unitree_office_walk") # Preload data for testing + video_store = TimedSensorReplay( + "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() + ) + + for image in video_store.iterate(): + print(image.sharpness()) + + +def test_sharpness_sliding_window(): + get_data("unitree_office_walk") # Preload data for testing + sharpness_window( + 0.5, + video_store=TimedSensorReplay( + "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() + ).stream(), + ).subscribe(print) From 066412fca64437b2cb5fdb29b26499a91516725e Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 22 Aug 2025 13:58:18 +0300 Subject: [PATCH 07/10] sharpness sliding window added to unitree msg replay --- dimos/robot/unitree_webrtc/unitree_go2.py | 45 +++++++++++------------ 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index 13f59ec20b..2f768c78c7 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -22,41 +22,40 @@ import warnings from typing import List, Optional +from dimos_lcm.sensor_msgs import CameraInfo +from dimos_lcm.std_msgs import Bool, String +from dimos_lcm.vision_msgs import Detection2DArray, Detection3DArray + from dimos import core from dimos.core import In, Module, Out, rpc -from dimos.msgs.geometry_msgs import PoseStamped, Transform, Vector3, Quaternion +from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, Transform, Vector3 from dimos.msgs.nav_msgs import OccupancyGrid, Path -from dimos.msgs.sensor_msgs import Image -from dimos_lcm.std_msgs import String -from dimos_lcm.sensor_msgs import CameraInfo -from dimos_lcm.vision_msgs import Detection2DArray, Detection3DArray +from dimos.msgs.sensor_msgs.Image import Image, sharpness_window +from dimos.navigation.bt_navigator.navigator import BehaviorTreeNavigator, NavigatorState +from dimos.navigation.frontier_exploration import WavefrontFrontierExplorer +from dimos.navigation.global_planner import AstarPlanner +from dimos.navigation.local_planner.holonomic_local_planner import HolonomicLocalPlanner +from dimos.perception.common.utils import extract_pose_from_detection3d +from dimos.perception.object_tracker import ObjectTracking from dimos.perception.spatial_perception import SpatialMemory from dimos.protocol import pubsub from dimos.protocol.pubsub.lcmpubsub import LCM, Topic from dimos.protocol.tf import TF from dimos.robot.foxglove_bridge import FoxgloveBridge -from dimos.web.websocket_vis.websocket_vis_module import WebsocketVisModule -from dimos.navigation.global_planner import AstarPlanner -from dimos.navigation.local_planner.holonomic_local_planner import HolonomicLocalPlanner -from dimos.navigation.bt_navigator.navigator import BehaviorTreeNavigator, NavigatorState -from dimos.navigation.frontier_exploration import WavefrontFrontierExplorer +from dimos.robot.robot import Robot +from dimos.robot.unitree_webrtc.camera_module import UnitreeCameraModule from dimos.robot.unitree_webrtc.connection import UnitreeWebRTCConnection from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.map import Map from dimos.robot.unitree_webrtc.type.odometry import Odometry from dimos.robot.unitree_webrtc.unitree_skills import MyUnitreeSkills -from dimos.robot.unitree_webrtc.camera_module import UnitreeCameraModule from dimos.skills.skills import AbstractRobotSkill, SkillLibrary +from dimos.types.robot_capabilities import RobotCapability from dimos.utils.data import get_data from dimos.utils.logging_config import setup_logger from dimos.utils.testing import TimedSensorReplay from dimos.utils.transform_utils import offset_distance -from dimos.perception.common.utils import extract_pose_from_detection3d -from dimos.perception.object_tracker import ObjectTracking -from dimos_lcm.std_msgs import Bool -from dimos.robot.robot import Robot -from dimos.types.robot_capabilities import RobotCapability - +from dimos.web.websocket_vis.websocket_vis_module import WebsocketVisModule logger = setup_logger("dimos.robot.unitree_webrtc.unitree_go2", level=logging.INFO) @@ -106,7 +105,7 @@ def video_stream(self): video_store = TimedSensorReplay( "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() ) - return video_store.stream() + return sharpness_window(0.5, video_store.stream()) def move(self, vector: Vector3, duration: float = 0.0): pass @@ -289,8 +288,8 @@ def start(self): self._deploy_mapping() self._deploy_navigation() self._deploy_visualization() - self._deploy_perception() - self._deploy_camera() + # self._deploy_perception() + # self._deploy_camera() self._start_modules() @@ -459,9 +458,9 @@ def _start_modules(self): self.frontier_explorer.start() self.websocket_vis.start() self.foxglove_bridge.start() - self.spatial_memory_module.start() - self.camera_module.start() - self.object_tracker.start() + # self.spatial_memory_module.start() + # self.camera_module.start() + # self.object_tracker.start() # Initialize skills after connection is established if self.skill_library is not None: From dbe48f598fae8ff6d312ca171816d92c99d9fce0 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 22 Aug 2025 14:22:32 +0300 Subject: [PATCH 08/10] foxglove reply for image sharpness filter --- dimos/msgs/sensor_msgs/Image.py | 14 +++++----- dimos/msgs/sensor_msgs/test_image.py | 39 +++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 4260a0c80a..55e9e7920c 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -284,20 +284,20 @@ def crop(self, x: int, y: int, width: int, height: int) -> "Image": def sharpness(self) -> float: """ Compute the Tenengrad focus measure for an image. + Returns a normalized value between 0 and 1, where 1 is sharpest. """ grayscale = self.to_grayscale() # Sobel gradient computation in x and y directions - sx = cv2.Sobel(grayscale, cv2.CV_32F, 1, 0, ksize=5) - sy = cv2.Sobel(grayscale, cv2.CV_32F, 0, 1, ksize=5) + sx = cv2.Sobel(grayscale.data, cv2.CV_32F, 1, 0, ksize=5) + sy = cv2.Sobel(grayscale.data, cv2.CV_32F, 0, 1, ksize=5) # Compute gradient magnitude magnitude = cv2.magnitude(sx, sy) - # Normalize the gradient magnitude for visualization - magnitude_normalized = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX) - - # Return the mean of the gradient magnitudes and the normalized image - return magnitude.mean() + # Return normalized mean (0-1 range) + # Max theoretical magnitude ~1442, but typically much lower + # Using 255 as practical upper bound for 8-bit images + return min(magnitude.mean() / 255.0, 1.0) def save(self, filepath: str) -> bool: """Save image to file.""" diff --git a/dimos/msgs/sensor_msgs/test_image.py b/dimos/msgs/sensor_msgs/test_image.py index cd12540b12..9622ea35a0 100644 --- a/dimos/msgs/sensor_msgs/test_image.py +++ b/dimos/msgs/sensor_msgs/test_image.py @@ -64,21 +64,48 @@ def test_opencv_conversion(img: Image): assert decoded_img == img +@pytest.mark.tool def test_sharpness_detector(): get_data("unitree_office_walk") # Preload data for testing video_store = TimedSensorReplay( "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() ) + cnt = 0 for image in video_store.iterate(): + cnt = cnt + 1 print(image.sharpness()) + if cnt > 30: + return -def test_sharpness_sliding_window(): +@pytest.mark.tool +def test_sharpness_sliding_window_foxglove(): + import time + + from dimos.msgs.geometry_msgs import Vector3 + from dimos.protocol.pubsub.lcmpubsub import LCM, Topic + + lcm = LCM() + lcm.start() + + sharp_topic = Topic("/sharp", Image) + all_topic = Topic("/all", Image) + sharpness_topic = Topic("/sharpness", Vector3) + get_data("unitree_office_walk") # Preload data for testing + video_stream = TimedSensorReplay( + "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() + ).stream() + + # Publish all images to all_topic + video_stream.subscribe(lambda x: lcm.publish(all_topic, x)) + video_stream.subscribe(lambda x: lcm.publish(sharpness_topic, Vector3([x.sharpness(), 0, 0]))) + + # Publish sharp images to sharp_topic sharpness_window( - 0.5, - video_store=TimedSensorReplay( - "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() - ).stream(), - ).subscribe(print) + 1, + source=video_stream, + ).subscribe(lambda x: lcm.publish(sharp_topic, x)) + + time.sleep(120) From 39b7fd64388479bf8f12a17c1cd6ad96d42aa3a5 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 22 Aug 2025 14:53:04 +0300 Subject: [PATCH 09/10] log range for sharpness --- assets/foxglove_image_sharpness_test.json | 140 ++++++++++++++++++++++ dimos/msgs/sensor_msgs/Image.py | 24 +++- 2 files changed, 160 insertions(+), 4 deletions(-) create mode 100644 assets/foxglove_image_sharpness_test.json diff --git a/assets/foxglove_image_sharpness_test.json b/assets/foxglove_image_sharpness_test.json new file mode 100644 index 0000000000..e68b79a7e4 --- /dev/null +++ b/assets/foxglove_image_sharpness_test.json @@ -0,0 +1,140 @@ +{ + "configById": { + "Image!1dpphsz": { + "cameraState": { + "distance": 20, + "perspective": true, + "phi": 60, + "target": [ + 0, + 0, + 0 + ], + "targetOffset": [ + 0, + 0, + 0 + ], + "targetOrientation": [ + 0, + 0, + 0, + 1 + ], + "thetaOffset": 45, + "fovy": 45, + "near": 0.5, + "far": 5000 + }, + "followMode": "follow-pose", + "scene": {}, + "transforms": {}, + "topics": {}, + "layers": {}, + "publish": { + "type": "point", + "poseTopic": "/move_base_simple/goal", + "pointTopic": "/clicked_point", + "poseEstimateTopic": "/initialpose", + "poseEstimateXDeviation": 0.5, + "poseEstimateYDeviation": 0.5, + "poseEstimateThetaDeviation": 0.26179939 + }, + "imageMode": { + "imageTopic": "/all" + } + }, + "Image!2xvd0hl": { + "cameraState": { + "distance": 20, + "perspective": true, + "phi": 60, + "target": [ + 0, + 0, + 0 + ], + "targetOffset": [ + 0, + 0, + 0 + ], + "targetOrientation": [ + 0, + 0, + 0, + 1 + ], + "thetaOffset": 45, + "fovy": 45, + "near": 0.5, + "far": 5000 + }, + "followMode": "follow-pose", + "scene": {}, + "transforms": {}, + "topics": {}, + "layers": {}, + "publish": { + "type": "point", + "poseTopic": "/move_base_simple/goal", + "pointTopic": "/clicked_point", + "poseEstimateTopic": "/initialpose", + "poseEstimateXDeviation": 0.5, + "poseEstimateYDeviation": 0.5, + "poseEstimateThetaDeviation": 0.26179939 + }, + "imageMode": { + "imageTopic": "/sharp" + } + }, + "Gauge!1iofczz": { + "path": "/sharpness.x", + "minValue": 0, + "maxValue": 1, + "colorMap": "red-yellow-green", + "colorMode": "colormap", + "gradient": [ + "#0000ff", + "#ff00ff" + ], + "reverse": false + }, + "Plot!1gy7vh9": { + "paths": [ + { + "timestampMethod": "receiveTime", + "value": "/sharpness.x", + "enabled": true, + "color": "#4e98e2" + } + ], + "showXAxisLabels": true, + "showYAxisLabels": true, + "showLegend": true, + "legendDisplay": "floating", + "showPlotValuesInLegend": false, + "isSynced": true, + "xAxisVal": "timestamp", + "sidebarDimension": 240 + } + }, + "globalVariables": {}, + "userNodes": {}, + "playbackConfig": { + "speed": 1 + }, + "layout": { + "first": { + "first": "Image!1dpphsz", + "second": "Image!2xvd0hl", + "direction": "row" + }, + "second": { + "first": "Gauge!1iofczz", + "second": "Plot!1gy7vh9", + "direction": "row" + }, + "direction": "column" + } +} diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 55e9e7920c..390c21ee3a 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -285,6 +285,9 @@ def sharpness(self) -> float: """ Compute the Tenengrad focus measure for an image. Returns a normalized value between 0 and 1, where 1 is sharpest. + + Uses adaptive normalization based on image statistics for better + discrimination across different image types. """ grayscale = self.to_grayscale() # Sobel gradient computation in x and y directions @@ -294,10 +297,23 @@ def sharpness(self) -> float: # Compute gradient magnitude magnitude = cv2.magnitude(sx, sy) - # Return normalized mean (0-1 range) - # Max theoretical magnitude ~1442, but typically much lower - # Using 255 as practical upper bound for 8-bit images - return min(magnitude.mean() / 255.0, 1.0) + mean_mag = magnitude.mean() + + # Use log-scale normalization for better discrimination + # This maps typical values more evenly across the 0-1 range: + # - Blurry images (mean ~50-150): 0.15-0.35 + # - Medium sharp (mean ~150-500): 0.35-0.65 + # - Sharp images (mean ~500-2000): 0.65-0.85 + # - Very sharp (mean >2000): 0.85-1.0 + + if mean_mag <= 0: + return 0.0 + + # Log scale with offset to handle the full range + # log10(50) ≈ 1.7, log10(5000) ≈ 3.7 + normalized = (np.log10(mean_mag + 1) - 1.7) / 2.0 + + return np.clip(normalized, 0.0, 1.0) def save(self, filepath: str) -> bool: """Save image to file.""" From f15150c15944070806e6998220f2b9ac6adc443c Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 22 Aug 2025 15:46:13 +0300 Subject: [PATCH 10/10] correct best frame selection --- dimos/msgs/sensor_msgs/Image.py | 22 +++++++++++++--------- dimos/msgs/sensor_msgs/test_image.py | 23 ++++++++++++++++++++--- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 390c21ee3a..46bf76c9f7 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -29,7 +29,7 @@ from reactivex.observable import Observable from reactivex.scheduler import ThreadPoolScheduler -from dimos.types.timestamped import Timestamped +from dimos.types.timestamped import Timestamped, TimestampedBufferCollection class ImageFormat(Enum): @@ -487,12 +487,16 @@ def agent_encode(self) -> str: def sharpness_window(target_frequency: float, source: Observable[Image]) -> Observable[Image]: - pool = ThreadPoolScheduler() - window = timedelta(seconds=1 / target_frequency) - - return source.pipe( - ops.flat_map(lambda img: rx.start(lambda: (img.sharpness(), img), scheduler=pool)), - ops.buffer_with_time(window, window), - ops.filter(bool), - ops.map(lambda buf: max(buf, key=lambda t: t[0])[1]), + window = TimestampedBufferCollection(1.0 / target_frequency) + source.subscribe(window.add) + + thread_scheduler = ThreadPoolScheduler(max_workers=1) + + def find_best(*argv): + if not window._items: + return None + return max(window._items, key=lambda x: x.sharpness()) + + return rx.interval(1.0 / target_frequency).pipe( + ops.observe_on(thread_scheduler), ops.map(find_best) ) diff --git a/dimos/msgs/sensor_msgs/test_image.py b/dimos/msgs/sensor_msgs/test_image.py index 9622ea35a0..7a3c84a8b6 100644 --- a/dimos/msgs/sensor_msgs/test_image.py +++ b/dimos/msgs/sensor_msgs/test_image.py @@ -89,6 +89,7 @@ def test_sharpness_sliding_window_foxglove(): lcm = LCM() lcm.start() + ping = 0 sharp_topic = Topic("/sharp", Image) all_topic = Topic("/all", Image) sharpness_topic = Topic("/sharpness", Vector3) @@ -100,12 +101,28 @@ def test_sharpness_sliding_window_foxglove(): # Publish all images to all_topic video_stream.subscribe(lambda x: lcm.publish(all_topic, x)) - video_stream.subscribe(lambda x: lcm.publish(sharpness_topic, Vector3([x.sharpness(), 0, 0]))) - # Publish sharp images to sharp_topic + def sharpness_vector(x: Image): + nonlocal ping + sharpness = x.sharpness() + if ping: + y = 1 + ping = ping - 1 + else: + y = 0 + + return Vector3([sharpness, y, 0]) + + video_stream.subscribe(lambda x: lcm.publish(sharpness_topic, sharpness_vector(x))) + + def pub_sharp(x: Image): + nonlocal ping + ping = 3 + lcm.publish(sharp_topic, x) + sharpness_window( 1, source=video_stream, - ).subscribe(lambda x: lcm.publish(sharp_topic, x)) + ).subscribe(pub_sharp) time.sleep(120)