diff --git a/machine/jobs/async_scheduler.py b/machine/jobs/async_scheduler.py deleted file mode 100644 index e2ca15b3..00000000 --- a/machine/jobs/async_scheduler.py +++ /dev/null @@ -1,26 +0,0 @@ -import asyncio -import concurrent -import concurrent.futures -import threading -from typing import Set - - -class AsyncScheduler: - def __init__(self) -> None: - self._loop = asyncio.new_event_loop() - threading.Thread(target=self._start_background_loop, daemon=True).start() - self._tasks: Set[concurrent.futures.Future] = set() - - def _start_background_loop(self) -> None: - asyncio.set_event_loop(self._loop) - self._loop.run_forever() - - def schedule(self, coro) -> None: - task = asyncio.run_coroutine_threadsafe(coro, self._loop) - self._tasks.add(task) - task.add_done_callback(self._tasks.discard) - - def stop(self) -> None: - concurrent.futures.wait(self._tasks) - self._tasks.clear() - self._loop.stop() diff --git a/machine/jobs/build_clearml_helper.py b/machine/jobs/build_clearml_helper.py index 668a868c..e605e498 100644 --- a/machine/jobs/build_clearml_helper.py +++ b/machine/jobs/build_clearml_helper.py @@ -4,14 +4,12 @@ from datetime import datetime from typing import Callable, Optional, Union, cast -import aiohttp from clearml import Task from dynaconf.base import Settings from ..utils.canceled_error import CanceledError from ..utils.phased_progress_reporter import PhaseProgressStatus from ..utils.progress_status import ProgressStatus -from .async_scheduler import AsyncScheduler class ProgressInfo: @@ -37,7 +35,7 @@ def clearml_check_canceled() -> None: def get_clearml_progress_caller( - progress_info: ProgressInfo, task: Task, scheduler: AsyncScheduler, logger: logging.Logger + progress_info: ProgressInfo, task: Task, logger: logging.Logger ) -> Callable[[ProgressStatus], None]: def clearml_progress(progress_status: ProgressStatus) -> None: percent_completed: Optional[int] = None @@ -51,11 +49,8 @@ def clearml_progress(progress_status: ProgressStatus) -> None: progress_info.last_progress_time is None or (current_time - progress_info.last_progress_time).seconds > 1 ): - scheduler.schedule( - update_runtime_properties( - task, - create_runtime_properties(task, percent_completed, message, progress_status), - ) + report_clearml_progress( + task=task, percent_completed=percent_completed, message=message, progress_status=progress_status ) progress_info.last_progress_time = current_time progress_info.last_percent_completed = percent_completed @@ -64,6 +59,42 @@ def clearml_progress(progress_status: ProgressStatus) -> None: return clearml_progress +def report_clearml_progress( + task: Task, + percent_completed: Optional[int] = None, + message: Optional[str] = None, + progress_status: Optional[ProgressStatus] = None, +) -> None: + if percent_completed is not None: + task.set_progress(percent_completed) + props = [] + if message is not None: + props.append({"type": str, "name": "message", "description": "Build Message", "value": message}) + # Report the step within the phase + if progress_status is not None and isinstance(progress_status, PhaseProgressStatus): + if progress_status.phase_stage is not None: + if progress_status.phase_step is not None: + props.append( + { + "type": int, + "name": f"{progress_status.phase_stage}_step", + "description": "Phase Step", + "value": progress_status.phase_step, + } + ) + if progress_status.step_count is not None: + props.append( + { + "type": int, + "name": f"{progress_status.phase_stage}_step_count", + "description": "Maximum Phase Step", + "value": progress_status.step_count, + } + ) + if len(props) > 0: + task.set_user_properties(*props) + + def get_local_progress_caller(progress_info: ProgressInfo, logger: logging.Logger) -> Callable[[ProgressStatus], None]: def local_progress(progress_status: ProgressStatus) -> None: @@ -91,32 +122,3 @@ def update_settings(settings: Settings, args: dict): raise TypeError(f"Build options could not be parsed: {e}") from e settings.update({settings.model_type: build_options}) settings.data_dir = os.path.expanduser(cast(str, settings.data_dir)) - - -async def update_runtime_properties(task, runtime_props: dict) -> None: - current_runtime_properties = task.data.runtime or {} - current_runtime_properties.update(runtime_props) - async with aiohttp.ClientSession( - base_url=task.session.host, headers={"Authorization": f"Bearer {task.session.token}"} - ) as session: - json = {"task": task.id, "runtime": runtime_props, "force": True} - async with session.post("/tasks.edit", json=json) as response: - response.raise_for_status() - - -def create_runtime_properties( - task, percent_completed: Optional[int], message: Optional[str], status: Optional[ProgressStatus] -) -> dict: - runtime_props = task.data.runtime.copy() or {} - if percent_completed is not None: - runtime_props["progress"] = str(percent_completed) - if message is not None: - runtime_props["message"] = message - # Report the step within the phase - if status is not None and isinstance(status, PhaseProgressStatus): - if status.phase_stage is not None: - if status.phase_step is not None: - runtime_props[f"{status.phase_stage}_step"] = str(status.phase_step) - if status.step_count is not None: - runtime_props[f"{status.phase_stage}_step_count"] = str(status.step_count) - return runtime_props diff --git a/machine/jobs/build_nmt_engine.py b/machine/jobs/build_nmt_engine.py index b7d18fa0..8ba836cc 100644 --- a/machine/jobs/build_nmt_engine.py +++ b/machine/jobs/build_nmt_engine.py @@ -8,8 +8,7 @@ from ..utils.canceled_error import CanceledError from ..utils.progress_status import ProgressStatus -from .async_scheduler import AsyncScheduler -from .build_clearml_helper import create_runtime_properties, update_runtime_properties +from .build_clearml_helper import report_clearml_progress from .config import SETTINGS from .nmt_engine_build_job import NmtEngineBuildJob from .nmt_model_factory import NmtModelFactory @@ -31,7 +30,6 @@ def run(args: dict) -> None: task = None if args["clearml"]: task = Task.init() - scheduler = AsyncScheduler() def clearml_check_canceled() -> None: if task.get_status() == "stopped": @@ -41,11 +39,8 @@ def clearml_check_canceled() -> None: def clearml_progress(status: ProgressStatus) -> None: if status.percent_completed is not None: - scheduler.schedule( - update_runtime_properties( - task, - create_runtime_properties(task, round(status.percent_completed * 100), None, status), - ) + report_clearml_progress( + task=task, percent_completed=round(status.percent_completed * 100), progress_status=status ) progress = clearml_progress diff --git a/machine/jobs/build_smt_engine.py b/machine/jobs/build_smt_engine.py index 12e1b23d..45cdd81f 100644 --- a/machine/jobs/build_smt_engine.py +++ b/machine/jobs/build_smt_engine.py @@ -5,14 +5,11 @@ from clearml import Task from ..utils.progress_status import ProgressStatus -from .async_scheduler import AsyncScheduler from .build_clearml_helper import ( ProgressInfo, - create_runtime_properties, get_clearml_check_canceled, get_clearml_progress_caller, get_local_progress_caller, - update_runtime_properties, update_settings, ) from .config import SETTINGS @@ -34,17 +31,15 @@ def run(args: dict) -> None: progress: Callable[[ProgressStatus], None] check_canceled: Optional[Callable[[], None]] = None task = None - scheduler: Optional[AsyncScheduler] = None progress_info = ProgressInfo() if args["clearml"]: task = Task.init() - scheduler = AsyncScheduler() check_canceled = get_clearml_check_canceled(progress_info, task) task.reload() - progress = get_clearml_progress_caller(progress_info, task, scheduler, logger) + progress = get_clearml_progress_caller(progress_info, task, logger) else: progress = get_local_progress_caller(ProgressInfo(), logger) @@ -66,12 +61,10 @@ def run(args: dict) -> None: smt_engine_build_job = SmtEngineBuildJob(SETTINGS, smt_model_factory, shared_file_service) train_corpus_size, confidence = smt_engine_build_job.run(progress, check_canceled) - if scheduler is not None and task is not None: - scheduler.schedule( - update_runtime_properties( - task, - create_runtime_properties(task, 100, "Completed", None), - ) + if task is not None: + task.set_progress(100) + task.set_user_properties( + {"type": str, "name": "message", "description": "Build Message", "value": "Completed"} ) task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size) task.get_logger().report_single_value(name="confidence", value=round(confidence, 4)) @@ -83,9 +76,6 @@ def run(args: dict) -> None: else: task.mark_failed(status_reason=type(e).__name__, status_message=str(e)) raise e - finally: - if scheduler is not None: - scheduler.stop() def main() -> None: diff --git a/machine/jobs/build_word_alignment_model.py b/machine/jobs/build_word_alignment_model.py index 732b5422..eaeaefaa 100644 --- a/machine/jobs/build_word_alignment_model.py +++ b/machine/jobs/build_word_alignment_model.py @@ -5,14 +5,11 @@ from clearml import Task from ..utils.progress_status import ProgressStatus -from .async_scheduler import AsyncScheduler from .build_clearml_helper import ( ProgressInfo, - create_runtime_properties, get_clearml_check_canceled, get_clearml_progress_caller, get_local_progress_caller, - update_runtime_properties, update_settings, ) from .config import SETTINGS @@ -35,17 +32,15 @@ def run(args: dict): progress: Callable[[ProgressStatus], None] check_canceled: Optional[Callable[[], None]] = None task = None - scheduler: Optional[AsyncScheduler] = None progress_info = ProgressInfo() if args["clearml"]: task = Task.init() - scheduler = AsyncScheduler() check_canceled = get_clearml_check_canceled(progress_info, task) task.reload() - progress = get_clearml_progress_caller(progress_info, task, scheduler, logger) + progress = get_clearml_progress_caller(progress_info, task, logger) else: progress = get_local_progress_caller(ProgressInfo(), logger) @@ -67,12 +62,10 @@ def run(args: dict): SETTINGS, word_alignment_model_factory, word_alignment_file_service ) train_corpus_size = word_alignment_build_job.run(progress, check_canceled) - if scheduler is not None and task is not None: - scheduler.schedule( - update_runtime_properties( - task, - create_runtime_properties(task, 100, "Completed", None), - ) + if task is not None: + task.set_progress(100) + task.set_user_properties( + {"type": str, "name": "message", "description": "Build Message", "value": "Completed"} ) task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size) logger.info("Finished") @@ -83,9 +76,6 @@ def run(args: dict): else: task.mark_failed(status_reason=type(e).__name__, status_message=str(e)) raise e - finally: - if scheduler is not None: - scheduler.stop() return