From 8dcb9a058eb9dc6b0ab2d4555c5a18ea08bb9ff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 13 Feb 2026 13:09:37 +0100 Subject: [PATCH 1/5] Switch from SingleNodeExecutor to TestClusterExecutor --- amorphouspy_api/src/amorphouspy_api/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amorphouspy_api/src/amorphouspy_api/worker.py b/amorphouspy_api/src/amorphouspy_api/worker.py index 5e1d8fd0..aeddb898 100644 --- a/amorphouspy_api/src/amorphouspy_api/worker.py +++ b/amorphouspy_api/src/amorphouspy_api/worker.py @@ -65,7 +65,7 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, melt_quench_simulation, ) from amorphouspy.workflows.structural_analysis import analyze_structure - from executorlib import SingleNodeExecutor + from executorlib.api import TestClusterExecutor # Create composition string from request comp_parts = [] @@ -88,7 +88,7 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, logger.info(f"Task {task_id}: Using shared project directory: {project_path}") # Create executor for caching workflow results - with SingleNodeExecutor(cache_directory=project_path) as exe: + with TestClusterExecutor(cache_directory=project_path) as exe: atoms_dict = exe.submit( get_structure_dict, composition=composition, From 7b185f029549f2697502c58990032d8bd17369b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 13 Feb 2026 18:24:42 +0100 Subject: [PATCH 2/5] test with dependencies --- amorphouspy_api/src/amorphouspy_api/worker.py | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/amorphouspy_api/src/amorphouspy_api/worker.py b/amorphouspy_api/src/amorphouspy_api/worker.py index aeddb898..4c8a08d7 100644 --- a/amorphouspy_api/src/amorphouspy_api/worker.py +++ b/amorphouspy_api/src/amorphouspy_api/worker.py @@ -65,6 +65,7 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, melt_quench_simulation, ) from amorphouspy.workflows.structural_analysis import analyze_structure + from executorlib import get_item_from_future from executorlib.api import TestClusterExecutor # Create composition string from request @@ -89,23 +90,23 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, # Create executor for caching workflow results with TestClusterExecutor(cache_directory=project_path) as exe: - atoms_dict = exe.submit( + atoms_dict_future = exe.submit( get_structure_dict, composition=composition, # n_molecules=5000, # Default number of molecules target_atoms=request.n_atoms, - ).result() - logger.info(f"Task {task_id}: Structure dictionary created with {len(atoms_dict['atoms'])} atoms") + ) + # logger.info(f"Task {task_id}: Structure dictionary created with {len(atoms_dict['atoms'])} atoms") structure_future = exe.submit( get_ase_structure, - atoms_dict=atoms_dict, + atoms_dict=atoms_dict_future, ) logger.info(f"Task {task_id}: ASE structure created") potential_future = exe.submit( generate_potential, - atoms_dict=atoms_dict, + atoms_dict=atoms_dict_future, potential_type=request.potential_type, ) logger.info(f"Task {task_id}: Potential generated") @@ -123,7 +124,7 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, # Run meltquench simulation logger.info(f"Task {task_id}: Executing simulation workflow") - result = exe.submit( + result_future = exe.submit( melt_quench_simulation, structure=structure_future, potential=potential_future, @@ -133,7 +134,7 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, cooling_rate=request.cooling_rate, langevin=False, server_kwargs={}, - ).result() + ) logger.info(f"Task {task_id}: Simulation completed successfully") # Update task status for structural analysis @@ -143,15 +144,16 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, logger.info(f"Task {task_id}: Starting structural analysis") # Perform structural analysis on the final structure (includes density calculation) - final_structure = result["structure"] - logger.info(f"Task {task_id}: Analyzing structure with {len(final_structure)} atoms") + # final_structure = result["structure"] + # logger.info(f"Task {task_id}: Analyzing structure with {len(final_structure)} atoms") # Run structural analysis - structural_data = exe.submit( - analyze_structure, - atoms=final_structure, - ).result() - logger.info(f"Task {task_id}: Structural analysis completed successfully") + structural_data_future = exe.submit( + analyze_structure, atoms=get_item_from_future(result_future, key="structure"), + ) + logger.info(f"Task {task_id}: Structural analysis submitted") + structural_data = structural_data_future.result() + result = result_future.result() # Debug: Check what fields are present in the structural_data object logger.info(f"Task {task_id}: StructureData type: {type(structural_data)}") From b669586a6ff196ff825325ecdaf34d742e81240d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 13 Feb 2026 22:52:39 +0100 Subject: [PATCH 3/5] Fix argument passing in structural analysis submission --- amorphouspy_api/src/amorphouspy_api/worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/amorphouspy_api/src/amorphouspy_api/worker.py b/amorphouspy_api/src/amorphouspy_api/worker.py index 4c8a08d7..de8d9810 100644 --- a/amorphouspy_api/src/amorphouspy_api/worker.py +++ b/amorphouspy_api/src/amorphouspy_api/worker.py @@ -149,7 +149,8 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, # Run structural analysis structural_data_future = exe.submit( - analyze_structure, atoms=get_item_from_future(result_future, key="structure"), + analyze_structure, + atoms=get_item_from_future(result_future, key="structure"), ) logger.info(f"Task {task_id}: Structural analysis submitted") structural_data = structural_data_future.result() From c1e8b59bf8e26f29a2a3be7961f4e9882f90b889 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 13 Feb 2026 22:54:45 +0100 Subject: [PATCH 4/5] Update executorlib version to 1.8.1 --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index ed75f161..9b915367 100644 --- a/environment.yml +++ b/environment.yml @@ -5,7 +5,7 @@ dependencies: - python =3.13 - ase >=3.25.0 - cryptography =45.0.7 -- executorlib =1.7.4 +- executorlib =1.8.1 - hatchling - jupyter - lammps =2024.08.29=*_openmpi_* From f59ae06673e2671bae789338c7772833119f8bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 14 Feb 2026 06:59:58 +0100 Subject: [PATCH 5/5] no more context --- amorphouspy_api/src/amorphouspy_api/worker.py | 133 +++++++++--------- 1 file changed, 67 insertions(+), 66 deletions(-) diff --git a/amorphouspy_api/src/amorphouspy_api/worker.py b/amorphouspy_api/src/amorphouspy_api/worker.py index de8d9810..9d68bd04 100644 --- a/amorphouspy_api/src/amorphouspy_api/worker.py +++ b/amorphouspy_api/src/amorphouspy_api/worker.py @@ -89,72 +89,73 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, logger.info(f"Task {task_id}: Using shared project directory: {project_path}") # Create executor for caching workflow results - with TestClusterExecutor(cache_directory=project_path) as exe: - atoms_dict_future = exe.submit( - get_structure_dict, - composition=composition, - # n_molecules=5000, # Default number of molecules - target_atoms=request.n_atoms, - ) - # logger.info(f"Task {task_id}: Structure dictionary created with {len(atoms_dict['atoms'])} atoms") - - structure_future = exe.submit( - get_ase_structure, - atoms_dict=atoms_dict_future, - ) - logger.info(f"Task {task_id}: ASE structure created") - - potential_future = exe.submit( - generate_potential, - atoms_dict=atoms_dict_future, - potential_type=request.potential_type, - ) - logger.info(f"Task {task_id}: Potential generated") - - # Update task status - current_task = task_store.get(task_id) or {"state": "processing"} - current_task["status"] = "Running meltquench simulation" - task_store.set(task_id, current_task) - logger.info(f"Task {task_id}: Starting meltquench simulation") - - # Use simulation parameters from the request - logger.info( - f"Task {task_id}: Using heating_rate={request.heating_rate}, cooling_rate={request.cooling_rate}, n_print={request.n_print}" - ) - - # Run meltquench simulation - logger.info(f"Task {task_id}: Executing simulation workflow") - result_future = exe.submit( - melt_quench_simulation, - structure=structure_future, - potential=potential_future, - n_print=request.n_print, - # tmp_working_directory=str(tmp_dir_base), # note: if provided needs to be static - or prevents caching at executor level - heating_rate=request.heating_rate, - cooling_rate=request.cooling_rate, - langevin=False, - server_kwargs={}, - ) - logger.info(f"Task {task_id}: Simulation completed successfully") - - # Update task status for structural analysis - current_task = task_store.get(task_id) or {"state": "processing"} - current_task["status"] = "Running structural analysis" - task_store.set(task_id, current_task) - logger.info(f"Task {task_id}: Starting structural analysis") - - # Perform structural analysis on the final structure (includes density calculation) - # final_structure = result["structure"] - # logger.info(f"Task {task_id}: Analyzing structure with {len(final_structure)} atoms") - - # Run structural analysis - structural_data_future = exe.submit( - analyze_structure, - atoms=get_item_from_future(result_future, key="structure"), - ) - logger.info(f"Task {task_id}: Structural analysis submitted") - structural_data = structural_data_future.result() - result = result_future.result() + exe = TestClusterExecutor(cache_directory=project_path) + atoms_dict_future = exe.submit( + get_structure_dict, + composition=composition, + # n_molecules=5000, # Default number of molecules + target_atoms=request.n_atoms, + ) + # logger.info(f"Task {task_id}: Structure dictionary created with {len(atoms_dict['atoms'])} atoms") + + structure_future = exe.submit( + get_ase_structure, + atoms_dict=atoms_dict_future, + ) + logger.info(f"Task {task_id}: ASE structure created") + + potential_future = exe.submit( + generate_potential, + atoms_dict=atoms_dict_future, + potential_type=request.potential_type, + ) + logger.info(f"Task {task_id}: Potential generated") + + # Update task status + current_task = task_store.get(task_id) or {"state": "processing"} + current_task["status"] = "Running meltquench simulation" + task_store.set(task_id, current_task) + logger.info(f"Task {task_id}: Starting meltquench simulation") + + # Use simulation parameters from the request + logger.info( + f"Task {task_id}: Using heating_rate={request.heating_rate}, cooling_rate={request.cooling_rate}, n_print={request.n_print}" + ) + + # Run meltquench simulation + logger.info(f"Task {task_id}: Executing simulation workflow") + result_future = exe.submit( + melt_quench_simulation, + structure=structure_future, + potential=potential_future, + n_print=request.n_print, + # tmp_working_directory=str(tmp_dir_base), # note: if provided needs to be static - or prevents caching at executor level + heating_rate=request.heating_rate, + cooling_rate=request.cooling_rate, + langevin=False, + server_kwargs={}, + ) + logger.info(f"Task {task_id}: Simulation completed successfully") + + # Update task status for structural analysis + current_task = task_store.get(task_id) or {"state": "processing"} + current_task["status"] = "Running structural analysis" + task_store.set(task_id, current_task) + logger.info(f"Task {task_id}: Starting structural analysis") + + # Perform structural analysis on the final structure (includes density calculation) + # final_structure = result["structure"] + # logger.info(f"Task {task_id}: Analyzing structure with {len(final_structure)} atoms") + + # Run structural analysis + structural_data_future = exe.submit( + analyze_structure, + atoms=get_item_from_future(result_future, key="structure"), + ) + logger.info(f"Task {task_id}: Structural analysis submitted") + exe.shutdown(wait=True, cancel_futures=False) + structural_data = structural_data_future.result() + result = result_future.result() # Debug: Check what fields are present in the structural_data object logger.info(f"Task {task_id}: StructureData type: {type(structural_data)}")