-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
wontfixThis will not be worked onThis will not be worked on
Description
- Move
frame_skip_probeandshould_skip_frameto a separate module
src/
deepstream/
pipelines/
probes/
compare_frames.py # <- here
- add unit tests for
FrameChangeDetectorto/tests/unit/compare_frames/frame_change_detector.py
an example from chatgpt (I prefer pytests)
import numpy as np
import cv2
import pytest
from frame_comparison.frame_change_detector import FrameChangeDetector
def make_frame(value: int = 128) -> np.ndarray:
return np.full((180, 320, 3), value, np.uint8)
def test_singleton_behavior():
a = FrameChangeDetector()
b = FrameChangeDetector()
assert a is b
def test_mse_and_ssim_values():
det = FrameChangeDetector()
f1, f2 = make_frame(100), make_frame(120)
assert det.mse(f1, f1) == 0
assert det.mse(f1, f2) > 0
ssim_same = det.simple_ssim(f1, f1)
ssim_diff = det.simple_ssim(f1, f2)
assert ssim_same == pytest.approx(1.0, rel=1e-2)
assert ssim_diff < ssim_same
def test_optical_flow_diff_changes():
det = FrameChangeDetector()
f1 = make_frame(120)
f2 = np.roll(f1, 5, axis=1)
flow_static = det.optical_flow_diff(f1, f1)
flow_shift = det.optical_flow_diff(f1, f2)
assert flow_static < flow_shift
def test_should_process_logic():
det = FrameChangeDetector(mse_thresh=1, ssim_thresh=0.99, flow_thresh=0.5)
f1 = make_frame(50)
f2 = make_frame(50)
f3 = make_frame(200)
should, _ = det.should_process(f1)
assert should # first always True
should, _ = det.should_process(f2)
assert not should # identical → static
should, _ = det.should_process(f3)
assert should # changed → dynamic
- add functional tests for
should_skip_frame,frame_skip_probeto/tests/functional/compare_frames/test_frame_skip_probe.py
example from chatgpt:
import pytest
import gi
import os
from gi.repository import Gst, GLib
import pyds
from frame_comparison.frame_skip_probe import frame_skip_probe
# ---------- Setup ----------
@pytest.fixture(scope="module", autouse=True)
def gst_init():
"""Initialize GStreamer once per session."""
Gst.init(None)
def make_pipeline(pattern: str, num_buffers: int = 10) -> Gst.Pipeline:
"""Create a simple DeepStream-like pipeline with appsink and our probe."""
desc = (
f"videotestsrc num-buffers={num_buffers} pattern={pattern} ! "
"video/x-raw,format=RGBA,width=320,height=180 ! "
"nvvideoconvert ! "
"m.sink_0 nvstreammux name=m batch-size=1 width=320 height=180 ! "
"nvvideoconvert ! "
"appsink name=sink emit-signals=true sync=false"
)
pipeline = Gst.parse_launch(desc)
assert pipeline is not None, "Pipeline creation failed"
return pipeline
def attach_probe(pipeline: Gst.Pipeline):
"""Attach the real frame_skip_probe to nvstreammux src pad."""
mux = pipeline.get_by_name("m")
assert mux is not None, "nvstreammux not found"
srcpad = mux.get_static_pad("src")
srcpad.add_probe(Gst.PadProbeType.BUFFER, frame_skip_probe, None)
# ---------- Helpers ----------
def _run_pipeline_until_eos(pipeline: Gst.Pipeline, timeout_sec: float = 5.0) -> int:
"""Run a pipeline until EOS and count emitted samples."""
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
frame_count = 0
def on_sample(sink, data):
nonlocal frame_count
frame_count += 1
return Gst.FlowReturn.OK
sink = pipeline.get_by_name("sink")
sink.connect("new-sample", on_sample, None)
def on_message(bus, msg):
t = msg.type
if t == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
print(f"ERROR: {err}, {dbg}")
loop.quit()
elif t == Gst.MessageType.EOS:
loop.quit()
bus.connect("message", on_message)
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except Exception:
pass
finally:
pipeline.set_state(Gst.State.NULL)
return frame_count
# ---------- Tests ----------
def test_probe_static_pattern_drops_frames():
"""Static 'snow' pattern should result in very few frames after probe."""
pipeline = make_pipeline("snow", num_buffers=8)
attach_probe(pipeline)
out_frames = _run_pipeline_until_eos(pipeline)
# Since most frames are identical, we expect almost all dropped
assert out_frames <= 2, f"Expected most frames dropped, got {out_frames}"
def test_probe_changing_pattern_allows_frames():
"""Dynamic 'ball' pattern should allow most frames through."""
pipeline = make_pipeline("ball", num_buffers=8)
attach_probe(pipeline)
out_frames = _run_pipeline_until_eos(pipeline)
# Expect most frames to be processed (not dropped)
assert out_frames >= 6, f"Expected most frames to pass, got {out_frames}"
def test_pipeline_initializes_and_stops_cleanly():
"""Ensure probe attachment and teardown works without leaks."""
pipeline = make_pipeline("black", num_buffers=1)
attach_probe(pipeline)
frames = _run_pipeline_until_eos(pipeline)
assert frames >= 1
DoD:
- unit and functional tests added
- a command to run tests added (scripts), tests are passing
Metadata
Metadata
Assignees
Labels
wontfixThis will not be worked onThis will not be worked on