Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions machine/jobs/build_clearml_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dynaconf.base import Settings

from ..utils.canceled_error import CanceledError
from ..utils.phased_progress_reporter import PhaseProgressStatus
from ..utils.progress_status import ProgressStatus
from .async_scheduler import AsyncScheduler

Expand Down Expand Up @@ -50,15 +51,10 @@ def clearml_progress(progress_status: ProgressStatus) -> None:
progress_info.last_progress_time is None
or (current_time - progress_info.last_progress_time).seconds > 1
):
new_runtime_props = task.data.runtime.copy() or {} # type: ignore
new_runtime_props["progress"] = str(percent_completed)
new_runtime_props["message"] = message
scheduler.schedule(
update_runtime_properties(
task.id, # type: ignore
task.session.host,
task.session.token, # type: ignore
create_runtime_properties(task, percent_completed, message),
task,
create_runtime_properties(task, percent_completed, message, progress_status),
)
)
progress_info.last_progress_time = current_time
Expand Down Expand Up @@ -97,21 +93,30 @@ def update_settings(settings: Settings, args: dict):
settings.data_dir = os.path.expanduser(cast(str, settings.data_dir))


async def update_runtime_properties(task_id: str, base_url: str, token: str, runtime_props: dict) -> None:
async with aiohttp.ClientSession(base_url=base_url, headers={"Authorization": f"Bearer {token}"}) as session:
json = {"task": task_id, "runtime": runtime_props, "force": True}
async def update_runtime_properties(task, runtime_props: dict) -> None:
current_runtime_properties = task.data.runtime or {}
current_runtime_properties.update(runtime_props)
async with aiohttp.ClientSession(
base_url=task.session.host, headers={"Authorization": f"Bearer {task.session.token}"}
) as session:
json = {"task": task.id, "runtime": runtime_props, "force": True}
async with session.post("/tasks.edit", json=json) as response:
response.raise_for_status()


def create_runtime_properties(task, percent_completed: Optional[int], message: Optional[str]) -> dict:
def create_runtime_properties(
task, percent_completed: Optional[int], message: Optional[str], status: Optional[ProgressStatus]
) -> dict:
runtime_props = task.data.runtime.copy() or {}
if percent_completed is not None:
runtime_props["progress"] = str(percent_completed)
else:
del runtime_props["progress"]
if message is not None:
runtime_props["message"] = message
else:
del runtime_props["message"]
# Report the step within the phase
if status is not None and isinstance(status, PhaseProgressStatus):
if status.phase_stage is not None:
if status.phase_step is not None:
runtime_props[f"{status.phase_stage}_step"] = str(status.phase_step)
if status.step_count is not None:
runtime_props[f"{status.phase_stage}_step_count"] = str(status.step_count)
return runtime_props
10 changes: 9 additions & 1 deletion machine/jobs/build_nmt_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from ..utils.canceled_error import CanceledError
from ..utils.progress_status import ProgressStatus
from .async_scheduler import AsyncScheduler
from .build_clearml_helper import create_runtime_properties, update_runtime_properties
from .config import SETTINGS
from .nmt_engine_build_job import NmtEngineBuildJob
from .nmt_model_factory import NmtModelFactory
Expand All @@ -29,6 +31,7 @@ def run(args: dict) -> None:
task = None
if args["clearml"]:
task = Task.init()
scheduler = AsyncScheduler()

def clearml_check_canceled() -> None:
if task.get_status() == "stopped":
Expand All @@ -38,7 +41,12 @@ def clearml_check_canceled() -> None:

def clearml_progress(status: ProgressStatus) -> None:
if status.percent_completed is not None:
task.set_progress(round(status.percent_completed * 100))
scheduler.schedule(
update_runtime_properties(
task,
create_runtime_properties(task, round(status.percent_completed * 100), None, status),
)
)

progress = clearml_progress

Expand Down
3 changes: 2 additions & 1 deletion machine/jobs/build_smt_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def run(args: dict) -> None:
if scheduler is not None and task is not None:
scheduler.schedule(
update_runtime_properties(
task.id, task.session.host, task.session.token, create_runtime_properties(task, 100, "Completed")
task,
create_runtime_properties(task, 100, "Completed", None),
)
)
task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size)
Expand Down
3 changes: 2 additions & 1 deletion machine/jobs/build_word_alignment_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def run(args: dict):
if scheduler is not None and task is not None:
scheduler.schedule(
update_runtime_properties(
task.id, task.session.host, task.session.token, create_runtime_properties(task, 100, "Completed")
task,
create_runtime_properties(task, 100, "Completed", None),
)
)
task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size)
Expand Down
12 changes: 6 additions & 6 deletions machine/jobs/nmt_engine_build_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@ def _get_progress_reporter(
if corpus_size > 0:
if self._config.align_pretranslations:
phases = [
Phase(message="Training NMT model", percentage=0.8),
Phase(message="Pretranslating segments", percentage=0.1),
Phase(message="Training NMT model", percentage=0.8, stage="train"),
Phase(message="Pretranslating segments", percentage=0.1, stage="inference"),
Phase(message="Aligning segments", percentage=0.1, report_steps=False),
]
else:
phases = [
Phase(message="Training NMT model", percentage=0.9),
Phase(message="Pretranslating segments", percentage=0.1),
Phase(message="Training NMT model", percentage=0.9, stage="train"),
Phase(message="Pretranslating segments", percentage=0.1, stage="inference"),
]
else:
if self._config.align_pretranslations:
phases = [
Phase(message="Pretranslating segments", percentage=0.9),
Phase(message="Pretranslating segments", percentage=0.9, stage="inference"),
Phase(message="Aligning segments", percentage=0.1, report_steps=False),
]
else:
phases = [Phase(message="Pretranslating segments", percentage=1.0)]
phases = [Phase(message="Pretranslating segments", percentage=1.0, stage="inference")]
return PhasedProgressReporter(progress, phases)

def _respond_to_no_training_corpus(self) -> Tuple[int, float]:
Expand Down
4 changes: 2 additions & 2 deletions machine/jobs/smt_engine_build_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def _get_progress_reporter(
self, progress: Optional[Callable[[ProgressStatus], None]], corpus_size: int
) -> PhasedProgressReporter:
phases = [
Phase(message="Training SMT model", percentage=0.85),
Phase(message="Training SMT model", percentage=0.85, stage="train"),
Phase(message="Training truecaser", percentage=0.05),
Phase(message="Pretranslating segments", percentage=0.1),
Phase(message="Pretranslating segments", percentage=0.1, stage="inference"),
]
return PhasedProgressReporter(progress, phases)

Expand Down
4 changes: 2 additions & 2 deletions machine/jobs/word_alignment_build_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def run(

def _get_progress_reporter(self, progress: Optional[Callable[[ProgressStatus], None]]) -> PhasedProgressReporter:
phases = [
Phase(message="Training Word Alignment model", percentage=0.9),
Phase(message="Aligning segments", percentage=0.1),
Phase(message="Training Word Alignment model", percentage=0.9, stage="train"),
Phase(message="Aligning segments", percentage=0.1, stage="inference"),
]
return PhasedProgressReporter(progress, phases)

Expand Down
18 changes: 17 additions & 1 deletion machine/utils/phased_progress_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ class Phase:
message: Optional[str] = None
percentage: float = 0
report_steps: bool = True
stage: Optional[str] = None


@dataclass(frozen=True)
class PhaseProgressStatus(ProgressStatus):
phase_stage: Optional[str] = None
phase_step: Optional[int] = None


class PhaseProgress(ContextManager[Callable[[ProgressStatus], None]]):
Expand Down Expand Up @@ -87,7 +94,16 @@ def _report(self, value: ProgressStatus) -> None:

percent_completed = self._percent_completed + (self._current_phase_percentage * (value.percent_completed or 0))
message = self._phases[self._current_phase_index].message if value.message is None else value.message
self._progress(ProgressStatus(self._step, percent_completed, message))
self._progress(
PhaseProgressStatus(
self._step,
percent_completed,
message,
value.step_count,
self.current_phase.stage if self.current_phase is not None else None,
value.step,
)
)

@property
def _current_phase_percentage(self) -> float:
Expand Down
3 changes: 2 additions & 1 deletion machine/utils/progress_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
class ProgressStatus:
@classmethod
def from_step(cls, step: int, step_count: int, message: Optional[str] = None) -> ProgressStatus:
return ProgressStatus(step, 1.0 if step_count == 0 else (step / step_count), message)
return ProgressStatus(step, 1.0 if step_count == 0 else (step / step_count), message, step_count)

step: int
percent_completed: Optional[float] = None
message: Optional[str] = None
step_count: Optional[int] = None