From 27520c2911aa9ac080aa77660eb07090e5892b3c Mon Sep 17 00:00:00 2001 From: Sarah Gershuni Date: Mon, 8 Dec 2025 11:11:30 +0200 Subject: [PATCH] Add hybrid CPU-GPU DeepStream pipeline --- .../pipelines/deepstream_pipeline_cpu_gpu.py | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py diff --git a/src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py b/src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py new file mode 100644 index 0000000..14dded4 --- /dev/null +++ b/src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py @@ -0,0 +1,149 @@ +import os +from datetime import datetime +from typing import Any, Dict, List +import gi +import numpy as np +import cv2 +import pyds + +gi.require_version("Gst", "1.0") +from gi.repository import GLib, Gst + +from src.frame_comparison.frame_change_detector import FrameChangeDetector +from src.model_conversion.onnx_to_trt import build_engine_if_missing + +from src.deepstream.helpers.load_class_labels import load_class_labels +from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor +from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier +from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent +from src.deepstream.helpers.pipeline_runner import run_pipeline + +from src.deepstream.probes.background_removal.cpu.background_removal_probe import remove_background_probe +from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe +from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import frame_skipping_probe, GPUFrameChangeDetector + +# Configuration +RTSP_PORT = os.environ.get("RTSP_PORT", "8554") +RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test" + +CONFIG_FILE: str = "/workspace/configs/resnet18.txt" +MSGCONV_CONFIG: str = "/workspace/configs/nvmsgbroker_msgconv_config.txt" +MQTT_CONN_STR = "172.17.0.1;1883;agstream-client" +MQTT_TOPIC = "deepstream/predictions" + +CLASS_LABELS = load_class_labels() + +stats: Dict[str, int] = {"total": 0, "skipped": 0, "processed": 0} + +def gpu_frame_skip_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, detector: GPUFrameChangeDetector) -> Gst.PadProbeReturn: + buffer_ptr: int = hash(info.get_buffer()) + batch_id: int = 0 + should_process: bool = frame_skipping_probe(buffer_ptr, batch_id, detector) + if should_process: + stats["processed"] += 1 + print(f"✅ PROCESSING frame {stats['total']}") + return Gst.PadProbeReturn.OK + else: + stats["skipped"] += 1 + print(f"⏭️ SKIPPING frame {stats['total']}") + return Gst.PadProbeReturn.DROP + +def build_pipeline() -> Gst.Pipeline: + """Build DeepStream pipeline with background removal, frame skipping, and message broker.""" + pipeline = Gst.Pipeline.new("final-cpu-pipeline") + + # Elements + rtspsrc = Gst.ElementFactory.make("rtspsrc", "source") + depay = Gst.ElementFactory.make("rtph264depay", "depay") + parse = Gst.ElementFactory.make("h264parse", "parse") + decode = Gst.ElementFactory.make("decodebin", "decode") + convert = Gst.ElementFactory.make("videoconvert", "convert") + nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert") + capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter") + streammux = Gst.ElementFactory.make("nvstreammux", "streammux") + nvinfer = Gst.ElementFactory.make("nvinfer", "nvinfer") + nvmsgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsgconv") + nvmsgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsgbroker") + + for e in [ + rtspsrc, + depay, + parse, + decode, + convert, + nvvideoconvert, + capsfilter, + streammux, + nvinfer, + nvmsgconv, + nvmsgbroker, + ]: + assert e is not None, f"Failed to create element {e}" + pipeline.add(e) + + # Configure elements + rtspsrc.set_property("location", RTSP_URL) + rtspsrc.set_property("latency", 200) + streammux.set_property("batch-size", 1) + streammux.set_property("width", 256) + streammux.set_property("height", 256) + nvinfer.set_property("config-file-path", CONFIG_FILE) + caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA") + capsfilter.set_property("caps", caps) + nvmsgconv.set_property("config", MSGCONV_CONFIG) + nvmsgconv.set_property("payload-type", 0) + nvmsgbroker.set_property("proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so") + nvmsgbroker.set_property("conn-str", MQTT_CONN_STR) + nvmsgbroker.set_property("topic", MQTT_TOPIC) + nvmsgbroker.set_property("sync", False) + + # Dynamic pad linking + def on_pad_added_rtspsrc(src: Any, pad: Any) -> None: + sinkpad = depay.get_static_pad("sink") + if not sinkpad.is_linked(): + pad.link(sinkpad) + + rtspsrc.connect("pad-added", on_pad_added_rtspsrc) + + def on_pad_added_decode(src: Any, pad: Any) -> None: + sinkpad = convert.get_static_pad("sink") + if not sinkpad.is_linked(): + pad.link(sinkpad) + + decode.connect("pad-added", on_pad_added_decode) + + # Link capsfilter → streammux + depay.link(parse) + parse.link(decode) + convert.link(nvvideoconvert) + nvvideoconvert.link(capsfilter) + srcpad = capsfilter.get_static_pad("src") + sinkpad = streammux.get_request_pad("sink_0") + srcpad.link(sinkpad) + + streammux.link(nvinfer) + nvinfer.link(nvmsgconv) + nvmsgconv.link(nvmsgbroker) + + detector = GPUFrameChangeDetector() + + streammux_src_pad: Gst.Pad = streammux.get_static_pad("src") + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, gpu_frame_skip_probe, detector) + + streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, remove_background_probe) + + tensor_extractor = TensorExtractor() + classifier = SoftmaxTopKClassifier(CLASS_LABELS) + plant_event_builder = NvdsPlantEventBuilder() + db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder) + + nvinfer_src_pad = nvinfer.get_static_pad("src") + nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.pad_probe) + + return pipeline + +if __name__ == "__main__": + Gst.init(None) + build_engine_if_missing(CONFIG_FILE) + pipeline = build_pipeline() + run_pipeline(pipeline)