Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions dimos/core/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,25 @@ def start(self) -> None: ...

@abstractmethod
def stop(self) -> None: ...

def dispose(self) -> None:
"""
Makes a Resource disposable
So you can do a

from reactivex.disposable import CompositeDisposable

disposables = CompositeDisposable()

transport1 = LCMTransport(...)
transport2 = LCMTransport(...)

disposables.add(transport1)
disposables.add(transport2)

...

disposables.dispose()

"""
self.stop()
36 changes: 34 additions & 2 deletions dimos/perception/detection/detectors/test_bbox_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dimos_lcm.foxglove_msgs.ImageAnnotations import ImageAnnotations
import pytest
from reactivex.disposable import CompositeDisposable

from dimos.core import LCMTransport
from dimos.msgs.sensor_msgs import Image
from dimos.perception.detection.type import Detection2D, ImageDetections2D


Expand All @@ -24,9 +28,37 @@ def detector(request):


@pytest.fixture(scope="session")
def detections(detector, test_image):
def get_topic_annotations():
disposables = CompositeDisposable()

def topic_annotations(suffix: str = "unnamed"):
annotations: LCMTransport[ImageAnnotations] = LCMTransport(
f"/annotations_{suffix}", ImageAnnotations
)
disposables.add(annotations)
return annotations

yield topic_annotations
disposables.dispose()


@pytest.fixture(scope="session")
def detections(detector, test_image, topic_image, get_topic_annotations):
"""Get ImageDetections2D from any detector."""
return detector.process_image(test_image)
topic_image.publish(test_image)
detections = detector.process_image(test_image)
annotations = detections.to_foxglove_annotations()
print("annotations:", annotations)
topic_annotations = get_topic_annotations(detector.__class__.__name__)
topic_annotations.publish(annotations)
return detections


@pytest.fixture(scope="session")
def topic_image():
image: LCMTransport[Image] = LCMTransport("/color_image", Image)
yield image
image.lcm.stop()


def test_detection_basic(detections) -> None:
Expand Down