Skip to content
140 changes: 140 additions & 0 deletions assets/foxglove_image_sharpness_test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
{
"configById": {
"Image!1dpphsz": {
"cameraState": {
"distance": 20,
"perspective": true,
"phi": 60,
"target": [
0,
0,
0
],
"targetOffset": [
0,
0,
0
],
"targetOrientation": [
0,
0,
0,
1
],
"thetaOffset": 45,
"fovy": 45,
"near": 0.5,
"far": 5000
},
"followMode": "follow-pose",
"scene": {},
"transforms": {},
"topics": {},
"layers": {},
"publish": {
"type": "point",
"poseTopic": "/move_base_simple/goal",
"pointTopic": "/clicked_point",
"poseEstimateTopic": "/initialpose",
"poseEstimateXDeviation": 0.5,
"poseEstimateYDeviation": 0.5,
"poseEstimateThetaDeviation": 0.26179939
},
"imageMode": {
"imageTopic": "/all"
}
},
"Image!2xvd0hl": {
"cameraState": {
"distance": 20,
"perspective": true,
"phi": 60,
"target": [
0,
0,
0
],
"targetOffset": [
0,
0,
0
],
"targetOrientation": [
0,
0,
0,
1
],
"thetaOffset": 45,
"fovy": 45,
"near": 0.5,
"far": 5000
},
"followMode": "follow-pose",
"scene": {},
"transforms": {},
"topics": {},
"layers": {},
"publish": {
"type": "point",
"poseTopic": "/move_base_simple/goal",
"pointTopic": "/clicked_point",
"poseEstimateTopic": "/initialpose",
"poseEstimateXDeviation": 0.5,
"poseEstimateYDeviation": 0.5,
"poseEstimateThetaDeviation": 0.26179939
},
"imageMode": {
"imageTopic": "/sharp"
}
},
"Gauge!1iofczz": {
"path": "/sharpness.x",
"minValue": 0,
"maxValue": 1,
"colorMap": "red-yellow-green",
"colorMode": "colormap",
"gradient": [
"#0000ff",
"#ff00ff"
],
"reverse": false
},
"Plot!1gy7vh9": {
"paths": [
{
"timestampMethod": "receiveTime",
"value": "/sharpness.x",
"enabled": true,
"color": "#4e98e2"
}
],
"showXAxisLabels": true,
"showYAxisLabels": true,
"showLegend": true,
"legendDisplay": "floating",
"showPlotValuesInLegend": false,
"isSynced": true,
"xAxisVal": "timestamp",
"sidebarDimension": 240
}
},
"globalVariables": {},
"userNodes": {},
"playbackConfig": {
"speed": 1
},
"layout": {
"first": {
"first": "Image!1dpphsz",
"second": "Image!2xvd0hl",
"direction": "row"
},
"second": {
"first": "Gauge!1iofczz",
"second": "Plot!1gy7vh9",
"direction": "row"
},
"direction": "column"
}
}
57 changes: 56 additions & 1 deletion dimos/msgs/sensor_msgs/Image.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@

import time
from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from typing import Optional, Tuple

import cv2
import numpy as np
import reactivex as rx

# Import LCM types
from dimos_lcm.sensor_msgs.Image import Image as LCMImage
from dimos_lcm.std_msgs.Header import Header
from reactivex import operators as ops
from reactivex.observable import Observable
from reactivex.scheduler import ThreadPoolScheduler

from dimos.types.timestamped import Timestamped
from dimos.types.timestamped import Timestamped, TimestampedBufferCollection


class ImageFormat(Enum):
Expand Down Expand Up @@ -279,6 +284,40 @@ def crop(self, x: int, y: int, width: int, height: int) -> "Image":
ts=self.ts,
)

def sharpness(self) -> float:
"""
Compute the Tenengrad focus measure for an image.
Returns a normalized value between 0 and 1, where 1 is sharpest.

Uses adaptive normalization based on image statistics for better
discrimination across different image types.
"""
grayscale = self.to_grayscale()
# Sobel gradient computation in x and y directions
sx = cv2.Sobel(grayscale.data, cv2.CV_32F, 1, 0, ksize=5)
sy = cv2.Sobel(grayscale.data, cv2.CV_32F, 0, 1, ksize=5)

# Compute gradient magnitude
magnitude = cv2.magnitude(sx, sy)

mean_mag = magnitude.mean()

# Use log-scale normalization for better discrimination
# This maps typical values more evenly across the 0-1 range:
# - Blurry images (mean ~50-150): 0.15-0.35
# - Medium sharp (mean ~150-500): 0.35-0.65
# - Sharp images (mean ~500-2000): 0.65-0.85
# - Very sharp (mean >2000): 0.85-1.0

if mean_mag <= 0:
return 0.0

# Log scale with offset to handle the full range
# log10(50) ≈ 1.7, log10(5000) ≈ 3.7
normalized = (np.log10(mean_mag + 1) - 1.7) / 2.0

return np.clip(normalized, 0.0, 1.0)

def save(self, filepath: str) -> bool:
"""Save image to file."""
# Convert to OpenCV format for saving
Expand Down Expand Up @@ -456,3 +495,19 @@ def agent_encode(self) -> str:
base64_str = base64.b64encode(jpeg_bytes).decode("utf-8")

return base64_str


def sharpness_window(target_frequency: float, source: Observable[Image]) -> Observable[Image]:
window = TimestampedBufferCollection(1.0 / target_frequency)
source.subscribe(window.add)

thread_scheduler = ThreadPoolScheduler(max_workers=1)

def find_best(*argv):
if not window._items:
return None
return max(window._items, key=lambda x: x.sharpness())

return rx.interval(1.0 / target_frequency).pipe(
ops.observe_on(thread_scheduler), ops.map(find_best)
)
67 changes: 66 additions & 1 deletion dimos/msgs/sensor_msgs/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import numpy as np
import pytest

from dimos.msgs.sensor_msgs.Image import Image, ImageFormat
from dimos.msgs.sensor_msgs.Image import Image, ImageFormat, sharpness_window
from dimos.utils.data import get_data
from dimos.utils.testing import TimedSensorReplay


@pytest.fixture
Expand Down Expand Up @@ -61,3 +62,67 @@ def test_opencv_conversion(img: Image):
# artificially patch timestamp
decoded_img.ts = img.ts
assert decoded_img == img


@pytest.mark.tool
def test_sharpness_detector():
get_data("unitree_office_walk") # Preload data for testing
video_store = TimedSensorReplay(
"unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb()
)

cnt = 0
for image in video_store.iterate():
cnt = cnt + 1
print(image.sharpness())
if cnt > 30:
return


@pytest.mark.tool
def test_sharpness_sliding_window_foxglove():
import time

from dimos.msgs.geometry_msgs import Vector3
from dimos.protocol.pubsub.lcmpubsub import LCM, Topic

lcm = LCM()
lcm.start()

ping = 0
sharp_topic = Topic("/sharp", Image)
all_topic = Topic("/all", Image)
sharpness_topic = Topic("/sharpness", Vector3)

get_data("unitree_office_walk") # Preload data for testing
video_stream = TimedSensorReplay(
"unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb()
).stream()

# Publish all images to all_topic
video_stream.subscribe(lambda x: lcm.publish(all_topic, x))

def sharpness_vector(x: Image):
nonlocal ping
sharpness = x.sharpness()
if ping:
y = 1
ping = ping - 1
else:
y = 0

return Vector3([sharpness, y, 0])

video_stream.subscribe(lambda x: lcm.publish(sharpness_topic, sharpness_vector(x)))

def pub_sharp(x: Image):
nonlocal ping
ping = 3
lcm.publish(sharp_topic, x)

sharpness_window(
1,
source=video_stream,
).subscribe(pub_sharp)

time.sleep(120)
Loading