From 9d5d7ac1f04df39982f351f172cfaafb57844a0a Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Tue, 9 Sep 2025 10:42:50 +0300 Subject: [PATCH 1/2] gstreamer camera --- dimos/hardware/README.md | 30 +- dimos/hardware/gstreamer_camera.py | 311 +++++++++++++++ .../hardware/gstreamer_camera_test_script.py | 132 +++++++ dimos/hardware/gstreamer_sender.py | 359 ++++++++++++++++++ 4 files changed, 831 insertions(+), 1 deletion(-) create mode 100644 dimos/hardware/gstreamer_camera.py create mode 100755 dimos/hardware/gstreamer_camera_test_script.py create mode 100755 dimos/hardware/gstreamer_sender.py diff --git a/dimos/hardware/README.md b/dimos/hardware/README.md index 0141ac89e9..fb598e82cf 100644 --- a/dimos/hardware/README.md +++ b/dimos/hardware/README.md @@ -1 +1,29 @@ -# UNDER DEVELOPMENT 🚧🚧🚧 \ No newline at end of file +# Hardware + +## Remote camera stream with timestamps + +### Required Ubuntu packages: + +```bash +sudo apt install gstreamer1.0-tools gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-libav python3-gi python3-gi-cairo gir1.2-gstreamer-1.0 gir1.2-gst-plugins-base-1.0 v4l-utils gstreamer1.0-vaapi +``` + +### Usage + +On sender machine (with the camera): + +```bash +python3 dimos/hardware/gstreamer_sender.py --device /dev/video0 --host 0.0.0.0 --port 5000 +``` + +If it's a stereo camera and you only want to send the left side (the left camera): + +```bash +python3 dimos/hardware/gstreamer_sender.py --device /dev/video0 --host 0.0.0.0 --port 5000 --single-camera +``` + +On receiver machine: + +```bash +python3 dimos/hardware/gstreamer_camera_test_script.py --host 10.0.0.227 --port 5000 +``` \ No newline at end of file diff --git a/dimos/hardware/gstreamer_camera.py b/dimos/hardware/gstreamer_camera.py new file mode 100644 index 0000000000..64360bd6f4 --- /dev/null +++ b/dimos/hardware/gstreamer_camera.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys +import threading +import time + +import numpy as np + +from dimos.core import Module, Out, rpc +from dimos.msgs.sensor_msgs import Image, ImageFormat +from dimos.utils.logging_config import setup_logger + +# Add system path for gi module if needed +if "/usr/lib/python3/dist-packages" not in sys.path: + sys.path.insert(0, "/usr/lib/python3/dist-packages") + +import gi + +gi.require_version("Gst", "1.0") +gi.require_version("GstApp", "1.0") +from gi.repository import Gst, GLib + +logger = setup_logger("dimos.hardware.gstreamer_camera", level=logging.INFO) + +Gst.init(None) + + +class GstreamerCameraModule(Module): + """Module that captures frames from a remote camera using GStreamer TCP with absolute timestamps. + """ + + video: Out[Image] = None + + def __init__( + self, + host: str = "localhost", + port: int = 5000, + frame_id: str = "camera", + timestamp_offset: float = 0.0, + reconnect_interval: float = 5.0, + *args, + **kwargs, + ): + """Initialize the GStreamer TCP camera module. + + Args: + host: TCP server host to connect to + port: TCP server port + frame_id: Frame ID for the published images + timestamp_offset: Offset to add to timestamps (useful for clock synchronization) + reconnect_interval: Seconds to wait before attempting reconnection + """ + self.host = host + self.port = port + self.frame_id = frame_id + self.timestamp_offset = timestamp_offset + self.reconnect_interval = reconnect_interval + + self.pipeline = None + self.appsink = None + self.main_loop = None + self.main_loop_thread = None + self.running = False + self.should_reconnect = False + self.frame_count = 0 + self.last_log_time = time.time() + self.reconnect_timer_id = None + + Module.__init__(self, *args, **kwargs) + + @rpc + def start(self): + if self.running: + logger.warning("GStreamer camera module is already running") + return + + self.should_reconnect = True + self._connect() + + def _connect(self): + if not self.should_reconnect: + return + + try: + self._create_pipeline() + self._start_pipeline() + self.running = True + logger.info(f"GStreamer TCP camera module connected to {self.host}:{self.port}") + except Exception as e: + logger.error(f"Failed to connect to {self.host}:{self.port}: {e}") + self._schedule_reconnect() + + @rpc + def stop(self): + self.should_reconnect = False + self._cleanup_reconnect_timer() + + if not self.running: + return + + self.running = False + + if self.pipeline: + self.pipeline.set_state(Gst.State.NULL) + + if self.main_loop: + self.main_loop.quit() + + # Only join the thread if we're not calling from within it + if self.main_loop_thread and self.main_loop_thread != threading.current_thread(): + self.main_loop_thread.join(timeout=2.0) + + logger.info("GStreamer camera module stopped") + + def _cleanup_reconnect_timer(self): + if self.reconnect_timer_id: + GLib.source_remove(self.reconnect_timer_id) + self.reconnect_timer_id = None + + def _schedule_reconnect(self): + if not self.should_reconnect: + return + + self._cleanup_reconnect_timer() + logger.info(f"Scheduling reconnect in {self.reconnect_interval} seconds...") + self.reconnect_timer_id = GLib.timeout_add_seconds( + int(self.reconnect_interval), self._reconnect_timeout + ) + + def _reconnect_timeout(self): + self.reconnect_timer_id = None + if self.should_reconnect: + logger.info("Attempting to reconnect...") + self._connect() + return False # Don't repeat the timeout + + def _handle_disconnect(self): + if not self.should_reconnect: + return + + self.running = False + + if self.pipeline: + self.pipeline.set_state(Gst.State.NULL) + self.pipeline = None + + self.appsink = None + + logger.warning(f"Disconnected from {self.host}:{self.port}") + self._schedule_reconnect() + + def _create_pipeline(self): + # TCP client source with Matroska demuxer to extract absolute timestamps + pipeline_str = f""" + tcpclientsrc host={self.host} port={self.port} ! + matroskademux name=demux ! + h264parse ! + avdec_h264 ! + videoconvert ! + video/x-raw,format=BGR ! + appsink name=sink emit-signals=true sync=false max-buffers=1 drop=true + """ + + try: + self.pipeline = Gst.parse_launch(pipeline_str) + self.appsink = self.pipeline.get_by_name("sink") + self.appsink.connect("new-sample", self._on_new_sample) + except Exception as e: + logger.error(f"Failed to create GStreamer pipeline: {e}") + raise + + def _start_pipeline(self): + """Start the GStreamer pipeline and main loop.""" + self.main_loop = GLib.MainLoop() + + # Start the pipeline + ret = self.pipeline.set_state(Gst.State.PLAYING) + if ret == Gst.StateChangeReturn.FAILURE: + logger.error("Unable to set the pipeline to playing state") + raise RuntimeError("Failed to start GStreamer pipeline") + + # Run the main loop in a separate thread + self.main_loop_thread = threading.Thread(target=self._run_main_loop) + self.main_loop_thread.daemon = True + self.main_loop_thread.start() + + # Set up bus message handling + bus = self.pipeline.get_bus() + bus.add_signal_watch() + bus.connect("message", self._on_bus_message) + + def _run_main_loop(self): + try: + self.main_loop.run() + except Exception as e: + logger.error(f"Main loop error: {e}") + + def _on_bus_message(self, bus, message): + t = message.type + + if t == Gst.MessageType.EOS: + logger.info("End of stream - server disconnected") + self._handle_disconnect() + elif t == Gst.MessageType.ERROR: + err, debug = message.parse_error() + logger.error(f"GStreamer error: {err}, {debug}") + self._handle_disconnect() + elif t == Gst.MessageType.WARNING: + warn, debug = message.parse_warning() + logger.warning(f"GStreamer warning: {warn}, {debug}") + elif t == Gst.MessageType.STATE_CHANGED: + if message.src == self.pipeline: + old_state, new_state, pending_state = message.parse_state_changed() + if new_state == Gst.State.PLAYING: + logger.info("Pipeline is now playing - connected to TCP server") + + def _on_new_sample(self, appsink): + """Handle new video samples from the appsink.""" + sample = appsink.emit("pull-sample") + if sample is None: + return Gst.FlowReturn.OK + + buffer = sample.get_buffer() + caps = sample.get_caps() + + # Extract video format information + struct = caps.get_structure(0) + width = struct.get_value("width") + height = struct.get_value("height") + + # Get the absolute timestamp from the buffer + # Matroska preserves the absolute timestamps we set in the sender + if buffer.pts != Gst.CLOCK_TIME_NONE: + # Convert nanoseconds to seconds and add offset + # This is the absolute time from when the frame was captured + timestamp = (buffer.pts / 1e9) + self.timestamp_offset + + # Skip frames with invalid timestamps (before year 2000) + # This filters out initial gray frames with relative timestamps + year_2000_timestamp = 946684800.0 # January 1, 2000 00:00:00 UTC + if timestamp < year_2000_timestamp: + logger.debug(f"Skipping frame with invalid timestamp: {timestamp:.6f}") + return Gst.FlowReturn.OK + + else: + return Gst.FlowReturn.OK + + # Map the buffer to access the data + success, map_info = buffer.map(Gst.MapFlags.READ) + if not success: + logger.error("Failed to map buffer") + return Gst.FlowReturn.ERROR + + try: + # Convert buffer data to numpy array + # The videoconvert element outputs BGR format + data = np.frombuffer(map_info.data, dtype=np.uint8) + + # Reshape to image dimensions + # For BGR format, we have 3 channels + image_array = data.reshape((height, width, 3)) + + # Create an Image message with the absolute timestamp + image_msg = Image( + data=image_array.copy(), # Make a copy to ensure data persistence + format=ImageFormat.BGR, + frame_id=self.frame_id, + ts=timestamp, + ) + + # Publish the image + if self.video and self.running: + self.video.publish(image_msg) + + # Log statistics periodically + self.frame_count += 1 + current_time = time.time() + if current_time - self.last_log_time >= 5.0: + fps = self.frame_count / (current_time - self.last_log_time) + logger.debug( + f"Receiving frames - FPS: {fps:.1f}, Resolution: {width}x{height}, " + f"Absolute timestamp: {timestamp:.6f}" + ) + self.frame_count = 0 + self.last_log_time = current_time + + except Exception as e: + logger.error(f"Error processing frame: {e}") + + finally: + buffer.unmap(map_info) + + return Gst.FlowReturn.OK + + def __del__(self): + self.stop() diff --git a/dimos/hardware/gstreamer_camera_test_script.py b/dimos/hardware/gstreamer_camera_test_script.py new file mode 100755 index 0000000000..fd0e154904 --- /dev/null +++ b/dimos/hardware/gstreamer_camera_test_script.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import time + +from dimos.hardware.gstreamer_camera import GstreamerCameraModule +from dimos import core +from dimos.protocol import pubsub +from dimos.msgs.sensor_msgs import Image + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def main(): + parser = argparse.ArgumentParser(description="Test script for GStreamer TCP camera module") + + # Network options + parser.add_argument( + "--host", default="localhost", help="TCP server host to connect to (default: localhost)" + ) + parser.add_argument("--port", type=int, default=5000, help="TCP server port (default: 5000)") + + # Camera options + parser.add_argument( + "--frame-id", + default="zed_camera", + help="Frame ID for published images (default: zed_camera)", + ) + parser.add_argument( + "--reconnect-interval", + type=float, + default=5.0, + help="Seconds to wait before attempting reconnection (default: 5.0)", + ) + + # Logging options + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Initialize LCM + pubsub.lcm.autoconf() + + # Start dimos + logger.info("Starting dimos...") + dimos = core.start(8) + + # Deploy the GStreamer camera module + logger.info(f"Deploying GStreamer TCP camera module (connecting to {args.host}:{args.port})...") + camera = dimos.deploy( + GstreamerCameraModule, + host=args.host, + port=args.port, + frame_id=args.frame_id, + reconnect_interval=args.reconnect_interval, + ) + + # Set up LCM transport for the video output + camera.video.transport = core.LCMTransport("/zed/video", Image) + + # Counter for received frames + frame_count = [0] + last_log_time = [time.time()] + first_timestamp = [None] + + def on_frame(msg): + frame_count[0] += 1 + current_time = time.time() + + # Capture first timestamp to show absolute timestamps are preserved + if first_timestamp[0] is None: + first_timestamp[0] = msg.ts + logger.info(f"First frame absolute timestamp: {msg.ts:.6f}") + + # Log stats every 2 seconds + if current_time - last_log_time[0] >= 2.0: + fps = frame_count[0] / (current_time - last_log_time[0]) + timestamp_delta = msg.ts - first_timestamp[0] + logger.info( + f"Received {frame_count[0]} frames - FPS: {fps:.1f} - " + f"Resolution: {msg.width}x{msg.height} - " + f"Timestamp: {msg.ts:.3f} (delta: {timestamp_delta:.3f}s)" + ) + frame_count[0] = 0 + last_log_time[0] = current_time + + # Subscribe to video output for monitoring + camera.video.subscribe(on_frame) + + # Start the camera + logger.info("Starting GStreamer camera...") + camera.start() + + logger.info("GStreamer TCP camera module is running. Press Ctrl+C to stop.") + logger.info(f"Connecting to TCP server at {args.host}:{args.port}") + logger.info("Publishing frames to LCM topic: /zed/video") + logger.info("") + logger.info("To start the sender on the camera machine, run:") + logger.info( + f" python3 dimos/hardware/gstreamer_sender.py --device /dev/video0 --host 0.0.0.0 --port {args.port}" + ) + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Shutting down...") + camera.stop() + logger.info("Stopped.") + + +if __name__ == "__main__": + main() diff --git a/dimos/hardware/gstreamer_sender.py b/dimos/hardware/gstreamer_sender.py new file mode 100755 index 0000000000..5b526609e1 --- /dev/null +++ b/dimos/hardware/gstreamer_sender.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import signal +import sys +import time + +# Add system path for gi module if needed +if "/usr/lib/python3/dist-packages" not in sys.path: + sys.path.insert(0, "/usr/lib/python3/dist-packages") + +import gi + +gi.require_version("Gst", "1.0") +gi.require_version("GstVideo", "1.0") +from gi.repository import GLib, Gst + +# Initialize GStreamer +Gst.init(None) + +# Setup logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("gstreamer_tcp_sender") + + +class GStreamerTCPSender: + def __init__( + self, + device: str = "/dev/video0", + width: int = 2560, + height: int = 720, + framerate: int = 60, + format_str: str = "YUY2", + bitrate: int = 5000, + host: str = "0.0.0.0", + port: int = 5000, + single_camera: bool = False, + ): + """Initialize the GStreamer TCP sender. + + Args: + device: Video device path + width: Video width in pixels + height: Video height in pixels + framerate: Frame rate in fps + format_str: Video format + bitrate: H264 encoding bitrate in kbps + host: Host to listen on (0.0.0.0 for all interfaces) + port: TCP port for listening + single_camera: If True, crop to left half (for stereo cameras) + """ + self.device = device + self.width = width + self.height = height + self.framerate = framerate + self.format = format_str + self.bitrate = bitrate + self.host = host + self.port = port + self.single_camera = single_camera + + self.pipeline = None + self.videosrc = None + self.encoder = None + self.mux = None + self.main_loop = None + self.running = False + self.start_time = None + self.frame_count = 0 + + def create_pipeline(self): + """Create the GStreamer pipeline with TCP server sink.""" + + # Create pipeline + self.pipeline = Gst.Pipeline.new("tcp-sender-pipeline") + + # Create elements + self.videosrc = Gst.ElementFactory.make("v4l2src", "source") + self.videosrc.set_property("device", self.device) + self.videosrc.set_property("do-timestamp", True) + logger.info(f"Using camera device: {self.device}") + + # Create caps filter for video format + capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter") + caps = Gst.Caps.from_string( + f"video/x-raw,width={self.width},height={self.height}," + f"format={self.format},framerate={self.framerate}/1" + ) + capsfilter.set_property("caps", caps) + + # Video converter + videoconvert = Gst.ElementFactory.make("videoconvert", "convert") + + # Crop element for single camera mode + videocrop = None + if self.single_camera: + videocrop = Gst.ElementFactory.make("videocrop", "crop") + # Crop to left half: for 2560x720 stereo, get left 1280x720 + videocrop.set_property("left", 0) + videocrop.set_property("right", self.width // 2) # Remove right half + videocrop.set_property("top", 0) + videocrop.set_property("bottom", 0) + + # H264 encoder + self.encoder = Gst.ElementFactory.make("x264enc", "encoder") + self.encoder.set_property("tune", "zerolatency") + self.encoder.set_property("bitrate", self.bitrate) + self.encoder.set_property("key-int-max", 30) + + # H264 parser + h264parse = Gst.ElementFactory.make("h264parse", "parser") + + # Use matroskamux which preserves timestamps better + self.mux = Gst.ElementFactory.make("matroskamux", "mux") + self.mux.set_property("streamable", True) + self.mux.set_property("writing-app", "gstreamer-tcp-sender") + + # TCP server sink + tcpserversink = Gst.ElementFactory.make("tcpserversink", "sink") + tcpserversink.set_property("host", self.host) + tcpserversink.set_property("port", self.port) + tcpserversink.set_property("sync", False) + + # Add elements to pipeline + self.pipeline.add(self.videosrc) + self.pipeline.add(capsfilter) + self.pipeline.add(videoconvert) + if videocrop: + self.pipeline.add(videocrop) + self.pipeline.add(self.encoder) + self.pipeline.add(h264parse) + self.pipeline.add(self.mux) + self.pipeline.add(tcpserversink) + + # Link elements + if not self.videosrc.link(capsfilter): + raise RuntimeError("Failed to link source to capsfilter") + if not capsfilter.link(videoconvert): + raise RuntimeError("Failed to link capsfilter to videoconvert") + + # Link through crop if in single camera mode + if videocrop: + if not videoconvert.link(videocrop): + raise RuntimeError("Failed to link videoconvert to videocrop") + if not videocrop.link(self.encoder): + raise RuntimeError("Failed to link videocrop to encoder") + else: + if not videoconvert.link(self.encoder): + raise RuntimeError("Failed to link videoconvert to encoder") + + if not self.encoder.link(h264parse): + raise RuntimeError("Failed to link encoder to h264parse") + if not h264parse.link(self.mux): + raise RuntimeError("Failed to link h264parse to mux") + if not self.mux.link(tcpserversink): + raise RuntimeError("Failed to link mux to tcpserversink") + + # Add probe to inject absolute timestamps + # Place probe after crop (if present) or after videoconvert + if videocrop: + probe_element = videocrop + else: + probe_element = videoconvert + probe_pad = probe_element.get_static_pad("src") + probe_pad.add_probe(Gst.PadProbeType.BUFFER, self._inject_absolute_timestamp, None) + + # Set up bus message handling + bus = self.pipeline.get_bus() + bus.add_signal_watch() + bus.connect("message", self._on_bus_message) + + def _inject_absolute_timestamp(self, pad, info, user_data): + buffer = info.get_buffer() + if buffer: + absolute_time = time.time() + absolute_time_ns = int(absolute_time * 1e9) + + # Set both PTS and DTS to the absolute time + # This will be preserved by matroskamux + buffer.pts = absolute_time_ns + buffer.dts = absolute_time_ns + + self.frame_count += 1 + return Gst.PadProbeReturn.OK + + def _on_bus_message(self, bus, message): + t = message.type + + if t == Gst.MessageType.EOS: + logger.info("End of stream") + self.stop() + elif t == Gst.MessageType.ERROR: + err, debug = message.parse_error() + logger.error(f"Pipeline error: {err}, {debug}") + self.stop() + elif t == Gst.MessageType.WARNING: + warn, debug = message.parse_warning() + logger.warning(f"Pipeline warning: {warn}, {debug}") + elif t == Gst.MessageType.STATE_CHANGED: + if message.src == self.pipeline: + old_state, new_state, pending_state = message.parse_state_changed() + logger.debug( + f"Pipeline state changed: {old_state.value_nick} -> {new_state.value_nick}" + ) + + def start(self): + if self.running: + logger.warning("Sender is already running") + return + + logger.info("Creating TCP pipeline with absolute timestamps...") + self.create_pipeline() + + logger.info("Starting pipeline...") + ret = self.pipeline.set_state(Gst.State.PLAYING) + if ret == Gst.StateChangeReturn.FAILURE: + logger.error("Failed to start pipeline") + raise RuntimeError("Failed to start GStreamer pipeline") + + self.running = True + self.start_time = time.time() + self.frame_count = 0 + + logger.info("TCP video sender started:") + logger.info(f" Source: {self.device}") + if self.single_camera: + output_width = self.width // 2 + logger.info(f" Input Resolution: {self.width}x{self.height} @ {self.framerate}fps") + logger.info( + f" Output Resolution: {output_width}x{self.height} @ {self.framerate}fps (left camera only)" + ) + else: + logger.info(f" Resolution: {self.width}x{self.height} @ {self.framerate}fps") + logger.info(f" Bitrate: {self.bitrate} kbps") + logger.info(f" TCP Server: {self.host}:{self.port}") + logger.info(" Container: Matroska (preserves absolute timestamps)") + logger.info(" Waiting for client connections...") + + self.main_loop = GLib.MainLoop() + try: + self.main_loop.run() + except KeyboardInterrupt: + logger.info("Interrupted by user") + finally: + self.stop() + + def stop(self): + if not self.running: + return + + self.running = False + + if self.pipeline: + logger.info("Stopping pipeline...") + self.pipeline.set_state(Gst.State.NULL) + + if self.main_loop and self.main_loop.is_running(): + self.main_loop.quit() + + if self.frame_count > 0 and self.start_time: + elapsed = time.time() - self.start_time + avg_fps = self.frame_count / elapsed + logger.info(f"Total frames sent: {self.frame_count}, Average FPS: {avg_fps:.1f}") + + logger.info("TCP video sender stopped") + + +def main(): + parser = argparse.ArgumentParser( + description="GStreamer TCP video sender with absolute timestamps" + ) + + # Video source options + parser.add_argument( + "--device", default="/dev/video0", help="Video device path (default: /dev/video0)" + ) + + # Video format options + parser.add_argument("--width", type=int, default=2560, help="Video width (default: 2560)") + parser.add_argument("--height", type=int, default=720, help="Video height (default: 720)") + parser.add_argument("--framerate", type=int, default=15, help="Frame rate in fps (default: 15)") + parser.add_argument("--format", default="YUY2", help="Video format (default: YUY2)") + + # Encoding options + parser.add_argument( + "--bitrate", type=int, default=5000, help="H264 bitrate in kbps (default: 5000)" + ) + + # Network options + parser.add_argument( + "--host", + default="0.0.0.0", + help="Host to listen on (default: 0.0.0.0 for all interfaces)", + ) + parser.add_argument("--port", type=int, default=5000, help="TCP port (default: 5000)") + + # Camera options + parser.add_argument( + "--single-camera", + action="store_true", + help="Extract left camera only from stereo feed (crops 2560x720 to 1280x720)", + ) + + # Logging options + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Create and start sender + sender = GStreamerTCPSender( + device=args.device, + width=args.width, + height=args.height, + framerate=args.framerate, + format_str=args.format, + bitrate=args.bitrate, + host=args.host, + port=args.port, + single_camera=args.single_camera, + ) + + # Handle signals gracefully + def signal_handler(sig, frame): + logger.info(f"Received signal {sig}, shutting down...") + sender.stop() + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + sender.start() + except Exception as e: + logger.error(f"Failed to start sender: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() From e5b29a12ee27c2f3a0ca319460477b80305b6aff Mon Sep 17 00:00:00 2001 From: paul-nechifor <1262969+paul-nechifor@users.noreply.github.com> Date: Thu, 11 Sep 2025 00:54:39 +0000 Subject: [PATCH 2/2] CI code cleanup --- dimos/hardware/gstreamer_camera.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dimos/hardware/gstreamer_camera.py b/dimos/hardware/gstreamer_camera.py index 64360bd6f4..b4d378ba29 100644 --- a/dimos/hardware/gstreamer_camera.py +++ b/dimos/hardware/gstreamer_camera.py @@ -41,8 +41,7 @@ class GstreamerCameraModule(Module): - """Module that captures frames from a remote camera using GStreamer TCP with absolute timestamps. - """ + """Module that captures frames from a remote camera using GStreamer TCP with absolute timestamps.""" video: Out[Image] = None