From 5e47da3c75b1462433678bf34bbf49c0efb1e592 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Mon, 24 Feb 2025 14:30:53 +0000 Subject: [PATCH 01/18] Make read only false when reconnecting --- poetry.lock | 50 +++++++++++++++++++++++++------------------------- simvue/run.py | 2 +- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/poetry.lock b/poetry.lock index ed998f4c..db892c52 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1320,23 +1320,23 @@ files = [ [[package]] name = "narwhals" -version = "1.27.1" +version = "1.28.0" description = "Extremely lightweight compatibility layer between dataframe libraries" optional = true python-versions = ">=3.8" groups = ["main"] markers = "python_version <= \"3.11\" and extra == \"plot\" or python_version >= \"3.12\" and extra == \"plot\"" files = [ - {file = "narwhals-1.27.1-py3-none-any.whl", hash = "sha256:71e4a126007886e3dd9d71d0d5921ebd2e8c1f9be9c405fe11850ece2b066c59"}, - {file = "narwhals-1.27.1.tar.gz", hash = "sha256:68505d0cee1e6c00382ac8b65e922f8b694a11cbe482a057fa63139de8d0ea03"}, + {file = "narwhals-1.28.0-py3-none-any.whl", hash = "sha256:45d909ad6240944d447b0dae38074c5a919830dff3868d57b05a5526c1f06fe4"}, + {file = "narwhals-1.28.0.tar.gz", hash = "sha256:a2213fa44a039f724278fb15609889319e7c240403413f2606cc856c8d8f708d"}, ] [package.extras] core = ["duckdb", "pandas", "polars", "pyarrow", "pyarrow-stubs"] cudf = ["cudf (>=24.10.0)"] dask = ["dask[dataframe] (>=2024.8)"] -dev = ["covdefaults", "hypothesis", "mypy (>=1.15.0,<1.16.0)", "pandas-stubs", "pre-commit", "pytest", "pytest-cov", "pytest-env", "pytest-randomly", "typing-extensions"] -docs = ["black", "duckdb", "jinja2", "markdown-exec[ansi]", "mkdocs", "mkdocs-autorefs", "mkdocs-material", "mkdocstrings[python]", "pandas", "polars (>=1.0.0)", "pyarrow"] +dev = ["covdefaults", "hypothesis", "mypy (>=1.15.0,<1.16.0)", "pandas-stubs", "pre-commit", "pyright", "pytest", "pytest-cov", "pytest-env", "pytest-randomly", "typing-extensions"] +docs = ["black", "duckdb", "jinja2", "markdown-exec[ansi]", "mkdocs", "mkdocs-autorefs", "mkdocs-material", "mkdocstrings-python (>=1.16)", "mkdocstrings[python]", "pandas", "polars (>=1.0.0)", "pyarrow"] duckdb = ["duckdb (>=1.0)"] extra = ["scikit-learn"] ibis = ["ibis-framework (>=6.0.0)", "packaging", "pyarrow-hotfix", "rich"] @@ -1346,7 +1346,7 @@ polars = ["polars (>=0.20.3)"] pyarrow = ["pyarrow (>=11.0.0)"] pyspark = ["pyspark (>=3.5.0)"] tests = ["covdefaults", "hypothesis", "pytest", "pytest-cov", "pytest-env", "pytest-randomly", "typing-extensions"] -typing = ["mypy (>=1.15.0,<1.16.0)", "pandas-stubs", "typing-extensions"] +typing = ["mypy (>=1.15.0,<1.16.0)", "pandas-stubs", "pyright", "typing-extensions"] [[package]] name = "numpy" @@ -2310,31 +2310,31 @@ jupyter = ["ipywidgets (>=7.5.1,<9)"] [[package]] name = "ruff" -version = "0.9.6" +version = "0.9.7" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" groups = ["dev"] markers = "python_version <= \"3.11\" or python_version >= \"3.12\"" files = [ - {file = "ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba"}, - {file = "ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504"}, - {file = "ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5"}, - {file = "ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217"}, - {file = "ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6"}, - {file = "ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897"}, - {file = "ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08"}, - {file = "ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656"}, - {file = "ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d"}, - {file = "ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa"}, - {file = "ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a"}, - {file = "ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9"}, + {file = "ruff-0.9.7-py3-none-linux_armv6l.whl", hash = "sha256:99d50def47305fe6f233eb8dabfd60047578ca87c9dcb235c9723ab1175180f4"}, + {file = "ruff-0.9.7-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:d59105ae9c44152c3d40a9c40d6331a7acd1cdf5ef404fbe31178a77b174ea66"}, + {file = "ruff-0.9.7-py3-none-macosx_11_0_arm64.whl", hash = "sha256:f313b5800483770bd540cddac7c90fc46f895f427b7820f18fe1822697f1fec9"}, + {file = "ruff-0.9.7-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:042ae32b41343888f59c0a4148f103208bf6b21c90118d51dc93a68366f4e903"}, + {file = "ruff-0.9.7-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:87862589373b33cc484b10831004e5e5ec47dc10d2b41ba770e837d4f429d721"}, + {file = "ruff-0.9.7-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a17e1e01bee0926d351a1ee9bc15c445beae888f90069a6192a07a84af544b6b"}, + {file = "ruff-0.9.7-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:7c1f880ac5b2cbebd58b8ebde57069a374865c73f3bf41f05fe7a179c1c8ef22"}, + {file = "ruff-0.9.7-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e63fc20143c291cab2841dbb8260e96bafbe1ba13fd3d60d28be2c71e312da49"}, + {file = "ruff-0.9.7-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:91ff963baed3e9a6a4eba2a02f4ca8eaa6eba1cc0521aec0987da8d62f53cbef"}, + {file = "ruff-0.9.7-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:88362e3227c82f63eaebf0b2eff5b88990280fb1ecf7105523883ba8c3aaf6fb"}, + {file = "ruff-0.9.7-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:0372c5a90349f00212270421fe91874b866fd3626eb3b397ede06cd385f6f7e0"}, + {file = "ruff-0.9.7-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:d76b8ab60e99e6424cd9d3d923274a1324aefce04f8ea537136b8398bbae0a62"}, + {file = "ruff-0.9.7-py3-none-musllinux_1_2_i686.whl", hash = "sha256:0c439bdfc8983e1336577f00e09a4e7a78944fe01e4ea7fe616d00c3ec69a3d0"}, + {file = "ruff-0.9.7-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:115d1f15e8fdd445a7b4dc9a30abae22de3f6bcabeb503964904471691ef7606"}, + {file = "ruff-0.9.7-py3-none-win32.whl", hash = "sha256:e9ece95b7de5923cbf38893f066ed2872be2f2f477ba94f826c8defdd6ec6b7d"}, + {file = "ruff-0.9.7-py3-none-win_amd64.whl", hash = "sha256:3770fe52b9d691a15f0b87ada29c45324b2ace8f01200fb0c14845e499eb0c2c"}, + {file = "ruff-0.9.7-py3-none-win_arm64.whl", hash = "sha256:b075a700b2533feb7a01130ff656a4ec0d5f340bb540ad98759b8401c32c2037"}, + {file = "ruff-0.9.7.tar.gz", hash = "sha256:643757633417907510157b206e490c3aa11cab0c087c912f60e07fbafa87a4c6"}, ] [[package]] diff --git a/simvue/run.py b/simvue/run.py index 5bfda8b3..b316f053 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -931,7 +931,7 @@ def reconnect(self, run_id: str) -> bool: self._status = "running" self._id = run_id - self._sv_obj = RunObject(identifier=self._id) + self._sv_obj = RunObject(identifier=self._id, _read_only=False) self._start(reconnect=True) return True From 236f1d791f639ef96c7835bc3b6b5bb05f41bfb9 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Mon, 24 Feb 2025 14:56:08 +0000 Subject: [PATCH 02/18] Fixed resource metric collection to collect on startup, and to collect first measurement accurately --- simvue/metrics.py | 8 ++++++-- simvue/run.py | 15 +++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/simvue/metrics.py b/simvue/metrics.py index fc345f56..6b224106 100644 --- a/simvue/metrics.py +++ b/simvue/metrics.py @@ -36,14 +36,18 @@ def get_process_memory(processes: list[psutil.Process]) -> int: return rss -def get_process_cpu(processes: list[psutil.Process]) -> int: +def get_process_cpu( + processes: list[psutil.Process], interval: float | None = None +) -> int: """ Get the CPU usage + + If first time being called, use a small interval to collect initial CPU metrics. """ cpu_percent: int = 0 for process in processes: with contextlib.suppress(Exception): - cpu_percent += process.cpu_percent() + cpu_percent += process.cpu_percent(interval=interval) return cpu_percent diff --git a/simvue/run.py b/simvue/run.py index b316f053..227bb62a 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -305,15 +305,20 @@ def processes(self) -> list[psutil.Process]: return list(set(process_list)) - def _get_sysinfo(self) -> dict[str, typing.Any]: + def _get_sysinfo(self, interval: float | None = None) -> dict[str, typing.Any]: """Retrieve system administration + Parameters + ---------- + interval : float | None + The interval to use for collection of CPU metrics, by default None (non blocking) + Returns ------- dict[str, typing.Any] retrieved system specifications """ - cpu = get_process_cpu(self.processes) + cpu = get_process_cpu(self.processes, interval=interval) memory = get_process_memory(self.processes) gpu = get_gpu_metrics(self.processes) data: dict[str, typing.Any] = {} @@ -348,6 +353,12 @@ def _heartbeat( if not heartbeat_trigger: raise RuntimeError("Expected initialisation of heartbeat") + # Add initial resource metrics + if self._resources_metrics_interval: + self._add_metrics_to_dispatch( + self._get_sysinfo(interval=0.1), join_on_fail=False + ) + last_heartbeat = time.time() last_res_metric_call = time.time() From 48a944a5bd2a84076a1f6ba469386a36bc922d2b Mon Sep 17 00:00:00 2001 From: Matt Field Date: Mon, 24 Feb 2025 15:25:48 +0000 Subject: [PATCH 03/18] Update parent process when pid is updated --- simvue/run.py | 1 + 1 file changed, 1 insertion(+) diff --git a/simvue/run.py b/simvue/run.py index 227bb62a..516cff8c 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -958,6 +958,7 @@ def set_pid(self, pid: int) -> None: PID of the process to be monitored """ self._pid = pid + self._parent_process = psutil.Process(self._pid) @skip_if_failed("_aborted", "_suppress_errors", False) @pydantic.validate_call From 6f35639585a1d363e65cfb525e8f7c5bc5889a90 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Mon, 24 Feb 2025 16:21:28 +0000 Subject: [PATCH 04/18] Add threading to heartbeat offline sender --- simvue/sender.py | 64 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 18 deletions(-) diff --git a/simvue/sender.py b/simvue/sender.py index 49751127..658cfe9f 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -116,6 +116,31 @@ def upload_cached_file( _logger.info(f"Run {_current_id} closed - deleting cached copies...") +def send_heartbeat( + file_path: pydantic.FilePath, + id_mapping: dict[str, str], + server_url: str, + headers: dict[str, str], +): + _offline_id = file_path.name.split(".")[0] + _online_id = id_mapping.get(_offline_id) + if not _online_id: + # Run has been closed - can just remove heartbeat and continue + file_path.unlink() + return + _logger.info(f"Sending heartbeat to run {_online_id}") + _response = requests.put( + f"{server_url}/runs/{_online_id}/heartbeat", + headers=headers, + ) + if _response.status_code == 200: + file_path.unlink() + else: + _logger.warning( + f"Attempting to send heartbeat to run {_online_id} returned status code {_response.status_code}." + ) + + @pydantic.validate_call def sender( cache_dir: pydantic.DirectoryPath | None = None, @@ -169,23 +194,26 @@ def sender( "Authorization": f"Bearer {_user_config.server.token.get_secret_value()}", "User-Agent": f"Simvue Python client {__version__}", } - - for _heartbeat_file in cache_dir.glob("runs/*.heartbeat"): - _offline_id = _heartbeat_file.name.split(".")[0] - _online_id = _id_mapping.get(_offline_id) - if not _online_id: - # Run has been closed - can just remove heartbeat and continue - _heartbeat_file.unlink() - continue - _logger.info(f"Sending heartbeat to run {_online_id}") - _response = requests.put( - f"{_user_config.server.url}/runs/{_online_id}/heartbeat", - headers=_headers, - ) - if _response.status_code == 200: - _heartbeat_file.unlink() - else: - _logger.warning( - f"Attempting to send heartbeat to run {_online_id} returned status code {_response.status_code}." + _heartbeat_files = list(cache_dir.glob("runs/*.heartbeat")) + if len(_heartbeat_files) < threading_threshold: + for _heartbeat_file in _heartbeat_files: + ( + send_heartbeat( + file_path=_heartbeat_file, + id_mapping=_id_mapping, + server_url=_user_config.server.url, + headers=_headers, + ), + ) + else: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + _results = executor.map( + lambda _heartbeat_file: send_heartbeat( + file_path=_heartbeat_file, + id_mapping=_id_mapping, + server_url=_user_config.server.url, + headers=_headers, + ), + _heartbeat_files, ) return _id_mapping From 454c141c817d1d5d7b7b69967e19b32a4a99aabe Mon Sep 17 00:00:00 2001 From: Matt Field Date: Mon, 24 Feb 2025 16:32:59 +0000 Subject: [PATCH 05/18] Get full list of files for upload at start --- simvue/sender.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/simvue/sender.py b/simvue/sender.py index 658cfe9f..6243f675 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -170,9 +170,14 @@ def sender( } _lock = threading.Lock() _upload_order = [item for item in UPLOAD_ORDER if item in objects_to_upload] + # Glob all files to look in at the start, to prevent extra files being written while other types are being uploaded + _all_offline_files = { + obj_type: list(cache_dir.glob(f"{obj_type}/*.json")) + for obj_type in _upload_order + } for _obj_type in _upload_order: - _offline_files = list(cache_dir.glob(f"{_obj_type}/*.json")) + _offline_files = _all_offline_files[_obj_type] if len(_offline_files) < threading_threshold: for file_path in _offline_files: upload_cached_file(cache_dir, _obj_type, file_path, _id_mapping, _lock) From b085d0ba0e75b57a880dae440d60da82e5000946 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Mon, 24 Feb 2025 18:21:33 +0000 Subject: [PATCH 06/18] Remove read only from set_folder_details --- simvue/run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/simvue/run.py b/simvue/run.py index 516cff8c..4ad4d44a 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -1614,7 +1614,6 @@ def set_folder_details( return False try: - self._folder.read_only(False) if metadata: self._folder.metadata = metadata if tags: From ac5b743c66a744d030896f023c2ae760f17580c7 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Mon, 24 Feb 2025 18:21:50 +0000 Subject: [PATCH 07/18] Remove read only from set_folder_details --- simvue/run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/simvue/run.py b/simvue/run.py index 4ad4d44a..1df56a85 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -1621,7 +1621,6 @@ def set_folder_details( if description: self._folder.description = description self._folder.commit() - self._folder.read_only(True) except (RuntimeError, ValueError, pydantic.ValidationError) as e: self._error(f"Failed to update folder '{self._folder.name}' details: {e}") return False From 8b45e558e4ec05c6469ddcde574dd24414c07e3a Mon Sep 17 00:00:00 2001 From: Matt Field Date: Mon, 24 Feb 2025 18:36:33 +0000 Subject: [PATCH 08/18] Add tests for reconnect --- tests/functional/test_run_class.py | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index b447659b..0ed75bfc 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -929,3 +929,41 @@ def test_run_created_with_no_timeout() -> None: client = simvue.Client() assert client.get_run(run._id) +@pytest.mark.parametrize("mode", ("online", "offline"), ids=("online", "offline")) +@pytest.mark.run +def test_reconnect(mode, monkeypatch: pytest.MonkeyPatch) -> None: + if mode == "offline": + temp_d = tempfile.TemporaryDirectory() + monkeypatch.setenv("SIMVUE_OFFLINE_DIRECTORY", temp_d) + + with simvue.Run(mode=mode) as run: + run.init( + name="test_reconnect", + folder="/simvue_unit_testing", + retention_period="2 minutes", + timeout=None, + running=False + ) + run_id = run.id + if mode == "offline": + _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) + run_id = _id_mapping.get(run_id) + + client = simvue.Client() + _created_run = client.get_run(run_id) + assert _created_run.status == "created" + time.sleep(1) + + with simvue.Run() as run: + run.reconnect(run_id) + run.log_metrics({"test_metric": 1}) + run.log_event("Testing!") + + if mode == "offline": + _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) + + _reconnected_run = client.get_run(run_id) + assert dict(_reconnected_run.metrics)["test_metric"]["last"] == 1 + assert client.get_events(run_id)[0]["message"] == "Testing!" + + \ No newline at end of file From 4ae9597ebfefa66fb1e71da1ad9bbeb862ebb540 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Tue, 25 Feb 2025 08:57:57 +0000 Subject: [PATCH 09/18] Change created/updated in sender to be one line --- simvue/sender.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/simvue/sender.py b/simvue/sender.py index 6243f675..57cd19e1 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -89,10 +89,9 @@ def upload_cached_file( raise RuntimeError( f"Object of type '{obj_for_upload.__class__.__name__}' has no identifier" ) - if id_mapping.get(_current_id): - _logger.info(f"Updated {obj_for_upload.__class__.__name__} '{_new_id}'") - else: - _logger.info(f"Created {obj_for_upload.__class__.__name__} '{_new_id}'") + _logger.info( + f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {obj_for_upload.__class__.__name__} '{_new_id}'" + ) file_path.unlink(missing_ok=True) if issubclass(_instance_class, simvue.api.objects.ObjectArtifact): file_path.parent.joinpath(f"{_current_id}.object").unlink() From 5e9fae95665156e4b3c5e8460cafae4982bbd16a Mon Sep 17 00:00:00 2001 From: Matt Field Date: Tue, 25 Feb 2025 10:33:46 +0000 Subject: [PATCH 10/18] Add server ID to run staging and remove server ID mapping from these in sender --- simvue/run.py | 1 + simvue/sender.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/simvue/run.py b/simvue/run.py index 1df56a85..780eb7fa 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -710,6 +710,7 @@ def init( self._sv_obj.alerts = [] self._sv_obj.created = time.time() self._sv_obj.notifications = notification + self._sv_obj._staging["folder_id"] = self._folder.id if self._status == "running": self._sv_obj.system = get_system() diff --git a/simvue/sender.py b/simvue/sender.py index 57cd19e1..fe8783b7 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -92,6 +92,7 @@ def upload_cached_file( _logger.info( f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {obj_for_upload.__class__.__name__} '{_new_id}'" ) + file_path.unlink(missing_ok=True) if issubclass(_instance_class, simvue.api.objects.ObjectArtifact): file_path.parent.joinpath(f"{_current_id}.object").unlink() @@ -106,9 +107,11 @@ def upload_cached_file( obj_type == "runs" and cache_dir.joinpath(f"{obj_type}", f"{_current_id}.closed").exists() ): - # Get list of alerts created by this run - their IDs can be deleted + # Get alerts and folder created by this run - their IDs can be deleted for id in _data.get("alerts", []): cache_dir.joinpath("server_ids", f"{id}.txt").unlink() + if _folder_id := _data.get("folder_id"): + cache_dir.joinpath("server_ids", f"{_folder_id}.txt").unlink() cache_dir.joinpath("server_ids", f"{_current_id}.txt").unlink() cache_dir.joinpath(f"{obj_type}", f"{_current_id}.closed").unlink() From b93e6061fd5c027eaf751314bf08a78c58b2ec9d Mon Sep 17 00:00:00 2001 From: Matt Field Date: Tue, 25 Feb 2025 10:40:01 +0000 Subject: [PATCH 11/18] Add sender lock file --- simvue/sender.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/simvue/sender.py b/simvue/sender.py index fe8783b7..6124c405 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -165,6 +165,14 @@ def sender( """ _user_config = SimvueConfiguration.fetch() cache_dir = cache_dir or _user_config.offline.cache + + # Check that no other sender is already currently running... + if cache_dir.joinpath("sender.lock").exists(): + raise RuntimeError("A sender is already running for this cache!") + + # Create lock file to prevent other senders running while this one isn't finished + cache_dir.joinpath("sender.lock").touch() + cache_dir.joinpath("server_ids").mkdir(parents=True, exist_ok=True) _id_mapping: dict[str, str] = { file_path.name.split(".")[0]: file_path.read_text() @@ -223,4 +231,6 @@ def sender( ), _heartbeat_files, ) + # Remove lock file to allow another sender to start in the future + cache_dir.joinpath("sender.lock").unlink() return _id_mapping From c053936b7739b833131cd225ba70ddf663bac04f Mon Sep 17 00:00:00 2001 From: Matt Field Date: Wed, 26 Feb 2025 09:50:14 +0000 Subject: [PATCH 12/18] Add option to specify alert by name in log_alert --- simvue/executor.py | 6 ++-- simvue/run.py | 47 +++++++++++++++++++++++------- tests/functional/test_run_class.py | 2 +- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/simvue/executor.py b/simvue/executor.py index a6bc7c4f..df1dee37 100644 --- a/simvue/executor.py +++ b/simvue/executor.py @@ -348,9 +348,11 @@ def _update_alerts(self) -> None: if self._runner._dispatcher: self._runner._dispatcher.purge() - self._runner.log_alert(self._alert_ids[proc_id], "critical") + self._runner.log_alert( + identifier=self._alert_ids[proc_id], state="critical" + ) else: - self._runner.log_alert(self._alert_ids[proc_id], "ok") + self._runner.log_alert(identifier=self._alert_ids[proc_id], state="ok") _current_time: float = 0 while ( diff --git a/simvue/run.py b/simvue/run.py index 780eb7fa..230ae51b 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -1929,16 +1929,21 @@ def create_user_alert( @check_run_initialised @pydantic.validate_call def log_alert( - self, identifier: str, state: typing.Literal["ok", "critical"] + self, + identifier: str | None = None, + name: str | None = None, + state: typing.Literal["ok", "critical"] = "critical", ) -> bool: - """Set the state of an alert + """Set the state of an alert - either specify the alert by ID or name. Parameters ---------- - identifier : str - identifier of alert to update + identifier : str | None + ID of alert to update, by default None + name : str | None + Name of the alert to update, by default None state : Literal['ok', 'critical'] - state to set alert to + state to set alert to, by default 'critical' Returns ------- @@ -1949,13 +1954,33 @@ def log_alert( self._error('state must be either "ok" or "critical"') return False + if (identifier and name) or (not identifier and not name): + self._error("Please specify alert to update either by ID or by name.") + return False + + if name: + try: + if alerts := Alert.get(offline=self._user_config.run.mode == "offline"): + identifier = next( + (id for id, alert in alerts if alert.name == name), None + ) + else: + self._error("No existing alerts") + return False + except RuntimeError as e: + self._error(f"{e.args[0]}") + return False + + if not identifier: + self._error(f"Alert with name '{name}' could not be found.") + _alert = UserAlert(identifier=identifier) - # if not isinstance(_alert, UserAlert): - # self._error( - # f"Cannot update state for alert '{identifier}' " - # f"of type '{_alert.__class__.__name__.lower()}'" - # ) - # return False + if not isinstance(_alert, UserAlert): + self._error( + f"Cannot update state for alert '{identifier}' " + f"of type '{_alert.__class__.__name__.lower()}'" + ) + return False _alert.read_only(False) _alert.set_status(run_id=self._id, status=state) _alert.commit() diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index 0ed75bfc..ceefb04c 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -885,7 +885,7 @@ def testing_exit(status: int) -> None: alert_id = run.create_user_alert("abort_test", trigger_abort=True) run.add_process(identifier="forever_long", executable="bash", c="sleep 10") time.sleep(2) - run.log_alert(alert_id, "critical") + run.log_alert(identifier=alert_id, state="critical") time.sleep(1) _alert = Alert(identifier=alert_id) assert _alert.get_status(run.id) == "critical" From dfa60c8debb433b17c4e13d0520d5c3342dd66bb Mon Sep 17 00:00:00 2001 From: Matt Field Date: Wed, 26 Feb 2025 10:18:18 +0000 Subject: [PATCH 13/18] Add test for log_alert --- tests/functional/test_run_class.py | 44 ++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index ceefb04c..88acc319 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -806,6 +806,50 @@ def test_add_alerts() -> None: for _id in _expected_alerts: client.delete_alert(_id) +@pytest.mark.run +def test_log_alert() -> None: + _uuid = f"{uuid.uuid4()}".split("-")[0] + + run = sv_run.Run() + run.init( + name="test_log_alerts", + folder="/simvue_unit_tests", + retention_period="1 min", + tags=["test_add_alerts"], + visibility="tenant" + ) + _run_id = run._id + # Create a user alert + _id = run.create_user_alert( + name=f"user_alert_{_uuid}", + ) + + # Set alert state to critical by name + run.log_alert(name=f"user_alert_{_uuid}", state="critical") + time.sleep(1) + + client = sv_cl.Client() + _alert = client.get_alerts(run_id=_run_id, critical_only=False, names_only=False)[0] + assert _alert.get_status(_run_id) == "critical" + + # Set alert state to OK by ID + run.log_alert(identifier=_id, state="ok") + time.sleep(1) + + _alert.refresh() + assert _alert.get_status(_run_id) == "ok" + import pdb; pdb.set_trace() + + # Check invalid name throws sensible error + with pytest.raises(RuntimeError) as e: + run.log_alert(name="fake_name_1234321", state='critical') + assert "Alert with name 'fake_name_1234321' could not be found." in str(e.value) + + # Check you cannot specify both ID and name + with pytest.raises(RuntimeError) as e: + run.log_alert(identifier="myid", name="myname", state='critical') + assert "Please specify alert to update either by ID or by name." in str(e.value) + @pytest.mark.run def test_abort_on_alert_process(mocker: pytest_mock.MockerFixture) -> None: From 18923e72a888fd8dc0ac5ccbdcf6c57673918a41 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Wed, 26 Feb 2025 12:17:26 +0000 Subject: [PATCH 14/18] Added log alert test --- tests/functional/test_run_class.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index 88acc319..e3654e06 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -834,11 +834,10 @@ def test_log_alert() -> None: # Set alert state to OK by ID run.log_alert(identifier=_id, state="ok") - time.sleep(1) + time.sleep(2) _alert.refresh() assert _alert.get_status(_run_id) == "ok" - import pdb; pdb.set_trace() # Check invalid name throws sensible error with pytest.raises(RuntimeError) as e: From 2742b53c69cf268e37dfe6934c24423818c3f766 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Wed, 26 Feb 2025 13:47:59 +0000 Subject: [PATCH 15/18] Fix bug where sender wont work if cache doesnt exist --- simvue/sender.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/simvue/sender.py b/simvue/sender.py index 6124c405..8a552f6e 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -166,6 +166,8 @@ def sender( _user_config = SimvueConfiguration.fetch() cache_dir = cache_dir or _user_config.offline.cache + cache_dir.joinpath("server_ids").mkdir(parents=True, exist_ok=True) + # Check that no other sender is already currently running... if cache_dir.joinpath("sender.lock").exists(): raise RuntimeError("A sender is already running for this cache!") @@ -173,7 +175,6 @@ def sender( # Create lock file to prevent other senders running while this one isn't finished cache_dir.joinpath("sender.lock").touch() - cache_dir.joinpath("server_ids").mkdir(parents=True, exist_ok=True) _id_mapping: dict[str, str] = { file_path.name.split(".")[0]: file_path.read_text() for file_path in cache_dir.glob("server_ids/*.txt") From ad7bd2401f9377bc80d2ccfb40e0d606c8852a5a Mon Sep 17 00:00:00 2001 From: Matt Field Date: Wed, 26 Feb 2025 15:38:45 +0000 Subject: [PATCH 16/18] Use same list of processes for all system metric collection and use small interval for CPU metrics as otherwise it reports 0 for fds every time --- simvue/run.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/simvue/run.py b/simvue/run.py index 230ae51b..35ac991d 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -318,9 +318,10 @@ def _get_sysinfo(self, interval: float | None = None) -> dict[str, typing.Any]: dict[str, typing.Any] retrieved system specifications """ - cpu = get_process_cpu(self.processes, interval=interval) - memory = get_process_memory(self.processes) - gpu = get_gpu_metrics(self.processes) + processes = self.processes + cpu = get_process_cpu(processes, interval=0.1) + memory = get_process_memory(processes) + gpu = get_gpu_metrics(processes) data: dict[str, typing.Any] = {} if memory is not None and cpu is not None: @@ -353,12 +354,6 @@ def _heartbeat( if not heartbeat_trigger: raise RuntimeError("Expected initialisation of heartbeat") - # Add initial resource metrics - if self._resources_metrics_interval: - self._add_metrics_to_dispatch( - self._get_sysinfo(interval=0.1), join_on_fail=False - ) - last_heartbeat = time.time() last_res_metric_call = time.time() From 0d35793971e025c0594f7f861f213baaa2050ef6 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Wed, 26 Feb 2025 16:44:55 +0000 Subject: [PATCH 17/18] Fixed get_alerts and improved tests - unstable due to server bug with user alerts --- simvue/client.py | 20 ++++++---- tests/functional/test_client.py | 69 +++++++++++++++++++++++++-------- 2 files changed, 65 insertions(+), 24 deletions(-) diff --git a/simvue/client.py b/simvue/client.py index c69e6b3a..4bc7888d 100644 --- a/simvue/client.py +++ b/simvue/client.py @@ -990,17 +990,23 @@ def get_alerts( RuntimeError if there was a failure retrieving data from the server """ - if not run_id: + if critical_only: + raise RuntimeError( + "critical_only is ambiguous when returning alerts with no run ID specified." + ) return [alert.name if names_only else alert for _, alert in Alert.get()] # type: ignore - return [ - alert.get("name") - if names_only - else Alert(identifier=alert.get("id"), **alert) + _alerts = [ + Alert(identifier=alert.get("id"), **alert) for alert in Run(identifier=run_id).get_alert_details() - if not critical_only or alert["status"].get("current") == "critical" - ] # type: ignore + ] + + return [ + alert.name if names_only else alert + for alert in _alerts + if not critical_only or alert.get_status(run_id) == "critical" + ] @prettify_pydantic @pydantic.validate_call diff --git a/tests/functional/test_client.py b/tests/functional/test_client.py index 37c1a1ae..16169dc5 100644 --- a/tests/functional/test_client.py +++ b/tests/functional/test_client.py @@ -12,7 +12,7 @@ from simvue.exception import ObjectNotFoundError import simvue.run as sv_run import simvue.api.objects as sv_api_obj - +from simvue.api.objects.alert.base import AlertBase @pytest.mark.dependency @pytest.mark.client @@ -24,25 +24,60 @@ def test_get_events(create_test_run: tuple[sv_run.Run, dict]) -> None: @pytest.mark.dependency @pytest.mark.client @pytest.mark.parametrize( - "from_run", (True, False) + "from_run", (True, False), ids=("from_run", "all_runs") ) -def test_get_alerts(create_test_run: tuple[sv_run.Run, dict], from_run: bool) -> None: - time.sleep(1.0) +@pytest.mark.parametrize( + "names_only", (True, False), ids=("names_only", "all_details") +) +@pytest.mark.parametrize( + "critical_only", (True, False), ids=("critical_only", "all_states") +) +def test_get_alerts(create_plain_run: tuple[sv_run.Run, dict], from_run: bool, names_only: bool, critical_only: bool) -> None: + run, run_data = create_plain_run + run_id = run.id + unique_id = f"{uuid.uuid4()}".split("-")[0] + _id_1 = run.create_user_alert( + name=f"user_alert_1_{unique_id}", + ) + _id_2 = run.create_user_alert( + name=f"user_alert_2_{unique_id}", + ) + _id_3 = run.create_user_alert( + name=f"user_alert_3_{unique_id}", + attach_to_run=False + ) + run.log_alert(identifier=_id_1, state="critical") + time.sleep(2) + run.close() + client = svc.Client() - _, run_data = create_test_run - if from_run: - triggered_alerts_full = client.get_alerts(run_id=create_test_run[1]["run_id"], critical_only=False, names_only=False) - assert len(triggered_alerts_full) == 7 - for alert in triggered_alerts_full: - if alert.name == "value_above_1": - assert alert["alert"]["status"]["current"] == "critical" + + if critical_only and not from_run: + with pytest.raises(RuntimeError) as e: + _alerts = client.get_alerts(run_id=run_id if from_run else None, critical_only=critical_only, names_only=names_only) + assert "critical_only is ambiguous when returning alerts with no run ID specified." in str(e.value) else: - assert (triggered_alerts_full := client.get_alerts(names_only=True, critical_only=False)) - - for alert in run_data["created_alerts"]: - assert alert in triggered_alerts_full, f"Alert '{alert}' was not triggered" - - + _alerts = client.get_alerts(run_id=run_id if from_run else None, critical_only=critical_only, names_only=names_only) + + if names_only: + assert all(isinstance(item, str) for item in _alerts) + else: + assert all(isinstance(item, AlertBase) for item in _alerts) + _alerts = [alert.name for alert in _alerts] + + assert f"user_alert_1_{unique_id}" in _alerts + + if not from_run: + assert len(_alerts) > 2 + assert f"user_alert_3_{unique_id}" in _alerts + else: + assert f"user_alert_3_{unique_id}" not in _alerts + if critical_only: + assert len(_alerts) == 1 + else: + assert len(_alerts) == 2 + assert f"user_alert_2_{unique_id}" in _alerts + @pytest.mark.dependency @pytest.mark.client def test_get_run_id_from_name(create_test_run: tuple[sv_run.Run, dict]) -> None: From 08a086dfa6609f352377cbce6af0f18f0da359f4 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Thu, 27 Feb 2025 08:51:24 +0000 Subject: [PATCH 18/18] Fix resource metrics sending immediately again --- simvue/run.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/simvue/run.py b/simvue/run.py index 35ac991d..ec574013 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -305,7 +305,7 @@ def processes(self) -> list[psutil.Process]: return list(set(process_list)) - def _get_sysinfo(self, interval: float | None = None) -> dict[str, typing.Any]: + def _get_sysinfo(self) -> dict[str, typing.Any]: """Retrieve system administration Parameters @@ -357,6 +357,9 @@ def _heartbeat( last_heartbeat = time.time() last_res_metric_call = time.time() + if self._resources_metrics_interval: + self._add_metrics_to_dispatch(self._get_sysinfo(), join_on_fail=False) + while not heartbeat_trigger.is_set(): time.sleep(0.1)