diff --git a/docs/docs/reference/profiles.yml.md b/docs/docs/reference/profiles.yml.md index 5ee73e9dee..a6c7a4990b 100644 --- a/docs/docs/reference/profiles.yml.md +++ b/docs/docs/reference/profiles.yml.md @@ -46,8 +46,8 @@ The profile configuration supports many properties. See below. max_price: type: 'Optional[float]' -### `retry_policy` +### `retry` -#SCHEMA# dstack._internal.core.models.profiles.ProfileRetryPolicy +#SCHEMA# dstack._internal.core.models.profiles.ProfileRetry overrides: show_root_heading: false diff --git a/src/dstack/_internal/cli/utils/run.py b/src/dstack/_internal/cli/utils/run.py index ae1f321e88..6fa8e04da2 100644 --- a/src/dstack/_internal/cli/utils/run.py +++ b/src/dstack/_internal/cli/utils/run.py @@ -1,12 +1,13 @@ from typing import List +from rich.markup import escape from rich.table import Table from dstack._internal.cli.utils.common import add_row_from_dict, console from dstack._internal.core.models.instances import InstanceAvailability from dstack._internal.core.models.profiles import TerminationPolicy from dstack._internal.core.models.runs import RunPlan -from dstack._internal.utils.common import pretty_date +from dstack._internal.utils.common import format_pretty_duration, pretty_date from dstack.api import Run @@ -23,18 +24,18 @@ def print_run_plan(run_plan: RunPlan, offers_limit: int = 3): max_duration = ( f"{job_plan.job_spec.max_duration / 3600:g}h" if job_plan.job_spec.max_duration else "-" ) - retry_policy = job_plan.job_spec.retry_policy - retry_policy = ( - (f"{retry_policy.duration / 3600:g}h" if retry_policy.duration else "yes") - if retry_policy.retry - else "no" - ) + if job_plan.job_spec.retry is None: + retry = "no" + else: + retry = escape(job_plan.job_spec.retry.pretty_format()) + profile = run_plan.run_spec.merged_profile creation_policy = profile.creation_policy termination_policy = profile.termination_policy - termination_idle_time = f"{profile.termination_idle_time}s" if termination_policy == TerminationPolicy.DONT_DESTROY: termination_idle_time = "-" + else: + termination_idle_time = format_pretty_duration(profile.termination_idle_time) if req.spot is None: spot_policy = "auto" @@ -54,7 +55,7 @@ def th(s: str) -> str: props.add_row(th("Max price"), max_price) props.add_row(th("Max duration"), max_duration) props.add_row(th("Spot policy"), spot_policy) - props.add_row(th("Retry policy"), retry_policy) + props.add_row(th("Retry policy"), retry) props.add_row(th("Creation policy"), creation_policy) props.add_row(th("Termination policy"), termination_policy) props.add_row(th("Termination idle time"), termination_idle_time) diff --git a/src/dstack/_internal/core/backends/gcp/compute.py b/src/dstack/_internal/core/backends/gcp/compute.py index f7b73d213b..b1324b0792 100644 --- a/src/dstack/_internal/core/backends/gcp/compute.py +++ b/src/dstack/_internal/core/backends/gcp/compute.py @@ -15,7 +15,10 @@ ) from dstack._internal.core.backends.base.offers import get_catalog_offers from dstack._internal.core.backends.gcp.config import GCPConfig -from dstack._internal.core.errors import ComputeResourceNotFoundError, NoCapacityError +from dstack._internal.core.errors import ( + ComputeResourceNotFoundError, + NoCapacityError, +) from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.gateways import GatewayComputeConfiguration from dstack._internal.core.models.instances import ( @@ -96,7 +99,6 @@ def create_instance( instance_config: InstanceConfiguration, ) -> JobProvisioningData: instance_name = instance_config.instance_name - if not gcp_resources.is_valid_resource_name(instance_name): # In a rare case the instance name is invalid in GCP, # we better use a random instance name than fail provisioning. diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index a7a990379e..d9eca8fa96 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -53,14 +53,43 @@ class ProfileRetryPolicy(CoreModel): _validate_duration = validator("duration", pre=True, allow_reuse=True)(parse_duration) - @root_validator() - @classmethod - def _validate_fields(cls, field_values): - if field_values["retry"] and "duration" not in field_values: - field_values["duration"] = DEFAULT_RETRY_DURATION - if field_values.get("duration") is not None: - field_values["retry"] = True - return field_values + @root_validator + def _validate_fields(cls, values): + if values["retry"] and "duration" not in values: + values["duration"] = DEFAULT_RETRY_DURATION + if values.get("duration") is not None: + values["retry"] = True + return values + + +class RetryEvent(str, Enum): + NO_CAPACITY = "no-capacity" + INTERRUPTION = "interruption" + ERROR = "error" + + +class ProfileRetry(CoreModel): + on_events: Annotated[ + List[RetryEvent], + Field( + description=( + "The list of events that should be handled with retry." + " Supported events are `no-capacity`, `interruption`, and `error`" + ) + ), + ] + duration: Annotated[ + Optional[Union[int, str]], + Field(description="The maximum period of retrying the run, e.g., `4h` or `1d`"), + ] = None + + _validate_duration = validator("duration", pre=True, allow_reuse=True)(parse_duration) + + @root_validator + def _validate_fields(cls, values): + if len(values["on_events"]) == 0: + raise ValueError("`on_events` cannot be empty") + return values class ProfileParams(CoreModel): @@ -86,8 +115,13 @@ class ProfileParams(CoreModel): description="The policy for provisioning spot or on-demand instances: `spot`, `on-demand`, or `auto`" ), ] + retry: Annotated[ + Optional[Union[ProfileRetry, bool]], + Field(description="The policy for resubmitting the run. Defaults to `false`"), + ] retry_policy: Annotated[ - Optional[ProfileRetryPolicy], Field(description="The policy for re-submitting the run") + Optional[ProfileRetryPolicy], + Field(description="The policy for resubmitting the run. Deprecated in favor of `retry`"), ] max_duration: Annotated[ Optional[Union[Literal["off"], str, int]], diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 9b18fa29f3..9e40610e9e 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -22,13 +22,14 @@ CreationPolicy, Profile, ProfileParams, + RetryEvent, SpotPolicy, TerminationPolicy, ) from dstack._internal.core.models.repos import AnyRunRepoData from dstack._internal.core.models.resources import ResourcesSpec from dstack._internal.utils import common as common_utils -from dstack._internal.utils.common import pretty_resources +from dstack._internal.utils.common import format_pretty_duration, pretty_resources class AppSpec(CoreModel): @@ -58,9 +59,14 @@ def is_finished(self): return self in self.finished_statuses() -class RetryPolicy(CoreModel): - retry: bool - duration: Optional[int] +class Retry(CoreModel): + on_events: List[RetryEvent] + duration: int + + def pretty_format(self) -> str: + pretty_duration = format_pretty_duration(self.duration) + events = ", ".join(event.value for event in self.on_events) + return f"{pretty_duration}[{events}]" class RunTerminationReason(str, Enum): @@ -187,7 +193,7 @@ class JobSpec(CoreModel): max_duration: Optional[int] registry_auth: Optional[RegistryAuth] requirements: Requirements - retry_policy: RetryPolicy + retry: Optional[Retry] working_dir: Optional[str] @@ -225,6 +231,7 @@ class JobSubmission(CoreModel): id: UUID4 submission_num: int submitted_at: datetime + last_processed_at: datetime finished_at: Optional[datetime] status: JobStatus termination_reason: Optional[JobTerminationReason] @@ -323,6 +330,7 @@ class Run(CoreModel): project_name: str user: str submitted_at: datetime + last_processed_at: datetime status: RunStatus termination_reason: Optional[RunTerminationReason] run_spec: RunSpec diff --git a/src/dstack/_internal/core/services/profiles.py b/src/dstack/_internal/core/services/profiles.py new file mode 100644 index 0000000000..904ae7d1c0 --- /dev/null +++ b/src/dstack/_internal/core/services/profiles.py @@ -0,0 +1,32 @@ +from typing import Optional + +from dstack._internal.core.models.profiles import DEFAULT_RETRY_DURATION, Profile, RetryEvent +from dstack._internal.core.models.runs import Retry + + +def get_retry(profile: Profile) -> Optional[Retry]: + profile_retry = profile.retry + if profile_retry is None: + # Handle retry_policy before retry was introduced + # TODO: Remove once retry_policy no longer supported + profile_retry_policy = profile.retry_policy + if profile_retry_policy is None: + return None + if not profile_retry_policy.retry: + return None + duration = profile_retry_policy.duration or DEFAULT_RETRY_DURATION + return Retry( + on_events=[RetryEvent.NO_CAPACITY, RetryEvent.INTERRUPTION, RetryEvent.ERROR], + duration=duration, + ) + if isinstance(profile_retry, bool): + if profile_retry: + return Retry( + on_events=[RetryEvent.NO_CAPACITY, RetryEvent.INTERRUPTION, RetryEvent.ERROR], + duration=DEFAULT_RETRY_DURATION, + ) + return None + profile_retry = profile_retry.copy() + if profile_retry.duration is None: + profile_retry.duration = DEFAULT_RETRY_DURATION + return Retry.parse_obj(profile_retry) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index d8974cc09b..90756ba10c 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -36,8 +36,18 @@ InstanceRuntime, RemoteConnectionInfo, ) -from dstack._internal.core.models.profiles import Profile, TerminationPolicy -from dstack._internal.core.models.runs import InstanceStatus, JobProvisioningData, Requirements +from dstack._internal.core.models.profiles import ( + Profile, + RetryEvent, + TerminationPolicy, +) +from dstack._internal.core.models.runs import ( + InstanceStatus, + JobProvisioningData, + Requirements, + Retry, +) +from dstack._internal.core.services.profiles import get_retry from dstack._internal.server.db import get_session_ctx from dstack._internal.server.models import InstanceModel, ProjectModel from dstack._internal.server.schemas.runner import HealthcheckResponse @@ -341,24 +351,6 @@ async def create_instance(instance_id: UUID) -> None: ) ).one() - if instance.retry_policy and instance.retry_policy_duration is not None: - retry_duration_deadline = _get_retry_duration_deadline(instance) - if get_current_datetime() > retry_duration_deadline: - instance.status = InstanceStatus.TERMINATED - instance.deleted = True - instance.deleted_at = get_current_datetime() - instance.termination_reason = "Retry duration expired" - await session.commit() - logger.warning( - "Retry duration expired. Terminate instance %s", - instance.name, - extra={ - "instance_name": instance.name, - "instance_status": InstanceStatus.TERMINATED.value, - }, - ) - return - if instance.last_retry_at is not None: last_retry = instance.last_retry_at.replace(tzinfo=datetime.timezone.utc) if get_current_datetime() < last_retry + timedelta(minutes=1): @@ -386,10 +378,10 @@ async def create_instance(instance_id: UUID) -> None: return try: - profile = Profile.__response__.parse_raw(instance.profile) - requirements = Requirements.__response__.parse_raw(instance.requirements) - instance_configuration = InstanceConfiguration.__response__.parse_raw( - instance.instance_configuration + profile: Profile = Profile.__response__.parse_raw(instance.profile) + requirements: Requirements = Requirements.__response__.parse_raw(instance.requirements) + instance_configuration: InstanceConfiguration = ( + InstanceConfiguration.__response__.parse_raw(instance.instance_configuration) ) except ValidationError as e: instance.status = InstanceStatus.TERMINATED @@ -410,6 +402,27 @@ async def create_instance(instance_id: UUID) -> None: await session.commit() return + retry = get_retry(profile) + should_retry = retry is not None and RetryEvent.NO_CAPACITY in retry.on_events + + if retry is not None: + retry_duration_deadline = _get_retry_duration_deadline(instance, retry) + if get_current_datetime() > retry_duration_deadline: + instance.status = InstanceStatus.TERMINATED + instance.deleted = True + instance.deleted_at = get_current_datetime() + instance.termination_reason = "Retry duration expired" + await session.commit() + logger.warning( + "Retry duration expired. Terminate instance %s", + instance.name, + extra={ + "instance_name": instance.name, + "instance_status": InstanceStatus.TERMINATED.value, + }, + ) + return + offers = await get_create_instance_offers( project=instance.project, profile=profile, @@ -417,7 +430,7 @@ async def create_instance(instance_id: UUID) -> None: exclude_not_available=True, ) - if not offers and instance.retry_policy: + if not offers and should_retry: instance.last_retry_at = get_current_datetime() await session.commit() logger.debug( @@ -479,7 +492,7 @@ async def create_instance(instance_id: UUID) -> None: instance.last_retry_at = get_current_datetime() - if not instance.retry_policy: + if not should_retry: instance.status = InstanceStatus.TERMINATED instance.deleted = True instance.deleted_at = get_current_datetime() @@ -749,9 +762,9 @@ def _get_instance_idle_duration(instance: InstanceModel) -> datetime.timedelta: return get_current_datetime() - last_time -def _get_retry_duration_deadline(instance: InstanceModel) -> datetime.datetime: +def _get_retry_duration_deadline(instance: InstanceModel, retry: Retry) -> datetime.datetime: return instance.created_at.replace(tzinfo=datetime.timezone.utc) + timedelta( - seconds=instance.retry_policy_duration + seconds=retry.duration ) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 697f04e5d7..027f9427e0 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -11,17 +11,18 @@ import dstack._internal.server.services.gateways as gateways import dstack._internal.server.services.gateways.autoscalers as autoscalers from dstack._internal.core.errors import ServerError -from dstack._internal.core.models.instances import InstanceOffer -from dstack._internal.core.models.profiles import ProfileRetryPolicy +from dstack._internal.core.models.profiles import RetryEvent from dstack._internal.core.models.runs import ( + Job, JobStatus, JobTerminationReason, + Run, RunSpec, RunStatus, RunTerminationReason, ) from dstack._internal.server.db import get_session_ctx -from dstack._internal.server.models import InstanceModel, JobModel, RunModel +from dstack._internal.server.models import JobModel, RunModel from dstack._internal.server.services.jobs import ( RUNNING_PROCESSING_JOBS_IDS, RUNNING_PROCESSING_JOBS_LOCK, @@ -29,11 +30,11 @@ SUBMITTED_PROCESSING_JOBS_LOCK, TERMINATING_PROCESSING_JOBS_IDS, TERMINATING_PROCESSING_JOBS_LOCK, + find_job, get_jobs_from_run_spec, group_jobs_by_replica_latest, ) from dstack._internal.server.services.runs import ( - JOB_TERMINATION_REASONS_TO_RETRY, PROCESSING_RUNS_IDS, PROCESSING_RUNS_LOCK, create_job_model_for_new_submission, @@ -44,11 +45,12 @@ scale_run_replicas, ) from dstack._internal.server.utils.common import wait_unlock -from dstack._internal.utils.common import get_current_datetime +from dstack._internal.utils import common from dstack._internal.utils.logging import get_logger logger = get_logger(__name__) PROCESSING_INTERVAL = datetime.timedelta(seconds=2) +RETRY_DELAY = datetime.timedelta(seconds=15) async def process_runs(): @@ -57,7 +59,8 @@ async def process_runs(): res = await session.execute( sa.select(RunModel).where( RunModel.status.not_in(RunStatus.finished_statuses()), - RunModel.last_processed_at < get_current_datetime() - PROCESSING_INTERVAL, + RunModel.last_processed_at + < common.get_current_datetime() - PROCESSING_INTERVAL, RunModel.id.not_in(PROCESSING_RUNS_IDS), ) ) @@ -87,7 +90,15 @@ async def process_single_run(run_id: uuid.UUID, job_ids: List[uuid.UUID]) -> uui ) async with get_session_ctx() as session: - run = await session.get(RunModel, run_id) + res = await session.execute( + sa.select(RunModel) + .where(RunModel.id == run_id) + .execution_options(populate_existing=True) + .options(joinedload(RunModel.project)) + .options(joinedload(RunModel.user)) + .options(joinedload(RunModel.repo)) + ) + run = res.scalar() if run is None: logger.error(f"Run {run_id} not found") return run_id @@ -108,7 +119,7 @@ async def process_single_run(run_id: uuid.UUID, job_ids: List[uuid.UUID]) -> uui run.status = RunStatus.TERMINATING run.termination_reason = RunTerminationReason.SERVER_ERROR - run.last_processed_at = get_current_datetime() + run.last_processed_at = common.get_current_datetime() await session.commit() return run_id @@ -116,21 +127,18 @@ async def process_single_run(run_id: uuid.UUID, job_ids: List[uuid.UUID]) -> uui async def process_pending_run(session: AsyncSession, run_model: RunModel): """Jobs are not created yet""" - - # TODO(egor-s): consider retry delay - - await session.execute( - sa.select(RunModel) - .where(RunModel.id == run_model.id) - .execution_options(populate_existing=True) - .options(joinedload(RunModel.project)) - .options(joinedload(RunModel.user)) - .options(joinedload(RunModel.repo)) - ) run = run_model_to_run(run_model) + if run.latest_job_submission is None: + logger.error("%s: failed to retry: pending run has no job submissions.") + run_model.status = RunStatus.FAILED + run_model.termination_reason = RunTerminationReason.SERVER_ERROR + return - # TODO(egor-s) consolidate with `scale_run_replicas` if possible + if common.get_current_datetime() - run.latest_job_submission.last_processed_at < RETRY_DELAY: + logger.debug("%s: pending run is not yet ready for resubmission", fmt(run_model)) + return + # TODO(egor-s) consolidate with `scale_run_replicas` if possible replicas = 1 if run.run_spec.configuration.type == "service": replicas = run.run_spec.configuration.replicas.min or 0 # new default @@ -180,8 +188,8 @@ async def process_active_run(session: AsyncSession, run_model: RunModel): Run is submitted, provisioning, or running. We handle fails, scaling, and status changes. """ + run = run_model_to_run(run_model) run_spec = RunSpec.__response__.parse_raw(run_model.run_spec) - retry_policy = run_spec.merged_profile.retry_policy or ProfileRetryPolicy() retry_single_job = can_retry_single_job(run_spec) run_statuses: Set[RunStatus] = set() @@ -189,58 +197,62 @@ async def process_active_run(session: AsyncSession, run_model: RunModel): replicas_to_retry: List[Tuple[int, List[JobModel]]] = [] replicas_info: List[autoscalers.ReplicaInfo] = [] - for replica_num, jobs in group_jobs_by_replica_latest(run_model.jobs): + for replica_num, job_models in group_jobs_by_replica_latest(run_model.jobs): replica_statuses: Set[RunStatus] = set() replica_needs_retry = False replica_active = True - for job in jobs: - if job.status == JobStatus.DONE or ( - job.status == JobStatus.TERMINATING - and job.termination_reason == JobTerminationReason.DONE_BY_RUNNER + for job_model in job_models: + job = find_job(run.jobs, job_model.replica_num, job_model.job_num) + if job_model.status == JobStatus.DONE or ( + job_model.status == JobStatus.TERMINATING + and job_model.termination_reason == JobTerminationReason.DONE_BY_RUNNER ): # the job is done or going to be done replica_statuses.add(RunStatus.DONE) # for some reason the replica is done, it's not active replica_active = False - elif job.termination_reason == JobTerminationReason.SCALED_DOWN: + elif job_model.termination_reason == JobTerminationReason.SCALED_DOWN: # the job was scaled down replica_active = False - elif job.status == JobStatus.RUNNING: + elif job_model.status == JobStatus.RUNNING: # the job is running replica_statuses.add(RunStatus.RUNNING) - elif job.status in {JobStatus.PROVISIONING, JobStatus.PULLING}: + elif job_model.status in {JobStatus.PROVISIONING, JobStatus.PULLING}: # the job is provisioning replica_statuses.add(RunStatus.PROVISIONING) - elif job.status == JobStatus.SUBMITTED: + elif job_model.status == JobStatus.SUBMITTED: # the job is submitted replica_statuses.add(RunStatus.SUBMITTED) - elif job.status == JobStatus.FAILED or ( - job.status == JobStatus.TERMINATING - and job.termination_reason + elif job_model.status == JobStatus.FAILED or ( + job_model.status == JobStatus.TERMINATING + and job_model.termination_reason not in {JobTerminationReason.DONE_BY_RUNNER, JobTerminationReason.SCALED_DOWN} ): - if await is_retry_enabled(session, job, retry_policy): - if await is_retry_duration_exceeded(session, job, retry_policy): + current_duration = should_retry_job(run, job, job_model) + if current_duration is None: + replica_statuses.add(RunStatus.FAILED) + run_termination_reasons.add(RunTerminationReason.JOB_FAILED) + else: + if is_retry_duration_exceeded(job, current_duration): replica_statuses.add(RunStatus.FAILED) run_termination_reasons.add(RunTerminationReason.RETRY_LIMIT_EXCEEDED) else: - # do a retry replica_needs_retry = True - else: - # just failed - replica_statuses.add(RunStatus.FAILED) - run_termination_reasons.add(RunTerminationReason.JOB_FAILED) - elif job.status in {JobStatus.TERMINATING, JobStatus.TERMINATED, JobStatus.ABORTED}: + elif job_model.status in { + JobStatus.TERMINATING, + JobStatus.TERMINATED, + JobStatus.ABORTED, + }: pass # unexpected, but let's ignore it else: - raise ValueError(f"Unexpected job status {job.status}") + raise ValueError(f"Unexpected job status {job_model.status}") if RunStatus.FAILED in replica_statuses: run_statuses.add(RunStatus.FAILED) else: if replica_needs_retry: - replicas_to_retry.append((replica_num, jobs)) + replicas_to_retry.append((replica_num, job_models)) if not replica_needs_retry or retry_single_job: run_statuses.update(replica_statuses) @@ -249,7 +261,7 @@ async def process_active_run(session: AsyncSession, run_model: RunModel): replicas_info.append( autoscalers.ReplicaInfo( active=True, - timestamp=min(job.submitted_at for job in jobs).replace( + timestamp=min(job.submitted_at for job in job_models).replace( tzinfo=datetime.timezone.utc ), ) @@ -259,7 +271,7 @@ async def process_active_run(session: AsyncSession, run_model: RunModel): replicas_info.append( autoscalers.ReplicaInfo( active=False, - timestamp=max(job.last_processed_at for job in jobs).replace( + timestamp=max(job.last_processed_at for job in job_models).replace( tzinfo=datetime.timezone.utc ), ) @@ -286,6 +298,16 @@ async def process_active_run(session: AsyncSession, run_model: RunModel): else: new_status = RunStatus.PENDING + # Terminate active jobs if the run is to be resubmitted + if new_status == RunStatus.PENDING and not retry_single_job: + for _, replica_jobs in replicas_to_retry: + for job_model in replica_jobs: + if not ( + job_model.status.is_finished() or job_model.status == JobStatus.TERMINATING + ): + job_model.status = JobStatus.TERMINATING + job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER + if new_status not in {RunStatus.TERMINATING, RunStatus.PENDING}: # No need to retry if the run is terminating, # pending run will retry replicas in `process_pending_run` @@ -318,32 +340,63 @@ async def process_active_run(session: AsyncSession, run_model: RunModel): run_model.termination_reason = termination_reason -async def is_retry_enabled( - session: AsyncSession, job: JobModel, retry_policy: ProfileRetryPolicy -) -> bool: - # retry for spot instances only - if retry_policy.retry and job.termination_reason in JOB_TERMINATION_REASONS_TO_RETRY: - if job.used_instance_id is None: - return False - instance = await session.get(InstanceModel, job.used_instance_id) - if instance is None or instance.offer is None: - return False - instance_offer = InstanceOffer.__response__.parse_raw(instance.offer) - if instance_offer.instance.resources.spot: - return True - return False +def should_retry_job(run: Run, job: Job, job_model: JobModel) -> Optional[datetime.timedelta]: + """ + Checks if the job should be retried. + Returns the current duration of retrying if retry is enabled. + """ + if job.job_spec.retry is None: + return None + + last_provisioned_submission = None + for job_submission in reversed(job.job_submissions): + if job_submission.job_provisioning_data is not None: + last_provisioned_submission = job_submission + break + if ( + job_model.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + and last_provisioned_submission is None + and RetryEvent.NO_CAPACITY in job.job_spec.retry.on_events + ): + return common.get_current_datetime() - run.submitted_at -async def is_retry_duration_exceeded( - session: AsyncSession, job: JobModel, retry_policy: ProfileRetryPolicy -) -> bool: - if retry_policy.duration is not None and get_current_datetime() - job.submitted_at.replace( - tzinfo=datetime.timezone.utc - ) > datetime.timedelta(seconds=retry_policy.duration): + if last_provisioned_submission is None: + return None + + if ( + last_provisioned_submission.termination_reason + == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + and RetryEvent.INTERRUPTION in job.job_spec.retry.on_events + ): + return common.get_current_datetime() - last_provisioned_submission.last_processed_at + + if ( + last_provisioned_submission.termination_reason + in [ + JobTerminationReason.CONTAINER_EXITED_WITH_ERROR, + JobTerminationReason.CREATING_CONTAINER_ERROR, + JobTerminationReason.EXECUTOR_ERROR, + JobTerminationReason.GATEWAY_ERROR, + JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED, + JobTerminationReason.WAITING_RUNNER_LIMIT_EXCEEDED, + JobTerminationReason.PORTS_BINDING_FAILED, + ] + and RetryEvent.ERROR in job.job_spec.retry.on_events + ): + return common.get_current_datetime() - last_provisioned_submission.last_processed_at + + return None + + +def is_retry_duration_exceeded(job: Job, current_duration: datetime.timedelta) -> bool: + if job.job_spec.retry is None: return True - return False + return current_duration > datetime.timedelta(seconds=job.job_spec.retry.duration) def can_retry_single_job(run_spec: RunSpec) -> bool: - # TODO(egor-s): handle independent and interconnected clusters + # TODO: Currently, we terminate and retry the entire replica if one of the job fails. + # We could make partial retry in some multi-node cases. + # E.g. restarting a worker node, independent jobs. return False diff --git a/src/dstack/_internal/server/migrations/versions/29826f417010_remove_instancemodel_retry_policy.py b/src/dstack/_internal/server/migrations/versions/29826f417010_remove_instancemodel_retry_policy.py new file mode 100644 index 0000000000..c5aef03b1f --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/29826f417010_remove_instancemodel_retry_policy.py @@ -0,0 +1,34 @@ +"""Remove InstanceModel.retry_policy + +Revision ID: 29826f417010 +Revises: dfffd6a1165c +Create Date: 2024-05-29 10:40:37.904752 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "29826f417010" +down_revision = "dfffd6a1165c" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.drop_column("retry_policy") + batch_op.drop_column("retry_policy_duration") + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.add_column(sa.Column("retry_policy_duration", sa.INTEGER(), nullable=True)) + batch_op.add_column(sa.Column("retry_policy", sa.BOOLEAN(), nullable=False)) + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 0b475dd0f2..fcbc9c42e5 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -323,6 +323,8 @@ class InstanceModel(BaseModel): finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime) # create instance + # TODO: Introduce a field that would store all resolved instance profile parameters, etc, (similar to job_spec). + # Currently, profile parameters are parsed every time they are accessed (e.g. see profile.retry). profile: Mapped[Optional[str]] = mapped_column(Text) requirements: Mapped[Optional[str]] = mapped_column(String(10_000)) instance_configuration: Mapped[Optional[str]] = mapped_column(Text) @@ -334,8 +336,6 @@ class InstanceModel(BaseModel): ) # retry policy - retry_policy: Mapped[bool] = mapped_column(Boolean, default=False) - retry_policy_duration: Mapped[Optional[int]] = mapped_column(Integer) last_retry_at: Mapped[Optional[datetime]] = mapped_column(DateTime) # instance termination handling diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 6a1bcb6e05..4a5521d944 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -82,13 +82,15 @@ def job_model_to_job_submission(job_model: JobModel) -> JobSubmission: ): backend_data = json.loads(job_provisioning_data.backend_data) job_provisioning_data.backend = backend_data["base_backend"] + last_processed_at = job_model.last_processed_at.replace(tzinfo=timezone.utc) finished_at = None if job_model.status.is_finished(): - finished_at = job_model.last_processed_at.replace(tzinfo=timezone.utc) + finished_at = last_processed_at return JobSubmission( id=job_model.id, submission_num=job_model.submission_num, submitted_at=job_model.submitted_at.replace(tzinfo=timezone.utc), + last_processed_at=last_processed_at, finished_at=finished_at, status=job_model.status, termination_reason=job_model.termination_reason, diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index ddfdb30118..c7b0716f2a 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -18,9 +18,10 @@ AppSpec, JobSpec, Requirements, - RetryPolicy, + Retry, RunSpec, ) +from dstack._internal.core.services.profiles import get_retry from dstack._internal.core.services.ssh.ports import filter_reserved_ports from dstack._internal.server.services.docker import ImageConfig, get_image_config from dstack._internal.server.utils.common import run_async @@ -61,10 +62,6 @@ def _shell_commands(self) -> List[str]: def _default_max_duration(self) -> Optional[int]: pass - @abstractmethod - def _retry_policy(self) -> RetryPolicy: - pass - @abstractmethod def _spot_policy(self) -> SpotPolicy: pass @@ -92,7 +89,7 @@ async def _get_job_spec( max_duration=self._max_duration(), registry_auth=self._registry_auth(), requirements=self._requirements(), - retry_policy=self._retry_policy(), + retry=self._retry(), working_dir=self._working_dir(), ) return job_spec @@ -166,6 +163,9 @@ def _requirements(self) -> Requirements: spot=None if spot_policy == SpotPolicy.AUTO else (spot_policy == SpotPolicy.SPOT), ) + def _retry(self) -> Optional[Retry]: + return get_retry(self.run_spec.merged_profile) + def _working_dir(self) -> Optional[str]: """ None means default working directory diff --git a/src/dstack/_internal/server/services/jobs/configurators/dev.py b/src/dstack/_internal/server/services/jobs/configurators/dev.py index 2dbb4e21ba..13d75a0f60 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/dev.py +++ b/src/dstack/_internal/server/services/jobs/configurators/dev.py @@ -1,8 +1,8 @@ from typing import List, Optional from dstack._internal.core.models.configurations import PortMapping, RunConfigurationType -from dstack._internal.core.models.profiles import ProfileRetryPolicy, SpotPolicy -from dstack._internal.core.models.runs import RetryPolicy, RunSpec +from dstack._internal.core.models.profiles import SpotPolicy +from dstack._internal.core.models.runs import RunSpec from dstack._internal.server.services.jobs.configurators.base import JobConfigurator from dstack._internal.server.services.jobs.configurators.extensions.vscode import VSCodeDesktop @@ -45,11 +45,6 @@ def _shell_commands(self) -> List[str]: def _default_max_duration(self) -> Optional[int]: return DEFAULT_MAX_DURATION_SECONDS - def _retry_policy(self) -> RetryPolicy: - return RetryPolicy.parse_obj( - self.run_spec.merged_profile.retry_policy or ProfileRetryPolicy() - ) - def _spot_policy(self) -> SpotPolicy: return self.run_spec.merged_profile.spot_policy or SpotPolicy.ONDEMAND diff --git a/src/dstack/_internal/server/services/jobs/configurators/service.py b/src/dstack/_internal/server/services/jobs/configurators/service.py index 718caabe0a..19e77f89b4 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/service.py +++ b/src/dstack/_internal/server/services/jobs/configurators/service.py @@ -1,8 +1,7 @@ from typing import List, Optional from dstack._internal.core.models.configurations import PortMapping, RunConfigurationType -from dstack._internal.core.models.profiles import ProfileRetryPolicy, SpotPolicy -from dstack._internal.core.models.runs import RetryPolicy +from dstack._internal.core.models.profiles import SpotPolicy from dstack._internal.server.services.jobs.configurators.base import JobConfigurator @@ -15,12 +14,6 @@ def _shell_commands(self) -> List[str]: def _default_max_duration(self) -> Optional[int]: return None - def _retry_policy(self) -> RetryPolicy: - # convert ProfileRetryPolicy to RetryPolicy - return RetryPolicy.parse_obj( - self.run_spec.merged_profile.retry_policy or ProfileRetryPolicy() - ) - def _spot_policy(self) -> SpotPolicy: return self.run_spec.merged_profile.spot_policy or SpotPolicy.AUTO diff --git a/src/dstack/_internal/server/services/jobs/configurators/task.py b/src/dstack/_internal/server/services/jobs/configurators/task.py index de8fad9be3..3908b654f5 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/task.py +++ b/src/dstack/_internal/server/services/jobs/configurators/task.py @@ -1,8 +1,8 @@ from typing import List, Optional from dstack._internal.core.models.configurations import PortMapping, RunConfigurationType -from dstack._internal.core.models.profiles import ProfileRetryPolicy, SpotPolicy -from dstack._internal.core.models.runs import JobSpec, RetryPolicy +from dstack._internal.core.models.profiles import SpotPolicy +from dstack._internal.core.models.runs import JobSpec from dstack._internal.server.services.jobs.configurators.base import JobConfigurator DEFAULT_MAX_DURATION_SECONDS = 72 * 3600 @@ -28,12 +28,6 @@ def _shell_commands(self) -> List[str]: def _default_max_duration(self) -> Optional[int]: return DEFAULT_MAX_DURATION_SECONDS - def _retry_policy(self) -> RetryPolicy: - # convert ProfileRetryPolicy to RetryPolicy - return RetryPolicy.parse_obj( - self.run_spec.merged_profile.retry_policy or ProfileRetryPolicy() - ) - def _spot_policy(self) -> SpotPolicy: return self.run_spec.merged_profile.spot_policy or SpotPolicy.AUTO diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index f3ca97f61b..6cf3d8281a 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -38,7 +38,6 @@ Profile, SpotPolicy, TerminationPolicy, - parse_duration, ) from dstack._internal.core.models.runs import ( InstanceStatus, @@ -50,7 +49,6 @@ JobSubmission, JobTerminationReason, Requirements, - RetryPolicy, Run, RunPlan, RunSpec, @@ -589,11 +587,6 @@ async def create_instance( if termination_idle_time is None: termination_idle_time = DEFAULT_POOL_TERMINATION_IDLE_TIME - retry_policy = RetryPolicy(retry=False, duration=None) - if profile.retry_policy is not None: - retry_policy.retry = profile.retry_policy.retry - retry_policy.duration = parse_duration(profile.retry_policy.duration) - instance = InstanceModel( name=instance_name, project=project, @@ -605,8 +598,6 @@ async def create_instance( instance_configuration=None, termination_policy=termination_policy, termination_idle_time=termination_idle_time, - retry_policy=retry_policy.retry, - retry_policy_duration=retry_policy.duration, ) logger.info( "Added a new instance %s", @@ -688,6 +679,7 @@ def run_model_to_run( project_name=run_model.project.name, user=run_model.user.name, submitted_at=run_model.submitted_at.replace(tzinfo=timezone.utc), + last_processed_at=run_model.last_processed_at.replace(tzinfo=timezone.utc), status=run_model.status, termination_reason=run_model.termination_reason, run_spec=run_spec, @@ -907,14 +899,13 @@ async def retry_run_replica_jobs( session: AsyncSession, run_model: RunModel, latest_jobs: List[JobModel], *, only_failed: bool ): for job_model in latest_jobs: - if job_model.termination_reason not in JOB_TERMINATION_REASONS_TO_RETRY: + if not (job_model.status.is_finished() or job_model.status == JobStatus.TERMINATING): if only_failed: # No need to resubmit, skip continue - if not (job_model.status.is_finished() or job_model.status == JobStatus.TERMINATING): - # The job is not finished, but we have to retry all jobs. Terminate it - job_model.status = JobStatus.TERMINATING - job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER + # The job is not finished, but we have to retry all jobs. Terminate it + job_model.status = JobStatus.TERMINATING + job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER new_job_model = create_job_model_for_new_submission( run_model=run_model, diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index 5addb63a12..30dbb757de 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -253,6 +253,7 @@ def get_job_provisioning_data() -> JobProvisioningData: ), instance_id="instance_id", hostname="127.0.0.4", + internal_ip="127.0.0.4", region="us-east-1", price=10.5, username="ubuntu", @@ -413,10 +414,6 @@ async def create_instance( profile=profile.json(), requirements=requirements.json(), instance_configuration=instance_configuration.json(), - retry_policy=profile.retry_policy.retry if profile.retry_policy is not None else False, - retry_policy_duration=profile.retry_policy.duration - if profile.retry_policy is not None - else 123, ) session.add(im) await session.commit() diff --git a/src/dstack/_internal/utils/common.py b/src/dstack/_internal/utils/common.py index a3e724b92f..f127dbf40c 100644 --- a/src/dstack/_internal/utils/common.py +++ b/src/dstack/_internal/utils/common.py @@ -138,6 +138,22 @@ def parse_pretty_duration(duration: str) -> int: return amount * multiplier +def format_pretty_duration(seconds: int) -> str: + if seconds < 0: + raise ValueError("Seconds cannot be negative") + units = [ + ("w", 7 * 24 * 3600), + ("d", 24 * 3600), + ("h", 3600), + ("m", 60), + ("s", 1), + ] + for unit, multiplier in units: + if seconds % multiplier == 0: + return f"{seconds // multiplier}{unit}" + return f"{seconds}s" # Fallback to seconds if no larger unit fits perfectly + + def sizeof_fmt(num, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index e3b1f9824e..64b64e7016 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -11,12 +11,11 @@ InstanceType, Resources, ) -from dstack._internal.core.models.profiles import Profile, TerminationPolicy +from dstack._internal.core.models.profiles import Profile, ProfileRetryPolicy, TerminationPolicy from dstack._internal.core.models.runs import ( InstanceStatus, JobProvisioningData, JobStatus, - RetryPolicy, ) from dstack._internal.server.background.tasks.process_instances import ( HealthStatus, @@ -320,7 +319,9 @@ async def test_create_instance(self, test_db, session: AsyncSession): async def test_expire_retry_duration(self, test_db, session: AsyncSession): project = await create_project(session=session) pool = await create_pool(session, project) - profile = Profile(name="test_profile", retry_policy=RetryPolicy(retry=True, duration=123)) + profile = Profile( + name="test_profile", retry_policy=ProfileRetryPolicy(retry=True, duration=123) + ) instance = await create_instance( session, project, pool, profile=profile, status=InstanceStatus.TERMINATING ) @@ -333,7 +334,9 @@ async def test_expire_retry_duration(self, test_db, session: AsyncSession): async def test_retry_delay(self, test_db, session: AsyncSession): project = await create_project(session=session) pool = await create_pool(session, project) - profile = Profile(name="test_profile", retry_policy=RetryPolicy(retry=True, duration=123)) + profile = Profile( + name="test_profile", retry_policy=ProfileRetryPolicy(retry=True, duration=123) + ) instance = await create_instance( session, project, diff --git a/src/tests/_internal/server/background/tasks/test_process_runs.py b/src/tests/_internal/server/background/tasks/test_process_runs.py index c564479502..eb48b9c4f1 100644 --- a/src/tests/_internal/server/background/tasks/test_process_runs.py +++ b/src/tests/_internal/server/background/tasks/test_process_runs.py @@ -8,7 +8,7 @@ import dstack._internal.server.background.tasks.process_runs as process_runs from dstack._internal.core.models.configurations import ServiceConfiguration -from dstack._internal.core.models.profiles import Profile, ProfileRetryPolicy +from dstack._internal.core.models.profiles import Profile from dstack._internal.core.models.resources import Range from dstack._internal.core.models.runs import ( JobStatus, @@ -45,7 +45,7 @@ async def make_run( run_name = "test-run" profile = Profile( name="test-profile", - retry_policy=ProfileRetryPolicy(retry=True), + retry=True, ) run_spec = get_run_spec( repo_id=repo.name, @@ -136,10 +136,12 @@ async def test_retry_running_to_pending(self, test_db, session: AsyncSession): session=session, run=run, status=JobStatus.FAILED, + submitted_at=run.submitted_at, + last_processed_at=run.submitted_at, termination_reason=JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, instance=instance, + job_provisioning_data=get_job_provisioning_data(), ) - with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: datetime_mock.return_value = run.submitted_at + datetime.timedelta(minutes=3) await process_runs.process_single_run(run.id, []) @@ -210,22 +212,27 @@ async def test_all_no_capacity_to_pending(self, test_db, session: AsyncSession): run=run, status=JobStatus.TERMINATING, termination_reason=JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, + submitted_at=run.submitted_at, + last_processed_at=run.submitted_at, replica_num=0, instance=await create_instance( session, project=run.project, pool=run.project.default_pool, spot=True ), + job_provisioning_data=get_job_provisioning_data(), ) await create_job( session=session, run=run, status=JobStatus.TERMINATING, termination_reason=JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, + submitted_at=run.submitted_at, + last_processed_at=run.submitted_at, replica_num=1, instance=await create_instance( session, project=run.project, pool=run.project.default_pool, spot=True ), + job_provisioning_data=get_job_provisioning_data(), ) - with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: datetime_mock.return_value = run.submitted_at + datetime.timedelta(minutes=3) await process_runs.process_single_run(run.id, []) @@ -240,14 +247,26 @@ async def test_some_no_capacity_keep_running(self, test_db, session: AsyncSessio run=run, status=JobStatus.TERMINATING, termination_reason=JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, + submitted_at=run.submitted_at, + last_processed_at=run.last_processed_at, replica_num=0, instance=await create_instance( session, project=run.project, pool=run.project.default_pool, spot=True ), + job_provisioning_data=get_job_provisioning_data(), ) - await create_job(session=session, run=run, status=JobStatus.RUNNING, replica_num=1) - - await process_runs.process_single_run(run.id, []) + await create_job( + session=session, + run=run, + status=JobStatus.RUNNING, + submitted_at=run.submitted_at, + last_processed_at=run.last_processed_at, + replica_num=1, + job_provisioning_data=get_job_provisioning_data(), + ) + with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: + datetime_mock.return_value = run.submitted_at + datetime.timedelta(minutes=3) + await process_runs.process_single_run(run.id, []) await session.refresh(run) assert run.status == RunStatus.RUNNING assert len(run.jobs) == 3 diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index cf8fdf8d41..8804608f1d 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -88,7 +88,8 @@ def get_dev_env_run_plan_dict( "max_duration": "off", "max_price": None, "pool_name": DEFAULT_POOL_NAME, - "retry_policy": {"duration": None, "retry": False}, + "retry": None, + "retry_policy": None, "spot_policy": "spot", "termination_idle_time": 300, "termination_policy": None, @@ -105,7 +106,8 @@ def get_dev_env_run_plan_dict( "max_price": None, "name": "string", "pool_name": DEFAULT_POOL_NAME, - "retry_policy": {"duration": None, "retry": False}, + "retry": None, + "retry_policy": None, "spot_policy": "spot", "termination_idle_time": 300, "termination_policy": None, @@ -159,7 +161,7 @@ def get_dev_env_run_plan_dict( "max_price": None, "spot": True, }, - "retry_policy": {"duration": None, "retry": False}, + "retry": None, "working_dir": ".", }, "offers": [json.loads(o.json()) for o in offers], @@ -178,6 +180,7 @@ def get_dev_env_run_dict( run_name: str = "run_name", repo_id: str = "test_repo", submitted_at: str = "2023-01-02T03:04:00+00:00", + last_processed_at: str = "2023-01-02T03:04:00+00:00", finished_at: str = "2023-01-02T03:04:00+00:00", ) -> Dict: return { @@ -185,6 +188,7 @@ def get_dev_env_run_dict( "project_name": project_name, "user": username, "submitted_at": submitted_at, + "last_processed_at": last_processed_at, "status": "submitted", "run_spec": { "configuration": { @@ -215,7 +219,8 @@ def get_dev_env_run_dict( "max_duration": "off", "max_price": None, "pool_name": DEFAULT_POOL_NAME, - "retry_policy": {"duration": None, "retry": False}, + "retry": None, + "retry_policy": None, "spot_policy": "spot", "termination_idle_time": 300, "termination_policy": None, @@ -232,7 +237,8 @@ def get_dev_env_run_dict( "max_price": None, "name": "string", "pool_name": DEFAULT_POOL_NAME, - "retry_policy": {"duration": None, "retry": False}, + "retry": None, + "retry_policy": None, "spot_policy": "spot", "termination_idle_time": 300, "termination_policy": None, @@ -286,7 +292,7 @@ def get_dev_env_run_dict( "max_price": None, "spot": True, }, - "retry_policy": {"duration": None, "retry": False}, + "retry": None, "working_dir": ".", }, "job_submissions": [ @@ -294,6 +300,7 @@ def get_dev_env_run_dict( "id": job_id, "submission_num": 0, "submitted_at": submitted_at, + "last_processed_at": last_processed_at, "finished_at": finished_at, "status": "submitted", "termination_reason": None, @@ -307,6 +314,7 @@ def get_dev_env_run_dict( "id": job_id, "submission_num": 0, "submitted_at": submitted_at, + "last_processed_at": last_processed_at, "finished_at": finished_at, "status": "submitted", "termination_reason": None, @@ -373,6 +381,7 @@ async def test_lists_runs(self, test_db, session: AsyncSession): "project_name": project.name, "user": user.name, "submitted_at": run1_submitted_at.isoformat(), + "last_processed_at": run1_submitted_at.isoformat(), "status": "submitted", "run_spec": run1_spec.dict(), "jobs": [ @@ -382,7 +391,8 @@ async def test_lists_runs(self, test_db, session: AsyncSession): { "id": str(job.id), "submission_num": 0, - "submitted_at": "2023-01-02T03:04:00+00:00", + "submitted_at": run1_submitted_at.isoformat(), + "last_processed_at": run1_submitted_at.isoformat(), "finished_at": None, "status": "submitted", "termination_reason": None, @@ -395,7 +405,8 @@ async def test_lists_runs(self, test_db, session: AsyncSession): "latest_job_submission": { "id": str(job.id), "submission_num": 0, - "submitted_at": "2023-01-02T03:04:00+00:00", + "submitted_at": run1_submitted_at.isoformat(), + "last_processed_at": run1_submitted_at.isoformat(), "finished_at": None, "status": "submitted", "termination_reason_message": None, @@ -411,6 +422,7 @@ async def test_lists_runs(self, test_db, session: AsyncSession): "project_name": project.name, "user": user.name, "submitted_at": run2_submitted_at.isoformat(), + "last_processed_at": run2_submitted_at.isoformat(), "status": "submitted", "run_spec": run2_spec.dict(), "jobs": [], @@ -555,6 +567,7 @@ async def test_submits_run(self, test_db, session: AsyncSession): run_id = UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e") submitted_at = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc) submitted_at_formatted = "2023-01-02T03:04:00+00:00" + last_processed_at_formatted = submitted_at_formatted repo = await create_repo(session=session, project_id=project.id) run_dict = get_dev_env_run_dict( run_id=str(run_id), @@ -562,6 +575,7 @@ async def test_submits_run(self, test_db, session: AsyncSession): project_name=project.name, username=user.name, submitted_at=submitted_at_formatted, + last_processed_at=last_processed_at_formatted, finished_at=None, run_name="test-run", repo_id=repo.name,