Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dimos/core/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class GlobalConfig(BaseSettings):
robot_rotation_diameter: float = 0.6
planner_strategy: NavigationStrategy = "simple"
planner_robot_speed: float | None = None
dtop: bool = False

model_config = SettingsConfigDict(
env_file=".env",
Expand Down
30 changes: 22 additions & 8 deletions dimos/core/module_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
import time
import threading
from typing import TYPE_CHECKING, Any

from dimos.core.global_config import GlobalConfig, global_config
from dimos.core.module import Module, ModuleT
from dimos.core.resource import Resource
from dimos.core.worker_manager import WorkerManager
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from dimos.core.module import Module, ModuleT
from dimos.core.resource_monitor.monitor import StatsMonitor
from dimos.core.rpc_client import ModuleProxy

logger = setup_logger()
Expand All @@ -33,7 +36,8 @@ class ModuleCoordinator(Resource): # type: ignore[misc]
_global_config: GlobalConfig
_n: int | None = None
_memory_limit: str = "auto"
_deployed_modules: dict[type[Module], "ModuleProxy"]
_deployed_modules: dict[type[Module], ModuleProxy]
_stats_monitor: StatsMonitor | None = None

def __init__(
self,
Expand All @@ -50,7 +54,17 @@ def start(self) -> None:
self._client = WorkerManager(n_workers=n)
self._client.start()

if self._global_config.dtop:
from dimos.core.resource_monitor.monitor import StatsMonitor

self._stats_monitor = StatsMonitor(self._client)
self._stats_monitor.start()

def stop(self) -> None:
if self._stats_monitor is not None:
self._stats_monitor.stop()
self._stats_monitor = None

for module_class, module in reversed(self._deployed_modules.items()):
logger.info("Stopping module...", module=module_class.__name__)
try:
Expand All @@ -61,7 +75,7 @@ def stop(self) -> None:

self._client.close_all() # type: ignore[union-attr]

def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> "ModuleProxy": # type: ignore[no-untyped-def]
def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> ModuleProxy: # type: ignore[no-untyped-def]
if not self._client:
raise ValueError("Trying to dimos.deploy before the client has started")

Expand All @@ -71,7 +85,7 @@ def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> "ModuleProxy":

def deploy_parallel(
self, module_specs: list[tuple[type[ModuleT], tuple[Any, ...], dict[str, Any]]]
) -> list["ModuleProxy"]:
) -> list[ModuleProxy]:
if not self._client:
raise ValueError("Not started")

Expand All @@ -94,13 +108,13 @@ def start_all_modules(self) -> None:
if hasattr(module, "on_system_modules"):
module.on_system_modules(module_list)

def get_instance(self, module: type[ModuleT]) -> "ModuleProxy":
def get_instance(self, module: type[ModuleT]) -> ModuleProxy:
return self._deployed_modules.get(module) # type: ignore[return-value, no-any-return]

def loop(self) -> None:
stop = threading.Event()
try:
while True:
time.sleep(0.1)
stop.wait()
except KeyboardInterrupt:
return
finally:
Expand Down
17 changes: 17 additions & 0 deletions dimos/core/resource_monitor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dimos.core.resource_monitor.logger import (
LCMResourceLogger,
ResourceLogger,
StructlogResourceLogger,
)
from dimos.core.resource_monitor.monitor import StatsMonitor
from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats, collect_process_stats

__all__ = [
"LCMResourceLogger",
"ProcessStats",
"ResourceLogger",
"StatsMonitor",
"StructlogResourceLogger",
"WorkerStats",
"collect_process_stats",
]
75 changes: 75 additions & 0 deletions dimos/core/resource_monitor/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from dataclasses import asdict
from typing import TYPE_CHECKING, Any, Protocol

from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats

logger = setup_logger()


class ResourceLogger(Protocol):
def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: ...


class StructlogResourceLogger:
"""Default implementation — logs resource stats via structlog info."""

def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None:
logger.info(
"coordinator",
pid=coordinator.pid,
cpu_pct=coordinator.cpu_percent,
pss_mb=round(coordinator.pss / 1048576, 1),
threads=coordinator.num_threads,
)
for w in workers:
logger.info(
"worker",
worker_id=w.worker_id,
pid=w.pid,
alive=w.alive,
cpu_pct=w.cpu_percent,
pss_mb=round(w.pss / 1048576, 1),
threads=w.num_threads,
children=w.num_children,
fds=w.num_fds,
io_r_mb=round(w.io_read_bytes / 1048576, 1),
io_w_mb=round(w.io_write_bytes / 1048576, 1),
modules=w.modules,
)


class LCMResourceLogger:
"""Publishes resource stats as dicts over a pickle LCM channel."""

def __init__(self, topic: str = "/dimos/resource_stats") -> None:
from dimos.core.transport import pLCMTransport

self._transport: pLCMTransport[dict[str, Any]] = pLCMTransport(topic)

def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None:
self._transport.broadcast(
None,
{
"coordinator": asdict(coordinator),
"workers": [asdict(w) for w in workers],
},
)
126 changes: 126 additions & 0 deletions dimos/core/resource_monitor/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from dataclasses import asdict
import os
import threading
from typing import TYPE_CHECKING, Protocol, runtime_checkable

from dimos.core.resource import Resource
from dimos.core.resource_monitor.stats import (
WorkerStats,
collect_process_stats,
)
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from collections.abc import Sequence

from dimos.core.resource_monitor.logger import ResourceLogger

logger = setup_logger()


@runtime_checkable
class WorkerInfo(Protocol):
@property
def pid(self) -> int | None: ...

@property
def worker_id(self) -> int: ...

@property
def module_names(self) -> list[str]: ...


@runtime_checkable
class WorkerSource(Protocol):
@property
def workers(self) -> Sequence[WorkerInfo]: ...


class StatsMonitor(Resource):
"""Self-contained resource monitor that runs in a daemon thread.

Collects stats for the coordinator process and all workers, then
forwards them to a ``ResourceLogger``.
"""

def __init__(
self,
worker_source: WorkerSource,
resource_logger: ResourceLogger | None = None,
interval: float = 1.0,
coordinator_pid: int | None = None,
) -> None:
self._worker_source = worker_source
self._interval = interval
self._coordinator_pid = coordinator_pid or os.getpid()
self._stop = threading.Event()
self._thread: threading.Thread | None = None

if resource_logger is not None:
self._logger = resource_logger
else:
from dimos.core.resource_monitor.logger import LCMResourceLogger

self._logger = LCMResourceLogger()

def start(self) -> None:
"""Start the monitoring daemon thread."""
# Prime cpu_percent so the first real reading isn't 0.0.
collect_process_stats(self._coordinator_pid)

self._stop.clear()
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()

def stop(self) -> None:
"""Signal the monitor thread to stop and wait for it."""
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=5.0)
self._thread = None

def _loop(self) -> None:
while not self._stop.wait(self._interval):
try:
self._collect_and_log()
except Exception:
logger.error("StatsMonitor collection failed", exc_info=True)

def _collect_and_log(self) -> None:
coordinator = collect_process_stats(self._coordinator_pid)

worker_stats: list[WorkerStats] = []
for w in self._worker_source.workers:
pid = w.pid
if pid is not None:
ps = collect_process_stats(pid)
worker_stats.append(
WorkerStats(**asdict(ps), worker_id=w.worker_id, modules=w.module_names)
)
else:
worker_stats.append(
WorkerStats(
pid=0,
alive=False,
worker_id=w.worker_id,
modules=w.module_names,
)
)

self._logger.log_stats(coordinator, worker_stats)
Loading
Loading