From fcd8f843aada01e0ab9751d9581acc4303cd399d Mon Sep 17 00:00:00 2001 From: leshy Date: Thu, 9 Oct 2025 00:23:24 +0300 Subject: [PATCH] Revert "Image upgrades! Impls for CUDA + numpy, along with an abstraction and full backwards compatibility" --- dimos/msgs/sensor_msgs/Image.py | 823 ++++++++-------- .../sensor_msgs/image_impls/AbstractImage.py | 210 ---- .../msgs/sensor_msgs/image_impls/CudaImage.py | 895 ------------------ .../sensor_msgs/image_impls/NumpyImage.py | 214 ----- dimos/perception/common/utils.py | 478 +++------- pyproject.toml | 3 +- tests/test_image_backend_utils.py | 279 ------ tests/test_image_backends.py | 534 ----------- 8 files changed, 546 insertions(+), 2890 deletions(-) delete mode 100644 dimos/msgs/sensor_msgs/image_impls/AbstractImage.py delete mode 100644 dimos/msgs/sensor_msgs/image_impls/CudaImage.py delete mode 100644 dimos/msgs/sensor_msgs/image_impls/NumpyImage.py delete mode 100644 tests/test_image_backend_utils.py delete mode 100644 tests/test_image_backends.py diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 7f77d71e70..30c74fd243 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -12,37 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations - import base64 import functools import time -from typing import Optional +from dataclasses import dataclass, field +from enum import Enum +from typing import Literal, Optional, Tuple, TypedDict import cv2 import numpy as np import reactivex as rx -from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ( - AbstractImage, - HAS_CUDA, - HAS_NVIMGCODEC, - ImageFormat, - NVIMGCODEC_LAST_USED, -) -from dimos.msgs.sensor_msgs.image_impls.CudaImage import CudaImage -from dimos.msgs.sensor_msgs.image_impls.NumpyImage import NumpyImage from dimos_lcm.sensor_msgs.Image import Image as LCMImage from dimos_lcm.std_msgs.Header import Header -from dimos.types.timestamped import TimestampedBufferCollection, to_human_readable -from dimos.utils.reactive import quality_barrier from reactivex import operators as ops from reactivex.observable import Observable from reactivex.scheduler import ThreadPoolScheduler -try: - import cupy as cp # type: ignore -except Exception: - cp = None # type: ignore +from dimos.types.timestamped import Timestamped, TimestampedBufferCollection, to_human_readable +from dimos.utils.reactive import quality_barrier try: from sensor_msgs.msg import Image as ROSImage @@ -50,366 +37,470 @@ ROSImage = None -class Image: +class ImageFormat(Enum): + """Supported image formats for internal representation.""" + + BGR = "BGR" # 8-bit Blue-Green-Red color + RGB = "RGB" # 8-bit Red-Green-Blue color + RGBA = "RGBA" # 8-bit RGB with Alpha + BGRA = "BGRA" # 8-bit BGR with Alpha + GRAY = "GRAY" # 8-bit Grayscale + GRAY16 = "GRAY16" # 16-bit Grayscale + DEPTH = "DEPTH" # 32-bit Float Depth + DEPTH16 = "DEPTH16" # 16-bit Integer Depth (millimeters) + + +class AgentImageMessage(TypedDict): + """Type definition for agent-compatible image representation.""" + + type: Literal["image"] + source_type: Literal["base64"] + mime_type: Literal["image/jpeg", "image/png"] + data: str # Base64 encoded image data + + +@dataclass +class Image(Timestamped): + """Standardized image type with LCM integration.""" + msg_name = "sensor_msgs.Image" + data: np.ndarray + format: ImageFormat = field(default=ImageFormat.BGR) + frame_id: str = field(default="") + ts: float = field(default_factory=time.time) - def __init__( - self, - impl: AbstractImage | None = None, - *, - data=None, - format: ImageFormat | None = None, - frame_id: str | None = None, - ts: float | None = None, - ): - """Construct an Image facade. - - Usage: - - Image(impl=) - - Image(data=, format=ImageFormat.RGB, frame_id=str, ts=float) - - Notes: - - When constructed from `data`, uses CudaImage if `data` is a CuPy array and CUDA is available; otherwise NumpyImage. - - `format` defaults to ImageFormat.RGB; `frame_id` defaults to ""; `ts` defaults to `time.time()`. - """ - # Disallow mixing impl with raw kwargs - if impl is not None and any(x is not None for x in (data, format, frame_id, ts)): - raise TypeError( - "Provide either 'impl' or ('data', 'format', 'frame_id', 'ts'), not both" - ) + def __str__(self): + return f"Image(shape={self.shape}, format={self.format}, dtype={self.dtype}, ts={to_human_readable(self.ts)})" - if impl is not None: - self._impl = impl - return + def __post_init__(self): + """Validate image data and format.""" + if self.data is None: + raise ValueError("Image data cannot be None") - # Raw constructor path - if data is None: - raise TypeError("'data' is required when constructing Image without 'impl'") - fmt = format if format is not None else ImageFormat.RGB - fid = frame_id if frame_id is not None else "" - tstamp = ts if ts is not None else time.time() + if not isinstance(self.data, np.ndarray): + raise ValueError("Image data must be a numpy array") - # Detect CuPy array without a hard dependency - is_cu = False - try: - import cupy as _cp # type: ignore + if len(self.data.shape) < 2: + raise ValueError("Image data must be at least 2D") - is_cu = isinstance(data, _cp.ndarray) - except Exception: - is_cu = False + # Ensure data is contiguous for efficient operations + if not self.data.flags["C_CONTIGUOUS"]: + self.data = np.ascontiguousarray(self.data) + + @property + def height(self) -> int: + """Get image height.""" + return self.data.shape[0] + + @property + def width(self) -> int: + """Get image width.""" + return self.data.shape[1] - if is_cu and HAS_CUDA: - self._impl = CudaImage(data, fmt, fid, tstamp) # type: ignore + @property + def channels(self) -> int: + """Get number of channels.""" + if len(self.data.shape) == 2: + return 1 + elif len(self.data.shape) == 3: + return self.data.shape[2] else: - self._impl = NumpyImage(np.asarray(data), fmt, fid, tstamp) + raise ValueError("Invalid image dimensions") - def __str__(self) -> str: - dev = "cuda" if self.is_cuda else "cpu" - return ( - f"Image(shape={self.shape}, format={self.format.value}, dtype={self.dtype}, " - f"dev={dev}, ts={to_human_readable(self.ts)})" + @property + def shape(self) -> Tuple[int, ...]: + """Get image shape.""" + return self.data.shape + + @property + def dtype(self) -> np.dtype: + """Get image data type.""" + return self.data.dtype + + def copy(self) -> "Image": + """Create a deep copy of the image.""" + return self.__class__( + data=self.data.copy(), + format=self.format, + frame_id=self.frame_id, + ts=self.ts, ) @classmethod - def from_impl(cls, impl: AbstractImage) -> "Image": - return cls(impl) + def from_opencv( + cls, cv_image: np.ndarray, format: ImageFormat = ImageFormat.BGR, **kwargs + ) -> "Image": + """Create Image from OpenCV image array.""" + return cls(data=cv_image, format=format, **kwargs) @classmethod def from_numpy( - cls, - np_image: np.ndarray, - format: ImageFormat = ImageFormat.RGB, - to_cuda: bool = False, - **kwargs, + cls, np_image: np.ndarray, format: ImageFormat = ImageFormat.BGR, **kwargs ) -> "Image": - if kwargs.pop("to_gpu", False): - to_cuda = True - if to_cuda and HAS_CUDA: - return cls( - CudaImage( - np_image if hasattr(np_image, "shape") else np.asarray(np_image), - format, - kwargs.get("frame_id", ""), - kwargs.get("ts", time.time()), - ) - ) # type: ignore - return cls( - NumpyImage( - np.asarray(np_image), - format, - kwargs.get("frame_id", ""), - kwargs.get("ts", time.time()), - ) - ) + """Create Image from numpy array.""" + return cls(data=np_image, format=format, **kwargs) @classmethod - def from_file( - cls, filepath: str, format: ImageFormat = ImageFormat.RGB, to_cuda: bool = False, **kwargs - ) -> "Image": - if kwargs.pop("to_gpu", False): - to_cuda = True - arr = cv2.imread(filepath, cv2.IMREAD_UNCHANGED) - if arr is None: + def from_file(cls, filepath: str, format: ImageFormat = ImageFormat.BGR) -> "Image": + """Load image from file.""" + # OpenCV loads as BGR by default + cv_image = cv2.imread(filepath, cv2.IMREAD_UNCHANGED) + if cv_image is None: raise ValueError(f"Could not load image from {filepath}") - if arr.ndim == 2: - detected = ImageFormat.GRAY16 if arr.dtype == np.uint16 else ImageFormat.GRAY - elif arr.shape[2] == 3: - detected = ImageFormat.BGR # OpenCV default - elif arr.shape[2] == 4: - detected = ImageFormat.BGRA # OpenCV default + + # Detect format based on channels and data type + if len(cv_image.shape) == 2: + if cv_image.dtype == np.uint16: + detected_format = ImageFormat.GRAY16 + else: + detected_format = ImageFormat.GRAY + elif cv_image.shape[2] == 3: + detected_format = ImageFormat.BGR # OpenCV default + elif cv_image.shape[2] == 4: + detected_format = ImageFormat.BGRA else: - detected = format - return cls(CudaImage(arr, detected) if to_cuda and HAS_CUDA else NumpyImage(arr, detected)) # type: ignore + detected_format = format + + return cls(data=cv_image, format=detected_format) @classmethod - def from_opencv( - cls, cv_image: np.ndarray, format: ImageFormat = ImageFormat.BGR, **kwargs - ) -> "Image": - """Construct from an OpenCV image (NumPy array).""" + def from_depth(cls, depth_data: np.ndarray, frame_id: str = "", ts: float = None) -> "Image": + """Create Image from depth data (float32 array).""" + if depth_data.dtype != np.float32: + depth_data = depth_data.astype(np.float32) + return cls( - NumpyImage(cv_image, format, kwargs.get("frame_id", ""), kwargs.get("ts", time.time())) + data=depth_data, + format=ImageFormat.DEPTH, + frame_id=frame_id, + ts=ts if ts is not None else time.time(), ) - @classmethod - def from_depth( - cls, depth_data, frame_id: str = "", ts: float = None, to_cuda: bool = False - ) -> "Image": - arr = np.asarray(depth_data) - if arr.dtype != np.float32: - arr = arr.astype(np.float32) - impl = ( - CudaImage(arr, ImageFormat.DEPTH, frame_id, time.time() if ts is None else ts) - if to_cuda and HAS_CUDA - else NumpyImage(arr, ImageFormat.DEPTH, frame_id, time.time() if ts is None else ts) - ) # type: ignore - return cls(impl) - - # Delegation - @property - def is_cuda(self) -> bool: - return self._impl.is_cuda - - @property - def data(self): - return self._impl.data - - @data.setter - def data(self, value) -> None: - # Preserve backend semantics: ensure array type matches implementation - if isinstance(self._impl, NumpyImage): - self._impl.data = np.asarray(value) - elif isinstance(self._impl, CudaImage): # type: ignore - if cp is None: - raise RuntimeError("CuPy not available to set CUDA image data") - self._impl.data = cp.asarray(value) # type: ignore + def to_opencv(self) -> np.ndarray: + """Convert to OpenCV-compatible array (BGR format).""" + if self.format == ImageFormat.BGR: + return self.data + elif self.format == ImageFormat.RGB: + return cv2.cvtColor(self.data, cv2.COLOR_RGB2BGR) + elif self.format == ImageFormat.RGBA: + return cv2.cvtColor(self.data, cv2.COLOR_RGBA2BGR) + elif self.format == ImageFormat.BGRA: + return cv2.cvtColor(self.data, cv2.COLOR_BGRA2BGR) + elif self.format == ImageFormat.GRAY: + return self.data + elif self.format == ImageFormat.GRAY16: + return self.data + elif self.format == ImageFormat.DEPTH: + return self.data # Depth images are already in the correct format + elif self.format == ImageFormat.DEPTH16: + return self.data # 16-bit depth images are already in the correct format else: - self._impl.data = value + raise ValueError(f"Unsupported format conversion: {self.format}") - @property - def format(self) -> ImageFormat: - return self._impl.format - - @format.setter - def format(self, value) -> None: - if isinstance(value, ImageFormat): - self._impl.format = value - elif isinstance(value, str): - try: - self._impl.format = ImageFormat[value] - except KeyError as e: - raise ValueError(f"Invalid ImageFormat: {value}") from e + def to_rgb(self) -> "Image": + """Convert image to RGB format.""" + if self.format == ImageFormat.RGB: + return self.copy() + elif self.format == ImageFormat.BGR: + rgb_data = cv2.cvtColor(self.data, cv2.COLOR_BGR2RGB) + elif self.format == ImageFormat.RGBA: + return self.copy() # Already RGB with alpha + elif self.format == ImageFormat.BGRA: + rgb_data = cv2.cvtColor(self.data, cv2.COLOR_BGRA2RGBA) + elif self.format == ImageFormat.GRAY: + rgb_data = cv2.cvtColor(self.data, cv2.COLOR_GRAY2RGB) + elif self.format == ImageFormat.GRAY16: + # Convert 16-bit grayscale to 8-bit then to RGB + gray8 = (self.data / 256).astype(np.uint8) + rgb_data = cv2.cvtColor(gray8, cv2.COLOR_GRAY2RGB) else: - raise TypeError("format must be ImageFormat or str name") - - @property - def frame_id(self) -> str: - return self._impl.frame_id + raise ValueError(f"Unsupported format conversion from {self.format} to RGB") - @frame_id.setter - def frame_id(self, value: str) -> None: - self._impl.frame_id = str(value) - - @property - def ts(self) -> float: - return self._impl.ts - - @ts.setter - def ts(self, value: float) -> None: - self._impl.ts = float(value) - - @property - def height(self) -> int: - return self._impl.height + return self.__class__( + data=rgb_data, + format=ImageFormat.RGB if self.format != ImageFormat.BGRA else ImageFormat.RGBA, + frame_id=self.frame_id, + ts=self.ts, + ) - @property - def width(self) -> int: - return self._impl.width + def to_bgr(self) -> "Image": + """Convert image to BGR format.""" + if self.format == ImageFormat.BGR: + return self.copy() + elif self.format == ImageFormat.RGB: + bgr_data = cv2.cvtColor(self.data, cv2.COLOR_RGB2BGR) + elif self.format == ImageFormat.RGBA: + bgr_data = cv2.cvtColor(self.data, cv2.COLOR_RGBA2BGR) + elif self.format == ImageFormat.BGRA: + bgr_data = cv2.cvtColor(self.data, cv2.COLOR_BGRA2BGR) + elif self.format == ImageFormat.GRAY: + bgr_data = cv2.cvtColor(self.data, cv2.COLOR_GRAY2BGR) + elif self.format == ImageFormat.GRAY16: + # Convert 16-bit grayscale to 8-bit then to BGR + gray8 = (self.data / 256).astype(np.uint8) + bgr_data = cv2.cvtColor(gray8, cv2.COLOR_GRAY2BGR) + else: + raise ValueError(f"Unsupported format conversion from {self.format} to BGR") - @property - def channels(self) -> int: - return self._impl.channels + return self.__class__( + data=bgr_data, + format=ImageFormat.BGR, + frame_id=self.frame_id, + ts=self.ts, + ) - @property - def shape(self): - return self._impl.shape + def to_grayscale(self) -> "Image": + """Convert image to grayscale.""" + if self.format == ImageFormat.GRAY: + return self.copy() + elif self.format == ImageFormat.GRAY16: + return self.copy() + elif self.format == ImageFormat.BGR: + gray_data = cv2.cvtColor(self.data, cv2.COLOR_BGR2GRAY) + elif self.format == ImageFormat.RGB: + gray_data = cv2.cvtColor(self.data, cv2.COLOR_RGB2GRAY) + elif self.format == ImageFormat.RGBA: + gray_data = cv2.cvtColor(self.data, cv2.COLOR_RGBA2GRAY) + elif self.format == ImageFormat.BGRA: + gray_data = cv2.cvtColor(self.data, cv2.COLOR_BGRA2GRAY) + else: + raise ValueError(f"Unsupported format conversion from {self.format} to grayscale") - @property - def dtype(self): - return self._impl.dtype + return self.__class__( + data=gray_data, + format=ImageFormat.GRAY, + frame_id=self.frame_id, + ts=self.ts, + ) - def copy(self) -> "Image": - return Image(self._impl.copy()) + def resize(self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR) -> "Image": + """Resize the image to the specified dimensions.""" + resized_data = cv2.resize(self.data, (width, height), interpolation=interpolation) + + return self.__class__( + data=resized_data, + format=self.format, + frame_id=self.frame_id, + ts=self.ts, + ) - def to_cpu(self) -> "Image": - if isinstance(self._impl, NumpyImage): - return self.copy() - return Image( - NumpyImage( - np.asarray(self._impl.to_opencv()), - self._impl.format, - self._impl.frame_id, - self._impl.ts, - ) + def crop(self, x: int, y: int, width: int, height: int) -> "Image": + """Crop the image to the specified region.""" + # Ensure crop region is within image bounds + x = max(0, min(x, self.width)) + y = max(0, min(y, self.height)) + x2 = min(x + width, self.width) + y2 = min(y + height, self.height) + + cropped_data = self.data[y:y2, x:x2] + + return self.__class__( + data=cropped_data, + format=self.format, + frame_id=self.frame_id, + ts=self.ts, ) - def to_cupy(self) -> "Image": - if isinstance(self._impl, CudaImage): - return self.copy() - return Image( - CudaImage( - np.asarray(self._impl.data), self._impl.format, self._impl.frame_id, self._impl.ts - ) - ) # type: ignore + @functools.cached_property + def sharpness(self) -> float: + """ + Compute the Tenengrad focus measure for an image. + Returns a normalized value between 0 and 1, where 1 is sharpest. - def to_opencv(self) -> np.ndarray: - return self._impl.to_opencv() + Uses adaptive normalization based on image statistics for better + discrimination across different image types. + """ + grayscale = self.to_grayscale() + # Sobel gradient computation in x and y directions + sx = cv2.Sobel(grayscale.data, cv2.CV_32F, 1, 0, ksize=5) + sy = cv2.Sobel(grayscale.data, cv2.CV_32F, 0, 1, ksize=5) - def to_rgb(self) -> "Image": - return Image(self._impl.to_rgb()) + # Compute gradient magnitude + magnitude = cv2.magnitude(sx, sy) - def to_bgr(self) -> "Image": - return Image(self._impl.to_bgr()) + mean_mag = magnitude.mean() - def to_grayscale(self) -> "Image": - return Image(self._impl.to_grayscale()) + # Use log-scale normalization for better discrimination + # This maps typical values more evenly across the 0-1 range: + # - Blurry images (mean ~50-150): 0.15-0.35 + # - Medium sharp (mean ~150-500): 0.35-0.65 + # - Sharp images (mean ~500-2000): 0.65-0.85 + # - Very sharp (mean >2000): 0.85-1.0 - def resize(self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR) -> "Image": - return Image(self._impl.resize(width, height, interpolation)) + if mean_mag <= 0: + return 0.0 + + # Log scale with offset to handle the full range + # log10(50) ≈ 1.7, log10(5000) ≈ 3.7 + normalized = (np.log10(mean_mag + 1) - 1.7) / 2.0 - def sharpness(self): - """Return sharpness score as a callable float for backward compatibility.""" - return self._impl.sharpness() + return np.clip(normalized, 0.0, 1.0) def save(self, filepath: str) -> bool: - return self._impl.save(filepath) + """Save image to file.""" + # Convert to OpenCV format for saving + cv_image = self.to_opencv() + return cv2.imwrite(filepath, cv_image) - def to_base64( - self, - quality: int = 80, - *, - max_width: Optional[int] = None, - max_height: Optional[int] = None, - ) -> str: - """Encode the image as a base64 JPEG string. + def to_base64(self, max_width: int = 640, max_height: int = 480) -> str: + """Encode image to base64 JPEG format for agent processing. Args: - quality: JPEG quality (0-100). - max_width: Optional maximum width to constrain the encoded image. - max_height: Optional maximum height to constrain the encoded image. + max_width: Maximum width for resizing (default 640) + max_height: Maximum height for resizing (default 480) Returns: - Base64-encoded JPEG representation of the image. + Base64 encoded JPEG string suitable for LLM/agent consumption. """ - bgr_image = self.to_bgr().to_opencv() - height, width = bgr_image.shape[:2] - - scale = 1.0 - if max_width is not None and width > max_width: - scale = min(scale, max_width / width) - if max_height is not None and height > max_height: - scale = min(scale, max_height / height) - - if scale < 1.0: - new_width = max(1, int(round(width * scale))) - new_height = max(1, int(round(height * scale))) - bgr_image = cv2.resize(bgr_image, (new_width, new_height), interpolation=cv2.INTER_AREA) - - encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), int(np.clip(quality, 0, 100))] - success, buffer = cv2.imencode(".jpg", bgr_image, encode_param) + bgr_image = self.to_bgr() + + # Encode as JPEG + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] # 80% quality + success, buffer = cv2.imencode(".jpg", bgr_image.data, encode_param) + if not success: raise ValueError("Failed to encode image as JPEG") - return base64.b64encode(buffer.tobytes()).decode("utf-8") + # Convert to base64 + + jpeg_bytes = buffer.tobytes() + base64_str = base64.b64encode(jpeg_bytes).decode("utf-8") - def agent_encode(self, quality: int = 80) -> str: - """Return a base64-encoded JPEG suitable for agent pipelines.""" - return self.to_base64(quality=quality) + return base64_str - # LCM encode/decode - def lcm_encode(self, frame_id: Optional[str] = None) -> bytes: + def agent_encode(self) -> AgentImageMessage: + return [ + { + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{self.to_base64()}"}, + } + ] + + def lcm_encode(self, frame_id: Optional[str] = None) -> LCMImage: + """Convert to LCM Image message.""" msg = LCMImage() + + # Header msg.header = Header() - msg.header.seq = 0 + msg.header.seq = 0 # Initialize sequence number msg.header.frame_id = frame_id or self.frame_id + + # Set timestamp properly as Time object if self.ts is not None: msg.header.stamp.sec = int(self.ts) msg.header.stamp.nsec = int((self.ts - int(self.ts)) * 1e9) else: - now = time.time() - msg.header.stamp.sec = int(now) - msg.header.stamp.nsec = int((now - int(now)) * 1e9) - - arr = ( - self.to_opencv() - if self.format in (ImageFormat.BGR, ImageFormat.RGB, ImageFormat.RGBA, ImageFormat.BGRA) - else self.to_opencv() - ) - msg.height = int(arr.shape[0]) - msg.width = int(arr.shape[1]) - msg.encoding = _get_lcm_encoding(self.format, arr.dtype) - msg.is_bigendian = False - channels = 1 if arr.ndim == 2 else int(arr.shape[2]) - msg.step = int(arr.shape[1] * arr.dtype.itemsize * channels) - img_bytes = arr.tobytes() - msg.data_length = len(img_bytes) - msg.data = img_bytes + current_time = time.time() + msg.header.stamp.sec = int(current_time) + msg.header.stamp.nsec = int((current_time - int(current_time)) * 1e9) + + # Image properties + msg.height = self.height + msg.width = self.width + msg.encoding = self._get_lcm_encoding() # Convert format to LCM encoding + msg.is_bigendian = False # Use little endian + msg.step = self._get_row_step() + + # Image data + image_bytes = self.data.tobytes() + msg.data_length = len(image_bytes) + msg.data = image_bytes + return msg.lcm_encode() @classmethod def lcm_decode(cls, data: bytes, **kwargs) -> "Image": + """Create Image from LCM Image message.""" + # Parse encoding to determine format and data type msg = LCMImage.lcm_decode(data) - fmt, dtype, channels = _parse_lcm_encoding(msg.encoding) - arr = np.frombuffer(msg.data, dtype=dtype) - if channels == 1: - arr = arr.reshape((msg.height, msg.width)) + format_info = cls._parse_encoding(msg.encoding) + + # Convert bytes back to numpy array + data = np.frombuffer(msg.data, dtype=format_info["dtype"]) + + # Reshape to image dimensions + if format_info["channels"] == 1: + data = data.reshape((msg.height, msg.width)) else: - arr = arr.reshape((msg.height, msg.width, channels)) + data = data.reshape((msg.height, msg.width, format_info["channels"])) + return cls( - NumpyImage( - arr, - fmt, - msg.header.frame_id if hasattr(msg, "header") else "", - msg.header.stamp.sec + msg.header.stamp.nsec / 1e9 - if hasattr(msg, "header") and getattr(msg.header, "stamp", None) - else time.time(), - ) + data=data, + format=format_info["format"], + frame_id=msg.header.frame_id if hasattr(msg, "header") else "", + ts=msg.header.stamp.sec + msg.header.stamp.nsec / 1e9 + if hasattr(msg, "header") and msg.header.stamp.sec > 0 + else time.time(), + **kwargs, ) - # PnP wrappers - def solve_pnp(self, *args, **kwargs): - return self._impl.solve_pnp(*args, **kwargs) # type: ignore - - def solve_pnp_ransac(self, *args, **kwargs): - return self._impl.solve_pnp_ransac(*args, **kwargs) # type: ignore + def _get_row_step(self) -> int: + """Calculate row step (bytes per row).""" + bytes_per_pixel = self._get_bytes_per_pixel() + return self.width * bytes_per_pixel + + def _get_bytes_per_pixel(self) -> int: + """Calculate bytes per pixel based on format and data type.""" + bytes_per_element = self.data.dtype.itemsize + return self.channels * bytes_per_element + + def _get_lcm_encoding(self) -> str: + """Get LCM encoding string from internal format and data type.""" + # Map internal format to LCM encoding based on format and dtype + if self.format == ImageFormat.GRAY: + if self.dtype == np.uint8: + return "mono8" + elif self.dtype == np.uint16: + return "mono16" + elif self.format == ImageFormat.GRAY16: + return "mono16" + elif self.format == ImageFormat.RGB: + return "rgb8" + elif self.format == ImageFormat.RGBA: + return "rgba8" + elif self.format == ImageFormat.BGR: + return "bgr8" + elif self.format == ImageFormat.BGRA: + return "bgra8" + elif self.format == ImageFormat.DEPTH: + if self.dtype == np.float32: + return "32FC1" + elif self.dtype == np.float64: + return "64FC1" + elif self.format == ImageFormat.DEPTH16: + if self.dtype == np.uint16: + return "16UC1" # 16-bit unsigned depth + elif self.dtype == np.int16: + return "16SC1" # 16-bit signed depth + + raise ValueError( + f"Cannot determine LCM encoding for format={self.format}, dtype={self.dtype}" + ) - def solve_pnp_batch(self, *args, **kwargs): - return self._impl.solve_pnp_batch(*args, **kwargs) # type: ignore + @staticmethod + def _parse_encoding(encoding: str) -> dict: + """Parse LCM image encoding string to determine format and data type.""" + # Standard encodings + encoding_map = { + "mono8": {"format": ImageFormat.GRAY, "dtype": np.uint8, "channels": 1}, + "mono16": {"format": ImageFormat.GRAY16, "dtype": np.uint16, "channels": 1}, + "rgb8": {"format": ImageFormat.RGB, "dtype": np.uint8, "channels": 3}, + "rgba8": {"format": ImageFormat.RGBA, "dtype": np.uint8, "channels": 4}, + "bgr8": {"format": ImageFormat.BGR, "dtype": np.uint8, "channels": 3}, + "bgra8": {"format": ImageFormat.BGRA, "dtype": np.uint8, "channels": 4}, + # Depth/float encodings + "32FC1": {"format": ImageFormat.DEPTH, "dtype": np.float32, "channels": 1}, + "32FC3": {"format": ImageFormat.RGB, "dtype": np.float32, "channels": 3}, + "64FC1": {"format": ImageFormat.DEPTH, "dtype": np.float64, "channels": 1}, + # 16-bit depth encodings + "16UC1": {"format": ImageFormat.DEPTH16, "dtype": np.uint16, "channels": 1}, + "16SC1": {"format": ImageFormat.DEPTH16, "dtype": np.int16, "channels": 1}, + } - def create_csrt_tracker(self, *args, **kwargs): - return self._impl.create_csrt_tracker(*args, **kwargs) # type: ignore + if encoding not in encoding_map: + raise ValueError(f"Unsupported encoding: {encoding}") - def csrt_update(self, *args, **kwargs): - return self._impl.csrt_update(*args, **kwargs) # type: ignore + return encoding_map[encoding] @classmethod def from_ros_msg(cls, ros_msg: ROSImage) -> "Image": @@ -472,135 +563,33 @@ def from_ros_msg(cls, ros_msg: ROSImage) -> "Image": ts=ts, ) - @staticmethod - def _parse_encoding(encoding: str) -> dict: - """Translate ROS encoding strings into format metadata.""" - encoding_map = { - "mono8": {"format": ImageFormat.GRAY, "dtype": np.uint8, "channels": 1}, - "mono16": {"format": ImageFormat.GRAY16, "dtype": np.uint16, "channels": 1}, - "rgb8": {"format": ImageFormat.RGB, "dtype": np.uint8, "channels": 3}, - "rgba8": {"format": ImageFormat.RGBA, "dtype": np.uint8, "channels": 4}, - "bgr8": {"format": ImageFormat.BGR, "dtype": np.uint8, "channels": 3}, - "bgra8": {"format": ImageFormat.BGRA, "dtype": np.uint8, "channels": 4}, - "32FC1": {"format": ImageFormat.DEPTH, "dtype": np.float32, "channels": 1}, - "32FC3": {"format": ImageFormat.RGB, "dtype": np.float32, "channels": 3}, - "64FC1": {"format": ImageFormat.DEPTH, "dtype": np.float64, "channels": 1}, - "16UC1": {"format": ImageFormat.DEPTH16, "dtype": np.uint16, "channels": 1}, - "16SC1": {"format": ImageFormat.DEPTH16, "dtype": np.int16, "channels": 1}, - } - - key = encoding.strip() - for candidate in (key, key.lower(), key.upper()): - if candidate in encoding_map: - return dict(encoding_map[candidate]) - - raise ValueError(f"Unsupported encoding: {encoding}") - def __repr__(self) -> str: - dev = "cuda" if self.is_cuda else "cpu" - return f"Image(shape={self.shape}, format={self.format.value}, dtype={self.dtype}, dev={dev}, frame_id='{self.frame_id}', ts={self.ts})" + """String representation.""" + return ( + f"Image(shape={self.shape}, format={self.format.value}, " + f"dtype={self.dtype}, frame_id='{self.frame_id}', ts={self.ts})" + ) def __eq__(self, other) -> bool: + """Check equality with another Image.""" if not isinstance(other, Image): return False + return ( - np.array_equal(self.to_opencv(), other.to_opencv()) + np.array_equal(self.data, other.data) and self.format == other.format and self.frame_id == other.frame_id and abs(self.ts - other.ts) < 1e-6 ) def __len__(self) -> int: - return int(self.height * self.width) - - -# Re-exports for tests -HAS_CUDA = HAS_CUDA -ImageFormat = ImageFormat -NVIMGCODEC_LAST_USED = NVIMGCODEC_LAST_USED -HAS_NVIMGCODEC = HAS_NVIMGCODEC -__all__ = [ - "HAS_CUDA", - "ImageFormat", - "NVIMGCODEC_LAST_USED", - "HAS_NVIMGCODEC", - "sharpness_window", - "sharpness_barrier", -] + """Return total number of pixels.""" + return self.height * self.width def sharpness_window(target_frequency: float, source: Observable[Image]) -> Observable[Image]: - """Emit the sharpest Image seen within each sliding time window.""" - if target_frequency <= 0: - raise ValueError("target_frequency must be positive") - - window = TimestampedBufferCollection(1.0 / target_frequency) - source.subscribe(window.add) - - thread_scheduler = ThreadPoolScheduler(max_workers=1) - - def find_best(*_args): - if not window._items: - return None - return max(window._items, key=lambda img: img.sharpness) - - return rx.interval(1.0 / target_frequency).pipe( - ops.observe_on(thread_scheduler), - ops.map(find_best), - ops.filter(lambda img: img is not None), - ) + raise NotImplementedError("use sharpness_barrier instead") def sharpness_barrier(target_frequency: float): - """Select the sharpest Image within each time window.""" - if target_frequency <= 0: - raise ValueError("target_frequency must be positive") - return quality_barrier(lambda image: image.sharpness, target_frequency) - - -def _get_lcm_encoding(fmt: ImageFormat, dtype: np.dtype) -> str: - if fmt == ImageFormat.GRAY: - if dtype == np.uint8: - return "mono8" - if dtype == np.uint16: - return "mono16" - if fmt == ImageFormat.GRAY16: - return "mono16" - if fmt == ImageFormat.RGB: - return "rgb8" - if fmt == ImageFormat.RGBA: - return "rgba8" - if fmt == ImageFormat.BGR: - return "bgr8" - if fmt == ImageFormat.BGRA: - return "bgra8" - if fmt == ImageFormat.DEPTH: - if dtype == np.float32: - return "32FC1" - if dtype == np.float64: - return "64FC1" - if fmt == ImageFormat.DEPTH16: - if dtype == np.uint16: - return "16UC1" - if dtype == np.int16: - return "16SC1" - raise ValueError(f"Unsupported LCM encoding for fmt={fmt}, dtype={dtype}") - - -def _parse_lcm_encoding(enc: str): - m = { - "mono8": (ImageFormat.GRAY, np.uint8, 1), - "mono16": (ImageFormat.GRAY16, np.uint16, 1), - "rgb8": (ImageFormat.RGB, np.uint8, 3), - "rgba8": (ImageFormat.RGBA, np.uint8, 4), - "bgr8": (ImageFormat.BGR, np.uint8, 3), - "bgra8": (ImageFormat.BGRA, np.uint8, 4), - "32FC1": (ImageFormat.DEPTH, np.float32, 1), - "32FC3": (ImageFormat.RGB, np.float32, 3), - "64FC1": (ImageFormat.DEPTH, np.float64, 1), - "16UC1": (ImageFormat.DEPTH16, np.uint16, 1), - "16SC1": (ImageFormat.DEPTH16, np.int16, 1), - } - if enc not in m: - raise ValueError(f"Unsupported encoding: {enc}") - return m[enc] + return quality_barrier(lambda x: x.sharpness, target_frequency) diff --git a/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py b/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py deleted file mode 100644 index 2f7da1d0d9..0000000000 --- a/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py +++ /dev/null @@ -1,210 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import base64 -import os -from abc import ABC, abstractmethod -from enum import Enum -from typing import Any - -import cv2 -import numpy as np - -try: - import cupy as cp # type: ignore - - HAS_CUDA = True -except Exception: # pragma: no cover - optional dependency - cp = None # type: ignore - HAS_CUDA = False - -# Optional nvImageCodec (preferred GPU codec) -USE_NVIMGCODEC = os.environ.get("USE_NVIMGCODEC", "0") == "1" -NVIMGCODEC_LAST_USED = False -try: # pragma: no cover - optional dependency - if HAS_CUDA and USE_NVIMGCODEC: - from nvidia import nvimgcodec # type: ignore - - try: - _enc_probe = nvimgcodec.Encoder() # type: ignore[attr-defined] - HAS_NVIMGCODEC = True - except Exception: - nvimgcodec = None # type: ignore - HAS_NVIMGCODEC = False - else: - nvimgcodec = None # type: ignore - HAS_NVIMGCODEC = False -except Exception: # pragma: no cover - optional dependency - nvimgcodec = None # type: ignore - HAS_NVIMGCODEC = False - - -class ImageFormat(Enum): - BGR = "BGR" - RGB = "RGB" - RGBA = "RGBA" - BGRA = "BGRA" - GRAY = "GRAY" - GRAY16 = "GRAY16" - DEPTH = "DEPTH" - DEPTH16 = "DEPTH16" - - -def _is_cu(x) -> bool: - return HAS_CUDA and cp is not None and isinstance(x, cp.ndarray) # type: ignore - - -def _ascontig(x): - if _is_cu(x): - return x if x.flags["C_CONTIGUOUS"] else cp.ascontiguousarray(x) # type: ignore - return x if x.flags["C_CONTIGUOUS"] else np.ascontiguousarray(x) - - -def _to_cpu(x): - return cp.asnumpy(x) if _is_cu(x) else x # type: ignore - - -def _to_cu(x): - if HAS_CUDA and cp is not None and isinstance(x, np.ndarray): # type: ignore - return cp.asarray(x) # type: ignore - return x - - -def _encode_nvimgcodec_cuda(bgr_cu, quality: int = 80) -> bytes: # pragma: no cover - optional - if not HAS_NVIMGCODEC or nvimgcodec is None: - raise RuntimeError("nvimgcodec not available") - if bgr_cu.ndim != 3 or bgr_cu.shape[2] != 3: - raise RuntimeError("nvimgcodec expects HxWx3 image") - if bgr_cu.dtype != cp.uint8: # type: ignore[attr-defined] - raise RuntimeError("nvimgcodec requires uint8 input") - if not bgr_cu.flags["C_CONTIGUOUS"]: - bgr_cu = cp.ascontiguousarray(bgr_cu) # type: ignore[attr-defined] - encoder = nvimgcodec.Encoder() # type: ignore[attr-defined] - try: - img = nvimgcodec.Image(bgr_cu, nvimgcodec.PixelFormat.BGR) # type: ignore[attr-defined] - except Exception: - img = nvimgcodec.Image(cp.asnumpy(bgr_cu), nvimgcodec.PixelFormat.BGR) # type: ignore[attr-defined] - if hasattr(nvimgcodec, "EncodeParams"): - params = nvimgcodec.EncodeParams(quality=quality) # type: ignore[attr-defined] - bitstreams = encoder.encode([img], [params]) - else: - bitstreams = encoder.encode([img]) - bs0 = bitstreams[0] - if hasattr(bs0, "buf"): - return bytes(bs0.buf) - return bytes(bs0) - - -class AbstractImage(ABC): - data: Any - format: ImageFormat - frame_id: str - ts: float - - @property - @abstractmethod - def is_cuda(self) -> bool: # pragma: no cover - abstract - ... - - @property - def height(self) -> int: - return int(self.data.shape[0]) - - @property - def width(self) -> int: - return int(self.data.shape[1]) - - @property - def channels(self) -> int: - if getattr(self.data, "ndim", 0) == 2: - return 1 - if getattr(self.data, "ndim", 0) == 3: - return int(self.data.shape[2]) - raise ValueError("Invalid image dimensions") - - @property - def shape(self): - return tuple(self.data.shape) - - @property - def dtype(self): - return self.data.dtype - - @abstractmethod - def to_opencv(self) -> np.ndarray: # pragma: no cover - abstract - ... - - @abstractmethod - def to_rgb(self) -> "AbstractImage": # pragma: no cover - abstract - ... - - @abstractmethod - def to_bgr(self) -> "AbstractImage": # pragma: no cover - abstract - ... - - @abstractmethod - def to_grayscale(self) -> "AbstractImage": # pragma: no cover - abstract - ... - - @abstractmethod - def resize( - self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR - ) -> "AbstractImage": # pragma: no cover - abstract - ... - - @abstractmethod - def sharpness(self) -> float: # pragma: no cover - abstract - ... - - def copy(self) -> "AbstractImage": - return self.__class__( - data=self.data.copy(), format=self.format, frame_id=self.frame_id, ts=self.ts - ) # type: ignore - - def save(self, filepath: str) -> bool: - global NVIMGCODEC_LAST_USED - if self.is_cuda and HAS_NVIMGCODEC and nvimgcodec is not None: - try: - bgr = self.to_bgr() - if _is_cu(bgr.data): - jpeg = _encode_nvimgcodec_cuda(bgr.data) - NVIMGCODEC_LAST_USED = True - with open(filepath, "wb") as f: - f.write(jpeg) - return True - except Exception: - NVIMGCODEC_LAST_USED = False - arr = self.to_opencv() - return cv2.imwrite(filepath, arr) - - def to_base64(self, quality: int = 80) -> str: - global NVIMGCODEC_LAST_USED - if self.is_cuda and HAS_NVIMGCODEC and nvimgcodec is not None: - try: - bgr = self.to_bgr() - if _is_cu(bgr.data): - jpeg = _encode_nvimgcodec_cuda(bgr.data, quality=quality) - NVIMGCODEC_LAST_USED = True - return base64.b64encode(jpeg).decode("utf-8") - except Exception: - NVIMGCODEC_LAST_USED = False - bgr = self.to_bgr() - success, buffer = cv2.imencode( - ".jpg", _to_cpu(bgr.data), [int(cv2.IMWRITE_JPEG_QUALITY), int(quality)] - ) - if not success: - raise ValueError("Failed to encode image as JPEG") - return base64.b64encode(buffer.tobytes()).decode("utf-8") diff --git a/dimos/msgs/sensor_msgs/image_impls/CudaImage.py b/dimos/msgs/sensor_msgs/image_impls/CudaImage.py deleted file mode 100644 index fc8577f0ac..0000000000 --- a/dimos/msgs/sensor_msgs/image_impls/CudaImage.py +++ /dev/null @@ -1,895 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import time -from dataclasses import dataclass, field -from typing import Optional, Tuple - -import cv2 -import numpy as np - -from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ( - AbstractImage, - ImageFormat, - HAS_CUDA, - _is_cu, - _to_cpu, - _ascontig, -) -from dimos.msgs.sensor_msgs.image_impls.NumpyImage import NumpyImage - -try: - import cupy as cp # type: ignore - from cupyx.scipy import ndimage as cndimage # type: ignore - from cupyx.scipy import signal as csignal # type: ignore -except Exception: # pragma: no cover - cp = None # type: ignore - cndimage = None # type: ignore - csignal = None # type: ignore - - -_CUDA_SRC = r""" -extern "C" { - -__device__ __forceinline__ void rodrigues_R(const float r[3], float R[9]){ - float theta = sqrtf(r[0]*r[0] + r[1]*r[1] + r[2]*r[2]); - if(theta < 1e-8f){ - R[0]=1.f; R[1]=0.f; R[2]=0.f; - R[3]=0.f; R[4]=1.f; R[5]=0.f; - R[6]=0.f; R[7]=0.f; R[8]=1.f; - return; - } - float kx=r[0]/theta, ky=r[1]/theta, kz=r[2]/theta; - float c=cosf(theta), s=sinf(theta), v=1.f-c; - R[0]=kx*kx*v + c; R[1]=kx*ky*v - kz*s; R[2]=kx*kz*v + ky*s; - R[3]=ky*kx*v + kz*s; R[4]=ky*ky*v + c; R[5]=ky*kz*v - kx*s; - R[6]=kz*kx*v - ky*s; R[7]=kz*ky*v + kx*s; R[8]=kz*kz*v + c; -} - -__device__ __forceinline__ void mat3x3_vec3(const float R[9], const float x[3], float y[3]){ - y[0] = R[0]*x[0] + R[1]*x[1] + R[2]*x[2]; - y[1] = R[3]*x[0] + R[4]*x[1] + R[5]*x[2]; - y[2] = R[6]*x[0] + R[7]*x[1] + R[8]*x[2]; -} - -__device__ __forceinline__ void cross_mat(const float v[3], float S[9]){ - S[0]=0.f; S[1]=-v[2]; S[2]= v[1]; - S[3]= v[2]; S[4]=0.f; S[5]=-v[0]; - S[6]=-v[1]; S[7]= v[0]; S[8]=0.f; -} - -// Solve a 6x6 system (JTJ * x = JTr) with Gauss–Jordan; JTJ is SPD after damping. -__device__ void solve6_gauss_jordan(float A[36], float b[6], float x[6]){ - float M[6][7]; - #pragma unroll - for(int r=0;r<6;++r){ - #pragma unroll - for(int c=0;c<6;++c) M[r][c] = A[r*6 + c]; - M[r][6] = b[r]; - } - for(int piv=0;piv<6;++piv){ - float invd = 1.f / M[piv][piv]; - for(int c=piv;c<7;++c) M[piv][c] *= invd; - for(int r=0;r<6;++r){ - if(r==piv) continue; - float f = M[r][piv]; - if(fabsf(f) < 1e-20f) continue; - for(int c=piv;c<7;++c) M[r][c] -= f * M[piv][c]; - } - } - #pragma unroll - for(int r=0;r<6;++r) x[r] = M[r][6]; -} - -// One block solves one pose; dynamic shared memory holds per-thread accumulators. -__global__ void pnp_gn_batch( - const float* __restrict__ obj, // (B,N,3) - const float* __restrict__ img, // (B,N,2) - const int N, - const float* __restrict__ intr, // (B,4) -> fx, fy, cx, cy - const int max_iters, - const float damping, - float* __restrict__ rvec_out, // (B,3) - float* __restrict__ tvec_out // (B,3) -){ - if(N <= 0) return; - int b = blockIdx.x; - const float* obj_b = obj + b * N * 3; - const float* img_b = img + b * N * 2; - float fx = intr[4*b + 0]; - float fy = intr[4*b + 1]; - float cx = intr[4*b + 2]; - float cy = intr[4*b + 3]; - - __shared__ float s_R[9]; - __shared__ float s_rvec[3]; - __shared__ float s_tvec[3]; - __shared__ float s_JTJ[36]; - __shared__ float s_JTr[6]; - __shared__ int s_done; - - extern __shared__ float scratch[]; - float* sh_JTJ = scratch; - float* sh_JTr = scratch + 36 * blockDim.x; - - if(threadIdx.x==0){ - s_rvec[0]=0.f; s_rvec[1]=0.f; s_rvec[2]=0.f; - s_tvec[0]=0.f; s_tvec[1]=0.f; s_tvec[2]=2.f; - } - __syncthreads(); - - for(int it=0; itmatrix) for NumPy/CuPy arrays.""" - - if cp is not None and ( - isinstance(x, cp.ndarray) # type: ignore[arg-type] - or getattr(x, "__cuda_array_interface__", None) is not None - ): - xp = cp - else: - xp = np - arr = xp.asarray(x, dtype=xp.float64) - - if not inverse and arr.ndim >= 2 and arr.shape[-2:] == (3, 3): - inverse = True - - if not inverse: - vec = arr - if vec.ndim >= 2 and vec.shape[-1] == 1: - vec = vec[..., 0] - if vec.shape[-1] != 3: - raise ValueError("Rodrigues expects vectors of shape (..., 3)") - orig_shape = vec.shape[:-1] - vec = vec.reshape(-1, 3) - n = vec.shape[0] - theta = xp.linalg.norm(vec, axis=1) - small = theta < 1e-12 - - def _skew(v): - vx, vy, vz = v[:, 0], v[:, 1], v[:, 2] - O = xp.zeros_like(vx) - return xp.stack( - [ - xp.stack([O, -vz, vy], axis=-1), - xp.stack([vz, O, -vx], axis=-1), - xp.stack([-vy, vx, O], axis=-1), - ], - axis=-2, - ) - - K = _skew(vec) - theta2 = theta * theta - theta4 = theta2 * theta2 - theta_safe = xp.where(small, 1.0, theta) - theta2_safe = xp.where(small, 1.0, theta2) - A = xp.where(small, 1.0 - theta2 / 6.0 + theta4 / 120.0, xp.sin(theta) / theta_safe)[ - :, None, None - ] - B = xp.where( - small, - 0.5 - theta2 / 24.0 + theta4 / 720.0, - (1.0 - xp.cos(theta)) / theta2_safe, - )[:, None, None] - I = xp.eye(3, dtype=arr.dtype) - I = I[None, :, :] if n == 1 else xp.broadcast_to(I, (n, 3, 3)) - KK = xp.matmul(K, K) - out = I + A * K + B * KK - return out.reshape(orig_shape + (3, 3)) if orig_shape else out[0] - - mat = arr - if mat.shape[-2:] != (3, 3): - raise ValueError("Rodrigues expects rotation matrices of shape (..., 3, 3)") - orig_shape = mat.shape[:-2] - mat = mat.reshape(-1, 3, 3) - trace = xp.trace(mat, axis1=1, axis2=2) - trace = xp.clip((trace - 1.0) / 2.0, -1.0, 1.0) - theta = xp.arccos(trace) - v = xp.stack( - [ - mat[:, 2, 1] - mat[:, 1, 2], - mat[:, 0, 2] - mat[:, 2, 0], - mat[:, 1, 0] - mat[:, 0, 1], - ], - axis=1, - ) - norm_v = xp.linalg.norm(v, axis=1) - small = theta < 1e-7 - eps = 1e-8 - norm_safe = xp.where(norm_v < eps, 1.0, norm_v) - r_general = theta[:, None] * v / norm_safe[:, None] - r_small = 0.5 * v - r = xp.where(small[:, None], r_small, r_general) - pi_mask = xp.abs(theta - xp.pi) < 1e-4 - if np.any(pi_mask) if xp is np else bool(cp.asnumpy(pi_mask).any()): - diag = xp.diagonal(mat, axis1=1, axis2=2) - axis_candidates = xp.clip((diag + 1.0) / 2.0, 0.0, None) - axis = xp.sqrt(axis_candidates) - signs = xp.sign(v) - axis = xp.where(signs == 0, axis, xp.copysign(axis, signs)) - axis_norm = xp.linalg.norm(axis, axis=1) - axis_norm = xp.where(axis_norm < eps, 1.0, axis_norm) - axis = axis / axis_norm[:, None] - r_pi = theta[:, None] * axis - r = xp.where(pi_mask[:, None], r_pi, r) - out = r.reshape(orig_shape + (3,)) if orig_shape else r[0] - return out - - -def _undistort_points_cuda( - img_px: "cp.ndarray", K: "cp.ndarray", dist: "cp.ndarray", iterations: int = 8 -) -> "cp.ndarray": - """Iteratively undistort pixel coordinates on device (Brown–Conrady). - - Returns pixel coordinates after undistortion (fx*xu+cx, fy*yu+cy). - """ - N = img_px.shape[0] - ones = cp.ones((N, 1), dtype=cp.float64) - uv1 = cp.concatenate([img_px.astype(cp.float64), ones], axis=1) - Kinv = cp.linalg.inv(K) - xdyd1 = uv1 @ Kinv.T - xd = xdyd1[:, 0] - yd = xdyd1[:, 1] - xu = xd.copy() - yu = yd.copy() - k1 = dist[0] - k2 = dist[1] if dist.size > 1 else 0.0 - p1 = dist[2] if dist.size > 2 else 0.0 - p2 = dist[3] if dist.size > 3 else 0.0 - k3 = dist[4] if dist.size > 4 else 0.0 - for _ in range(iterations): - r2 = xu * xu + yu * yu - r4 = r2 * r2 - r6 = r4 * r2 - radial = 1.0 + k1 * r2 + k2 * r4 + k3 * r6 - delta_x = 2.0 * p1 * xu * yu + p2 * (r2 + 2.0 * xu * xu) - delta_y = p1 * (r2 + 2.0 * yu * yu) + 2.0 * p2 * xu * yu - xu = (xd - delta_x) / radial - yu = (yd - delta_y) / radial - fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] - return cp.stack([fx * xu + cx, fy * yu + cy], axis=1) - - -@dataclass -class CudaImage(AbstractImage): - data: any # cupy.ndarray - format: ImageFormat = field(default=ImageFormat.BGR) - frame_id: str = field(default="") - ts: float = field(default_factory=time.time) - - def __post_init__(self): - if not HAS_CUDA or cp is None: - raise RuntimeError("CuPy/CUDA not available") - if not _is_cu(self.data): - # Accept NumPy arrays and move to device automatically - try: - self.data = cp.asarray(self.data) - except Exception as e: - raise ValueError("CudaImage requires a CuPy array") from e - if self.data.ndim < 2: - raise ValueError("Image data must be at least 2D") - self.data = _ascontig(self.data) - - @property - def is_cuda(self) -> bool: - return True - - def to_opencv(self) -> np.ndarray: - if self.format in (ImageFormat.BGR, ImageFormat.RGB, ImageFormat.RGBA, ImageFormat.BGRA): - return _to_cpu(self.to_bgr().data) - return _to_cpu(self.data) - - def to_rgb(self) -> "CudaImage": - if self.format == ImageFormat.RGB: - return self.copy() # type: ignore - if self.format == ImageFormat.BGR: - return CudaImage(_bgr_to_rgb_cuda(self.data), ImageFormat.RGB, self.frame_id, self.ts) - if self.format == ImageFormat.RGBA: - return self.copy() # type: ignore - if self.format == ImageFormat.BGRA: - return CudaImage( - _bgra_to_rgba_cuda(self.data), ImageFormat.RGBA, self.frame_id, self.ts - ) - if self.format == ImageFormat.GRAY: - return CudaImage(_gray_to_rgb_cuda(self.data), ImageFormat.RGB, self.frame_id, self.ts) - if self.format in (ImageFormat.GRAY16, ImageFormat.DEPTH16): - gray8 = (self.data.astype(cp.float32) / 256.0).clip(0, 255).astype(cp.uint8) # type: ignore - return CudaImage(_gray_to_rgb_cuda(gray8), ImageFormat.RGB, self.frame_id, self.ts) - return self.copy() # type: ignore - - def to_bgr(self) -> "CudaImage": - if self.format == ImageFormat.BGR: - return self.copy() # type: ignore - if self.format == ImageFormat.RGB: - return CudaImage(_rgb_to_bgr_cuda(self.data), ImageFormat.BGR, self.frame_id, self.ts) - if self.format == ImageFormat.RGBA: - return CudaImage( - _rgba_to_bgra_cuda(self.data)[..., :3], ImageFormat.BGR, self.frame_id, self.ts - ) - if self.format == ImageFormat.BGRA: - return CudaImage(self.data[..., :3], ImageFormat.BGR, self.frame_id, self.ts) - if self.format in (ImageFormat.GRAY, ImageFormat.DEPTH): - return CudaImage( - _rgb_to_bgr_cuda(_gray_to_rgb_cuda(self.data)), - ImageFormat.BGR, - self.frame_id, - self.ts, - ) - if self.format in (ImageFormat.GRAY16, ImageFormat.DEPTH16): - gray8 = (self.data.astype(cp.float32) / 256.0).clip(0, 255).astype(cp.uint8) # type: ignore - return CudaImage( - _rgb_to_bgr_cuda(_gray_to_rgb_cuda(gray8)), ImageFormat.BGR, self.frame_id, self.ts - ) - return self.copy() # type: ignore - - def to_grayscale(self) -> "CudaImage": - if self.format in (ImageFormat.GRAY, ImageFormat.GRAY16, ImageFormat.DEPTH): - return self.copy() # type: ignore - if self.format == ImageFormat.BGR: - return CudaImage( - _rgb_to_gray_cuda(_bgr_to_rgb_cuda(self.data)), - ImageFormat.GRAY, - self.frame_id, - self.ts, - ) - if self.format == ImageFormat.RGB: - return CudaImage(_rgb_to_gray_cuda(self.data), ImageFormat.GRAY, self.frame_id, self.ts) - if self.format in (ImageFormat.RGBA, ImageFormat.BGRA): - rgb = ( - self.data[..., :3] - if self.format == ImageFormat.RGBA - else _bgra_to_rgba_cuda(self.data)[..., :3] - ) - return CudaImage(_rgb_to_gray_cuda(rgb), ImageFormat.GRAY, self.frame_id, self.ts) - raise ValueError(f"Unsupported format: {self.format}") - - def resize(self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR) -> "CudaImage": - return CudaImage( - _resize_bilinear_hwc_cuda(self.data, height, width), self.format, self.frame_id, self.ts - ) - - def sharpness(self) -> float: - if cp is None: - return 0.0 - try: - from cupyx.scipy import ndimage as cndimage # type: ignore - - gray = self.to_grayscale().data.astype(cp.float32) - deriv5 = cp.asarray([1, 2, 0, -2, -1], dtype=cp.float32) - smooth5 = cp.asarray([1, 4, 6, 4, 1], dtype=cp.float32) - gx = cndimage.convolve1d(gray, deriv5, axis=1, mode="reflect") # type: ignore - gx = cndimage.convolve1d(gx, smooth5, axis=0, mode="reflect") # type: ignore - gy = cndimage.convolve1d(gray, deriv5, axis=0, mode="reflect") # type: ignore - gy = cndimage.convolve1d(gy, smooth5, axis=1, mode="reflect") # type: ignore - magnitude = cp.hypot(gx, gy) # type: ignore - mean_mag = float(cp.asnumpy(magnitude.mean())) # type: ignore - except Exception: - return 0.0 - if mean_mag <= 0: - return 0.0 - return float(np.clip((np.log10(mean_mag + 1) - 1.7) / 2.0, 0.0, 1.0)) - - # CUDA tracker (template NCC with small scale pyramid) - @dataclass - class BBox: - x: int - y: int - w: int - h: int - - def create_csrt_tracker(self, bbox: BBox): - if csignal is None: - raise RuntimeError("cupyx.scipy.signal not available for CUDA tracker") - x, y, w, h = map(int, bbox) - gray = self.to_grayscale().data.astype(cp.float32) - tmpl = gray[y : y + h, x : x + w] - if tmpl.size == 0: - raise ValueError("Invalid bbox for CUDA tracker") - return _CudaTemplateTracker(tmpl, x0=x, y0=y) - - def csrt_update(self, tracker) -> Tuple[bool, Tuple[int, int, int, int]]: - if not isinstance(tracker, _CudaTemplateTracker): - raise TypeError("Expected CUDA tracker instance") - gray = self.to_grayscale().data.astype(cp.float32) - x, y, w, h = tracker.update(gray) - return True, (int(x), int(y), int(w), int(h)) - - # PnP – Gauss–Newton (no distortion in batch), iterative per-instance - def solve_pnp( - self, - object_points: np.ndarray, - image_points: np.ndarray, - camera_matrix: np.ndarray, - dist_coeffs: Optional[np.ndarray] = None, - flags: int = cv2.SOLVEPNP_ITERATIVE, - ) -> Tuple[bool, np.ndarray, np.ndarray]: - if not HAS_CUDA or cp is None or (dist_coeffs is not None and np.any(dist_coeffs)): - obj = np.asarray(object_points, dtype=np.float32).reshape(-1, 3) - img = np.asarray(image_points, dtype=np.float32).reshape(-1, 2) - K = np.asarray(camera_matrix, dtype=np.float64) - dist = None if dist_coeffs is None else np.asarray(dist_coeffs, dtype=np.float64) - ok, rvec, tvec = cv2.solvePnP(obj, img, K, dist, flags=flags) - return bool(ok), rvec.astype(np.float64), tvec.astype(np.float64) - - rvec, tvec = _solve_pnp_cuda_kernel(object_points, image_points, camera_matrix) - ok = np.isfinite(rvec).all() and np.isfinite(tvec).all() - return ok, rvec, tvec - - def solve_pnp_batch( - self, - object_points_batch: np.ndarray, - image_points_batch: np.ndarray, - camera_matrix: np.ndarray, - dist_coeffs: Optional[np.ndarray] = None, - iterations: int = 15, - damping: float = 1e-6, - ) -> Tuple[np.ndarray, np.ndarray]: - """Batched PnP (each block = one instance).""" - if not HAS_CUDA or cp is None or (dist_coeffs is not None and np.any(dist_coeffs)): - obj = np.asarray(object_points_batch, dtype=np.float32) - img = np.asarray(image_points_batch, dtype=np.float32) - if obj.ndim != 3 or img.ndim != 3 or obj.shape[:2] != img.shape[:2]: - raise ValueError( - "Batched object/image arrays must be shaped (B,N,...) with matching sizes" - ) - K = np.asarray(camera_matrix, dtype=np.float64) - dist = None if dist_coeffs is None else np.asarray(dist_coeffs, dtype=np.float64) - B = obj.shape[0] - r_list = np.empty((B, 3, 1), dtype=np.float64) - t_list = np.empty((B, 3, 1), dtype=np.float64) - for b in range(B): - K_b = K if K.ndim == 2 else K[b] - dist_b = None - if dist is not None: - if dist.ndim == 1: - dist_b = dist - elif dist.ndim == 2: - dist_b = dist[b] - else: - raise ValueError("dist_coeffs must be 1D or batched 2D") - ok, rvec, tvec = cv2.solvePnP( - obj[b], img[b], K_b, dist_b, flags=cv2.SOLVEPNP_ITERATIVE - ) - if not ok: - raise RuntimeError(f"cv2.solvePnP failed for batch index {b}") - r_list[b] = rvec.astype(np.float64) - t_list[b] = tvec.astype(np.float64) - return r_list, t_list - - return _solve_pnp_cuda_kernel( - object_points_batch, - image_points_batch, - camera_matrix, - iterations=iterations, - damping=damping, - ) - - def solve_pnp_ransac( - self, - object_points: np.ndarray, - image_points: np.ndarray, - camera_matrix: np.ndarray, - dist_coeffs: Optional[np.ndarray] = None, - iterations_count: int = 100, - reprojection_error: float = 3.0, - confidence: float = 0.99, - min_sample: int = 6, - ) -> Tuple[bool, np.ndarray, np.ndarray, np.ndarray]: - """RANSAC with CUDA PnP solver.""" - if not HAS_CUDA or cp is None or (dist_coeffs is not None and np.any(dist_coeffs)): - obj = np.asarray(object_points, dtype=np.float32) - img = np.asarray(image_points, dtype=np.float32) - K = np.asarray(camera_matrix, dtype=np.float64) - dist = None if dist_coeffs is None else np.asarray(dist_coeffs, dtype=np.float64) - ok, rvec, tvec, mask = cv2.solvePnPRansac( - obj, - img, - K, - dist, - iterationsCount=int(iterations_count), - reprojectionError=float(reprojection_error), - confidence=float(confidence), - flags=cv2.SOLVEPNP_ITERATIVE, - ) - mask_flat = np.zeros((obj.shape[0],), dtype=np.uint8) - if mask is not None and len(mask) > 0: - mask_flat[mask.flatten()] = 1 - return bool(ok), rvec.astype(np.float64), tvec.astype(np.float64), mask_flat - - obj = cp.asarray(object_points, dtype=cp.float32) - img = cp.asarray(image_points, dtype=cp.float32) - camera_matrix_np = np.asarray(_to_cpu(camera_matrix), dtype=np.float32) - fx = float(camera_matrix_np[0, 0]) - fy = float(camera_matrix_np[1, 1]) - cx = float(camera_matrix_np[0, 2]) - cy = float(camera_matrix_np[1, 2]) - N = obj.shape[0] - rng = cp.random.RandomState(1234) - best_inliers = -1 - best_r, best_t, best_mask = None, None, None - - for _ in range(iterations_count): - idx = rng.choice(N, size=min_sample, replace=False) - rvec, tvec = _solve_pnp_cuda_kernel(obj[idx], img[idx], camera_matrix_np) - R = _rodrigues(cp.asarray(rvec.flatten())) - Xc = obj @ R.T + cp.asarray(tvec.flatten()) - invZ = 1.0 / cp.clip(Xc[:, 2], 1e-6, None) - u_hat = fx * Xc[:, 0] * invZ + cx - v_hat = fy * Xc[:, 1] * invZ + cy - err = cp.sqrt((img[:, 0] - u_hat) ** 2 + (img[:, 1] - v_hat) ** 2) - mask = (err < reprojection_error).astype(cp.uint8) - inliers = int(mask.sum()) - if inliers > best_inliers: - best_inliers, best_r, best_t, best_mask = inliers, rvec, tvec, mask - if inliers >= int(confidence * N): - break - - if best_inliers <= 0: - return False, np.zeros((3, 1)), np.zeros((3, 1)), np.zeros((N,), dtype=np.uint8) - in_idx = cp.nonzero(best_mask)[0] - rvec, tvec = _solve_pnp_cuda_kernel(obj[in_idx], img[in_idx], camera_matrix_np) - return True, rvec, tvec, cp.asnumpy(best_mask) - - -class _CudaTemplateTracker: - def __init__( - self, - tmpl: "cp.ndarray", - scale_step: float = 1.05, - lr: float = 0.1, - search_radius: int = 16, - x0: int = 0, - y0: int = 0, - ): - self.tmpl = tmpl.astype(cp.float32) - self.h, self.w = int(tmpl.shape[0]), int(tmpl.shape[1]) - self.scale_step = float(scale_step) - self.lr = float(lr) - self.search_radius = int(search_radius) - # Cosine window - wy = cp.hanning(self.h).astype(cp.float32) - wx = cp.hanning(self.w).astype(cp.float32) - self.window = wy[:, None] * wx[None, :] - self.tmpl = self.tmpl * self.window - self.y = int(y0) - self.x = int(x0) - - def update(self, gray: "cp.ndarray"): - H, W = int(gray.shape[0]), int(gray.shape[1]) - r = self.search_radius - x0 = max(0, self.x - r) - y0 = max(0, self.y - r) - x1 = min(W, self.x + self.w + r) - y1 = min(H, self.y + self.h + r) - search = gray[y0:y1, x0:x1] - if search.shape[0] < self.h or search.shape[1] < self.w: - search = gray - x0 = y0 = 0 - best = (self.x, self.y, self.w, self.h) - best_score = -1e9 - for s in (1.0 / self.scale_step, 1.0, self.scale_step): - th = max(1, int(round(self.h * s))) - tw = max(1, int(round(self.w * s))) - tmpl_s = _resize_bilinear_hwc_cuda(self.tmpl, th, tw) - if tmpl_s.ndim == 3: - tmpl_s = tmpl_s[..., 0] - tmpl_s = tmpl_s.astype(cp.float32) - tmpl_zm = tmpl_s - tmpl_s.mean() - tmpl_energy = cp.sqrt(cp.sum(tmpl_zm * tmpl_zm)) + 1e-6 - # NCC via correlate2d and local std - ones = cp.ones((th, tw), dtype=cp.float32) - num = csignal.correlate2d(search, tmpl_zm, mode="valid") # type: ignore - sumS = csignal.correlate2d(search, ones, mode="valid") # type: ignore - sumS2 = csignal.correlate2d(search * search, ones, mode="valid") # type: ignore - n = float(th * tw) - meanS = sumS / n - varS = cp.clip(sumS2 - n * meanS * meanS, 0.0, None) - stdS = cp.sqrt(varS) + 1e-6 - res = num / (stdS * tmpl_energy) - ij = cp.unravel_index(cp.argmax(res), res.shape) - dy, dx = int(ij[0].get()), int(ij[1].get()) # type: ignore - score = float(res[ij].get()) # type: ignore - if score > best_score: - best_score = score - best = (x0 + dx, y0 + dy, tw, th) - x, y, w, h = best - patch = gray[y : y + h, x : x + w] - if patch.shape[0] != self.h or patch.shape[1] != self.w: - patch = _resize_bilinear_hwc_cuda(patch, self.h, self.w) - if patch.ndim == 3: - patch = patch[..., 0] - patch = patch.astype(cp.float32) * self.window - self.tmpl = (1.0 - self.lr) * self.tmpl + self.lr * patch - self.x, self.y, self.w, self.h = x, y, w, h - return x, y, w, h diff --git a/dimos/msgs/sensor_msgs/image_impls/NumpyImage.py b/dimos/msgs/sensor_msgs/image_impls/NumpyImage.py deleted file mode 100644 index da04005d4f..0000000000 --- a/dimos/msgs/sensor_msgs/image_impls/NumpyImage.py +++ /dev/null @@ -1,214 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import time -from dataclasses import dataclass, field -from typing import Optional, Tuple - -import cv2 -import numpy as np - -from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ( - AbstractImage, - ImageFormat, -) - - -@dataclass -class NumpyImage(AbstractImage): - data: np.ndarray - format: ImageFormat = field(default=ImageFormat.BGR) - frame_id: str = field(default="") - ts: float = field(default_factory=time.time) - - def __post_init__(self): - if not isinstance(self.data, np.ndarray) or self.data.ndim < 2: - raise ValueError("NumpyImage requires a 2D/3D NumPy array") - - @property - def is_cuda(self) -> bool: - return False - - def to_opencv(self) -> np.ndarray: - arr = self.data - if self.format == ImageFormat.BGR: - return arr - if self.format == ImageFormat.RGB: - return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) - if self.format == ImageFormat.RGBA: - return cv2.cvtColor(arr, cv2.COLOR_RGBA2BGR) - if self.format == ImageFormat.BGRA: - return cv2.cvtColor(arr, cv2.COLOR_BGRA2BGR) - if self.format in ( - ImageFormat.GRAY, - ImageFormat.GRAY16, - ImageFormat.DEPTH, - ImageFormat.DEPTH16, - ): - return arr - raise ValueError(f"Unsupported format: {self.format}") - - def to_rgb(self) -> "NumpyImage": - if self.format == ImageFormat.RGB: - return self.copy() # type: ignore - arr = self.data - if self.format == ImageFormat.BGR: - return NumpyImage( - cv2.cvtColor(arr, cv2.COLOR_BGR2RGB), ImageFormat.RGB, self.frame_id, self.ts - ) - if self.format == ImageFormat.RGBA: - return self.copy() # RGBA contains RGB + alpha - if self.format == ImageFormat.BGRA: - rgba = cv2.cvtColor(arr, cv2.COLOR_BGRA2RGBA) - return NumpyImage(rgba, ImageFormat.RGBA, self.frame_id, self.ts) - if self.format in (ImageFormat.GRAY, ImageFormat.GRAY16, ImageFormat.DEPTH16): - gray8 = (arr / 256).astype(np.uint8) if self.format != ImageFormat.GRAY else arr - rgb = cv2.cvtColor(gray8, cv2.COLOR_GRAY2RGB) - return NumpyImage(rgb, ImageFormat.RGB, self.frame_id, self.ts) - return self.copy() # type: ignore - - def to_bgr(self) -> "NumpyImage": - if self.format == ImageFormat.BGR: - return self.copy() # type: ignore - arr = self.data - if self.format == ImageFormat.RGB: - return NumpyImage( - cv2.cvtColor(arr, cv2.COLOR_RGB2BGR), ImageFormat.BGR, self.frame_id, self.ts - ) - if self.format == ImageFormat.RGBA: - return NumpyImage( - cv2.cvtColor(arr, cv2.COLOR_RGBA2BGR), ImageFormat.BGR, self.frame_id, self.ts - ) - if self.format == ImageFormat.BGRA: - return NumpyImage( - cv2.cvtColor(arr, cv2.COLOR_BGRA2BGR), ImageFormat.BGR, self.frame_id, self.ts - ) - if self.format in (ImageFormat.GRAY, ImageFormat.GRAY16, ImageFormat.DEPTH16): - gray8 = (arr / 256).astype(np.uint8) if self.format != ImageFormat.GRAY else arr - return NumpyImage( - cv2.cvtColor(gray8, cv2.COLOR_GRAY2BGR), ImageFormat.BGR, self.frame_id, self.ts - ) - return self.copy() # type: ignore - - def to_grayscale(self) -> "NumpyImage": - if self.format in (ImageFormat.GRAY, ImageFormat.GRAY16, ImageFormat.DEPTH): - return self.copy() # type: ignore - if self.format == ImageFormat.BGR: - return NumpyImage( - cv2.cvtColor(self.data, cv2.COLOR_BGR2GRAY), - ImageFormat.GRAY, - self.frame_id, - self.ts, - ) - if self.format == ImageFormat.RGB: - return NumpyImage( - cv2.cvtColor(self.data, cv2.COLOR_RGB2GRAY), - ImageFormat.GRAY, - self.frame_id, - self.ts, - ) - if self.format in (ImageFormat.RGBA, ImageFormat.BGRA): - code = cv2.COLOR_RGBA2GRAY if self.format == ImageFormat.RGBA else cv2.COLOR_BGRA2GRAY - return NumpyImage( - cv2.cvtColor(self.data, code), ImageFormat.GRAY, self.frame_id, self.ts - ) - raise ValueError(f"Unsupported format: {self.format}") - - def resize( - self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR - ) -> "NumpyImage": - return NumpyImage( - cv2.resize(self.data, (width, height), interpolation=interpolation), - self.format, - self.frame_id, - self.ts, - ) - - def sharpness(self) -> float: - gray = self.to_grayscale() - sx = cv2.Sobel(gray.data, cv2.CV_32F, 1, 0, ksize=5) - sy = cv2.Sobel(gray.data, cv2.CV_32F, 0, 1, ksize=5) - magnitude = cv2.magnitude(sx, sy) - mean_mag = float(magnitude.mean()) - if mean_mag <= 0: - return 0.0 - return float(np.clip((np.log10(mean_mag + 1) - 1.7) / 2.0, 0.0, 1.0)) - - # PnP wrappers - def solve_pnp( - self, - object_points: np.ndarray, - image_points: np.ndarray, - camera_matrix: np.ndarray, - dist_coeffs: Optional[np.ndarray] = None, - flags: int = cv2.SOLVEPNP_ITERATIVE, - ) -> Tuple[bool, np.ndarray, np.ndarray]: - obj = np.asarray(object_points, dtype=np.float32).reshape(-1, 3) - img = np.asarray(image_points, dtype=np.float32).reshape(-1, 2) - K = np.asarray(camera_matrix, dtype=np.float64) - dist = None if dist_coeffs is None else np.asarray(dist_coeffs, dtype=np.float64) - ok, rvec, tvec = cv2.solvePnP(obj, img, K, dist, flags=flags) - return bool(ok), rvec.astype(np.float64), tvec.astype(np.float64) - - def create_csrt_tracker(self, bbox: Tuple[int, int, int, int]): - tracker = None - if hasattr(cv2, "legacy") and hasattr(cv2.legacy, "TrackerCSRT_create"): - tracker = cv2.legacy.TrackerCSRT_create() - elif hasattr(cv2, "TrackerCSRT_create"): - tracker = cv2.TrackerCSRT_create() - else: - raise RuntimeError("OpenCV CSRT tracker not available") - ok = tracker.init(self.to_bgr().to_opencv(), tuple(map(int, bbox))) - if not ok: - raise RuntimeError("Failed to initialize CSRT tracker") - return tracker - - def csrt_update(self, tracker) -> Tuple[bool, Tuple[int, int, int, int]]: - ok, box = tracker.update(self.to_bgr().to_opencv()) - if not ok: - return False, (0, 0, 0, 0) - x, y, w, h = map(int, box) - return True, (x, y, w, h) - - def solve_pnp_ransac( - self, - object_points: np.ndarray, - image_points: np.ndarray, - camera_matrix: np.ndarray, - dist_coeffs: Optional[np.ndarray] = None, - iterations_count: int = 100, - reprojection_error: float = 3.0, - confidence: float = 0.99, - min_sample: int = 6, - ) -> Tuple[bool, np.ndarray, np.ndarray, np.ndarray]: - obj = np.asarray(object_points, dtype=np.float32).reshape(-1, 3) - img = np.asarray(image_points, dtype=np.float32).reshape(-1, 2) - K = np.asarray(camera_matrix, dtype=np.float64) - dist = None if dist_coeffs is None else np.asarray(dist_coeffs, dtype=np.float64) - ok, rvec, tvec, inliers = cv2.solvePnPRansac( - obj, - img, - K, - dist, - iterationsCount=int(iterations_count), - reprojectionError=float(reprojection_error), - confidence=float(confidence), - flags=cv2.SOLVEPNP_ITERATIVE, - ) - mask = np.zeros((obj.shape[0],), dtype=np.uint8) - if inliers is not None and len(inliers) > 0: - mask[inliers.flatten()] = 1 - return bool(ok), rvec.astype(np.float64), tvec.astype(np.float64), mask diff --git a/dimos/perception/common/utils.py b/dimos/perception/common/utils.py index 1ce3931c2f..f4bad0645e 100644 --- a/dimos/perception/common/utils.py +++ b/dimos/perception/common/utils.py @@ -28,32 +28,6 @@ logger = setup_logger("dimos.perception.common.utils") -# Optional CuPy support -try: # pragma: no cover - optional dependency - import cupy as cp # type: ignore - - _HAS_CUDA = True -except Exception: # pragma: no cover - optional dependency - cp = None # type: ignore - _HAS_CUDA = False - - -def _is_cu_array(x) -> bool: - return _HAS_CUDA and cp is not None and isinstance(x, cp.ndarray) # type: ignore - - -def _to_numpy(x): - return cp.asnumpy(x) if _is_cu_array(x) else x # type: ignore - - -def _to_cupy(x): - if _HAS_CUDA and cp is not None and isinstance(x, np.ndarray): # type: ignore - try: - return cp.asarray(x) # type: ignore - except Exception: - return x - return x - def load_camera_info(yaml_path: str, frame_id: str = "camera_link") -> CameraInfo: """ @@ -141,135 +115,6 @@ def load_camera_info_opencv(yaml_path: str) -> Tuple[np.ndarray, np.ndarray]: return K, dist -def rectify_image_cpu(image: Image, camera_matrix: np.ndarray, dist_coeffs: np.ndarray) -> Image: - """CPU rectification using OpenCV. Preserves backend by caller. - - Returns an Image with numpy or cupy data depending on caller choice. - """ - src = _to_numpy(image.data) - rect = cv2.undistort(src, camera_matrix, dist_coeffs) - # Caller decides whether to convert back to GPU. - return Image(data=rect, format=image.format, frame_id=image.frame_id, ts=image.ts) - - -def rectify_image_cuda(image: Image, camera_matrix: np.ndarray, dist_coeffs: np.ndarray) -> Image: - """GPU rectification using CuPy bilinear sampling. - - Generates an undistorted output grid and samples from the distorted source. - Falls back to CPU if CUDA not available. - """ - if not _HAS_CUDA or cp is None or not image.is_cuda: # type: ignore - return rectify_image_cpu(image, camera_matrix, dist_coeffs) - - xp = cp # type: ignore - - # Source (distorted) image on device - src = image.data - if src.ndim not in (2, 3): - raise ValueError("Unsupported image rank for rectification") - H, W = int(src.shape[0]), int(src.shape[1]) - - # Extract intrinsics and distortion as float64 - K = xp.asarray(camera_matrix, dtype=xp.float64) - dist = xp.asarray(dist_coeffs, dtype=xp.float64).reshape(-1) - fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] - k1 = dist[0] if dist.size > 0 else 0.0 - k2 = dist[1] if dist.size > 1 else 0.0 - p1 = dist[2] if dist.size > 2 else 0.0 - p2 = dist[3] if dist.size > 3 else 0.0 - k3 = dist[4] if dist.size > 4 else 0.0 - - # Build undistorted target grid (pixel coords) - u = xp.arange(W, dtype=xp.float64) - v = xp.arange(H, dtype=xp.float64) - uu, vv = xp.meshgrid(u, v, indexing="xy") - - # Convert to normalized undistorted coords - xu = (uu - cx) / fx - yu = (vv - cy) / fy - - # Apply forward distortion model to get distorted normalized coords - r2 = xu * xu + yu * yu - r4 = r2 * r2 - r6 = r4 * r2 - radial = 1.0 + k1 * r2 + k2 * r4 + k3 * r6 - delta_x = 2.0 * p1 * xu * yu + p2 * (r2 + 2.0 * xu * xu) - delta_y = p1 * (r2 + 2.0 * yu * yu) + 2.0 * p2 * xu * yu - xd = xu * radial + delta_x - yd = yu * radial + delta_y - - # Back to pixel coordinates in the source (distorted) image - us = fx * xd + cx - vs = fy * yd + cy - - # Bilinear sample from src at (vs, us) - def _bilinear_sample_cuda(img, x_src, y_src): - h, w = int(img.shape[0]), int(img.shape[1]) - # Base integer corners (not clamped) - x0i = xp.floor(x_src).astype(xp.int32) - y0i = xp.floor(y_src).astype(xp.int32) - x1i = x0i + 1 - y1i = y0i + 1 - - # Masks for in-bounds neighbors (BORDER_CONSTANT behavior) - m00 = (x0i >= 0) & (x0i < w) & (y0i >= 0) & (y0i < h) - m10 = (x1i >= 0) & (x1i < w) & (y0i >= 0) & (y0i < h) - m01 = (x0i >= 0) & (x0i < w) & (y1i >= 0) & (y1i < h) - m11 = (x1i >= 0) & (x1i < w) & (y1i >= 0) & (y1i < h) - - # Clamp indices for safe gather, but multiply contributions by masks - x0 = xp.clip(x0i, 0, w - 1) - y0 = xp.clip(y0i, 0, h - 1) - x1 = xp.clip(x1i, 0, w - 1) - y1 = xp.clip(y1i, 0, h - 1) - - # Weights - wx = (x_src - x0i).astype(xp.float64) - wy = (y_src - y0i).astype(xp.float64) - w00 = (1.0 - wx) * (1.0 - wy) - w10 = wx * (1.0 - wy) - w01 = (1.0 - wx) * wy - w11 = wx * wy - - # Cast masks for arithmetic - m00f = m00.astype(xp.float64) - m10f = m10.astype(xp.float64) - m01f = m01.astype(xp.float64) - m11f = m11.astype(xp.float64) - - if img.ndim == 2: - Ia = img[y0, x0].astype(xp.float64) - Ib = img[y0, x1].astype(xp.float64) - Ic = img[y1, x0].astype(xp.float64) - Id = img[y1, x1].astype(xp.float64) - out = w00 * m00f * Ia + w10 * m10f * Ib + w01 * m01f * Ic + w11 * m11f * Id - else: - Ia = img[y0, x0].astype(xp.float64) - Ib = img[y0, x1].astype(xp.float64) - Ic = img[y1, x0].astype(xp.float64) - Id = img[y1, x1].astype(xp.float64) - # Expand weights and masks for channel broadcasting - w00e = (w00 * m00f)[..., None] - w10e = (w10 * m10f)[..., None] - w01e = (w01 * m01f)[..., None] - w11e = (w11 * m11f)[..., None] - out = w00e * Ia + w10e * Ib + w01e * Ic + w11e * Id - - # Cast back to original dtype with clipping for integers - if img.dtype == xp.uint8: - out = xp.clip(xp.rint(out), 0, 255).astype(xp.uint8) - elif img.dtype == xp.uint16: - out = xp.clip(xp.rint(out), 0, 65535).astype(xp.uint16) - elif img.dtype == xp.int16: - out = xp.clip(xp.rint(out), -32768, 32767).astype(xp.int16) - else: - out = out.astype(img.dtype, copy=False) - return out - - rect = _bilinear_sample_cuda(src, us, vs) - return Image(data=rect, format=image.format, frame_id=image.frame_id, ts=image.ts) - - def rectify_image(image: Image, camera_matrix: np.ndarray, dist_coeffs: np.ndarray) -> Image: """ Rectify (undistort) an image using camera calibration parameters. @@ -282,52 +127,16 @@ def rectify_image(image: Image, camera_matrix: np.ndarray, dist_coeffs: np.ndarr Returns: Image: Rectified Image object with same format and metadata """ - if image.is_cuda and _HAS_CUDA: - return rectify_image_cuda(image, camera_matrix, dist_coeffs) - return rectify_image_cpu(image, camera_matrix, dist_coeffs) - - -def project_3d_points_to_2d_cuda( - points_3d: "cp.ndarray", camera_intrinsics: Union[List[float], "cp.ndarray"] -) -> "cp.ndarray": - xp = cp # type: ignore - pts = points_3d.astype(xp.float64, copy=False) - mask = pts[:, 2] > 0 - if not bool(xp.any(mask)): - return xp.zeros((0, 2), dtype=xp.int32) - valid = pts[mask] - if isinstance(camera_intrinsics, list) and len(camera_intrinsics) == 4: - fx, fy, cx, cy = [xp.asarray(v, dtype=xp.float64) for v in camera_intrinsics] - else: - K = camera_intrinsics.astype(xp.float64, copy=False) - fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] - u = (valid[:, 0] * fx / valid[:, 2]) + cx - v = (valid[:, 1] * fy / valid[:, 2]) + cy - return xp.stack([u, v], axis=1).astype(xp.int32) + # Apply undistortion using OpenCV + rectified_data = cv2.undistort(image.data, camera_matrix, dist_coeffs) - -def project_3d_points_to_2d_cpu( - points_3d: np.ndarray, camera_intrinsics: Union[List[float], np.ndarray] -) -> np.ndarray: - pts = np.asarray(points_3d, dtype=np.float64) - valid_mask = pts[:, 2] > 0 - if not np.any(valid_mask): - return np.zeros((0, 2), dtype=np.int32) - valid_points = pts[valid_mask] - if isinstance(camera_intrinsics, list) and len(camera_intrinsics) == 4: - fx, fy, cx, cy = [float(v) for v in camera_intrinsics] - else: - K = np.array(camera_intrinsics, dtype=np.float64) - fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] - u = (valid_points[:, 0] * fx / valid_points[:, 2]) + cx - v = (valid_points[:, 1] * fy / valid_points[:, 2]) + cy - return np.column_stack([u, v]).astype(np.int32) + # Create new Image object with rectified data, preserving all other properties + return Image(data=rectified_data, format=image.format, frame_id=image.frame_id, ts=image.ts) def project_3d_points_to_2d( - points_3d: Union[np.ndarray, "cp.ndarray"], - camera_intrinsics: Union[List[float], np.ndarray, "cp.ndarray"], -) -> Union[np.ndarray, "cp.ndarray"]: + points_3d: np.ndarray, camera_intrinsics: Union[List[float], np.ndarray] +) -> np.ndarray: """ Project 3D points to 2D image coordinates using camera intrinsics. @@ -339,75 +148,40 @@ def project_3d_points_to_2d( Nx2 array of 2D image coordinates (u, v) """ if len(points_3d) == 0: - return ( - cp.zeros((0, 2), dtype=cp.int32) - if _is_cu_array(points_3d) - else np.zeros((0, 2), dtype=np.int32) - ) + return np.zeros((0, 2), dtype=np.int32) # Filter out points with zero or negative depth - if _is_cu_array(points_3d) or _is_cu_array(camera_intrinsics): - xp = cp # type: ignore - pts = points_3d if _is_cu_array(points_3d) else xp.asarray(points_3d) - K = camera_intrinsics if _is_cu_array(camera_intrinsics) else camera_intrinsics - return project_3d_points_to_2d_cuda(pts, K) # type: ignore[arg-type] - return project_3d_points_to_2d_cpu(np.asarray(points_3d), np.asarray(camera_intrinsics)) - - -def project_2d_points_to_3d_cuda( - points_2d: "cp.ndarray", - depth_values: "cp.ndarray", - camera_intrinsics: Union[List[float], "cp.ndarray"], -) -> "cp.ndarray": - xp = cp # type: ignore - pts = points_2d.astype(xp.float64, copy=False) - depths = depth_values.astype(xp.float64, copy=False) - valid = depths > 0 - if not bool(xp.any(valid)): - return xp.zeros((0, 3), dtype=xp.float32) - uv = pts[valid] - Z = depths[valid] - if isinstance(camera_intrinsics, list) and len(camera_intrinsics) == 4: - fx, fy, cx, cy = [xp.asarray(v, dtype=xp.float64) for v in camera_intrinsics] - else: - K = camera_intrinsics.astype(xp.float64, copy=False) - fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] - X = (uv[:, 0] - cx) * Z / fx - Y = (uv[:, 1] - cy) * Z / fy - return xp.stack([X, Y, Z], axis=1).astype(xp.float32) + valid_mask = points_3d[:, 2] > 0 + if not np.any(valid_mask): + return np.zeros((0, 2), dtype=np.int32) + valid_points = points_3d[valid_mask] -def project_2d_points_to_3d_cpu( - points_2d: np.ndarray, - depth_values: np.ndarray, - camera_intrinsics: Union[List[float], np.ndarray], -) -> np.ndarray: - pts = np.asarray(points_2d, dtype=np.float64) - depths = np.asarray(depth_values, dtype=np.float64) - valid_mask = depths > 0 - if not np.any(valid_mask): - return np.zeros((0, 3), dtype=np.float32) - valid_points_2d = pts[valid_mask] - valid_depths = depths[valid_mask] + # Extract camera parameters if isinstance(camera_intrinsics, list) and len(camera_intrinsics) == 4: - fx, fy, cx, cy = [float(v) for v in camera_intrinsics] + fx, fy, cx, cy = camera_intrinsics else: - camera_matrix = np.array(camera_intrinsics, dtype=np.float64) + camera_matrix = np.array(camera_intrinsics) fx = camera_matrix[0, 0] fy = camera_matrix[1, 1] cx = camera_matrix[0, 2] cy = camera_matrix[1, 2] - X = (valid_points_2d[:, 0] - cx) * valid_depths / fx - Y = (valid_points_2d[:, 1] - cy) * valid_depths / fy - Z = valid_depths - return np.column_stack([X, Y, Z]).astype(np.float32) + + # Project to image coordinates + u = (valid_points[:, 0] * fx / valid_points[:, 2]) + cx + v = (valid_points[:, 1] * fy / valid_points[:, 2]) + cy + + # Round to integer pixel coordinates + points_2d = np.column_stack([u, v]).astype(np.int32) + + return points_2d def project_2d_points_to_3d( - points_2d: Union[np.ndarray, "cp.ndarray"], - depth_values: Union[np.ndarray, "cp.ndarray"], - camera_intrinsics: Union[List[float], np.ndarray, "cp.ndarray"], -) -> Union[np.ndarray, "cp.ndarray"]: + points_2d: np.ndarray, + depth_values: np.ndarray, + camera_intrinsics: Union[List[float], np.ndarray], +) -> np.ndarray: """ Project 2D image points to 3D coordinates using depth values and camera intrinsics. @@ -420,27 +194,46 @@ def project_2d_points_to_3d( Nx3 array of 3D points (X, Y, Z) """ if len(points_2d) == 0: - return ( - cp.zeros((0, 3), dtype=cp.float32) - if _is_cu_array(points_2d) - else np.zeros((0, 3), dtype=np.float32) - ) + return np.zeros((0, 3), dtype=np.float32) # Ensure depth_values is a numpy array - if _is_cu_array(points_2d) or _is_cu_array(depth_values) or _is_cu_array(camera_intrinsics): - xp = cp # type: ignore - pts = points_2d if _is_cu_array(points_2d) else xp.asarray(points_2d) - depths = depth_values if _is_cu_array(depth_values) else xp.asarray(depth_values) - K = camera_intrinsics if _is_cu_array(camera_intrinsics) else camera_intrinsics - return project_2d_points_to_3d_cuda(pts, depths, K) # type: ignore[arg-type] - return project_2d_points_to_3d_cpu( - np.asarray(points_2d), np.asarray(depth_values), np.asarray(camera_intrinsics) - ) + depth_values = np.asarray(depth_values) + + # Filter out points with zero or negative depth + valid_mask = depth_values > 0 + if not np.any(valid_mask): + return np.zeros((0, 3), dtype=np.float32) + + valid_points_2d = points_2d[valid_mask] + valid_depths = depth_values[valid_mask] + + # Extract camera parameters + if isinstance(camera_intrinsics, list) and len(camera_intrinsics) == 4: + fx, fy, cx, cy = camera_intrinsics + else: + camera_matrix = np.array(camera_intrinsics) + fx = camera_matrix[0, 0] + fy = camera_matrix[1, 1] + cx = camera_matrix[0, 2] + cy = camera_matrix[1, 2] + + # Back-project to 3D coordinates + # X = (u - cx) * Z / fx + # Y = (v - cy) * Z / fy + # Z = depth + X = (valid_points_2d[:, 0] - cx) * valid_depths / fx + Y = (valid_points_2d[:, 1] - cy) * valid_depths / fy + Z = valid_depths + + # Stack into 3D points + points_3d = np.column_stack([X, Y, Z]).astype(np.float32) + + return points_3d def colorize_depth( - depth_img: Union[np.ndarray, "cp.ndarray"], max_depth: float = 5.0, overlay_stats: bool = True -) -> Optional[Union[np.ndarray, "cp.ndarray"]]: + depth_img: np.ndarray, max_depth: float = 5.0, overlay_stats: bool = True +) -> Optional[np.ndarray]: """ Normalize and colorize depth image using COLORMAP_JET with optional statistics overlay. @@ -455,57 +248,57 @@ def colorize_depth( if depth_img is None: return None - was_cu = _is_cu_array(depth_img) - xp = cp if was_cu else np # type: ignore - depth = depth_img if was_cu else np.asarray(depth_img) - - valid_mask = xp.isfinite(depth) & (depth > 0) - depth_norm = xp.zeros_like(depth, dtype=xp.float32) - if bool(valid_mask.any() if not was_cu else xp.any(valid_mask)): - depth_norm = xp.where(valid_mask, xp.clip(depth / max_depth, 0, 1), depth_norm) - - # Use CPU for colormap/text; convert back to GPU if needed - depth_norm_np = _to_numpy(depth_norm) - depth_colored = cv2.applyColorMap((depth_norm_np * 255).astype(np.uint8), cv2.COLORMAP_JET) - depth_rgb_np = cv2.cvtColor(depth_colored, cv2.COLOR_BGR2RGB) - depth_rgb_np = (depth_rgb_np * 0.6).astype(np.uint8) - - if overlay_stats and (np.any(_to_numpy(valid_mask))): - valid_depths = _to_numpy(depth)[_to_numpy(valid_mask)] - min_depth = float(np.min(valid_depths)) - max_depth_actual = float(np.max(valid_depths)) - h, w = depth_rgb_np.shape[:2] + valid_mask = np.isfinite(depth_img) & (depth_img > 0) + depth_norm = np.zeros_like(depth_img) + depth_norm[valid_mask] = np.clip(depth_img[valid_mask] / max_depth, 0, 1) + depth_colored = cv2.applyColorMap((depth_norm * 255).astype(np.uint8), cv2.COLORMAP_JET) + depth_rgb = cv2.cvtColor(depth_colored, cv2.COLOR_BGR2RGB) + + # Make the depth image less bright by scaling down the values + depth_rgb = (depth_rgb * 0.6).astype(np.uint8) + + if overlay_stats and valid_mask.any(): + # Calculate statistics + valid_depths = depth_img[valid_mask] + min_depth = np.min(valid_depths) + max_depth_actual = np.max(valid_depths) + + # Get center depth + h, w = depth_img.shape center_y, center_x = h // 2, w // 2 - center_region = _to_numpy(depth)[ + # Sample a small region around center for robustness + center_region = depth_img[ max(0, center_y - 2) : min(h, center_y + 3), max(0, center_x - 2) : min(w, center_x + 3) ] center_mask = np.isfinite(center_region) & (center_region > 0) if center_mask.any(): - center_depth = float(np.median(center_region[center_mask])) + center_depth = np.median(center_region[center_mask]) else: - depth_np = _to_numpy(depth) - vm_np = _to_numpy(valid_mask) - center_depth = float(depth_np[center_y, center_x]) if vm_np[center_y, center_x] else 0.0 + center_depth = depth_img[center_y, center_x] if valid_mask[center_y, center_x] else 0.0 + # Prepare text overlays font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.6 thickness = 1 line_type = cv2.LINE_AA - text_color = (255, 255, 255) - bg_color = (0, 0, 0) + + # Text properties + text_color = (255, 255, 255) # White + bg_color = (0, 0, 0) # Black background padding = 5 + # Min depth (top-left) min_text = f"Min: {min_depth:.2f}m" (text_w, text_h), _ = cv2.getTextSize(min_text, font, font_scale, thickness) cv2.rectangle( - depth_rgb_np, + depth_rgb, (padding, padding), (padding + text_w + 4, padding + text_h + 6), bg_color, -1, ) cv2.putText( - depth_rgb_np, + depth_rgb, min_text, (padding + 2, padding + text_h + 2), font, @@ -515,17 +308,18 @@ def colorize_depth( line_type, ) + # Max depth (top-right) max_text = f"Max: {max_depth_actual:.2f}m" (text_w, text_h), _ = cv2.getTextSize(max_text, font, font_scale, thickness) cv2.rectangle( - depth_rgb_np, + depth_rgb, (w - padding - text_w - 4, padding), (w - padding, padding + text_h + 6), bg_color, -1, ) cv2.putText( - depth_rgb_np, + depth_rgb, max_text, (w - padding - text_w - 2, padding + text_h + 2), font, @@ -535,36 +329,41 @@ def colorize_depth( line_type, ) + # Center depth (center) if center_depth > 0: center_text = f"{center_depth:.2f}m" (text_w, text_h), _ = cv2.getTextSize(center_text, font, font_scale, thickness) center_text_x = center_x - text_w // 2 center_text_y = center_y + text_h // 2 + + # Draw crosshair cross_size = 10 cross_color = (255, 255, 255) cv2.line( - depth_rgb_np, + depth_rgb, (center_x - cross_size, center_y), (center_x + cross_size, center_y), cross_color, 1, ) cv2.line( - depth_rgb_np, + depth_rgb, (center_x, center_y - cross_size), (center_x, center_y + cross_size), cross_color, 1, ) + + # Draw center depth text with background cv2.rectangle( - depth_rgb_np, + depth_rgb, (center_text_x - 2, center_text_y - text_h - 2), (center_text_x + text_w + 2, center_text_y + 2), bg_color, -1, ) cv2.putText( - depth_rgb_np, + depth_rgb, center_text, (center_text_x, center_text_y), font, @@ -574,11 +373,11 @@ def colorize_depth( line_type, ) - return _to_cupy(depth_rgb_np) if was_cu else depth_rgb_np + return depth_rgb def draw_bounding_box( - image: Union[np.ndarray, "cp.ndarray"], + image: np.ndarray, bbox: List[float], color: Tuple[int, int, int] = (0, 255, 0), thickness: int = 2, @@ -586,7 +385,7 @@ def draw_bounding_box( confidence: Optional[float] = None, object_id: Optional[int] = None, font_scale: float = 0.6, -) -> Union[np.ndarray, "cp.ndarray"]: +) -> np.ndarray: """ Draw a bounding box with optional label on an image. @@ -603,10 +402,10 @@ def draw_bounding_box( Returns: Image with bounding box drawn """ - was_cu = _is_cu_array(image) - img_np = _to_numpy(image) x1, y1, x2, y2 = map(int, bbox) - cv2.rectangle(img_np, (x1, y1), (x2, y2), color, thickness) + + # Draw bounding box + cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness) # Create label text text_parts = [] @@ -623,7 +422,7 @@ def draw_bounding_box( # Draw text background text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, 1)[0] cv2.rectangle( - img_np, + image, (x1, y1 - text_size[1] - 5), (x1 + text_size[0], y1), (0, 0, 0), @@ -632,7 +431,7 @@ def draw_bounding_box( # Draw text cv2.putText( - img_np, + image, text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, @@ -641,17 +440,17 @@ def draw_bounding_box( 1, ) - return _to_cupy(img_np) if was_cu else img_np + return image def draw_segmentation_mask( - image: Union[np.ndarray, "cp.ndarray"], - mask: Union[np.ndarray, "cp.ndarray"], + image: np.ndarray, + mask: np.ndarray, color: Tuple[int, int, int] = (0, 200, 200), alpha: float = 0.5, draw_contours: bool = True, contour_thickness: int = 2, -) -> Union[np.ndarray, "cp.ndarray"]: +) -> np.ndarray: """ Draw segmentation mask overlay on an image. @@ -669,35 +468,39 @@ def draw_segmentation_mask( if mask is None: return image - was_cu = _is_cu_array(image) - img_np = _to_numpy(image) - mask_np = _to_numpy(mask) - try: - mask_np = mask_np.astype(np.uint8) - colored_mask = np.zeros_like(img_np) - colored_mask[mask_np > 0] = color - mask_area = mask_np > 0 - img_np[mask_area] = cv2.addWeighted( - img_np[mask_area], 1 - alpha, colored_mask[mask_area], alpha, 0 + # Ensure mask is uint8 + mask = mask.astype(np.uint8) + + # Create colored mask overlay + colored_mask = np.zeros_like(image) + colored_mask[mask > 0] = color + + # Apply the mask with transparency + mask_area = mask > 0 + image[mask_area] = cv2.addWeighted( + image[mask_area], 1 - alpha, colored_mask[mask_area], alpha, 0 ) + + # Draw mask contours if requested if draw_contours: - contours, _ = cv2.findContours(mask_np, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - cv2.drawContours(img_np, contours, -1, color, contour_thickness) + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + cv2.drawContours(image, contours, -1, color, contour_thickness) + except Exception as e: logger.warning(f"Error drawing segmentation mask: {e}") - return _to_cupy(img_np) if was_cu else img_np + return image def draw_object_detection_visualization( - image: Union[np.ndarray, "cp.ndarray"], + image: np.ndarray, objects: List[ObjectData], draw_masks: bool = False, bbox_color: Tuple[int, int, int] = (0, 255, 0), mask_color: Tuple[int, int, int] = (0, 200, 200), font_scale: float = 0.6, -) -> Union[np.ndarray, "cp.ndarray"]: +) -> np.ndarray: """ Create object detection visualization with bounding boxes and optional masks. @@ -712,8 +515,7 @@ def draw_object_detection_visualization( Returns: Image with detection visualization """ - was_cu = _is_cu_array(image) - viz_image = _to_numpy(image).copy() + viz_image = image.copy() for obj in objects: try: @@ -747,7 +549,7 @@ def draw_object_detection_visualization( except Exception as e: logger.warning(f"Error drawing object visualization: {e}") - return _to_cupy(viz_image) if was_cu else viz_image + return viz_image def detection_results_to_object_data( @@ -833,12 +635,11 @@ def combine_object_data( # Check mask overlap mask2 = obj2.get("segmentation_mask") - m2 = _to_numpy(mask2) if mask2 is not None else None - if m2 is None or np.sum(m2 > 0) == 0: + if mask2 is None or np.sum(mask2 > 0) == 0: combined.append(obj_copy) continue - mask2_area = np.sum(m2 > 0) + mask2_area = np.sum(mask2 > 0) is_duplicate = False for obj1 in list1: @@ -846,8 +647,7 @@ def combine_object_data( if mask1 is None: continue - m1 = _to_numpy(mask1) - intersection = np.sum((m1 > 0) & (m2 > 0)) + intersection = np.sum((mask1 > 0) & (mask2 > 0)) if intersection / mask2_area >= overlap_threshold: is_duplicate = True break diff --git a/pyproject.toml b/pyproject.toml index f6dc613fd2..670f98a0b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -148,8 +148,7 @@ cpu = [ ] cuda = [ - "cupy-cuda12x==13.6.0", - "nvidia-nvimgcodec-cu12[all]", + "pycuda", "onnxruntime-gpu>=1.17.1", # Only versions supporting both cuda11 and cuda12 "ctransformers[cuda]==0.2.27", "mmengine>=0.10.3", diff --git a/tests/test_image_backend_utils.py b/tests/test_image_backend_utils.py deleted file mode 100644 index 3adde5b631..0000000000 --- a/tests/test_image_backend_utils.py +++ /dev/null @@ -1,279 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np -import pytest - -from dimos.msgs.sensor_msgs import Image, ImageFormat -from dimos.perception.common.utils import ( - rectify_image, - project_3d_points_to_2d, - project_2d_points_to_3d, - colorize_depth, - draw_bounding_box, - draw_segmentation_mask, - draw_object_detection_visualization, -) - - -def _has_cupy() -> bool: - try: - import cupy as cp # type: ignore - - try: - ndev = cp.cuda.runtime.getDeviceCount() # type: ignore[attr-defined] - if ndev <= 0: - return False - x = cp.array([1, 2, 3]) - _ = int(x.sum().get()) - return True - except Exception: - return False - except Exception: - return False - - -@pytest.mark.parametrize( - "shape,fmt", [((64, 64, 3), ImageFormat.BGR), ((64, 64), ImageFormat.GRAY)] -) -def test_rectify_image_cpu(shape, fmt): - arr = (np.random.rand(*shape) * (255 if fmt != ImageFormat.GRAY else 65535)).astype( - np.uint8 if fmt != ImageFormat.GRAY else np.uint16 - ) - img = Image(data=arr, format=fmt, frame_id="cam", ts=123.456) - K = np.array( - [[100.0, 0, arr.shape[1] / 2], [0, 100.0, arr.shape[0] / 2], [0, 0, 1]], dtype=np.float64 - ) - D = np.zeros(5, dtype=np.float64) - out = rectify_image(img, K, D) - assert out.shape[:2] == arr.shape[:2] - assert out.format == fmt - assert out.frame_id == "cam" - assert abs(out.ts - 123.456) < 1e-9 - # With zero distortion, pixels should match - np.testing.assert_array_equal(out.data, arr) - - -@pytest.mark.skipif(not _has_cupy(), reason="CuPy/CUDA not available") -@pytest.mark.parametrize( - "shape,fmt", [((32, 32, 3), ImageFormat.BGR), ((32, 32), ImageFormat.GRAY)] -) -def test_rectify_image_gpu_parity(shape, fmt): - import cupy as cp # type: ignore - - arr_np = (np.random.rand(*shape) * (255 if fmt != ImageFormat.GRAY else 65535)).astype( - np.uint8 if fmt != ImageFormat.GRAY else np.uint16 - ) - arr_cu = cp.asarray(arr_np) - img = Image(data=arr_cu, format=fmt, frame_id="cam", ts=1.23) - K = np.array( - [[80.0, 0, arr_np.shape[1] / 2], [0, 80.0, arr_np.shape[0] / 2], [0, 0, 1.0]], - dtype=np.float64, - ) - D = np.zeros(5, dtype=np.float64) - out = rectify_image(img, K, D) - # Zero distortion parity and backend preservation - assert out.format == fmt - assert out.frame_id == "cam" - assert abs(out.ts - 1.23) < 1e-9 - assert out.data.__class__.__module__.startswith("cupy") - np.testing.assert_array_equal(cp.asnumpy(out.data), arr_np) - - -@pytest.mark.skipif(not _has_cupy(), reason="CuPy/CUDA not available") -def test_rectify_image_gpu_nonzero_dist_close(): - import cupy as cp # type: ignore - - H, W = 64, 96 - # Structured pattern to make interpolation deterministic enough - x = np.linspace(0, 255, W, dtype=np.float32) - y = np.linspace(0, 255, H, dtype=np.float32) - xv, yv = np.meshgrid(x, y) - arr_np = np.stack( - [ - xv.astype(np.uint8), - yv.astype(np.uint8), - ((xv + yv) / 2).astype(np.uint8), - ], - axis=2, - ) - img_cpu = Image(data=arr_np, format=ImageFormat.BGR, frame_id="cam", ts=0.5) - img_gpu = Image(data=cp.asarray(arr_np), format=ImageFormat.BGR, frame_id="cam", ts=0.5) - - fx, fy = 120.0, 125.0 - cx, cy = W / 2.0, H / 2.0 - K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1.0]], dtype=np.float64) - D = np.array([0.05, -0.02, 0.001, -0.001, 0.0], dtype=np.float64) - - out_cpu = rectify_image(img_cpu, K, D) - out_gpu = rectify_image(img_gpu, K, D) - # Compare within a small tolerance - # Small numeric differences may remain due to model and casting; keep tight tolerance - np.testing.assert_allclose( - cp.asnumpy(out_gpu.data).astype(np.int16), out_cpu.data.astype(np.int16), atol=4 - ) - - -def test_project_roundtrip_cpu(): - pts3d = np.array([[0.1, 0.2, 1.0], [0.0, 0.0, 2.0], [0.5, -0.3, 3.0]], dtype=np.float32) - fx, fy, cx, cy = 200.0, 220.0, 64.0, 48.0 - K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1.0]], dtype=np.float64) - uv = project_3d_points_to_2d(pts3d, K) - assert uv.shape == (3, 2) - Z = pts3d[:, 2] - pts3d_back = project_2d_points_to_3d(uv.astype(np.float32), Z.astype(np.float32), K) - # Allow small rounding differences due to int rounding in 2D - assert pts3d_back.shape == (3, 3) - assert np.all(pts3d_back[:, 2] > 0) - - -@pytest.mark.skipif(not _has_cupy(), reason="CuPy/CUDA not available") -def test_project_parity_gpu_cpu(): - import cupy as cp # type: ignore - - pts3d_np = np.array([[0.1, 0.2, 1.0], [0.0, 0.0, 2.0], [0.5, -0.3, 3.0]], dtype=np.float32) - fx, fy, cx, cy = 200.0, 220.0, 64.0, 48.0 - K_np = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1.0]], dtype=np.float64) - uv_cpu = project_3d_points_to_2d(pts3d_np, K_np) - uv_gpu = project_3d_points_to_2d(cp.asarray(pts3d_np), cp.asarray(K_np)) - np.testing.assert_array_equal(cp.asnumpy(uv_gpu), uv_cpu) - - Z_np = pts3d_np[:, 2] - pts3d_cpu = project_2d_points_to_3d(uv_cpu.astype(np.float32), Z_np.astype(np.float32), K_np) - pts3d_gpu = project_2d_points_to_3d( - cp.asarray(uv_cpu.astype(np.float32)), cp.asarray(Z_np.astype(np.float32)), cp.asarray(K_np) - ) - assert pts3d_cpu.shape == cp.asnumpy(pts3d_gpu).shape - - -@pytest.mark.skipif(not _has_cupy(), reason="CuPy/CUDA not available") -def test_project_parity_gpu_cpu_random(): - import cupy as cp # type: ignore - - rng = np.random.RandomState(0) - N = 1000 - Z = rng.uniform(0.1, 5.0, size=(N, 1)).astype(np.float32) - XY = rng.uniform(-1.0, 1.0, size=(N, 2)).astype(np.float32) - pts3d_np = np.concatenate([XY, Z], axis=1) - - fx, fy = 300.0, 320.0 - cx, cy = 128.0, 96.0 - K_np = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1.0]], dtype=np.float64) - - uv_cpu = project_3d_points_to_2d(pts3d_np, K_np) - uv_gpu = project_3d_points_to_2d(cp.asarray(pts3d_np), cp.asarray(K_np)) - np.testing.assert_array_equal(cp.asnumpy(uv_gpu), uv_cpu) - - # Roundtrip - Z_flat = pts3d_np[:, 2] - pts3d_cpu = project_2d_points_to_3d(uv_cpu.astype(np.float32), Z_flat.astype(np.float32), K_np) - pts3d_gpu = project_2d_points_to_3d( - cp.asarray(uv_cpu.astype(np.float32)), - cp.asarray(Z_flat.astype(np.float32)), - cp.asarray(K_np), - ) - assert pts3d_cpu.shape == cp.asnumpy(pts3d_gpu).shape - - -def test_colorize_depth_cpu(): - depth = np.zeros((32, 48), dtype=np.float32) - depth[8:16, 12:24] = 1.5 - out = colorize_depth(depth, max_depth=3.0, overlay_stats=False) - assert isinstance(out, np.ndarray) - assert out.shape == (32, 48, 3) - assert out.dtype == np.uint8 - - -@pytest.mark.skipif(not _has_cupy(), reason="CuPy/CUDA not available") -def test_colorize_depth_gpu_parity(): - import cupy as cp # type: ignore - - depth_np = np.zeros((16, 20), dtype=np.float32) - depth_np[4:8, 5:15] = 2.0 - out_cpu = colorize_depth(depth_np, max_depth=4.0, overlay_stats=False) - out_gpu = colorize_depth(cp.asarray(depth_np), max_depth=4.0, overlay_stats=False) - np.testing.assert_array_equal(cp.asnumpy(out_gpu), out_cpu) - - -def test_draw_bounding_box_cpu(): - img = np.zeros((20, 30, 3), dtype=np.uint8) - out = draw_bounding_box(img, [2, 3, 10, 12], color=(255, 0, 0), thickness=1) - assert isinstance(out, np.ndarray) - assert out.shape == img.shape - assert out.dtype == img.dtype - - -@pytest.mark.skipif(not _has_cupy(), reason="CuPy/CUDA not available") -def test_draw_bounding_box_gpu_parity(): - import cupy as cp # type: ignore - - img_np = np.zeros((20, 30, 3), dtype=np.uint8) - out_cpu = draw_bounding_box(img_np.copy(), [2, 3, 10, 12], color=(0, 255, 0), thickness=2) - img_cu = cp.asarray(img_np) - out_gpu = draw_bounding_box(img_cu, [2, 3, 10, 12], color=(0, 255, 0), thickness=2) - np.testing.assert_array_equal(cp.asnumpy(out_gpu), out_cpu) - - -def test_draw_segmentation_mask_cpu(): - img = np.zeros((20, 30, 3), dtype=np.uint8) - mask = np.zeros((20, 30), dtype=np.uint8) - mask[5:10, 8:15] = 1 - out = draw_segmentation_mask(img, mask, color=(0, 200, 200), alpha=0.5) - assert out.shape == img.shape - - -@pytest.mark.skipif(not _has_cupy(), reason="CuPy/CUDA not available") -def test_draw_segmentation_mask_gpu_parity(): - import cupy as cp # type: ignore - - img_np = np.zeros((20, 30, 3), dtype=np.uint8) - mask_np = np.zeros((20, 30), dtype=np.uint8) - mask_np[2:12, 3:20] = 1 - out_cpu = draw_segmentation_mask(img_np.copy(), mask_np, color=(100, 50, 200), alpha=0.4) - out_gpu = draw_segmentation_mask( - cp.asarray(img_np), cp.asarray(mask_np), color=(100, 50, 200), alpha=0.4 - ) - np.testing.assert_array_equal(cp.asnumpy(out_gpu), out_cpu) - - -def test_draw_object_detection_visualization_cpu(): - img = np.zeros((30, 40, 3), dtype=np.uint8) - objects = [ - { - "object_id": 1, - "bbox": [5, 6, 20, 25], - "label": "box", - "confidence": 0.9, - } - ] - out = draw_object_detection_visualization(img, objects) - assert out.shape == img.shape - - -@pytest.mark.skipif(not _has_cupy(), reason="CuPy/CUDA not available") -def test_draw_object_detection_visualization_gpu_parity(): - import cupy as cp # type: ignore - - img_np = np.zeros((30, 40, 3), dtype=np.uint8) - objects = [ - { - "object_id": 1, - "bbox": [5, 6, 20, 25], - "label": "box", - "confidence": 0.9, - } - ] - out_cpu = draw_object_detection_visualization(img_np.copy(), objects) - out_gpu = draw_object_detection_visualization(cp.asarray(img_np), objects) - np.testing.assert_array_equal(cp.asnumpy(out_gpu), out_cpu) diff --git a/tests/test_image_backends.py b/tests/test_image_backends.py deleted file mode 100644 index 59217719dc..0000000000 --- a/tests/test_image_backends.py +++ /dev/null @@ -1,534 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time - -import cv2 -import numpy as np -import pytest - -from dimos.msgs.sensor_msgs.Image import Image, ImageFormat, HAS_CUDA -from dimos.utils.data import get_data - -IMAGE_PATH = get_data("chair-image.png") - - -def _load_chair_image() -> np.ndarray: - img = cv2.imread(IMAGE_PATH, cv2.IMREAD_UNCHANGED) - if img is None: - raise FileNotFoundError(f"unable to load test image at {IMAGE_PATH}") - return img - - -_CHAIR_BGRA = _load_chair_image() - - -def _prepare_image(fmt: ImageFormat, shape=None) -> np.ndarray: - base = _CHAIR_BGRA - if fmt == ImageFormat.BGR: - arr = cv2.cvtColor(base, cv2.COLOR_BGRA2BGR) - elif fmt == ImageFormat.RGB: - arr = cv2.cvtColor(base, cv2.COLOR_BGRA2RGB) - elif fmt == ImageFormat.BGRA: - arr = base.copy() - elif fmt == ImageFormat.GRAY: - arr = cv2.cvtColor(base, cv2.COLOR_BGRA2GRAY) - else: - raise ValueError(f"unsupported image format {fmt}") - - if shape is None: - return arr.copy() - - if len(shape) == 2: - height, width = shape - orig_h, orig_w = arr.shape[:2] - interp = cv2.INTER_AREA if height <= orig_h and width <= orig_w else cv2.INTER_LINEAR - resized = cv2.resize(arr, (width, height), interpolation=interp) - return resized.copy() - - if len(shape) == 3: - height, width, channels = shape - orig_h, orig_w = arr.shape[:2] - interp = cv2.INTER_AREA if height <= orig_h and width <= orig_w else cv2.INTER_LINEAR - resized = cv2.resize(arr, (width, height), interpolation=interp) - if resized.ndim == 2: - resized = np.repeat(resized[:, :, None], channels, axis=2) - elif resized.shape[2] != channels: - if channels == 4 and resized.shape[2] == 3: - alpha = np.full((height, width, 1), 255, dtype=resized.dtype) - resized = np.concatenate([resized, alpha], axis=2) - elif channels == 3 and resized.shape[2] == 4: - resized = resized[:, :, :3] - else: - raise ValueError(f"cannot adjust image to {channels} channels") - return resized.copy() - - raise ValueError("shape must be a tuple of length 2 or 3") - - -@pytest.fixture -def alloc_timer(request): - def _alloc( - arr: np.ndarray, fmt: ImageFormat, *, to_cuda: bool = True, label: str | None = None - ): - tag = label or request.node.name - start = time.perf_counter() - cpu = Image.from_numpy(arr, format=fmt) - cpu_time = time.perf_counter() - start - - gpu = None - gpu_time = None - if to_cuda: - arr_gpu = np.array(arr, copy=True) - start = time.perf_counter() - gpu = Image.from_numpy(arr_gpu, format=fmt, to_cuda=True) - gpu_time = time.perf_counter() - start - - if gpu_time is not None: - print(f"[alloc {tag}] cpu={cpu_time:.6f}s gpu={gpu_time:.6f}s") - else: - print(f"[alloc {tag}] cpu={cpu_time:.6f}s") - return cpu, gpu, cpu_time, gpu_time - - return _alloc - - -@pytest.mark.parametrize( - "shape,fmt", - [ - ((64, 64, 3), ImageFormat.BGR), - ((64, 64, 4), ImageFormat.BGRA), - ((64, 64, 3), ImageFormat.RGB), - ((64, 64), ImageFormat.GRAY), - ], -) -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_color_conversions_parity(shape, fmt, alloc_timer): - arr = _prepare_image(fmt, shape) - # Build CPU and CUDA images with same logical content (timed allocations) - cpu, gpu, _, _ = alloc_timer(arr, fmt) - - # Test to_rgb -> to_bgr parity - cpu_round = cpu.to_rgb().to_bgr().to_opencv() - gpu_round = gpu.to_rgb().to_bgr().to_opencv() - - assert cpu_round.shape == gpu_round.shape - assert cpu_round.dtype == gpu_round.dtype - # Exact match for uint8 color ops - assert np.array_equal(cpu_round, gpu_round) - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_grayscale_parity(alloc_timer): - arr = _prepare_image(ImageFormat.BGR, (48, 32, 3)) - cpu, gpu, _, _ = alloc_timer(arr, ImageFormat.BGR) - - cpu_gray = cpu.to_grayscale().to_opencv() - gpu_gray = gpu.to_grayscale().to_opencv() - - assert cpu_gray.shape == gpu_gray.shape - assert cpu_gray.dtype == gpu_gray.dtype - # Allow tiny rounding differences (<=1 LSB) — visually indistinguishable - diff = np.abs(cpu_gray.astype(np.int16) - gpu_gray.astype(np.int16)) - assert diff.max() <= 1 - - -@pytest.mark.parametrize("fmt", [ImageFormat.BGR, ImageFormat.RGB, ImageFormat.BGRA]) -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_resize_parity(fmt, alloc_timer): - shape = (60, 80, 3) if fmt in (ImageFormat.BGR, ImageFormat.RGB) else (60, 80, 4) - arr = _prepare_image(fmt, shape) - cpu, gpu, _, _ = alloc_timer(arr, fmt) - - new_w, new_h = 37, 53 - cpu_res = cpu.resize(new_w, new_h).to_opencv() - gpu_res = gpu.resize(new_w, new_h).to_opencv() - - assert cpu_res.shape == gpu_res.shape - assert cpu_res.dtype == gpu_res.dtype - # Allow small tolerance due to float interpolation differences - assert np.max(np.abs(cpu_res.astype(np.int16) - gpu_res.astype(np.int16))) <= 1 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_perf_compare_alloc(alloc_timer): - arr = _prepare_image(ImageFormat.BGR, (480, 640, 3)) - alloc_timer(arr, ImageFormat.BGR, label="test_perf_compare_alloc-setup") - - runs = 5 - t0 = time.perf_counter() - for _ in range(runs): - _ = Image.from_numpy(arr, format=ImageFormat.BGR) - cpu_t = (time.perf_counter() - t0) / runs - t0 = time.perf_counter() - for _ in range(runs): - _ = Image.from_numpy(arr, format=ImageFormat.BGR, to_cuda=True) - gpu_t = (time.perf_counter() - t0) / runs - print(f"alloc (avg per call) cpu={cpu_t:.6f}s gpu={gpu_t:.6f}s") - assert cpu_t > 0 and gpu_t > 0 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_sharpness_parity(alloc_timer): - arr = _prepare_image(ImageFormat.BGR, (64, 64, 3)) - cpu, gpu, _, _ = alloc_timer(arr, ImageFormat.BGR) - - s_cpu = cpu.sharpness() - s_gpu = gpu.sharpness() - - # Values should be very close; minor border/rounding differences allowed - assert abs(s_cpu - s_gpu) < 5e-2 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_to_opencv_parity(alloc_timer): - # BGRA should drop alpha and produce BGR - arr = _prepare_image(ImageFormat.BGRA, (32, 32, 4)) - cpu, gpu, _, _ = alloc_timer(arr, ImageFormat.BGRA) - - cpu_bgr = cpu.to_opencv() - gpu_bgr = gpu.to_opencv() - - assert cpu_bgr.shape == (32, 32, 3) - assert gpu_bgr.shape == (32, 32, 3) - assert np.array_equal(cpu_bgr, gpu_bgr) - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_solve_pnp_parity(alloc_timer): - # Synthetic camera and 3D points - K = np.array([[400.0, 0.0, 32.0], [0.0, 400.0, 24.0], [0.0, 0.0, 1.0]], dtype=np.float64) - dist = None - obj = np.array( - [ - [-0.5, -0.5, 0.0], - [0.5, -0.5, 0.0], - [0.5, 0.5, 0.0], - [-0.5, 0.5, 0.0], - [0.0, 0.0, 0.5], - [0.0, 0.0, 1.0], - ], - dtype=np.float32, - ) - - rvec_true = np.zeros((3, 1), dtype=np.float64) - tvec_true = np.array([[0.0], [0.0], [2.0]], dtype=np.float64) - img_pts, _ = cv2.projectPoints(obj, rvec_true, tvec_true, K, dist) - img_pts = img_pts.reshape(-1, 2).astype(np.float32) - - # Build images using deterministic fixture content - base_bgr = _prepare_image(ImageFormat.BGR, (48, 64, 3)) - cpu, gpu, _, _ = alloc_timer(base_bgr, ImageFormat.BGR) - - ok_cpu, r_cpu, t_cpu = cpu.solve_pnp(obj, img_pts, K, dist) - ok_gpu, r_gpu, t_gpu = gpu.solve_pnp(obj, img_pts, K, dist) - - assert ok_cpu and ok_gpu - # Validate reprojection error for CUDA solver - proj_cpu, _ = cv2.projectPoints(obj, r_cpu, t_cpu, K, dist) - proj_gpu, _ = cv2.projectPoints(obj, r_gpu, t_gpu, K, dist) - proj_cpu = proj_cpu.reshape(-1, 2) - proj_gpu = proj_gpu.reshape(-1, 2) - err_gpu = np.linalg.norm(proj_gpu - img_pts, axis=1) - assert err_gpu.mean() < 1e-3 - assert err_gpu.max() < 1e-2 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_perf_compare_grayscale(alloc_timer): - arr = _prepare_image(ImageFormat.BGR, (480, 640, 3)) - cpu, gpu, _, _ = alloc_timer(arr, ImageFormat.BGR, label="test_perf_compare_grayscale-setup") - - runs = 10 - t0 = time.perf_counter() - for _ in range(runs): - _ = cpu.to_grayscale() - cpu_t = (time.perf_counter() - t0) / runs - t0 = time.perf_counter() - for _ in range(runs): - _ = gpu.to_grayscale() - gpu_t = (time.perf_counter() - t0) / runs - print(f"grayscale (avg per call) cpu={cpu_t:.6f}s gpu={gpu_t:.6f}s") - assert cpu_t > 0 and gpu_t > 0 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_perf_compare_resize(alloc_timer): - arr = _prepare_image(ImageFormat.BGR, (480, 640, 3)) - cpu, gpu, _, _ = alloc_timer(arr, ImageFormat.BGR, label="test_perf_compare_resize-setup") - - runs = 5 - t0 = time.perf_counter() - for _ in range(runs): - _ = cpu.resize(320, 240) - cpu_t = (time.perf_counter() - t0) / runs - t0 = time.perf_counter() - for _ in range(runs): - _ = gpu.resize(320, 240) - gpu_t = (time.perf_counter() - t0) / runs - print(f"resize (avg per call) cpu={cpu_t:.6f}s gpu={gpu_t:.6f}s") - assert cpu_t > 0 and gpu_t > 0 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_perf_compare_sharpness(alloc_timer): - arr = _prepare_image(ImageFormat.BGR, (480, 640, 3)) - cpu, gpu, _, _ = alloc_timer(arr, ImageFormat.BGR, label="test_perf_compare_sharpness-setup") - - runs = 3 - t0 = time.perf_counter() - for _ in range(runs): - _ = cpu.sharpness() - cpu_t = (time.perf_counter() - t0) / runs - t0 = time.perf_counter() - for _ in range(runs): - _ = gpu.sharpness() - gpu_t = (time.perf_counter() - t0) / runs - print(f"sharpness (avg per call) cpu={cpu_t:.6f}s gpu={gpu_t:.6f}s") - assert cpu_t > 0 and gpu_t > 0 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_perf_compare_solvepnp(alloc_timer): - K = np.array([[600.0, 0.0, 320.0], [0.0, 600.0, 240.0], [0.0, 0.0, 1.0]], dtype=np.float64) - dist = None - rng = np.random.default_rng(123) - obj = rng.standard_normal((200, 3)).astype(np.float32) - rvec_true = np.array([[0.1], [-0.2], [0.05]]) - tvec_true = np.array([[0.0], [0.0], [3.0]]) - img_pts, _ = cv2.projectPoints(obj, rvec_true, tvec_true, K, dist) - img_pts = img_pts.reshape(-1, 2).astype(np.float32) - base_bgr = _prepare_image(ImageFormat.BGR, (480, 640, 3)) - cpu, gpu, _, _ = alloc_timer( - base_bgr, ImageFormat.BGR, label="test_perf_compare_solvepnp-setup" - ) - - runs = 5 - t0 = time.perf_counter() - for _ in range(runs): - _ = cpu.solve_pnp(obj, img_pts, K, dist) - cpu_t = (time.perf_counter() - t0) / runs - t0 = time.perf_counter() - for _ in range(runs): - _ = gpu.solve_pnp(obj, img_pts, K, dist) - gpu_t = (time.perf_counter() - t0) / runs - print(f"solvePnP (avg per call) cpu={cpu_t:.6f}s gpu={gpu_t:.6f}s") - assert cpu_t > 0 and gpu_t > 0 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_perf_compare_tracker(alloc_timer): - H, W = 240, 320 - img_base = _prepare_image(ImageFormat.BGR, (H, W, 3)) - img1 = img_base.copy() - img2 = img_base.copy() - bbox0 = (80, 60, 40, 30) - x0, y0, w0, h0 = bbox0 - cv2.rectangle(img1, (x0, y0), (x0 + w0, y0 + h0), (255, 255, 255), thickness=-1) - dx, dy = 8, 5 - cv2.rectangle( - img2, - (x0 + dx, y0 + dy), - (x0 + dx + w0, y0 + dy + h0), - (255, 255, 255), - thickness=-1, - ) - cpu1, gpu1, _, _ = alloc_timer(img1, ImageFormat.BGR, label="test_perf_compare_tracker-frame1") - cpu2, gpu2, _, _ = alloc_timer(img2, ImageFormat.BGR, label="test_perf_compare_tracker-frame2") - trk_cpu = cpu1.create_csrt_tracker(bbox0) - trk_gpu = gpu1.create_csrt_tracker(bbox0) - - runs = 10 - t0 = time.perf_counter() - for _ in range(runs): - _ = cpu2.csrt_update(trk_cpu) - cpu_t = (time.perf_counter() - t0) / runs - t0 = time.perf_counter() - for _ in range(runs): - _ = gpu2.csrt_update(trk_gpu) - gpu_t = (time.perf_counter() - t0) / runs - print(f"tracker (avg per call) cpu={cpu_t:.6f}s gpu={gpu_t:.6f}s") - assert cpu_t > 0 and gpu_t > 0 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_csrt_tracker_parity(alloc_timer): - # Check tracker availability - has_csrt = False - if hasattr(cv2, "legacy") and hasattr(cv2.legacy, "TrackerCSRT_create"): - has_csrt = True - elif hasattr(cv2, "TrackerCSRT_create"): - has_csrt = True - if not has_csrt: - pytest.skip("OpenCV CSRT tracker not available") - - H, W = 100, 100 - # Create two frames with a moving rectangle - img_base = _prepare_image(ImageFormat.BGR, (H, W, 3)) - img1 = img_base.copy() - img2 = img_base.copy() - bbox0 = (30, 30, 20, 15) - x0, y0, w0, h0 = bbox0 - # draw rect in img1 - cv2.rectangle(img1, (x0, y0), (x0 + w0, y0 + h0), (255, 255, 255), thickness=-1) - # shift by (dx,dy) - dx, dy = 5, 3 - cv2.rectangle( - img2, - (x0 + dx, y0 + dy), - (x0 + dx + w0, y0 + dy + h0), - (255, 255, 255), - thickness=-1, - ) - - cpu1, gpu1, _, _ = alloc_timer(img1, ImageFormat.BGR, label="test_csrt_tracker_parity-frame1") - cpu2, gpu2, _, _ = alloc_timer(img2, ImageFormat.BGR, label="test_csrt_tracker_parity-frame2") - - trk_cpu = cpu1.create_csrt_tracker(bbox0) - ok_cpu, bbox_cpu = cpu2.csrt_update(trk_cpu) - trk_gpu = gpu1.create_csrt_tracker(bbox0) - ok_gpu, bbox_gpu = gpu2.csrt_update(trk_gpu) - - assert ok_cpu and ok_gpu - # Compare both to ground-truth expected bbox - expected = (x0 + dx, y0 + dy, w0, h0) - err_cpu = sum(abs(a - b) for a, b in zip(bbox_cpu, expected)) - err_gpu = sum(abs(a - b) for a, b in zip(bbox_gpu, expected)) - assert err_cpu <= 8 - assert err_gpu <= 10 # allow some slack for scale/window effects - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_solve_pnp_ransac_with_outliers_and_distortion(alloc_timer): - # Camera with distortion - K = np.array([[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]], dtype=np.float64) - dist = np.array([0.1, -0.05, 0.001, 0.001, 0.0], dtype=np.float64) - rng = np.random.default_rng(202) - obj = rng.uniform(-1.0, 1.0, size=(200, 3)).astype(np.float32) - obj[:, 2] = np.abs(obj[:, 2]) + 2.0 # keep in front of camera - rvec_true = np.array([[0.1], [-0.15], [0.05]], dtype=np.float64) - tvec_true = np.array([[0.2], [-0.1], [3.0]], dtype=np.float64) - img_pts, _ = cv2.projectPoints(obj, rvec_true, tvec_true, K, dist) - img_pts = img_pts.reshape(-1, 2) - # Add outliers - n_out = 20 - idx = rng.choice(len(img_pts), size=n_out, replace=False) - img_pts[idx] += rng.uniform(-50, 50, size=(n_out, 2)) - img_pts = img_pts.astype(np.float32) - - base_bgr = _prepare_image(ImageFormat.BGR, (480, 640, 3)) - _, gpu, _, _ = alloc_timer(base_bgr, ImageFormat.BGR, label="test_solve_pnp_ransac-setup") - - ok_gpu, r_gpu, t_gpu, mask_gpu = gpu.solve_pnp_ransac( - obj, img_pts, K, dist, iterations_count=150, reprojection_error=3.0 - ) - assert ok_gpu - inlier_ratio = mask_gpu.mean() - assert inlier_ratio > 0.7 - # Reprojection error on inliers - in_idx = np.nonzero(mask_gpu)[0] - proj_gpu, _ = cv2.projectPoints(obj[in_idx], r_gpu, t_gpu, K, dist) - proj_gpu = proj_gpu.reshape(-1, 2) - err = np.linalg.norm(proj_gpu - img_pts[in_idx], axis=1) - assert err.mean() < 1.5 - assert err.max() < 4.0 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_solve_pnp_batch_correctness_and_perf(alloc_timer): - # Generate batched problems - B, N = 8, 50 - rng = np.random.default_rng(99) - obj = rng.uniform(-1.0, 1.0, size=(B, N, 3)).astype(np.float32) - obj[:, :, 2] = np.abs(obj[:, :, 2]) + 2.0 - K = np.array([[600.0, 0.0, 320.0], [0.0, 600.0, 240.0], [0.0, 0.0, 1.0]], dtype=np.float64) - r_true = np.zeros((B, 3, 1), dtype=np.float64) - t_true = np.tile(np.array([[0.0], [0.0], [3.0]], dtype=np.float64), (B, 1, 1)) - img = [] - for b in range(B): - ip, _ = cv2.projectPoints(obj[b], r_true[b], t_true[b], K, None) - img.append(ip.reshape(-1, 2)) - img = np.stack(img, axis=0).astype(np.float32) - - base_bgr = _prepare_image(ImageFormat.BGR, (10, 10, 3)) - cpu, gpu, _, _ = alloc_timer(base_bgr, ImageFormat.BGR, label="test_solve_pnp_batch-setup") - - # CPU loop - - t0 = time.perf_counter() - r_list = [] - t_list = [] - for b in range(B): - ok, r, t = cpu.solve_pnp(obj[b], img[b], K, None) - assert ok - r_list.append(r) - t_list.append(t) - cpu_total = time.perf_counter() - t0 - cpu_t = cpu_total / B - - # CUDA batched - t0 = time.perf_counter() - r_b, t_b = gpu.solve_pnp_batch(obj, img, K) - gpu_total = time.perf_counter() - t0 - gpu_t = gpu_total / B - print(f"solvePnP-batch (avg per pose) cpu={cpu_t:.6f}s gpu={gpu_t:.6f}s (B={B}, N={N})") - - # Check reprojection for a couple of batches - for b in range(min(B, 4)): - proj, _ = cv2.projectPoints(obj[b], r_b[b], t_b[b], K, None) - err = np.linalg.norm(proj.reshape(-1, 2) - img[b], axis=1) - assert err.mean() < 1e-2 - assert err.max() < 1e-1 - - -def test_nvimgcodec_flag_and_fallback(monkeypatch): - # Force nvimgcodec flag on, then reload Image and ensure fallback works - monkeypatch.setenv("USE_NVIMGCODEC", "1") - import importlib as _importlib - - ImageMod = _importlib.import_module("dimos.msgs.sensor_msgs.Image") - _importlib.reload(ImageMod) - # Even if nvimgcodec missing, to_base64 should work (fallback) - arr = _prepare_image(ImageFormat.BGR, (32, 32, 3)) - img = ImageMod.Image.from_numpy( - arr, format=ImageMod.ImageFormat.BGR, to_cuda=bool(ImageMod.HAS_CUDA) - ) - b64 = img.to_base64() - assert isinstance(b64, str) and len(b64) > 0 - # Turn flag off and reload - monkeypatch.setenv("USE_NVIMGCODEC", "0") - _importlib.reload(ImageMod) - img2 = ImageMod.Image.from_numpy(arr, format=ImageMod.ImageFormat.BGR) - b64_2 = img2.to_base64() - assert isinstance(b64_2, str) and len(b64_2) > 0 - - -@pytest.mark.skipif(not HAS_CUDA, reason="CuPy/CUDA not available") -def test_nvimgcodec_gpu_path(monkeypatch): - # Enable flag and reload; skip if nvimgcodec not present - monkeypatch.setenv("USE_NVIMGCODEC", "1") - import importlib as _importlib - - ImageMod = _importlib.import_module("dimos.msgs.sensor_msgs.Image") - _importlib.reload(ImageMod) - if not ImageMod.HAS_NVIMGCODEC: - pytest.skip("nvimgcodec library not available") - # Create a CUDA image and encode - arr = _prepare_image(ImageFormat.BGR, (32, 32, 3)) - img = ImageMod.Image.from_numpy(arr, format=ImageMod.ImageFormat.BGR, to_cuda=True) - b64 = img.to_base64() - assert isinstance(b64, str) and len(b64) > 0 - # Some builds may import nvimgcodec but not support CuPy device buffers; allow skip - if not getattr(ImageMod, "NVIMGCODEC_LAST_USED", False): - pytest.skip("nvimgcodec present but encode fell back to CPU in this environment")