diff --git a/amorphouspy_api/src/amorphouspy_api/app.py b/amorphouspy_api/src/amorphouspy_api/app.py index c48c9ed9..4c7b08a4 100644 --- a/amorphouspy_api/src/amorphouspy_api/app.py +++ b/amorphouspy_api/src/amorphouspy_api/app.py @@ -107,41 +107,40 @@ def submit_to_executor(request_data: dict) -> dict: """ try: # Create fresh executor to properly detect cached results - exe = get_executor(cache_directory=MELTQUENCH_PROJECT_DIR) - - # Get LAMMPS-specific resource configuration - lammps_resource_dict = get_lammps_resource_dict() - - # Submit the workflow - this returns a future for the final result - future = run_meltquench_workflow( - executor=exe, - components=request_data["components"], - values=request_data["values"], - n_atoms=request_data["n_atoms"], - potential_type=request_data["potential_type"], - heating_rate=request_data["heating_rate"], - cooling_rate=request_data["cooling_rate"], - n_print=request_data["n_print"], - lammps_resource_dict=lammps_resource_dict, - ) + with get_executor(cache_directory=MELTQUENCH_PROJECT_DIR) as exe: + # Get LAMMPS-specific resource configuration + lammps_resource_dict = get_lammps_resource_dict() + + # Submit the workflow - this returns a future for the final result + future = run_meltquench_workflow( + executor=exe, + components=request_data["components"], + values=request_data["values"], + n_atoms=request_data["n_atoms"], + potential_type=request_data["potential_type"], + heating_rate=request_data["heating_rate"], + cooling_rate=request_data["cooling_rate"], + n_print=request_data["n_print"], + lammps_resource_dict=lammps_resource_dict, + ) - # Wait briefly for cache check to complete (happens in background thread) - # With wait=False, executorlib checks cache asynchronously - for _ in range(10): # Up to 1 second - if future.done(): - break - time.sleep(0.1) - - # Check if result is already available (from cache or completed) - if future.done() and not future.cancelled(): - try: - result = future.result() - # Serialize using MeltquenchResult to handle ASE Atoms objects - serialized_result = MeltquenchResult(**result).model_dump() - return {"state": "complete", "result": serialized_result} - except Exception as e: - logger.exception("Job failed with exception") - return {"state": "error", "error": str(e)} + # Wait briefly for cache check to complete (happens in background thread) + # With wait=False, executorlib checks cache asynchronously + for _ in range(10): # Up to 1 second + if future.done(): + break + time.sleep(0.1) + + # Check if result is already available (from cache or completed) + if future.done() and not future.cancelled(): + try: + result = future.result() + # Serialize using MeltquenchResult to handle ASE Atoms objects + serialized_result = MeltquenchResult(**result).model_dump() + return {"state": "complete", "result": serialized_result} + except Exception as e: + logger.exception("Job failed with exception") + return {"state": "error", "error": str(e)} # Job is running in background return {"state": "running"} diff --git a/amorphouspy_api/src/amorphouspy_api/database.py b/amorphouspy_api/src/amorphouspy_api/database.py index d5a1df13..fdd1c8af 100644 --- a/amorphouspy_api/src/amorphouspy_api/database.py +++ b/amorphouspy_api/src/amorphouspy_api/database.py @@ -252,6 +252,9 @@ def _task_to_dict(self, task: Task) -> dict[str, Any]: if task.error_message: task_dict["error"] = task.error_message + if task.request_data: + task_dict["request_data"] = task.request_data + return task_dict def _update_task_from_dict(self, task: Task, task_data: dict[str, Any]) -> None: diff --git a/amorphouspy_api/src/amorphouspy_api/jobs.py b/amorphouspy_api/src/amorphouspy_api/jobs.py index 21c5dd76..38ff79ab 100644 --- a/amorphouspy_api/src/amorphouspy_api/jobs.py +++ b/amorphouspy_api/src/amorphouspy_api/jobs.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - from executorlib.executor.single import TestClusterExecutor + from executorlib.api import TestClusterExecutor logger = logging.getLogger(__name__) @@ -40,7 +40,7 @@ def get_executor_class() -> type: else: # Use TestClusterExecutor for local - it supports wait=False # (SingleNodeExecutor does not support wait=False) - from executorlib.executor.single import TestClusterExecutor + from executorlib.api import TestClusterExecutor return TestClusterExecutor @@ -105,9 +105,4 @@ def get_executor(cache_directory: Path) -> "TestClusterExecutor": cache_directory, ) - executor = executor_class(cache_directory=cache_directory, **executor_config) - - # Enter context manager - executor.__enter__() - - return executor + return executor_class(cache_directory=cache_directory, **executor_config) diff --git a/amorphouspy_api/src/tests/test_meltquench.py b/amorphouspy_api/src/tests/test_meltquench.py index 8d767d00..6171747e 100644 --- a/amorphouspy_api/src/tests/test_meltquench.py +++ b/amorphouspy_api/src/tests/test_meltquench.py @@ -21,10 +21,14 @@ class MockFuture: def __init__(self, result: dict[str, Any]) -> None: """Initialize mock future with result.""" self._result = result + self._time = time.time() def done(self) -> bool: """Return True to indicate job is complete.""" - return True + if time.time() - self._time > 5: + return True + else: + return False def cancelled(self) -> bool: """Return False to indicate job was not cancelled.""" @@ -154,7 +158,16 @@ def test_check_running_then_complete() -> None: task_id, { "state": "running", - "request_data": {"components": ["SiO2"], "values": [100.0], "unit": "wt"}, + "request_data": { + "components": ["SiO2"], + "values": [100.0], + "unit": "wt", + "n_atoms": 3, + "potential_type": "test", + "heating_rate": 1e12, + "cooling_rate": 1e12, + "n_print": 100, + }, "request_hash": "test-hash-running", }, )