diff --git a/machine/jobs/build_clearml_helper.py b/machine/jobs/build_clearml_helper.py index c7373023..668a868c 100644 --- a/machine/jobs/build_clearml_helper.py +++ b/machine/jobs/build_clearml_helper.py @@ -9,6 +9,7 @@ from dynaconf.base import Settings from ..utils.canceled_error import CanceledError +from ..utils.phased_progress_reporter import PhaseProgressStatus from ..utils.progress_status import ProgressStatus from .async_scheduler import AsyncScheduler @@ -50,15 +51,10 @@ def clearml_progress(progress_status: ProgressStatus) -> None: progress_info.last_progress_time is None or (current_time - progress_info.last_progress_time).seconds > 1 ): - new_runtime_props = task.data.runtime.copy() or {} # type: ignore - new_runtime_props["progress"] = str(percent_completed) - new_runtime_props["message"] = message scheduler.schedule( update_runtime_properties( - task.id, # type: ignore - task.session.host, - task.session.token, # type: ignore - create_runtime_properties(task, percent_completed, message), + task, + create_runtime_properties(task, percent_completed, message, progress_status), ) ) progress_info.last_progress_time = current_time @@ -97,21 +93,30 @@ def update_settings(settings: Settings, args: dict): settings.data_dir = os.path.expanduser(cast(str, settings.data_dir)) -async def update_runtime_properties(task_id: str, base_url: str, token: str, runtime_props: dict) -> None: - async with aiohttp.ClientSession(base_url=base_url, headers={"Authorization": f"Bearer {token}"}) as session: - json = {"task": task_id, "runtime": runtime_props, "force": True} +async def update_runtime_properties(task, runtime_props: dict) -> None: + current_runtime_properties = task.data.runtime or {} + current_runtime_properties.update(runtime_props) + async with aiohttp.ClientSession( + base_url=task.session.host, headers={"Authorization": f"Bearer {task.session.token}"} + ) as session: + json = {"task": task.id, "runtime": runtime_props, "force": True} async with session.post("/tasks.edit", json=json) as response: response.raise_for_status() -def create_runtime_properties(task, percent_completed: Optional[int], message: Optional[str]) -> dict: +def create_runtime_properties( + task, percent_completed: Optional[int], message: Optional[str], status: Optional[ProgressStatus] +) -> dict: runtime_props = task.data.runtime.copy() or {} if percent_completed is not None: runtime_props["progress"] = str(percent_completed) - else: - del runtime_props["progress"] if message is not None: runtime_props["message"] = message - else: - del runtime_props["message"] + # Report the step within the phase + if status is not None and isinstance(status, PhaseProgressStatus): + if status.phase_stage is not None: + if status.phase_step is not None: + runtime_props[f"{status.phase_stage}_step"] = str(status.phase_step) + if status.step_count is not None: + runtime_props[f"{status.phase_stage}_step_count"] = str(status.step_count) return runtime_props diff --git a/machine/jobs/build_nmt_engine.py b/machine/jobs/build_nmt_engine.py index 93841d9b..b7d18fa0 100644 --- a/machine/jobs/build_nmt_engine.py +++ b/machine/jobs/build_nmt_engine.py @@ -8,6 +8,8 @@ from ..utils.canceled_error import CanceledError from ..utils.progress_status import ProgressStatus +from .async_scheduler import AsyncScheduler +from .build_clearml_helper import create_runtime_properties, update_runtime_properties from .config import SETTINGS from .nmt_engine_build_job import NmtEngineBuildJob from .nmt_model_factory import NmtModelFactory @@ -29,6 +31,7 @@ def run(args: dict) -> None: task = None if args["clearml"]: task = Task.init() + scheduler = AsyncScheduler() def clearml_check_canceled() -> None: if task.get_status() == "stopped": @@ -38,7 +41,12 @@ def clearml_check_canceled() -> None: def clearml_progress(status: ProgressStatus) -> None: if status.percent_completed is not None: - task.set_progress(round(status.percent_completed * 100)) + scheduler.schedule( + update_runtime_properties( + task, + create_runtime_properties(task, round(status.percent_completed * 100), None, status), + ) + ) progress = clearml_progress diff --git a/machine/jobs/build_smt_engine.py b/machine/jobs/build_smt_engine.py index 56a4caa0..12e1b23d 100644 --- a/machine/jobs/build_smt_engine.py +++ b/machine/jobs/build_smt_engine.py @@ -69,7 +69,8 @@ def run(args: dict) -> None: if scheduler is not None and task is not None: scheduler.schedule( update_runtime_properties( - task.id, task.session.host, task.session.token, create_runtime_properties(task, 100, "Completed") + task, + create_runtime_properties(task, 100, "Completed", None), ) ) task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size) diff --git a/machine/jobs/build_word_alignment_model.py b/machine/jobs/build_word_alignment_model.py index d2c9b3a9..732b5422 100644 --- a/machine/jobs/build_word_alignment_model.py +++ b/machine/jobs/build_word_alignment_model.py @@ -70,7 +70,8 @@ def run(args: dict): if scheduler is not None and task is not None: scheduler.schedule( update_runtime_properties( - task.id, task.session.host, task.session.token, create_runtime_properties(task, 100, "Completed") + task, + create_runtime_properties(task, 100, "Completed", None), ) ) task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size) diff --git a/machine/jobs/nmt_engine_build_job.py b/machine/jobs/nmt_engine_build_job.py index b8b74594..21d94cae 100644 --- a/machine/jobs/nmt_engine_build_job.py +++ b/machine/jobs/nmt_engine_build_job.py @@ -31,23 +31,23 @@ def _get_progress_reporter( if corpus_size > 0: if self._config.align_pretranslations: phases = [ - Phase(message="Training NMT model", percentage=0.8), - Phase(message="Pretranslating segments", percentage=0.1), + Phase(message="Training NMT model", percentage=0.8, stage="train"), + Phase(message="Pretranslating segments", percentage=0.1, stage="inference"), Phase(message="Aligning segments", percentage=0.1, report_steps=False), ] else: phases = [ - Phase(message="Training NMT model", percentage=0.9), - Phase(message="Pretranslating segments", percentage=0.1), + Phase(message="Training NMT model", percentage=0.9, stage="train"), + Phase(message="Pretranslating segments", percentage=0.1, stage="inference"), ] else: if self._config.align_pretranslations: phases = [ - Phase(message="Pretranslating segments", percentage=0.9), + Phase(message="Pretranslating segments", percentage=0.9, stage="inference"), Phase(message="Aligning segments", percentage=0.1, report_steps=False), ] else: - phases = [Phase(message="Pretranslating segments", percentage=1.0)] + phases = [Phase(message="Pretranslating segments", percentage=1.0, stage="inference")] return PhasedProgressReporter(progress, phases) def _respond_to_no_training_corpus(self) -> Tuple[int, float]: diff --git a/machine/jobs/smt_engine_build_job.py b/machine/jobs/smt_engine_build_job.py index 452810f8..286e5425 100644 --- a/machine/jobs/smt_engine_build_job.py +++ b/machine/jobs/smt_engine_build_job.py @@ -31,9 +31,9 @@ def _get_progress_reporter( self, progress: Optional[Callable[[ProgressStatus], None]], corpus_size: int ) -> PhasedProgressReporter: phases = [ - Phase(message="Training SMT model", percentage=0.85), + Phase(message="Training SMT model", percentage=0.85, stage="train"), Phase(message="Training truecaser", percentage=0.05), - Phase(message="Pretranslating segments", percentage=0.1), + Phase(message="Pretranslating segments", percentage=0.1, stage="inference"), ] return PhasedProgressReporter(progress, phases) diff --git a/machine/jobs/word_alignment_build_job.py b/machine/jobs/word_alignment_build_job.py index 811350cd..0f74ab40 100644 --- a/machine/jobs/word_alignment_build_job.py +++ b/machine/jobs/word_alignment_build_job.py @@ -59,8 +59,8 @@ def run( def _get_progress_reporter(self, progress: Optional[Callable[[ProgressStatus], None]]) -> PhasedProgressReporter: phases = [ - Phase(message="Training Word Alignment model", percentage=0.9), - Phase(message="Aligning segments", percentage=0.1), + Phase(message="Training Word Alignment model", percentage=0.9, stage="train"), + Phase(message="Aligning segments", percentage=0.1, stage="inference"), ] return PhasedProgressReporter(progress, phases) diff --git a/machine/utils/phased_progress_reporter.py b/machine/utils/phased_progress_reporter.py index 9be8fa4d..a20553e0 100644 --- a/machine/utils/phased_progress_reporter.py +++ b/machine/utils/phased_progress_reporter.py @@ -12,6 +12,13 @@ class Phase: message: Optional[str] = None percentage: float = 0 report_steps: bool = True + stage: Optional[str] = None + + +@dataclass(frozen=True) +class PhaseProgressStatus(ProgressStatus): + phase_stage: Optional[str] = None + phase_step: Optional[int] = None class PhaseProgress(ContextManager[Callable[[ProgressStatus], None]]): @@ -87,7 +94,16 @@ def _report(self, value: ProgressStatus) -> None: percent_completed = self._percent_completed + (self._current_phase_percentage * (value.percent_completed or 0)) message = self._phases[self._current_phase_index].message if value.message is None else value.message - self._progress(ProgressStatus(self._step, percent_completed, message)) + self._progress( + PhaseProgressStatus( + self._step, + percent_completed, + message, + value.step_count, + self.current_phase.stage if self.current_phase is not None else None, + value.step, + ) + ) @property def _current_phase_percentage(self) -> float: diff --git a/machine/utils/progress_status.py b/machine/utils/progress_status.py index aba1ae1f..e7285306 100644 --- a/machine/utils/progress_status.py +++ b/machine/utils/progress_status.py @@ -8,8 +8,9 @@ class ProgressStatus: @classmethod def from_step(cls, step: int, step_count: int, message: Optional[str] = None) -> ProgressStatus: - return ProgressStatus(step, 1.0 if step_count == 0 else (step / step_count), message) + return ProgressStatus(step, 1.0 if step_count == 0 else (step / step_count), message, step_count) step: int percent_completed: Optional[float] = None message: Optional[str] = None + step_count: Optional[int] = None