diff --git a/README.md b/README.md index 63d0244..b5f5547 100644 --- a/README.md +++ b/README.md @@ -95,4 +95,4 @@ These publications provide insights into the practical applications and impact o ## Financial Support -`ezmsg` is supported by Johns Hopkins University (JHU), the JHU Applied Physics Laboratory (APL), and by the Wyss Center for Bio and Neuro Engineering. +`ezmsg` is supported by Johns Hopkins University (JHU), the JHU Applied Physics Laboratory (APL), Blackrock Neurotech and by the Wyss Center for Bio and Neuro Engineering. diff --git a/pyproject.toml b/pyproject.toml index 37154bc..f69689f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ezmsg" -version = "3.8.0" +version = "3.9.0" description = "A simple DAG-based computation model" authors = [ { name = "Griffin Milsap", email = "griffin.milsap@gmail.com" }, @@ -52,9 +52,6 @@ docs = [ axisarray = [ "numpy>=2.2.6", ] -perf = [ - "xarray", -] [project.scripts] ezmsg = "ezmsg.core.command:cmdline" @@ -63,6 +60,12 @@ ezmsg = "ezmsg.core.command:cmdline" axisarray = [ "numpy>=2.2.6", ] +perf = [ + "xarray>=2025.6.1", +] +dashboard = [ + "ezmsg-dashboard; python_version >= '3.11'", +] [tool.pytest.ini_options] addopts = ["--import-mode=importlib"] diff --git a/src/ezmsg/core/command.py b/src/ezmsg/core/command.py index f9c753d..61aa2f4 100644 --- a/src/ezmsg/core/command.py +++ b/src/ezmsg/core/command.py @@ -10,11 +10,17 @@ from .commands.start import handle_start from .netprotocol import ( Address, + DEFAULT_HOST, GRAPHSERVER_ADDR_ENV, GRAPHSERVER_PORT_DEFAULT, PUBLISHER_START_PORT_ENV, PUBLISHER_START_PORT_DEFAULT, ) +from .commands.dashboard import ( + DASHBOARD_ADDR_ENV, + DASHBOARD_INSTALL_HINT, + DASHBOARD_PORT_DEFAULT, +) def build_parser() -> argparse.ArgumentParser: @@ -30,6 +36,7 @@ def build_parser() -> argparse.ArgumentParser: epilog=f""" You can also change server configuration with environment variables. GraphServer will be hosted on ${GRAPHSERVER_ADDR_ENV} (default port: {GRAPHSERVER_PORT_DEFAULT}). + Dashboard will be hosted on ${DASHBOARD_ADDR_ENV} (default: {DEFAULT_HOST}:{DASHBOARD_PORT_DEFAULT}, or graph port + 1). Publishers will be assigned available ports starting from {PUBLISHER_START_PORT_DEFAULT}. (Change with ${PUBLISHER_START_PORT_ENV}) """, ) @@ -55,7 +62,12 @@ def cmdline(argv: list[str] | None = None) -> None: result = args._handler(args) if inspect.isawaitable(result): - asyncio.run(result) + try: + asyncio.run(result) + except KeyboardInterrupt: + # asyncio.run() re-raises KeyboardInterrupt after cancelling the main + # task on Ctrl+C, even when command cleanup has already completed. + pass async def run_command( @@ -64,8 +76,10 @@ async def run_command( target: str = "live", compact: int | None = None, nobrowser: bool = False, + dashboard: int | bool | None = None, ) -> None: handlers = { + "dashboard": None, "serve": handle_serve, "start": handle_start, "shutdown": handle_shutdown, @@ -74,11 +88,25 @@ async def run_command( } if cmd not in handlers: raise ValueError(f"Unknown ezmsg command '{cmd}'") + if cmd == "dashboard": + try: + from ezmsg.dashboard.server import handle_dashboard + except ImportError as exc: + raise RuntimeError(DASHBOARD_INSTALL_HINT) from exc + handlers["dashboard"] = handle_dashboard args = argparse.Namespace( command=cmd, address=str(graph_address), + graph_address=str(graph_address), target=target, compact=compact, nobrowser=nobrowser, + dashboard=dashboard, + host="127.0.0.1", + port=8000, + open_browser=False, + log_level="info", ) - await handlers[cmd](args) + result = handlers[cmd](args) + if inspect.isawaitable(result): + await result diff --git a/src/ezmsg/core/commands/__init__.py b/src/ezmsg/core/commands/__init__.py index 7177e52..175c3c9 100644 --- a/src/ezmsg/core/commands/__init__.py +++ b/src/ezmsg/core/commands/__init__.py @@ -1,5 +1,6 @@ import argparse +from .dashboard_cmd import setup_dashboard_cmdline from .graphviz import setup_graphviz_cmdline from .mermaid import setup_mermaid_cmdline from .serve import setup_serve_cmdline @@ -8,6 +9,7 @@ def setup_core_cmdline(subparsers: argparse._SubParsersAction) -> None: + setup_dashboard_cmdline(subparsers) setup_serve_cmdline(subparsers) setup_start_cmdline(subparsers) setup_shutdown_cmdline(subparsers) diff --git a/src/ezmsg/core/commands/common.py b/src/ezmsg/core/commands/common.py index 2b8eaf5..8fe8501 100644 --- a/src/ezmsg/core/commands/common.py +++ b/src/ezmsg/core/commands/common.py @@ -1,6 +1,7 @@ import argparse -from ..netprotocol import Address, GRAPHSERVER_PORT_DEFAULT +from ..graphserver import GraphService +from ..netprotocol import Address def add_address_argument(parser: argparse.ArgumentParser) -> None: @@ -21,5 +22,5 @@ def add_compact_argument(parser: argparse.ArgumentParser) -> None: def graph_address_from_args(args: argparse.Namespace) -> Address: if args.address is None: - return Address("127.0.0.1", GRAPHSERVER_PORT_DEFAULT) + return GraphService.default_address() return Address.from_string(args.address) diff --git a/src/ezmsg/core/commands/dashboard.py b/src/ezmsg/core/commands/dashboard.py new file mode 100644 index 0000000..3997dc6 --- /dev/null +++ b/src/ezmsg/core/commands/dashboard.py @@ -0,0 +1,77 @@ +import argparse +import os +from typing import Any + +from ..netprotocol import ( + Address, + DEFAULT_HOST, + GRAPHSERVER_ADDR_ENV, + GRAPHSERVER_PORT_DEFAULT, +) + +DASHBOARD_ADDR_ENV = "EZMSG_DASHBOARD_ADDR" +DASHBOARD_PORT_DEFAULT = GRAPHSERVER_PORT_DEFAULT + 1 +DASHBOARD_INSTALL_HINT = ( + "Dashboard support requires the optional `ezmsg-dashboard` package. " + "Install it with `pip install ezmsg-dashboard`." +) + + +class DashboardDependencyError(RuntimeError): + pass + + +def add_dashboard_argument(parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--dashboard", + nargs="?", + const=True, + default=None, + type=int, + metavar="PORT", + help=( + "Serve the optional ezmsg dashboard alongside the graph server. " + "If PORT is omitted, ezmsg uses the configured dashboard address or graph port + 1." + ), + ) + + +def default_graph_address() -> Address: + address_str = os.environ.get( + GRAPHSERVER_ADDR_ENV, f"{DEFAULT_HOST}:{GRAPHSERVER_PORT_DEFAULT}" + ) + return Address.from_string(address_str) + + +def dashboard_address( + graph_address: Address | None = None, dashboard_port: int | None = None +) -> Address: + if DASHBOARD_ADDR_ENV in os.environ: + address = Address.from_string(os.environ[DASHBOARD_ADDR_ENV]) + else: + resolved_graph_address = graph_address or default_graph_address() + address = Address(resolved_graph_address.host, resolved_graph_address.port + 1) + + if dashboard_port is not None: + return Address(address.host, dashboard_port) + return address + + +def require_dashboard_dependency() -> Any: + try: + from ezmsg.dashboard.server import start_dashboard_server + except ImportError as exc: + raise DashboardDependencyError(DASHBOARD_INSTALL_HINT) from exc + return start_dashboard_server + + +def start_dashboard(graph_address: Address, dashboard_port: int | None = None) -> Any: + start_dashboard_server = require_dashboard_dependency() + + address = dashboard_address(graph_address, dashboard_port=dashboard_port) + return start_dashboard_server( + graph_address=graph_address, + host=address.host, + port=address.port, + log_level="warning", + ) diff --git a/src/ezmsg/core/commands/dashboard_cmd.py b/src/ezmsg/core/commands/dashboard_cmd.py new file mode 100644 index 0000000..0ffb5f5 --- /dev/null +++ b/src/ezmsg/core/commands/dashboard_cmd.py @@ -0,0 +1,43 @@ +import argparse +import logging + +from .dashboard import DASHBOARD_INSTALL_HINT + +logger = logging.getLogger("ezmsg") + + +def _warn_dashboard_dependency_missing(_: argparse.Namespace) -> None: + logger.warning(DASHBOARD_INSTALL_HINT) + + +def _setup_dashboard_fallback(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser( + "dashboard", + help="launch the optional ezmsg dashboard server", + description="Launch the optional ezmsg dashboard server.", + ) + parser.add_argument("--graph-address", default=None, help="Address of the ezmsg graph server.") + parser.add_argument("--host", default="127.0.0.1", help="HTTP bind host for the dashboard.") + parser.add_argument("--port", type=int, default=8000, help="HTTP bind port for the dashboard.") + parser.add_argument( + "--open-browser", + action="store_true", + help="Open the dashboard in a browser after startup.", + ) + parser.add_argument( + "--log-level", + default="info", + choices=["critical", "error", "warning", "info", "debug", "trace"], + help="Uvicorn log verbosity.", + ) + parser.set_defaults(_handler=_warn_dashboard_dependency_missing) + + +def setup_dashboard_cmdline(subparsers: argparse._SubParsersAction) -> None: + try: + from ezmsg.dashboard.server import setup_dashboard_cmdline as setup_optional_dashboard + except ImportError: + _setup_dashboard_fallback(subparsers) + return + + setup_optional_dashboard(subparsers) diff --git a/src/ezmsg/core/commands/serve.py b/src/ezmsg/core/commands/serve.py index 075bad1..9ed5b95 100644 --- a/src/ezmsg/core/commands/serve.py +++ b/src/ezmsg/core/commands/serve.py @@ -4,6 +4,11 @@ from ..graphserver import GraphService from .common import add_address_argument, graph_address_from_args +from .dashboard import ( + DashboardDependencyError, + add_dashboard_argument, + start_dashboard, +) logger = logging.getLogger("ezmsg") @@ -14,17 +19,29 @@ async def handle_serve(args: argparse.Namespace) -> None: logger.info(f"GraphServer Address: {graph_address}") graph_server = graph_service.create_server() + dashboard_server = None try: + if args.dashboard is not None: + dashboard_port = args.dashboard if type(args.dashboard) is int else None + dashboard_server = start_dashboard( + graph_service.address, dashboard_port=dashboard_port + ) + logger.info(f"Dashboard Address: {dashboard_server.url}") logger.info("Servers running...") await asyncio.to_thread(graph_server.join) - except KeyboardInterrupt: + except (KeyboardInterrupt, asyncio.CancelledError): logger.info("Interrupt detected; shutting down servers") + except DashboardDependencyError as exc: + logger.warning(str(exc)) finally: + if dashboard_server is not None: + dashboard_server.stop() graph_server.stop() def setup_serve_cmdline(subparsers: argparse._SubParsersAction) -> None: parser = subparsers.add_parser("serve") add_address_argument(parser) + add_dashboard_argument(parser) parser.set_defaults(_handler=handle_serve) diff --git a/src/ezmsg/core/commands/start.py b/src/ezmsg/core/commands/start.py index 8133b0d..eb040d8 100644 --- a/src/ezmsg/core/commands/start.py +++ b/src/ezmsg/core/commands/start.py @@ -7,6 +7,11 @@ from ..graphserver import GraphService from ..netprotocol import close_stream_writer from .common import add_address_argument, graph_address_from_args +from .dashboard import ( + DashboardDependencyError, + add_dashboard_argument, + require_dashboard_dependency, +) logger = logging.getLogger("ezmsg") @@ -14,10 +19,18 @@ async def handle_start(args: argparse.Namespace) -> None: graph_address = graph_address_from_args(args) graph_service = GraphService(graph_address) + cmd = [sys.executable, "-m", "ezmsg.core", "serve", f"--address={graph_address}"] + if args.dashboard is not None: + try: + require_dashboard_dependency() + except DashboardDependencyError as exc: + logger.warning(str(exc)) + return + cmd.append("--dashboard") + if type(args.dashboard) is int: + cmd.append(str(args.dashboard)) - popen = subprocess.Popen( - [sys.executable, "-m", "ezmsg.core", "serve", f"--address={graph_address}"] - ) + popen = subprocess.Popen(cmd) while True: try: @@ -33,4 +46,5 @@ async def handle_start(args: argparse.Namespace) -> None: def setup_start_cmdline(subparsers: argparse._SubParsersAction) -> None: parser = subparsers.add_parser("start") add_address_argument(parser) + add_dashboard_argument(parser) parser.set_defaults(_handler=handle_start) diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index d002ad7..bc88275 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -145,9 +145,8 @@ def benchmark( ) try: - communications = ( - DEFAULT_COMMS if comms is None else [Communication(c) for c in comms] - ) + communication_names = DEFAULT_COMMS if comms is None else list(comms) + communications = [Communication(c) for c in communication_names] except ValueError: ez.logger.error( f"Invalid test communications requested. Valid communications: {', '.join([c.value for c in Communication])}" diff --git a/tests/test_command.py b/tests/test_command.py index b74df4b..cee71c5 100644 --- a/tests/test_command.py +++ b/tests/test_command.py @@ -1,7 +1,8 @@ import pytest +import argparse from pathlib import Path -from ezmsg.core.command import build_parser +from ezmsg.core.command import build_parser, cmdline def test_mermaid_subparser_accepts_mermaid_specific_args(): @@ -121,8 +122,103 @@ def test_serve_subparser_rejects_visualization_args(): parser.parse_args(["serve", "--target", "play"]) +def test_serve_subparser_accepts_dashboard_flag(): + parser = build_parser() + + args = parser.parse_args(["serve", "--dashboard"]) + + assert args.command == "serve" + assert args.dashboard is True + + +def test_start_subparser_accepts_dashboard_flag(): + parser = build_parser() + + args = parser.parse_args(["start", "--dashboard"]) + + assert args.command == "start" + assert args.dashboard is True + + +def test_serve_subparser_accepts_dashboard_port(): + parser = build_parser() + + args = parser.parse_args(["serve", "--dashboard", "28000"]) + + assert args.command == "serve" + assert args.dashboard == 28000 + + +def test_dashboard_subparser_accepts_dashboard_args(): + parser = build_parser() + + args = parser.parse_args( + [ + "dashboard", + "--graph-address", + "127.0.0.1:4000", + "--host", + "0.0.0.0", + "--port", + "28000", + "--open-browser", + "--log-level", + "debug", + ] + ) + + assert args.command == "dashboard" + assert args.graph_address == "127.0.0.1:4000" + assert args.host == "0.0.0.0" + assert args.port == 28000 + assert args.open_browser is True + assert args.log_level == "debug" + + def test_perf_subparser_rejects_core_only_args(): parser = build_parser() with pytest.raises(SystemExit): parser.parse_args(["perf", "benchmark", "--address", "127.0.0.1:4000"]) + + +def test_cmdline_suppresses_keyboard_interrupt_from_asyncio_run(monkeypatch): + class DummyParser: + def parse_args(self, args=None): + return argparse.Namespace(_handler=lambda parsed_args: object()) + + monkeypatch.setattr("ezmsg.core.command.build_parser", lambda: DummyParser()) + monkeypatch.setattr("ezmsg.core.command.inspect.isawaitable", lambda result: True) + + def raise_keyboard_interrupt(result): + raise KeyboardInterrupt + + monkeypatch.setattr("ezmsg.core.command.asyncio.run", raise_keyboard_interrupt) + + cmdline([]) + + +def test_dashboard_subcommand_warns_when_optional_dependency_missing(monkeypatch, caplog): + real_import = __import__ + + def fake_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "ezmsg.dashboard.server": + raise ImportError("missing optional dashboard package") + return real_import(name, globals, locals, fromlist, level) + + monkeypatch.setattr("builtins.__import__", fake_import) + monkeypatch.delitem(__import__("sys").modules, "ezmsg.dashboard.server", raising=False) + monkeypatch.delitem(__import__("sys").modules, "ezmsg.core.commands.dashboard_cmd", raising=False) + + from ezmsg.core.commands.dashboard_cmd import setup_dashboard_cmdline + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(dest="command", required=True) + setup_dashboard_cmdline(subparsers) + + args = parser.parse_args(["dashboard"]) + + with caplog.at_level("WARNING"): + args._handler(args) + + assert "pip install ezmsg-dashboard" in caplog.text diff --git a/tests/test_dashboard_commands.py b/tests/test_dashboard_commands.py new file mode 100644 index 0000000..cd6b7d9 --- /dev/null +++ b/tests/test_dashboard_commands.py @@ -0,0 +1,207 @@ +import argparse +import sys +from types import SimpleNamespace + +import pytest + +from ezmsg.core.commands.dashboard import ( + DASHBOARD_ADDR_ENV, + DashboardDependencyError, + DASHBOARD_INSTALL_HINT, + dashboard_address, + require_dashboard_dependency, + start_dashboard, +) +from ezmsg.core.commands.start import handle_start +from ezmsg.core.commands.serve import handle_serve +from ezmsg.core.commands.common import graph_address_from_args +from ezmsg.core.netprotocol import Address +from ezmsg.core.graphserver import GraphService + + +def test_dashboard_address_defaults_to_graph_port_plus_one(): + graph_address = Address("127.0.0.1", 25978) + + assert dashboard_address(graph_address) == Address("127.0.0.1", 25979) + + +def test_dashboard_address_uses_environment_override(monkeypatch): + monkeypatch.setenv(DASHBOARD_ADDR_ENV, "0.0.0.0:4100") + + assert dashboard_address(Address("127.0.0.1", 25978)) == Address("0.0.0.0", 4100) + + +def test_dashboard_address_uses_explicit_port_with_graph_host(): + assert dashboard_address(Address("127.0.0.1", 30000), dashboard_port=30001) == Address( + "127.0.0.1", 30001 + ) + + +def test_dashboard_address_uses_explicit_port_with_env_host(monkeypatch): + monkeypatch.setenv(DASHBOARD_ADDR_ENV, "0.0.0.0:4100") + + assert dashboard_address(Address("127.0.0.1", 25978), dashboard_port=4101) == Address( + "0.0.0.0", 4101 + ) + + +def test_graph_address_from_args_uses_environment_override(monkeypatch): + monkeypatch.setenv("EZMSG_GRAPHSERVER_ADDR", "0.0.0.0:4101") + + assert graph_address_from_args(argparse.Namespace(address=None)) == Address( + "0.0.0.0", 4101 + ) + + +def test_require_dashboard_dependency_raises_helpful_error_when_package_missing(monkeypatch): + import builtins + + real_import = builtins.__import__ + + def fake_import(name, globals=None, locals=None, fromlist=(), level=0): + if name == "ezmsg.dashboard.server": + raise ImportError("missing optional dashboard package") + return real_import(name, globals, locals, fromlist, level) + + monkeypatch.setattr(builtins, "__import__", fake_import) + + with pytest.raises(RuntimeError, match="pip install ezmsg-dashboard"): + require_dashboard_dependency() + + +def test_start_dashboard_raises_helpful_error_when_package_missing(monkeypatch): + monkeypatch.setattr( + "ezmsg.core.commands.dashboard.require_dashboard_dependency", + lambda: (_ for _ in ()).throw(DashboardDependencyError(DASHBOARD_INSTALL_HINT)), + ) + + with pytest.raises(RuntimeError, match="pip install ezmsg-dashboard"): + start_dashboard(Address("127.0.0.1", 25978)) + + +@pytest.mark.asyncio +async def test_handle_start_forwards_dashboard_flag(monkeypatch): + popen_calls: list[list[str]] = [] + + class DummyPopen: + def __init__(self, cmd): + popen_calls.append(cmd) + self.pid = 1234 + + async def fake_open_connection(self): + return object(), SimpleNamespace() + + async def fake_close_stream_writer(writer): + return None + + monkeypatch.setattr("ezmsg.core.commands.start.subprocess.Popen", DummyPopen) + monkeypatch.setattr( + "ezmsg.core.commands.start.require_dashboard_dependency", + lambda: object(), + ) + monkeypatch.setattr( + "ezmsg.core.commands.start.GraphService.open_connection", fake_open_connection + ) + monkeypatch.setattr( + "ezmsg.core.commands.start.close_stream_writer", fake_close_stream_writer + ) + + args = argparse.Namespace(address="127.0.0.1:25978", dashboard=True) + await handle_start(args) + + assert popen_calls == [ + [ + sys.executable, + "-m", + "ezmsg.core", + "serve", + "--address=127.0.0.1:25978", + "--dashboard", + ] + ] + + +@pytest.mark.asyncio +async def test_handle_start_forwards_dashboard_port(monkeypatch): + popen_calls: list[list[str]] = [] + + class DummyPopen: + def __init__(self, cmd): + popen_calls.append(cmd) + self.pid = 1234 + + async def fake_open_connection(self): + return object(), SimpleNamespace() + + async def fake_close_stream_writer(writer): + return None + + monkeypatch.setattr("ezmsg.core.commands.start.subprocess.Popen", DummyPopen) + monkeypatch.setattr( + "ezmsg.core.commands.start.require_dashboard_dependency", + lambda: object(), + ) + monkeypatch.setattr( + "ezmsg.core.commands.start.GraphService.open_connection", fake_open_connection + ) + monkeypatch.setattr( + "ezmsg.core.commands.start.close_stream_writer", fake_close_stream_writer + ) + + args = argparse.Namespace(address="127.0.0.1:25978", dashboard=28123) + await handle_start(args) + + assert popen_calls == [ + [ + sys.executable, + "-m", + "ezmsg.core", + "serve", + "--address=127.0.0.1:25978", + "--dashboard", + "28123", + ] + ] + + +@pytest.mark.asyncio +async def test_handle_start_warns_when_dashboard_dependency_missing(monkeypatch, caplog): + monkeypatch.setattr( + "ezmsg.core.commands.start.require_dashboard_dependency", + lambda: (_ for _ in ()).throw(DashboardDependencyError(DASHBOARD_INSTALL_HINT)), + ) + + args = argparse.Namespace(address="127.0.0.1:25978", dashboard=True) + + with caplog.at_level("WARNING"): + await handle_start(args) + + assert "pip install ezmsg-dashboard" in caplog.text + + +@pytest.mark.asyncio +async def test_handle_serve_warns_when_dashboard_dependency_missing(monkeypatch, caplog): + class DummyGraphServer: + def join(self): + return None + + def stop(self): + return None + + monkeypatch.setattr( + "ezmsg.core.commands.serve.GraphService.create_server", + lambda self: DummyGraphServer(), + ) + monkeypatch.setattr( + "ezmsg.core.commands.serve.start_dashboard", + lambda graph_address, dashboard_port=None: (_ for _ in ()).throw( + DashboardDependencyError(DASHBOARD_INSTALL_HINT) + ), + ) + + args = argparse.Namespace(address="127.0.0.1:25978", dashboard=True) + + with caplog.at_level("WARNING"): + await handle_serve(args) + + assert "pip install ezmsg-dashboard" in caplog.text