Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
449a3df
initial unitree split
leshy Jul 3, 2025
83735f0
sensor replay tooling
leshy Jul 3, 2025
eb81650
full foxglove replay works
leshy Jul 4, 2025
1114d93
better LCM system checks, fixes bin/lfs_push
leshy Jul 4, 2025
e95519f
Merge branch 'lcmchecks3' into multi-unitree
leshy Jul 4, 2025
48c2c06
lcm autoconf functionality
leshy Jul 4, 2025
dba47be
we won't execute commands on the system by default
leshy Jul 4, 2025
18ff70f
Merge branch 'lcmchecks3' into multi-unitree
leshy Jul 4, 2025
4ee4b9e
dockerfile bugfix
leshy Jul 4, 2025
a7c3c58
Merge branch 'lcmchecks3' into multi-unitree
leshy Jul 4, 2025
5d9504f
lcm test now autoconfs the system
leshy Jul 4, 2025
bf7c404
planner typefix
leshy Jul 4, 2025
ca59c80
lcmcheck autoconf
leshy Jul 4, 2025
1aea183
Merge branch 'lcmchecks3' into multi-unitree
leshy Jul 4, 2025
fc2acc0
timedstreamreply async fix
leshy Jul 4, 2025
165b1ee
pubsub tests fix
leshy Jul 4, 2025
20e452e
fixing tests
leshy Jul 4, 2025
d6ea66d
transport autoconf
leshy Jul 5, 2025
2fd6a18
Merge branch 'lcmchecks3' into multi-unitree
leshy Jul 5, 2025
fbc4134
bugfixes, transport setting for input
leshy Jul 5, 2025
3869926
work on full unitree build
leshy Jul 5, 2025
6bb7f91
dask issue identified
leshy Jul 8, 2025
b58c1f0
instance+class property rpcs
leshy Jul 8, 2025
a39ca28
lcm service pulled out of pubsub, rpc implementation
leshy Jul 8, 2025
93f9dce
initial generic RPC protocol implementation
leshy Jul 9, 2025
754d423
redis & lcm RPC implementation
leshy Jul 9, 2025
3378e92
module wide rpc autobind & tests
leshy Jul 9, 2025
e9528a6
implementation of async calls
leshy Jul 9, 2025
267e0db
RPC implementation finished
leshy Jul 9, 2025
89c2dbc
tests fixed
leshy Jul 9, 2025
cc98fb0
tests passing
leshy Jul 9, 2025
605c51c
heavy tests should run in CI
leshy Jul 9, 2025
acc3ebb
experiment with separate heavy tests
leshy Jul 9, 2025
83cc470
multiprocess rpc tags
leshy Jul 9, 2025
00e1864
Merge branch 'dev' into multi-unitree
leshy Jul 9, 2025
cbb57b5
lfs tests marked as heavy
leshy Jul 9, 2025
40086f9
pubsubrpc docs
leshy Jul 9, 2025
ae19b9c
RPC Actors work
leshy Jul 10, 2025
1b20b04
working unitree rpc build
leshy Jul 10, 2025
b8363ab
multiprocess works
leshy Jul 10, 2025
ccd6856
less verbose global planner
leshy Jul 10, 2025
f97c341
poseStamped implementation
leshy Jul 10, 2025
4d9b4c5
small cleanup
leshy Jul 10, 2025
74d62ea
module not using RPC if not deployed, actor tests disabled in CI
leshy Jul 10, 2025
6bea2d0
office walk
leshy Jul 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,18 @@ jobs:
}}
cmd: "pytest"
dev-image: dev:${{ needs.dev.result == 'success' && needs.check-changes.outputs.branch-tag || 'dev' }}

# we run in parallel with normal tests for speed
run-heavy-tests:
needs: [check-changes, dev]
if: always()
uses: ./.github/workflows/tests.yml
with:
should-run: ${{
needs.check-changes.result == 'success' &&
((needs.dev.result == 'success') ||
(needs.dev.result == 'skipped' &&
needs.check-changes.outputs.tests == 'true'))
}}
cmd: "pytest -m heavy"
dev-image: dev:${{ needs.dev.result == 'success' && needs.check-changes.outputs.branch-tag || 'dev' }}
2 changes: 1 addition & 1 deletion bin/lfs_push
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ for dir_path in data/*; do
compressed_dirs+=("$dir_name")

# Add the compressed file to git LFS tracking
git add "$compressed_file"
git add -f "$compressed_file"

echo -e " ${GREEN}✓${NC} git-add $compressed_file"

Expand Down
3 changes: 3 additions & 0 deletions data/.lfs/unitree_office_walk.tar.gz
Git LFS file not shown
1 change: 1 addition & 0 deletions dimos/agents/memory/test_image_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dimos.stream.video_provider import VideoProvider


@pytest.mark.heavy
class TestImageEmbedding:
"""Test class for CLIP image embedding functionality."""

Expand Down
99 changes: 80 additions & 19 deletions dimos/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,82 @@
from __future__ import annotations

import multiprocessing as mp
import time
from typing import Optional

import pytest
from dask.distributed import Client, LocalCluster
from rich.console import Console

import dimos.core.colors as colors
from dimos.core.core import In, Out, RemoteOut, rpc
from dimos.core.module_dask import Module
from dimos.core.module import Module, ModuleBase
from dimos.core.transport import LCMTransport, ZenohTransport, pLCMTransport
from dimos.protocol.rpc.lcmrpc import LCMRPC
from dimos.protocol.rpc.spec import RPC


def patch_actor(actor, cls): ...


class RPCClient:
def __init__(self, actor_instance, actor_class):
self.rpc = LCMRPC()
self.actor_class = actor_class
self.remote_name = actor_class.__name__
self.actor_instance = actor_instance
self.rpcs = actor_class.rpcs.keys()
self.rpc.start()

def __reduce__(self):
# Return the class and the arguments needed to reconstruct the object
return (
self.__class__,
(self.actor_instance, self.actor_class),
)

# passthrough
def __getattr__(self, name: str):
# Check if accessing a known safe attribute to avoid recursion
if name in {
"__class__",
"__init__",
"__dict__",
"__getattr__",
"rpcs",
"remote_name",
"remote_instance",
"actor_instance",
}:
raise AttributeError(f"{name} is not found.")

if name in self.rpcs:
return lambda *args: self.rpc.call_sync(f"{self.remote_name}/{name}", args)

# return super().__getattr__(name)
# Try to avoid recursion by directly accessing attributes that are known
return self.actor_instance.__getattr__(name)


def patchdask(dask_client: Client):
def deploy(actor_class, *args, **kwargs):
actor = dask_client.submit(
actor_class,
*args,
**kwargs,
actor=True,
).result()

actor.set_ref(actor).result()
print(colors.green(f"Subsystem deployed: {actor}"))
return actor
def deploy(
actor_class,
*args,
**kwargs,
):
console = Console()
with console.status(f"deploying [green]{actor_class.__name__}", spinner="arc"):
actor = dask_client.submit(
actor_class,
*args,
**kwargs,
actor=True,
).result()

worker = actor.set_ref(actor).result()
print((f"deployed: {colors.green(actor)} @ {colors.blue('worker ' + str(worker))}"))

return RPCClient(actor, actor_class)

dask_client.deploy = deploy
return dask_client
Expand All @@ -34,15 +90,20 @@ def dimos():
stop(client)


def start(n):
def start(n: Optional[int] = None) -> Client:
console = Console()
if not n:
n = mp.cpu_count()
print(colors.green(f"Initializing dimos local cluster with {n} workers"))
cluster = LocalCluster(
n_workers=n,
threads_per_worker=3,
)
client = Client(cluster)
with console.status(
f"[green]Initializing dimos local cluster with [bright_blue]{n} workers", spinner="arc"
) as status:
cluster = LocalCluster(
n_workers=n,
threads_per_worker=4,
)
client = Client(cluster)

console.print(f"[green]Initialized dimos local cluster with [bright_blue]{n} workers")
return patchdask(client)


Expand Down
22 changes: 19 additions & 3 deletions dimos/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class RemoteStream(Stream[T]):
def state(self) -> State: # noqa: D401
return State.UNBOUND if self.owner is None else State.READY

# this won't work but nvm
@property
def transport(self) -> Transport[T]:
return self._transport
Expand All @@ -204,6 +205,7 @@ def connect(self, other: RemoteIn[T]):

class In(Stream[T]):
connection: Optional[RemoteOut[T]] = None
_transport: Transport

def __str__(self):
mystr = super().__str__()
Expand All @@ -220,21 +222,35 @@ def __reduce__(self): # noqa: D401

@property
def transport(self) -> Transport[T]:
return self.connection.transport
if not self._transport:
self._transport = self.connection.transport
return self._transport

@property
def state(self) -> State: # noqa: D401
return State.UNBOUND if self.owner is None else State.READY

def subscribe(self, cb):
# print("SUBBING", self, self.connection._transport)
self.connection._transport.subscribe(self, cb)
self.transport.subscribe(self, cb)


class RemoteIn(RemoteStream[T]):
def connect(self, other: RemoteOut[T]) -> None:
return self.owner.connect_stream(self.name, other).result()

# this won't work but that's ok
@property
def transport(self) -> Transport[T]:
return self._transport

def publish(self, msg):
self.transport.broadcast(self, msg)

@transport.setter
def transport(self, value: Transport[T]) -> None:
self.owner.set_transport(self.name, value).result()
self._transport = value


def rpc(fn: Callable[..., Any]) -> Callable[..., Any]:
fn.__rpc__ = True # type: ignore[attr-defined]
Expand Down
139 changes: 99 additions & 40 deletions dimos/core/module_dask.py → dimos/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,112 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
from typing import (
Any,
Callable,
List,
get_args,
get_origin,
get_type_hints,
)

from dask.distributed import Actor
from dask.distributed import Actor, get_worker

from dimos.core import colors
from dimos.core.core import In, Out, RemoteIn, RemoteOut, T, Transport
from dimos.protocol.rpc.lcmrpc import LCMRPC


class ModuleBase:
def __init__(self, *args, **kwargs):
try:
get_worker()
self.rpc = LCMRPC()
self.rpc.serve_module_rpc(self)
self.rpc.start()
except ValueError:
return

@property
def outputs(self) -> dict[str, Out]:
return {
name: s
for name, s in self.__dict__.items()
if isinstance(s, Out) and not name.startswith("_")
}

@property
def inputs(self) -> dict[str, In]:
return {
name: s
for name, s in self.__dict__.items()
if isinstance(s, In) and not name.startswith("_")
}

@classmethod
@property
def rpcs(cls) -> dict[str, Callable]:
return {
name: getattr(cls, name)
for name in dir(cls)
if not name.startswith("_")
and name != "rpcs" # Exclude the rpcs property itself to prevent recursion
and callable(getattr(cls, name, None))
and hasattr(getattr(cls, name), "__rpc__")
}

def io(self) -> str:
def _box(name: str) -> str:
return [
f"┌┴" + "─" * (len(name) + 1) + "┐",
f"│ {name} │",
f"└┬" + "─" * (len(name) + 1) + "┘",
]

# can't modify __str__ on a function like we are doing for I/O
# so we have a separate repr function here
def repr_rpc(fn: Callable) -> str:
sig = inspect.signature(fn)
# Remove 'self' parameter
params = [p for name, p in sig.parameters.items() if name != "self"]

# Format parameters with colored types
param_strs = []
for param in params:
param_str = param.name
if param.annotation != inspect.Parameter.empty:
type_name = getattr(param.annotation, "__name__", str(param.annotation))
param_str += ": " + colors.green(type_name)
if param.default != inspect.Parameter.empty:
param_str += f" = {param.default}"
param_strs.append(param_str)

# Format return type
return_annotation = ""
if sig.return_annotation != inspect.Signature.empty:
return_type = getattr(sig.return_annotation, "__name__", str(sig.return_annotation))
return_annotation = " -> " + colors.green(return_type)

return (
"RPC " + colors.blue(fn.__name__) + f"({', '.join(param_strs)})" + return_annotation
)

ret = [
*(f" ├─ {stream}" for stream in self.inputs.values()),
*_box(self.__class__.__name__),
*(f" ├─ {stream}" for stream in self.outputs.values()),
" │",
*(f" ├─ {repr_rpc(rpc)}" for rpc in self.rpcs.values()),
]

return "\n".join(ret)

class Module:

class DaskModule(ModuleBase):
ref: Actor
worker: int

def __init__(self):
def __init__(self, *args, **kwargs):
self.ref = None

for name, ann in get_type_hints(self, include_extras=True).items():
Expand All @@ -42,9 +129,13 @@ def __init__(self):
inner, *_ = get_args(ann) or (Any,)
stream = In(inner, name, self)
setattr(self, name, stream)
super().__init__(*args, **kwargs)

def set_ref(self, ref):
def set_ref(self, ref) -> int:
worker = get_worker()
self.ref = ref
self.worker = worker.name
return worker.name

def __str__(self):
return f"{self.__class__.__name__}"
Expand Down Expand Up @@ -76,38 +167,6 @@ def dask_receive_msg(self, input_name: str, msg: Any):
def dask_register_subscriber(self, output_name: str, subscriber: RemoteIn[T]):
getattr(self, output_name).transport.dask_register_subscriber(subscriber)

@property
def outputs(self) -> dict[str, Out]:
return {
name: s
for name, s in self.__dict__.items()
if isinstance(s, Out) and not name.startswith("_")
}

@property
def inputs(self) -> dict[str, In]:
return {
name: s
for name, s in self.__dict__.items()
if isinstance(s, In) and not name.startswith("_")
}

@property
def rpcs(self) -> List[Callable]:
return [name for name in dir(self) if hasattr(getattr(self, name), "__rpc__")]

def io(self) -> str:
def _box(name: str) -> str:
return [
"┌┴" + "─" * (len(name) + 1) + "┐",
f"│ {name} │",
"└┬" + "─" * (len(name) + 1) + "┘",
]

ret = [
*(f" ├─ {stream}" for stream in self.inputs.values()),
*_box(self.__class__.__name__),
*(f" ├─ {stream}" for stream in self.outputs.values()),
]

return "\n".join(ret)
# global setting
Module = DaskModule
Loading