Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions src/deepstream/pipelines/deepstream_pipeline_cpu_gpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import os
from datetime import datetime
from typing import Any, Dict, List
import gi
import numpy as np
import cv2
import pyds

gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst

from src.frame_comparison.frame_change_detector import FrameChangeDetector
from src.model_conversion.onnx_to_trt import build_engine_if_missing

from src.deepstream.helpers.load_class_labels import load_class_labels
from src.deepstream.helpers.meta_tensor_extractor import TensorExtractor
from src.deepstream.helpers.softmax_topk_classifier import ClassificationPrediction, SoftmaxTopKClassifier
from src.deepstream.helpers.plant_msg_meta_builder import NvdsPlantEventBuilder, PlantEvent
from src.deepstream.helpers.pipeline_runner import run_pipeline

from src.deepstream.probes.background_removal.cpu.background_removal_probe import remove_background_probe
from src.deepstream.probes.db_message_meta_probe import DbMessageMetaProbe
from src.deepstream.probes.frame_comparison.gpu.frame_skipping_probe import frame_skipping_probe, GPUFrameChangeDetector

# Configuration
RTSP_PORT = os.environ.get("RTSP_PORT", "8554")
RTSP_URL = f"rtsp://127.0.0.1:{RTSP_PORT}/test"

CONFIG_FILE: str = "/workspace/configs/resnet18.txt"
MSGCONV_CONFIG: str = "/workspace/configs/nvmsgbroker_msgconv_config.txt"
MQTT_CONN_STR = "172.17.0.1;1883;agstream-client"
MQTT_TOPIC = "deepstream/predictions"

CLASS_LABELS = load_class_labels()

stats: Dict[str, int] = {"total": 0, "skipped": 0, "processed": 0}

def gpu_frame_skip_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, detector: GPUFrameChangeDetector) -> Gst.PadProbeReturn:
buffer_ptr: int = hash(info.get_buffer())
batch_id: int = 0
should_process: bool = frame_skipping_probe(buffer_ptr, batch_id, detector)
if should_process:
stats["processed"] += 1
print(f"✅ PROCESSING frame {stats['total']}")
return Gst.PadProbeReturn.OK
else:
stats["skipped"] += 1
print(f"⏭️ SKIPPING frame {stats['total']}")
return Gst.PadProbeReturn.DROP

def build_pipeline() -> Gst.Pipeline:
"""Build DeepStream pipeline with background removal, frame skipping, and message broker."""
pipeline = Gst.Pipeline.new("final-cpu-pipeline")

# Elements
rtspsrc = Gst.ElementFactory.make("rtspsrc", "source")
depay = Gst.ElementFactory.make("rtph264depay", "depay")
parse = Gst.ElementFactory.make("h264parse", "parse")
decode = Gst.ElementFactory.make("decodebin", "decode")
convert = Gst.ElementFactory.make("videoconvert", "convert")
nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert")
capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")
streammux = Gst.ElementFactory.make("nvstreammux", "streammux")
nvinfer = Gst.ElementFactory.make("nvinfer", "nvinfer")
nvmsgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsgconv")
nvmsgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsgbroker")

for e in [
rtspsrc,
depay,
parse,
decode,
convert,
nvvideoconvert,
capsfilter,
streammux,
nvinfer,
nvmsgconv,
nvmsgbroker,
]:
assert e is not None, f"Failed to create element {e}"
pipeline.add(e)

# Configure elements
rtspsrc.set_property("location", RTSP_URL)
rtspsrc.set_property("latency", 200)
streammux.set_property("batch-size", 1)
streammux.set_property("width", 256)
streammux.set_property("height", 256)
nvinfer.set_property("config-file-path", CONFIG_FILE)
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
capsfilter.set_property("caps", caps)
nvmsgconv.set_property("config", MSGCONV_CONFIG)
nvmsgconv.set_property("payload-type", 0)
nvmsgbroker.set_property("proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so")
nvmsgbroker.set_property("conn-str", MQTT_CONN_STR)
nvmsgbroker.set_property("topic", MQTT_TOPIC)
nvmsgbroker.set_property("sync", False)

# Dynamic pad linking
def on_pad_added_rtspsrc(src: Any, pad: Any) -> None:
sinkpad = depay.get_static_pad("sink")
if not sinkpad.is_linked():
pad.link(sinkpad)

rtspsrc.connect("pad-added", on_pad_added_rtspsrc)

def on_pad_added_decode(src: Any, pad: Any) -> None:
sinkpad = convert.get_static_pad("sink")
if not sinkpad.is_linked():
pad.link(sinkpad)

decode.connect("pad-added", on_pad_added_decode)

# Link capsfilter → streammux
depay.link(parse)
parse.link(decode)
convert.link(nvvideoconvert)
nvvideoconvert.link(capsfilter)
srcpad = capsfilter.get_static_pad("src")
sinkpad = streammux.get_request_pad("sink_0")
srcpad.link(sinkpad)

streammux.link(nvinfer)
nvinfer.link(nvmsgconv)
nvmsgconv.link(nvmsgbroker)

detector = GPUFrameChangeDetector()

streammux_src_pad: Gst.Pad = streammux.get_static_pad("src")
streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, gpu_frame_skip_probe, detector)

streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, remove_background_probe)

tensor_extractor = TensorExtractor()
classifier = SoftmaxTopKClassifier(CLASS_LABELS)
plant_event_builder = NvdsPlantEventBuilder()
db_message_meta_probe = DbMessageMetaProbe(tensor_extractor, classifier, plant_event_builder)

nvinfer_src_pad = nvinfer.get_static_pad("src")
nvinfer_src_pad.add_probe(Gst.PadProbeType.BUFFER, db_message_meta_probe.pad_probe)

return pipeline

if __name__ == "__main__":
Gst.init(None)
build_engine_if_missing(CONFIG_FILE)
pipeline = build_pipeline()
run_pipeline(pipeline)
Loading