Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 35 additions & 30 deletions dimos/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,43 @@
T = TypeVar("T")


class ObservableMixin(Generic[T]):
# subscribes and returns the first value it receives
# might be nicer to write without rxpy but had this snippet ready
def get_next(self, timeout=10.0) -> T:
try:
return (
self.observable()
.pipe(ops.first(), *([ops.timeout(timeout)] if timeout is not None else []))
.run()
)
except Exception as e:
raise Exception(f"No value received after {timeout} seconds") from e

def hot_latest(self) -> Callable[[], T]:
return reactive.getter_streaming(self.observable())

def pure_observable(self):
def _subscribe(observer, scheduler=None):
unsubscribe = self.subscribe(observer.on_next)
return Disposable(unsubscribe)

return rx.create(_subscribe)

# default return is backpressured because most
# use cases will want this by default
def observable(self):
return backpressure(self.pure_observable())


class State(enum.Enum):
UNBOUND = "unbound" # descriptor defined but not bound
READY = "ready" # bound to owner but not yet connected
CONNECTED = "connected" # input bound to an output
FLOWING = "flowing" # runtime: data observed


class Transport(Protocol[T]):
class Transport(ObservableMixin[T]):
# used by local Output
def broadcast(self, selfstream: Out[T], value: T): ...

Expand Down Expand Up @@ -153,10 +182,13 @@ class RemoteOut(RemoteStream[T]):
def connect(self, other: RemoteIn[T]):
return other.connect(self)

def subscribe(self, cb) -> Callable[[], None]:
return self.transport.subscribe(cb, self)


# representation of Input
# as views from inside of the module
class In(Stream[T]):
class In(Stream[T], ObservableMixin[T]):
connection: Optional[RemoteOut[T]] = None
_transport: Transport

Expand All @@ -183,36 +215,9 @@ def transport(self) -> Transport[T]:
def state(self) -> State: # noqa: D401
return State.UNBOUND if self.owner is None else State.READY

# subscribes and returns the first value it receives
# might be nicer to write without rxpy but had this snippet ready
def get_next(self, timeout=10.0) -> T:
try:
return (
self.observable()
.pipe(ops.first(), *([ops.timeout(timeout)] if timeout is not None else []))
.run()
)
except Exception as e:
raise Exception(f"No value received after {timeout} seconds") from e

def hot_latest(self) -> Callable[[], T]:
return reactive.getter_streaming(self.observable())

def pure_observable(self):
def _subscribe(observer, scheduler=None):
unsubscribe = self.subscribe(observer.on_next)
return Disposable(unsubscribe)

return rx.create(_subscribe)

# default return is backpressured because most
# use cases will want this by default
def observable(self):
return backpressure(self.pure_observable())

# returns unsubscribe function
def subscribe(self, cb) -> Callable[[], None]:
return self.transport.subscribe(self, cb)
return self.transport.subscribe(cb, self)


# representation of input outside of module
Expand Down
8 changes: 4 additions & 4 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)

import dimos.core.colors as colors
from dimos.core.stream import In, Transport
from dimos.core.stream import In, RemoteIn, Transport
from dimos.protocol.pubsub.lcmpubsub import LCM, PickleLCM
from dimos.protocol.pubsub.lcmpubsub import Topic as LCMTopic

Expand Down Expand Up @@ -75,7 +75,7 @@ def broadcast(self, _, msg):

self.lcm.publish(self.topic, msg)

def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None:
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None:
if not self._started:
self.lcm.start()
self._started = True
Expand All @@ -99,7 +99,7 @@ def broadcast(self, _, msg):

self.lcm.publish(self.topic, msg)

def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None:
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None:
if not self._started:
self.lcm.start()
self._started = True
Expand Down Expand Up @@ -144,7 +144,7 @@ def dask_register_subscriber(self, remoteInput: RemoteIn[T]) -> None:
self.subscribers.append(remoteInput)

# for inputs
def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None:
def subscribe(self, callback: Callable[[T], None], selfstream: In[T]) -> None:
if not self._started:
selfstream.connection.owner.dask_register_subscriber(
selfstream.connection.name, selfstream
Expand Down
2 changes: 1 addition & 1 deletion dimos/robot/unitree_webrtc/type/lidar.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, **kwargs):
)

self.origin = kwargs.get("origin")
self.resolution = kwargs.get("resolution")
self.resolution = kwargs.get("resolution", 0.05)

@classmethod
def from_msg(cls: "LidarMessage", raw_message: RawLidarMsg) -> "LidarMessage":
Expand Down
1 change: 1 addition & 0 deletions dimos/robot/unitree_webrtc/type/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def add_frame(self, frame: LidarMessage) -> "Map":
"""Voxelise *frame* and splice it into the running map."""
new_pct = frame.pointcloud.voxel_down_sample(voxel_size=self.voxel_size)
self.pointcloud = splice_cylinder(self.pointcloud, new_pct, shrink=0.5)
return self

def consume(self, observable: Observable[LidarMessage]) -> Observable["Map"]:
"""Reactive operator that folds a stream of `LidarMessage` into the map."""
Expand Down
Loading