diff --git a/pyproject.toml b/pyproject.toml index 754f647d..6d796f65 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,7 +55,6 @@ axisarray = [ [project.scripts] ezmsg = "ezmsg.core.command:cmdline" -ezmsg-perf = "ezmsg.util.perf.command:command" [project.optional-dependencies] axisarray = [ diff --git a/src/ezmsg/core/command.py b/src/ezmsg/core/command.py index 09ce2dc3..61e76f97 100644 --- a/src/ezmsg/core/command.py +++ b/src/ezmsg/core/command.py @@ -1,33 +1,30 @@ import argparse import asyncio -import base64 -import json -import logging -import subprocess -import sys -import webbrowser -import zlib +import inspect -from .graphserver import GraphService +from ezmsg.util.perf.command import setup_perf_cmdline + +from .commands import setup_core_cmdline +from .commands.graphviz import handle_graphviz +from .commands.mermaid import handle_mermaid, mermaid_url as mm +from .commands.serve import handle_serve +from .commands.shutdown import handle_shutdown +from .commands.start import handle_start from .netprotocol import ( Address, GRAPHSERVER_ADDR_ENV, GRAPHSERVER_PORT_DEFAULT, PUBLISHER_START_PORT_ENV, PUBLISHER_START_PORT_DEFAULT, - close_stream_writer, ) -logger = logging.getLogger("ezmsg") - -def cmdline() -> None: +def build_parser() -> argparse.ArgumentParser: """ - Command-line interface for ezmsg core server management. + Build the ezmsg core command-line parser. - Provides commands for starting, stopping, and managing ezmsg server - processes including GraphServer and SHMServer, as well as utilities - for graph visualization. + Each command gets its own subparser so command-specific options are not + shared globally across unrelated commands. """ parser = argparse.ArgumentParser( "ezmsg.core", @@ -38,63 +35,27 @@ def cmdline() -> None: Publishers will be assigned available ports starting from {PUBLISHER_START_PORT_DEFAULT}. (Change with ${PUBLISHER_START_PORT_ENV}) """, ) + subparsers = parser.add_subparsers(dest="command", required=True, help="command for ezmsg") - parser.add_argument( - "command", - help="command for ezmsg", - choices=["serve", "start", "shutdown", "graphviz", "mermaid"], - ) - - parser.add_argument("--address", help="Address for GraphServer", default=None) - - parser.add_argument( - "--target", - help="Target for mermaid output. Options are 'ink', 'live', and 'play'.", - default="live", - ) - - parser.add_argument( - "-c", - "--compact", - help="""Use compact graph representation. Only used when `cmd` is 'mermaid' or 'graphviz'. - Removes the lowest level of detail (typically streams). Can be stacked (eg. '-cc'). - Warning: this will also prune the graph of proxy topics (nodes that are both sources and targets). - """, - action="count", - ) - - parser.add_argument( - "-n", - "--nobrowser", - help="Do not automatically open the browser for mermaid output. `--target` value will be ignored.", - action="store_true", - ) - - class Args: - command: str - address: str | None - target: str - compact: int | None - nobrowser: bool + setup_core_cmdline(subparsers) + setup_perf_cmdline(subparsers) + return parser - args = parser.parse_args(namespace=Args) - graph_address = Address("127.0.0.1", GRAPHSERVER_PORT_DEFAULT) - if args.address is not None: - graph_address = Address.from_string(args.address) +def cmdline(argv: list[str] | None = None) -> None: + """ + Command-line interface for ezmsg core server management. - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + Provides commands for starting, stopping, and managing ezmsg server + processes including GraphServer and SHMServer, as well as utilities + for graph visualization. + """ + parser = build_parser() + args = parser.parse_args(args=argv) - loop.run_until_complete( - run_command( - args.command, - graph_address, - args.target, - args.compact, - args.nobrowser, - ) - ) + result = args._handler(args) + if inspect.isawaitable(result): + asyncio.run(result) async def run_command( @@ -104,122 +65,20 @@ async def run_command( compact: int | None = None, nobrowser: bool = False, ) -> None: - """ - Run an ezmsg command with the specified parameters. - - This function handles various ezmsg commands like 'serve', 'start', 'shutdown', etc. - and manages the graph and shared memory services. - - :param cmd: The command to execute ('serve', 'start', 'shutdown', 'graphviz', 'mermaid') - :type cmd: str - :param graph_address: Address of the graph service - :type graph_address: Address - :param target: Target for visualization commands (default: 'live') - :type target: str - :param compact: Compactification level for visualization commands - :type compact: int | None - :param nobrowser: Whether to suppress browser opening for visualization - :type nobrowser: bool - """ - graph_service = GraphService(graph_address) - - if cmd == "serve": - logger.info(f"GraphServer Address: {graph_address}") - - graph_server = graph_service.create_server() - - try: - logger.info("Servers running...") - graph_server.join() - - except KeyboardInterrupt: - logger.info("Interrupt detected; shutting down servers") - - finally: - if graph_server is not None: - graph_server.stop() - - elif cmd == "start": - popen = subprocess.Popen( - [sys.executable, "-m", "ezmsg.core", "serve", f"--address={graph_address}"] - ) - - while True: - try: - _, writer = await graph_service.open_connection() - await close_stream_writer(writer) - break - except ConnectionRefusedError: - await asyncio.sleep(0.1) - - logger.info(f"Forked ezmsg servers in PID: {popen.pid}") - - elif cmd == "shutdown": - try: - await graph_service.shutdown() - logger.info( - f"Issued shutdown command to GraphServer @ {graph_service.address}" - ) - - except ConnectionRefusedError: - logger.warning( - f"Could not issue shutdown command to GraphServer @ {graph_service.address}; server not running?" - ) - - elif cmd in ["graphviz", "mermaid"]: - graph_out = await graph_service.get_formatted_graph( - fmt=cmd, compact_level=compact - ) - print(graph_out) - if cmd == "mermaid": - if not nobrowser: - if target == "live": - print( - "%% If the graph does not render immediately, try toggling the 'Pan & Zoom' button." - ) - webbrowser.open(mm(graph_out, target=target)) - - -def mm(graph: str, target="live") -> str: - """ - Generate a Mermaid visualization URL for the given graph. - - :param graph: Graph representation string to visualize. - :type graph: str - :param target: Target platform ('live' or 'ink'). - :type target: str - :return: URL for graph visualization. - :rtype: str - """ - if target != "ink": - jdict = { - "code": graph, - "mermaid": {"theme": "default"}, - "updateDiagram": True, - "autoSync": True, - "rough": False, - } - graph = json.dumps(jdict) - graphbytes: bytes = graph.encode("utf8") - - if target != "ink": - compress = zlib.compressobj(9, zlib.DEFLATED, 15, 8, zlib.Z_DEFAULT_STRATEGY) - graphbytes = compress.compress(graphbytes) - graphbytes += compress.flush() - - base64_bytes = base64.b64encode(graphbytes) - base64_string = base64_bytes.decode("ascii") - - if target == "ink": - prefix = "https://mermaid.ink/img/" - elif target in ["live", "play"]: - type_str = "pako" # or "base64" if we skip compression above. - if target == "live": - prefix = f"https://mermaid.live/edit#{type_str}:" - else: # "play" - prefix = f"https://www.mermaidchart.com/play#{type_str}:" - else: - raise ValueError( - f"Unknown mermaid target '{target}'. Available options are 'ink', 'live', or 'play'." - ) - return prefix + base64_string + handlers = { + "serve": handle_serve, + "start": handle_start, + "shutdown": handle_shutdown, + "graphviz": handle_graphviz, + "mermaid": handle_mermaid, + } + if cmd not in handlers: + raise ValueError(f"Unknown ezmsg command '{cmd}'") + args = argparse.Namespace( + command=cmd, + address=str(graph_address), + target=target, + compact=compact, + nobrowser=nobrowser, + ) + await handlers[cmd](args) diff --git a/src/ezmsg/core/commands/__init__.py b/src/ezmsg/core/commands/__init__.py new file mode 100644 index 00000000..7177e520 --- /dev/null +++ b/src/ezmsg/core/commands/__init__.py @@ -0,0 +1,15 @@ +import argparse + +from .graphviz import setup_graphviz_cmdline +from .mermaid import setup_mermaid_cmdline +from .serve import setup_serve_cmdline +from .shutdown import setup_shutdown_cmdline +from .start import setup_start_cmdline + + +def setup_core_cmdline(subparsers: argparse._SubParsersAction) -> None: + setup_serve_cmdline(subparsers) + setup_start_cmdline(subparsers) + setup_shutdown_cmdline(subparsers) + setup_graphviz_cmdline(subparsers) + setup_mermaid_cmdline(subparsers) diff --git a/src/ezmsg/core/commands/common.py b/src/ezmsg/core/commands/common.py new file mode 100644 index 00000000..2b8eaf51 --- /dev/null +++ b/src/ezmsg/core/commands/common.py @@ -0,0 +1,25 @@ +import argparse + +from ..netprotocol import Address, GRAPHSERVER_PORT_DEFAULT + + +def add_address_argument(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--address", help="Address for GraphServer", default=None) + + +def add_compact_argument(parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "-c", + "--compact", + help="""Use compact graph representation. + Removes the lowest level of detail (typically streams). Can be stacked (eg. '-cc'). + Warning: this will also prune the graph of proxy topics (nodes that are both sources and targets). + """, + action="count", + ) + + +def graph_address_from_args(args: argparse.Namespace) -> Address: + if args.address is None: + return Address("127.0.0.1", GRAPHSERVER_PORT_DEFAULT) + return Address.from_string(args.address) diff --git a/src/ezmsg/core/commands/graphviz.py b/src/ezmsg/core/commands/graphviz.py new file mode 100644 index 00000000..2c0c2e07 --- /dev/null +++ b/src/ezmsg/core/commands/graphviz.py @@ -0,0 +1,19 @@ +import argparse + +from ..graphserver import GraphService +from .common import add_address_argument, add_compact_argument, graph_address_from_args + + +async def handle_graphviz(args: argparse.Namespace) -> None: + graph_service = GraphService(graph_address_from_args(args)) + graph_out = await graph_service.get_formatted_graph( + fmt="graphviz", compact_level=args.compact + ) + print(graph_out) + + +def setup_graphviz_cmdline(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("graphviz") + add_address_argument(parser) + add_compact_argument(parser) + parser.set_defaults(_handler=handle_graphviz) diff --git a/src/ezmsg/core/commands/mermaid.py b/src/ezmsg/core/commands/mermaid.py new file mode 100644 index 00000000..dd59ad76 --- /dev/null +++ b/src/ezmsg/core/commands/mermaid.py @@ -0,0 +1,77 @@ +import argparse +import base64 +import json +import webbrowser +import zlib + +from ..graphserver import GraphService +from .common import add_address_argument, add_compact_argument, graph_address_from_args + + +def mermaid_url(graph: str, target: str = "live") -> str: + if target != "ink": + graph = json.dumps( + { + "code": graph, + "mermaid": {"theme": "default"}, + "updateDiagram": True, + "autoSync": True, + "rough": False, + } + ) + + graphbytes = graph.encode("utf8") + + if target != "ink": + compress = zlib.compressobj(9, zlib.DEFLATED, 15, 8, zlib.Z_DEFAULT_STRATEGY) + graphbytes = compress.compress(graphbytes) + compress.flush() + + base64_string = base64.b64encode(graphbytes).decode("ascii") + + if target == "ink": + prefix = "https://mermaid.ink/img/" + elif target == "live": + prefix = "https://mermaid.live/edit#pako:" + elif target == "play": + prefix = "https://www.mermaidchart.com/play#pako:" + else: + raise ValueError( + f"Unknown mermaid target '{target}'. Available options are 'ink', 'live', or 'play'." + ) + + return prefix + base64_string + + +async def handle_mermaid(args: argparse.Namespace) -> None: + graph_service = GraphService(graph_address_from_args(args)) + graph_out = await graph_service.get_formatted_graph( + fmt="mermaid", compact_level=args.compact + ) + print(graph_out) + + if args.nobrowser: + return + + if args.target == "live": + print( + "%% If the graph does not render immediately, try toggling the 'Pan & Zoom' button." + ) + webbrowser.open(mermaid_url(graph_out, target=args.target)) + + +def setup_mermaid_cmdline(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("mermaid") + add_address_argument(parser) + add_compact_argument(parser) + parser.add_argument( + "--target", + help="Target for mermaid output. Options are 'ink', 'live', and 'play'.", + default="live", + ) + parser.add_argument( + "-n", + "--nobrowser", + help="Do not automatically open the browser for mermaid output. `--target` value will be ignored.", + action="store_true", + ) + parser.set_defaults(_handler=handle_mermaid) diff --git a/src/ezmsg/core/commands/serve.py b/src/ezmsg/core/commands/serve.py new file mode 100644 index 00000000..075bad18 --- /dev/null +++ b/src/ezmsg/core/commands/serve.py @@ -0,0 +1,30 @@ +import argparse +import asyncio +import logging + +from ..graphserver import GraphService +from .common import add_address_argument, graph_address_from_args + +logger = logging.getLogger("ezmsg") + + +async def handle_serve(args: argparse.Namespace) -> None: + graph_address = graph_address_from_args(args) + graph_service = GraphService(graph_address) + + logger.info(f"GraphServer Address: {graph_address}") + graph_server = graph_service.create_server() + + try: + logger.info("Servers running...") + await asyncio.to_thread(graph_server.join) + except KeyboardInterrupt: + logger.info("Interrupt detected; shutting down servers") + finally: + graph_server.stop() + + +def setup_serve_cmdline(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("serve") + add_address_argument(parser) + parser.set_defaults(_handler=handle_serve) diff --git a/src/ezmsg/core/commands/shutdown.py b/src/ezmsg/core/commands/shutdown.py new file mode 100644 index 00000000..0f4dc77f --- /dev/null +++ b/src/ezmsg/core/commands/shutdown.py @@ -0,0 +1,26 @@ +import argparse +import logging + +from ..graphserver import GraphService +from .common import add_address_argument, graph_address_from_args + +logger = logging.getLogger("ezmsg") + + +async def handle_shutdown(args: argparse.Namespace) -> None: + graph_address = graph_address_from_args(args) + graph_service = GraphService(graph_address) + + try: + await graph_service.shutdown() + logger.info(f"Issued shutdown command to GraphServer @ {graph_service.address}") + except ConnectionRefusedError: + logger.warning( + f"Could not issue shutdown command to GraphServer @ {graph_service.address}; server not running?" + ) + + +def setup_shutdown_cmdline(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("shutdown") + add_address_argument(parser) + parser.set_defaults(_handler=handle_shutdown) diff --git a/src/ezmsg/core/commands/start.py b/src/ezmsg/core/commands/start.py new file mode 100644 index 00000000..8133b0db --- /dev/null +++ b/src/ezmsg/core/commands/start.py @@ -0,0 +1,36 @@ +import argparse +import asyncio +import logging +import subprocess +import sys + +from ..graphserver import GraphService +from ..netprotocol import close_stream_writer +from .common import add_address_argument, graph_address_from_args + +logger = logging.getLogger("ezmsg") + + +async def handle_start(args: argparse.Namespace) -> None: + graph_address = graph_address_from_args(args) + graph_service = GraphService(graph_address) + + popen = subprocess.Popen( + [sys.executable, "-m", "ezmsg.core", "serve", f"--address={graph_address}"] + ) + + while True: + try: + _, writer = await graph_service.open_connection() + await close_stream_writer(writer) + break + except ConnectionRefusedError: + await asyncio.sleep(0.1) + + logger.info(f"Forked ezmsg servers in PID: {popen.pid}") + + +def setup_start_cmdline(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("start") + add_address_argument(parser) + parser.set_defaults(_handler=handle_start) diff --git a/src/ezmsg/util/perf/command.py b/src/ezmsg/util/perf/command.py index 9dab1f8e..31f70451 100644 --- a/src/ezmsg/util/perf/command.py +++ b/src/ezmsg/util/perf/command.py @@ -1,4 +1,5 @@ import argparse +import sys from .ab import setup_ab_cmdline from .analysis import setup_summary_cmdline @@ -6,16 +7,30 @@ from .run import setup_run_cmdline -def command() -> None: +def setup_perf_cmdline(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser("perf", help="performance test utilities") + perf_subparsers = parser.add_subparsers(dest="perf_command", required=True) + + setup_run_cmdline(perf_subparsers) + setup_hotpath_cmdline(perf_subparsers) + setup_ab_cmdline(perf_subparsers) + setup_summary_cmdline(perf_subparsers) + + +def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="ezmsg perf test utility") subparsers = parser.add_subparsers(dest="command", required=True) + setup_perf_cmdline(subparsers) + return parser + + +def command(argv: list[str] | None = None) -> None: + parser = build_parser() - setup_run_cmdline(subparsers) - setup_hotpath_cmdline(subparsers) - setup_ab_cmdline(subparsers) - setup_summary_cmdline(subparsers) + if argv is None: + argv = ["perf", *sys.argv[1:]] - ns = parser.parse_args() + ns = parser.parse_args(argv) ns._handler(ns) diff --git a/tests/test_command.py b/tests/test_command.py new file mode 100644 index 00000000..6a737253 --- /dev/null +++ b/tests/test_command.py @@ -0,0 +1,68 @@ +import pytest + +from ezmsg.core.command import build_parser + + +def test_mermaid_subparser_accepts_mermaid_specific_args(): + parser = build_parser() + + args = parser.parse_args( + [ + "mermaid", + "--address", + "127.0.0.1:4000", + "--target", + "ink", + "-cc", + "--nobrowser", + ] + ) + + assert args.command == "mermaid" + assert args.address == "127.0.0.1:4000" + assert args.target == "ink" + assert args.compact == 2 + assert args.nobrowser is True + + +def test_perf_subparser_accepts_nested_perf_args(): + parser = build_parser() + + args = parser.parse_args( + [ + "perf", + "hotpath", + "--count", + "10", + "--samples", + "2", + "--quiet", + ] + ) + + assert args.command == "perf" + assert args.perf_command == "hotpath" + assert args.count == 10 + assert args.samples == 2 + assert args.quiet is True + + +def test_graphviz_subparser_rejects_mermaid_only_args(): + parser = build_parser() + + with pytest.raises(SystemExit): + parser.parse_args(["graphviz", "--nobrowser"]) + + +def test_serve_subparser_rejects_visualization_args(): + parser = build_parser() + + with pytest.raises(SystemExit): + parser.parse_args(["serve", "--target", "play"]) + + +def test_perf_subparser_rejects_core_only_args(): + parser = build_parser() + + with pytest.raises(SystemExit): + parser.parse_args(["perf", "hotpath", "--address", "127.0.0.1:4000"])