diff --git a/amorphouspy_api/src/amorphouspy_api/worker.py b/amorphouspy_api/src/amorphouspy_api/worker.py index 5e1d8fd0..89120e70 100644 --- a/amorphouspy_api/src/amorphouspy_api/worker.py +++ b/amorphouspy_api/src/amorphouspy_api/worker.py @@ -65,7 +65,7 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, melt_quench_simulation, ) from amorphouspy.workflows.structural_analysis import analyze_structure - from executorlib import SingleNodeExecutor + from executorlib import SingleNodeExecutor, get_item_from_future # Create composition string from request comp_parts = [] @@ -88,70 +88,73 @@ def meltquench_worker(task_id: str, request_dict: dict[str, Any], db_path: str, logger.info(f"Task {task_id}: Using shared project directory: {project_path}") # Create executor for caching workflow results - with SingleNodeExecutor(cache_directory=project_path) as exe: - atoms_dict = exe.submit( - get_structure_dict, - composition=composition, - # n_molecules=5000, # Default number of molecules - target_atoms=request.n_atoms, - ).result() - logger.info(f"Task {task_id}: Structure dictionary created with {len(atoms_dict['atoms'])} atoms") - - structure_future = exe.submit( - get_ase_structure, - atoms_dict=atoms_dict, - ) - logger.info(f"Task {task_id}: ASE structure created") - - potential_future = exe.submit( - generate_potential, - atoms_dict=atoms_dict, - potential_type=request.potential_type, - ) - logger.info(f"Task {task_id}: Potential generated") - - # Update task status - current_task = task_store.get(task_id) or {"state": "processing"} - current_task["status"] = "Running meltquench simulation" - task_store.set(task_id, current_task) - logger.info(f"Task {task_id}: Starting meltquench simulation") - - # Use simulation parameters from the request - logger.info( - f"Task {task_id}: Using heating_rate={request.heating_rate}, cooling_rate={request.cooling_rate}, n_print={request.n_print}" - ) - - # Run meltquench simulation - logger.info(f"Task {task_id}: Executing simulation workflow") - result = exe.submit( - melt_quench_simulation, - structure=structure_future, - potential=potential_future, - n_print=request.n_print, - # tmp_working_directory=str(tmp_dir_base), # note: if provided needs to be static - or prevents caching at executor level - heating_rate=request.heating_rate, - cooling_rate=request.cooling_rate, - langevin=False, - server_kwargs={}, - ).result() - logger.info(f"Task {task_id}: Simulation completed successfully") - - # Update task status for structural analysis - current_task = task_store.get(task_id) or {"state": "processing"} - current_task["status"] = "Running structural analysis" - task_store.set(task_id, current_task) - logger.info(f"Task {task_id}: Starting structural analysis") - - # Perform structural analysis on the final structure (includes density calculation) - final_structure = result["structure"] - logger.info(f"Task {task_id}: Analyzing structure with {len(final_structure)} atoms") - - # Run structural analysis - structural_data = exe.submit( - analyze_structure, - atoms=final_structure, - ).result() - logger.info(f"Task {task_id}: Structural analysis completed successfully") + exe = SingleNodeExecutor(cache_directory=project_path) + atoms_dict_future = exe.submit( + get_structure_dict, + composition=composition, + # n_molecules=5000, # Default number of molecules + target_atoms=request.n_atoms, + ) + # logger.info(f"Task {task_id}: Structure dictionary created with {len(atoms_dict['atoms'])} atoms") + + structure_future = exe.submit( + get_ase_structure, + atoms_dict=atoms_dict_future, + ) + logger.info(f"Task {task_id}: ASE structure created") + + potential_future = exe.submit( + generate_potential, + atoms_dict=atoms_dict_future, + potential_type=request.potential_type, + ) + logger.info(f"Task {task_id}: Potential generated") + + # Update task status + current_task = task_store.get(task_id) or {"state": "processing"} + current_task["status"] = "Running meltquench simulation" + task_store.set(task_id, current_task) + logger.info(f"Task {task_id}: Starting meltquench simulation") + + # Use simulation parameters from the request + logger.info( + f"Task {task_id}: Using heating_rate={request.heating_rate}, cooling_rate={request.cooling_rate}, n_print={request.n_print}" + ) + + # Run meltquench simulation + logger.info(f"Task {task_id}: Executing simulation workflow") + result_future = exe.submit( + melt_quench_simulation, + structure=structure_future, + potential=potential_future, + n_print=request.n_print, + # tmp_working_directory=str(tmp_dir_base), # note: if provided needs to be static - or prevents caching at executor level + heating_rate=request.heating_rate, + cooling_rate=request.cooling_rate, + langevin=False, + server_kwargs={}, + ) + logger.info(f"Task {task_id}: Simulation completed successfully") + + # Update task status for structural analysis + current_task = task_store.get(task_id) or {"state": "processing"} + current_task["status"] = "Running structural analysis" + task_store.set(task_id, current_task) + logger.info(f"Task {task_id}: Starting structural analysis") + + # Perform structural analysis on the final structure (includes density calculation) + # final_structure = get_item_from_future(result_future, key="structure") + # logger.info(f"Task {task_id}: Analyzing structure with {len(final_structure)} atoms") + + # Run structural analysis + structural_data_future = exe.submit( + analyze_structure, atoms=get_item_from_future(result_future, key="structure") + ) + logger.info(f"Task {task_id}: Structural analysis completed successfully") + + exe.shutdown(wait=False, cancel_futures=False) + result = result_future.result() + structural_data = structural_data_future.result() # Debug: Check what fields are present in the structural_data object logger.info(f"Task {task_id}: StructureData type: {type(structural_data)}") diff --git a/environment.yml b/environment.yml index ed75f161..9b915367 100644 --- a/environment.yml +++ b/environment.yml @@ -5,7 +5,7 @@ dependencies: - python =3.13 - ase >=3.25.0 - cryptography =45.0.7 -- executorlib =1.7.4 +- executorlib =1.8.1 - hatchling - jupyter - lammps =2024.08.29=*_openmpi_*