From 70f037e8b0edfdcd4c5e5d15063fef6392f775c4 Mon Sep 17 00:00:00 2001 From: Max Schettler Date: Fri, 30 Jan 2026 12:06:41 +0100 Subject: [PATCH 1/2] Remove unused definition --- src/api/organization/project/branch/__init__.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/api/organization/project/branch/__init__.py b/src/api/organization/project/branch/__init__.py index 2c5b24f8e..ac0eeda33 100644 --- a/src/api/organization/project/branch/__init__.py +++ b/src/api/organization/project/branch/__init__.py @@ -68,7 +68,6 @@ BranchResizeStatusEntry, BranchServiceStatus, BranchSourceDeploymentParameters, - BranchStatus, BranchStatusPublic, BranchUpdate, CapaResizeKey, @@ -296,14 +295,6 @@ def _normalize_resize_statuses(branch: Branch) -> dict[str, BranchResizeStatusEn return normalized -_DEFAULT_SERVICE_STATUS = BranchStatus( - database=BranchServiceStatus.UNKNOWN, - storage=BranchServiceStatus.UNKNOWN, - meta=BranchServiceStatus.UNKNOWN, - rest=BranchServiceStatus.UNKNOWN, -) - - _PVC_TIMEOUT_SECONDS = float(600) _PVC_CLONE_TIMEOUT_SECONDS = float(10) _PVC_POLL_INTERVAL_SECONDS = float(2) From 6b722151a141142f6863bdcf12e12842288ddcda Mon Sep 17 00:00:00 2001 From: Max Schettler Date: Fri, 30 Jan 2026 14:12:56 +0100 Subject: [PATCH 2/2] Extract branch status management to dedicated module --- src/api/_util/resourcelimit.py | 4 +- src/api/backupmonitor.py | 2 +- .../organization/project/branch/__init__.py | 232 +++--------------- src/api/organization/project/branch/status.py | 229 +++++++++++++++++ src/api/resources.py | 4 +- src/check_branch_status.py | 35 --- src/deployment/health.py | 63 ----- src/deployment/monitors/resize/__init__.py | 16 -- 8 files changed, 262 insertions(+), 323 deletions(-) create mode 100644 src/api/organization/project/branch/status.py delete mode 100644 src/check_branch_status.py delete mode 100644 src/deployment/health.py diff --git a/src/api/_util/resourcelimit.py b/src/api/_util/resourcelimit.py index 12306eb6a..00fc304ab 100644 --- a/src/api/_util/resourcelimit.py +++ b/src/api/_util/resourcelimit.py @@ -548,11 +548,11 @@ async def _collect_branch_statuses( if not branch_ids: return {} - from ..organization.project import branch as branch_module + from ..organization.project.branch.status import refresh_branch_status statuses: dict[Identifier, BranchServiceStatus] = {} for branch_id in branch_ids: - statuses[branch_id] = await branch_module.refresh_branch_status(branch_id) + statuses[branch_id] = await refresh_branch_status(branch_id) return statuses diff --git a/src/api/backupmonitor.py b/src/api/backupmonitor.py index 8cebef383..720cc50c9 100644 --- a/src/api/backupmonitor.py +++ b/src/api/backupmonitor.py @@ -26,7 +26,7 @@ create_branch_snapshot, delete_branch_snapshot, ) -from .organization.project.branch import refresh_branch_status +from .organization.project.branch.status import refresh_branch_status from .settings import get_settings # --------------------------- diff --git a/src/api/organization/project/branch/__init__.py b/src/api/organization/project/branch/__init__.py index ac0eeda33..52c395794 100644 --- a/src/api/organization/project/branch/__init__.py +++ b/src/api/organization/project/branch/__init__.py @@ -3,7 +3,7 @@ import logging import secrets from collections.abc import Sequence -from datetime import UTC, datetime, timedelta +from datetime import UTC, datetime from typing import Annotated, Any, Literal, TypedDict, cast from urllib.parse import urlsplit, urlunsplit @@ -14,7 +14,6 @@ from fastapi.security import HTTPAuthorizationCredentials from keycloak.exceptions import KeycloakError from kubernetes_asyncio.client.exceptions import ApiException -from pydantic import ValidationError from sqlalchemy.exc import IntegrityError from sqlmodel import select @@ -38,10 +37,6 @@ update_branch_volume_iops, ) from .....deployment._util import deployment_namespace -from .....deployment.health import ( - collect_branch_service_health, - derive_branch_status_from_services, -) from .....deployment.kubernetes._util import core_v1_client from .....deployment.kubernetes.neonvm import PowerState as NeonVMPowerState from .....deployment.kubernetes.neonvm import set_virtualmachine_power_state @@ -64,8 +59,6 @@ BranchPgbouncerConfigUpdate, BranchPublic, BranchResizeService, - BranchResizeStatus, - BranchResizeStatusEntry, BranchServiceStatus, BranchSourceDeploymentParameters, BranchStatusPublic, @@ -98,56 +91,29 @@ from ....keycloak import realm_admin from ....settings import get_settings as get_api_settings from .auth import api as auth_api +from .status import ( + collect_branch_service_health, + normalize_resize_statuses, + parse_branch_status, + persist_branch_status, +) api = APIRouter(tags=["branch"]) logger = logging.getLogger(__name__) -_TRANSITIONAL_BRANCH_STATUSES: set[BranchServiceStatus] = { - BranchServiceStatus.CREATING, - BranchServiceStatus.STARTING, - BranchServiceStatus.STOPPING, - BranchServiceStatus.RESTARTING, - BranchServiceStatus.PAUSING, - BranchServiceStatus.RESUMING, - BranchServiceStatus.UPDATING, - BranchServiceStatus.DELETING, - BranchServiceStatus.RESIZING, -} -_PROTECTED_BRANCH_STATUSES: set[BranchServiceStatus] = {BranchServiceStatus.PAUSED} -_ACTIVE_RESIZE_STATUSES: set[BranchResizeStatus] = { - "PENDING", - "RESIZING", - "FILESYSTEM_RESIZE_PENDING", -} -_CREATING_STATUS_ERROR_GRACE_PERIOD = timedelta(minutes=5) -_STARTING_STATUS_ERROR_GRACE_PERIOD = timedelta(minutes=5) +_PVC_TIMEOUT_SECONDS = float(600) +_PVC_CLONE_TIMEOUT_SECONDS = float(10) +_PVC_POLL_INTERVAL_SECONDS = float(2) +_VOLUME_SNAPSHOT_CLASS = "simplyblock-csi-snapshotclass" -def _parse_branch_status(value: BranchServiceStatus | str | None) -> BranchServiceStatus: - if isinstance(value, BranchServiceStatus): - return value - if value: - # Normalize to the canonical representation expected by the enum ("STARTING", "STOPPED", etc.). - normalized_value = str(value).upper() - member = BranchServiceStatus._value2member_map_.get(normalized_value) - if member is not None: - return cast("BranchServiceStatus", member) - logger.warning("Encountered unknown branch status %s; defaulting to UNKNOWN", value) - return BranchServiceStatus.UNKNOWN - - -async def _persist_branch_status(branch_id: Identifier, status: BranchServiceStatus) -> None: - async with AsyncSessionLocal() as session: - branch = await session.get(Branch, branch_id) - if branch is None: - logger.warning("Branch %s missing while updating status to %s", branch_id, status) - return - if _parse_branch_status(branch.status) == status: - return - branch.set_status(status) - await session.commit() +_PGBOUNCER_ADMIN_USER = "pgbouncer_admin" +_PGBOUNCER_ADMIN_DATABASE = "pgbouncer" +_PGBOUNCER_SERVICE_PORT = 6432 +_PGBOUNCER_CONFIG_TEMPLATE_ERROR = "PgBouncer configuration template missing required entries." +_PGBOUNCER_CONFIG_UPDATE_ERROR = "Failed to update PgBouncer configuration." async def _cleanup_failed_branch_deployment(branch_id: Identifier) -> None: @@ -168,146 +134,6 @@ async def _cleanup_failed_branch_deployment(branch_id: Identifier) -> None: logger.exception("Failed to initialize cleanup session for branch %s", branch_id) -def _should_update_branch_status( - current: BranchServiceStatus, - derived: BranchServiceStatus, - resize_status: BranchResizeStatus, -) -> bool: - if current == derived: - return False - if current == BranchServiceStatus.RESIZING and resize_status in _ACTIVE_RESIZE_STATUSES: - # Keep the explicit RESIZING while a resize is still in progress - # unless we detect a hard failure. - return derived == BranchServiceStatus.ERROR - if current == BranchServiceStatus.STARTING and derived == BranchServiceStatus.STOPPED: - logger.debug("Ignoring STARTING -> STOPPED transition detected by branch status monitor") - return False - if current in _PROTECTED_BRANCH_STATUSES and derived not in { - BranchServiceStatus.ACTIVE_HEALTHY, - BranchServiceStatus.ERROR, - }: - return False - if ( - derived == BranchServiceStatus.STOPPED - and current in _TRANSITIONAL_BRANCH_STATUSES - and current != BranchServiceStatus.STOPPING - ): - return False - if derived in { - BranchServiceStatus.ACTIVE_HEALTHY, - BranchServiceStatus.ACTIVE_UNHEALTHY, - BranchServiceStatus.STOPPED, - BranchServiceStatus.ERROR, - }: - return True - if derived == BranchServiceStatus.UNKNOWN: - return current not in _TRANSITIONAL_BRANCH_STATUSES and current not in _PROTECTED_BRANCH_STATUSES - return True - - -def _adjust_derived_status_for_stuck_creation( - branch: Branch, current: BranchServiceStatus, derived: BranchServiceStatus -) -> BranchServiceStatus: - if derived != BranchServiceStatus.STOPPED: - return derived - - status_timestamp = branch.status_updated_at or branch.created_datetime - elapsed = datetime.now(UTC) - status_timestamp - - if current == BranchServiceStatus.CREATING and elapsed >= _CREATING_STATUS_ERROR_GRACE_PERIOD: - logger.warning( - "Branch %s still CREATING after %s with STOPPED services; marking ERROR", - branch.id, - elapsed, - ) - return BranchServiceStatus.ERROR - - if current == BranchServiceStatus.STARTING and elapsed >= _STARTING_STATUS_ERROR_GRACE_PERIOD: - logger.warning( - "Branch %s still STARTING after %s with STOPPED services; marking ERROR", - branch.id, - elapsed, - ) - return BranchServiceStatus.ERROR - - return derived - - -async def refresh_branch_status(branch_id: Identifier) -> BranchServiceStatus: - """ - Probe branch services, derive an overall lifecycle state, and persist it when appropriate. - """ - async with AsyncSessionLocal() as session: - branch = await session.get(Branch, branch_id) - if branch is None: - logger.warning("Branch %s not found while refreshing status", branch_id) - return BranchServiceStatus.UNKNOWN - - current_status = _parse_branch_status(branch.status) - try: - namespace, _ = get_autoscaler_vm_identity(branch.id) - service_status = await collect_branch_service_health(branch_id) - derived_status = derive_branch_status_from_services( - service_status, - storage_enabled=branch.enable_file_storage, - ) - except Exception: - logger.exception("Failed to refresh service status for branch %s", branch.id) - derived_status = BranchServiceStatus.UNKNOWN - - derived_status = _adjust_derived_status_for_stuck_creation( - branch, - current_status, - derived_status, - ) - - if _should_update_branch_status( - current_status, - derived_status, - resize_status=branch.resize_status, - ): - branch.set_status(derived_status) - await session.commit() - return derived_status - - await session.rollback() - return current_status - - -def _normalize_resize_statuses(branch: Branch) -> dict[str, BranchResizeStatusEntry]: - statuses = branch.resize_statuses or {} - if not statuses: - return {} - - normalized: dict[str, BranchResizeStatusEntry] = {} - for service, entry in statuses.items(): - if isinstance(entry, BranchResizeStatusEntry): - normalized[service] = entry - continue - try: - normalized[service] = BranchResizeStatusEntry.model_validate(entry) - except ValidationError: - logger.warning( - "Skipping invalid resize status entry for branch %s service %s", - branch.id, - service, - ) - return normalized - - -_PVC_TIMEOUT_SECONDS = float(600) -_PVC_CLONE_TIMEOUT_SECONDS = float(10) -_PVC_POLL_INTERVAL_SECONDS = float(2) -_VOLUME_SNAPSHOT_CLASS = "simplyblock-csi-snapshotclass" - - -_PGBOUNCER_ADMIN_USER = "pgbouncer_admin" -_PGBOUNCER_ADMIN_DATABASE = "pgbouncer" -_PGBOUNCER_SERVICE_PORT = 6432 -_PGBOUNCER_CONFIG_TEMPLATE_ERROR = "PgBouncer configuration template missing required entries." -_PGBOUNCER_CONFIG_UPDATE_ERROR = "Failed to update PgBouncer configuration." - - def generate_pgbouncer_password(length: int = 32) -> str: if length <= 0: raise ValueError("PgBouncer password length must be positive.") @@ -752,7 +578,7 @@ async def _deploy_branch_environment_task( pgbouncer_config: PgbouncerConfigSnapshot, pitr_enabled: bool, ) -> None: - await _persist_branch_status(branch_id, BranchServiceStatus.CREATING) + await persist_branch_status(branch_id, BranchServiceStatus.CREATING) try: await deploy_branch_environment( organization_id=organization_id, @@ -767,7 +593,7 @@ async def _deploy_branch_environment_task( pitr_enabled=pitr_enabled, ) except VelaError: - await _persist_branch_status(branch_id, BranchServiceStatus.ERROR) + await persist_branch_status(branch_id, BranchServiceStatus.ERROR) await _cleanup_failed_branch_deployment(branch_id) logging.exception( "Branch deployment failed for project_id=%s branch_id=%s branch_slug=%s", @@ -776,7 +602,7 @@ async def _deploy_branch_environment_task( branch_slug, ) return - await _persist_branch_status(branch_id, BranchServiceStatus.STARTING) + await persist_branch_status(branch_id, BranchServiceStatus.STARTING) async def _clone_branch_environment_task( @@ -794,7 +620,7 @@ async def _clone_branch_environment_task( pgbouncer_config: PgbouncerConfigSnapshot, pitr_enabled: bool, ) -> None: - await _persist_branch_status(branch_id, BranchServiceStatus.CREATING) + await persist_branch_status(branch_id, BranchServiceStatus.CREATING) storage_class_name: str | None = None if copy_data: try: @@ -812,7 +638,7 @@ async def _clone_branch_environment_task( pitr_enabled=pitr_enabled, ) except VelaError: - await _persist_branch_status(branch_id, BranchServiceStatus.ERROR) + await persist_branch_status(branch_id, BranchServiceStatus.ERROR) await _cleanup_failed_branch_deployment(branch_id) logging.exception( "Branch data clone failed for project_id=%s branch_id=%s branch_slug=%s", @@ -837,7 +663,7 @@ async def _clone_branch_environment_task( pitr_enabled=pitr_enabled, ) except VelaError: - await _persist_branch_status(branch_id, BranchServiceStatus.ERROR) + await persist_branch_status(branch_id, BranchServiceStatus.ERROR) await _cleanup_failed_branch_deployment(branch_id) logging.exception( "Branch deployment (clone) failed for project_id=%s branch_id=%s branch_slug=%s", @@ -846,7 +672,7 @@ async def _clone_branch_environment_task( branch_slug, ) return - await _persist_branch_status(branch_id, BranchServiceStatus.STARTING) + await persist_branch_status(branch_id, BranchServiceStatus.STARTING) async def _restore_branch_environment_task( @@ -866,7 +692,7 @@ async def _restore_branch_environment_task( pgbouncer_config: PgbouncerConfigSnapshot, pitr_enabled: bool, ) -> None: - await _persist_branch_status(branch_id, BranchServiceStatus.CREATING) + await persist_branch_status(branch_id, BranchServiceStatus.CREATING) storage_class_name: str | None = None try: storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops) @@ -885,7 +711,7 @@ async def _restore_branch_environment_task( database_size=parameters.database_size, ) except VelaError: - await _persist_branch_status(branch_id, BranchServiceStatus.ERROR) + await persist_branch_status(branch_id, BranchServiceStatus.ERROR) await _cleanup_failed_branch_deployment(branch_id) logging.exception( "Branch restore failed for project_id=%s branch_id=%s branch_slug=%s using snapshot %s/%s", @@ -912,7 +738,7 @@ async def _restore_branch_environment_task( pitr_enabled=pitr_enabled, ) except VelaError: - await _persist_branch_status(branch_id, BranchServiceStatus.ERROR) + await persist_branch_status(branch_id, BranchServiceStatus.ERROR) await _cleanup_failed_branch_deployment(branch_id) logging.exception( "Branch deployment (restore) failed for project_id=%s branch_id=%s branch_slug=%s", @@ -921,7 +747,7 @@ async def _restore_branch_environment_task( branch_slug, ) return - await _persist_branch_status(branch_id, BranchServiceStatus.STARTING) + await persist_branch_status(branch_id, BranchServiceStatus.STARTING) def _resolve_db_host(branch: Branch) -> str | None: @@ -1015,7 +841,7 @@ async def _public(branch: Branch) -> BranchPublic: ) used_resources = branch.resource_usage_snapshot() - branch_status = _parse_branch_status(branch.status) + branch_status = parse_branch_status(branch.status) api_keys = BranchApiKeys(anon=branch.anon_key, service_role=branch.service_key) @@ -1440,7 +1266,7 @@ async def status( _project: ProjectDep, branch: BranchDep, ) -> BranchStatusPublic: - normalized_resize_statuses = _normalize_resize_statuses(branch) + normalized_resize_statuses = normalize_resize_statuses(branch) service_status = await collect_branch_service_health(branch.id) return BranchStatusPublic( resize_status=branch.resize_status, diff --git a/src/api/organization/project/branch/status.py b/src/api/organization/project/branch/status.py new file mode 100644 index 000000000..ee88e0e3f --- /dev/null +++ b/src/api/organization/project/branch/status.py @@ -0,0 +1,229 @@ +import logging +from datetime import UTC, datetime, timedelta +from typing import cast + +from pydantic import ValidationError + +from ....._util import Identifier +from .....deployment.monitors.health import vm_monitor +from .....models.branch import Branch, BranchResizeStatus, BranchResizeStatusEntry, BranchServiceStatus, BranchStatus +from ....db import AsyncSessionLocal + +logger = logging.getLogger(__name__) + + +_CREATING_STATUS_ERROR_GRACE_PERIOD = timedelta(minutes=5) +_STARTING_STATUS_ERROR_GRACE_PERIOD = timedelta(minutes=5) +_TRANSITIONAL_BRANCH_STATUSES: set[BranchServiceStatus] = { + BranchServiceStatus.CREATING, + BranchServiceStatus.STARTING, + BranchServiceStatus.STOPPING, + BranchServiceStatus.RESTARTING, + BranchServiceStatus.PAUSING, + BranchServiceStatus.RESUMING, + BranchServiceStatus.UPDATING, + BranchServiceStatus.DELETING, + BranchServiceStatus.RESIZING, +} +_PROTECTED_BRANCH_STATUSES: set[BranchServiceStatus] = {BranchServiceStatus.PAUSED} +_ACTIVE_RESIZE_STATUSES: set[BranchResizeStatus] = { + "PENDING", + "RESIZING", + "FILESYSTEM_RESIZE_PENDING", +} + + +def parse_branch_status(value: BranchServiceStatus | str | None) -> BranchServiceStatus: + if isinstance(value, BranchServiceStatus): + return value + if value: + # Normalize to the canonical representation expected by the enum ("STARTING", "STOPPED", etc.). + normalized_value = str(value).upper() + member = BranchServiceStatus._value2member_map_.get(normalized_value) + if member is not None: + return cast("BranchServiceStatus", member) + logger.warning("Encountered unknown branch status %s; defaulting to UNKNOWN", value) + return BranchServiceStatus.UNKNOWN + + +async def persist_branch_status(branch_id: Identifier, status: BranchServiceStatus) -> None: + async with AsyncSessionLocal() as session: + branch = await session.get(Branch, branch_id) + if branch is None: + logger.warning("Branch %s missing while updating status to %s", branch_id, status) + return + if parse_branch_status(branch.status) == status: + return + branch.set_status(status) + await session.commit() + + +def _should_update_branch_status( + current: BranchServiceStatus, + derived: BranchServiceStatus, + resize_status: BranchResizeStatus, +) -> bool: + if current == derived: + return False + if current == BranchServiceStatus.RESIZING and resize_status in _ACTIVE_RESIZE_STATUSES: + # Keep the explicit RESIZING while a resize is still in progress + # unless we detect a hard failure. + return derived == BranchServiceStatus.ERROR + if current == BranchServiceStatus.STARTING and derived == BranchServiceStatus.STOPPED: + logger.debug("Ignoring STARTING -> STOPPED transition detected by branch status monitor") + return False + if current in _PROTECTED_BRANCH_STATUSES and derived not in { + BranchServiceStatus.ACTIVE_HEALTHY, + BranchServiceStatus.ERROR, + }: + return False + if ( + derived == BranchServiceStatus.STOPPED + and current in _TRANSITIONAL_BRANCH_STATUSES + and current != BranchServiceStatus.STOPPING + ): + return False + if derived in { + BranchServiceStatus.ACTIVE_HEALTHY, + BranchServiceStatus.ACTIVE_UNHEALTHY, + BranchServiceStatus.STOPPED, + BranchServiceStatus.ERROR, + }: + return True + if derived == BranchServiceStatus.UNKNOWN: + return current not in _TRANSITIONAL_BRANCH_STATUSES and current not in _PROTECTED_BRANCH_STATUSES + return True + + +def _adjust_derived_status_for_stuck_creation( + branch: Branch, current: BranchServiceStatus, derived: BranchServiceStatus +) -> BranchServiceStatus: + if derived != BranchServiceStatus.STOPPED: + return derived + + status_timestamp = branch.status_updated_at or branch.created_datetime + elapsed = datetime.now(UTC) - status_timestamp + + if current == BranchServiceStatus.CREATING and elapsed >= _CREATING_STATUS_ERROR_GRACE_PERIOD: + logger.warning( + "Branch %s still CREATING after %s with STOPPED services; marking ERROR", + branch.id, + elapsed, + ) + return BranchServiceStatus.ERROR + + if current == BranchServiceStatus.STARTING and elapsed >= _STARTING_STATUS_ERROR_GRACE_PERIOD: + logger.warning( + "Branch %s still STARTING after %s with STOPPED services; marking ERROR", + branch.id, + elapsed, + ) + return BranchServiceStatus.ERROR + + return derived + + +async def refresh_branch_status(branch_id: Identifier) -> BranchServiceStatus: + """ + Probe branch services, derive an overall lifecycle state, and persist it when appropriate. + """ + async with AsyncSessionLocal() as session: + branch = await session.get(Branch, branch_id) + if branch is None: + logger.warning("Branch %s not found while refreshing status", branch_id) + return BranchServiceStatus.UNKNOWN + + current_status = parse_branch_status(branch.status) + try: + service_status = await collect_branch_service_health(branch_id) + derived_status = derive_branch_status_from_services( + service_status, + storage_enabled=branch.enable_file_storage, + ) + except Exception: + logger.exception("Failed to refresh service status for branch %s", branch.id) + derived_status = BranchServiceStatus.UNKNOWN + + derived_status = _adjust_derived_status_for_stuck_creation( + branch, + current_status, + derived_status, + ) + + if _should_update_branch_status( + current_status, + derived_status, + resize_status=branch.resize_status, + ): + branch.set_status(derived_status) + await session.commit() + return derived_status + + await session.rollback() + return current_status + + +def normalize_resize_statuses(branch: Branch) -> dict[str, BranchResizeStatusEntry]: + statuses = branch.resize_statuses or {} + if not statuses: + return {} + + normalized: dict[str, BranchResizeStatusEntry] = {} + for service, entry in statuses.items(): + if isinstance(entry, BranchResizeStatusEntry): + normalized[service] = entry + continue + try: + normalized[service] = BranchResizeStatusEntry.model_validate(entry) + except ValidationError: + logger.warning( + "Skipping invalid resize status entry for branch %s service %s", + branch.id, + service, + ) + return normalized + + +def derive_branch_status_from_services( + service_status: BranchStatus, + *, + storage_enabled: bool, +) -> BranchServiceStatus: + statuses: list[BranchServiceStatus] = [ + service_status.database, + service_status.meta, + service_status.rest, + ] + if storage_enabled: + statuses.append(service_status.storage) + + if all(status == BranchServiceStatus.ACTIVE_HEALTHY for status in statuses): + return BranchServiceStatus.ACTIVE_HEALTHY + if any(status == BranchServiceStatus.ERROR for status in statuses): + return BranchServiceStatus.ERROR + if all(status == BranchServiceStatus.STOPPED for status in statuses): + return BranchServiceStatus.STOPPED + if any(status == BranchServiceStatus.UNKNOWN for status in statuses): + return BranchServiceStatus.UNKNOWN + return BranchServiceStatus.ACTIVE_UNHEALTHY + + +async def collect_branch_service_health(id_: Identifier) -> BranchStatus: + status = vm_monitor.status(id_) + if status is None or status.services is None: + return BranchStatus( + database=BranchServiceStatus.UNKNOWN, + storage=BranchServiceStatus.UNKNOWN, + meta=BranchServiceStatus.UNKNOWN, + rest=BranchServiceStatus.UNKNOWN, + ) + + services = status.services + return BranchStatus( + database=BranchServiceStatus.ACTIVE_HEALTHY if services.get("postgres", False) else BranchServiceStatus.STOPPED, + storage=BranchServiceStatus.ACTIVE_HEALTHY + if services.get("storageapi", False) + else BranchServiceStatus.STOPPED, + meta=BranchServiceStatus.ACTIVE_HEALTHY if services.get("meta", False) else BranchServiceStatus.STOPPED, + rest=BranchServiceStatus.ACTIVE_HEALTHY if services.get("rest", False) else BranchServiceStatus.STOPPED, + ) diff --git a/src/api/resources.py b/src/api/resources.py index 52dfb6993..b9ff61576 100644 --- a/src/api/resources.py +++ b/src/api/resources.py @@ -11,7 +11,6 @@ from sqlmodel import select from .._util import quantity_to_bytes, quantity_to_milli_cpu -from ..check_branch_status import get_branch_status from ..deployment import ( get_autoscaler_vm_identity, resolve_autoscaler_volume_identifiers, @@ -56,7 +55,7 @@ from .auth import authenticated_user from .db import SessionDep from .dependencies import OrganizationDep -from .organization.project.branch import refresh_branch_status +from .organization.project.branch.status import refresh_branch_status from .settings import get_settings router = APIRouter(dependencies=[Depends(authenticated_user)], tags=["resource"]) @@ -496,7 +495,6 @@ async def monitor_resources(): continue branch.store_resource_usage(usage) - status = await get_branch_status(branch.id) if status in [BranchServiceStatus.ACTIVE_HEALTHY, BranchServiceStatus.RESIZING]: prov_result = await db.execute( select(BranchProvisioning).where(BranchProvisioning.branch_id == branch.id) diff --git a/src/check_branch_status.py b/src/check_branch_status.py deleted file mode 100644 index a452631be..000000000 --- a/src/check_branch_status.py +++ /dev/null @@ -1,35 +0,0 @@ -import logging - -from sqlmodel.ext.asyncio.session import AsyncSession - -from ._util import Identifier -from .api.organization.project.branch import refresh_branch_status -from .models.branch import Branch, BranchServiceStatus - -logger = logging.getLogger(__name__) - - -async def _resolve_branch_status(session: AsyncSession, branch_id: Identifier) -> BranchServiceStatus: - branch = await session.get(Branch, branch_id) - if branch is None: - logger.warning("Branch %s not found while resolving status; returning UNKNOWN", branch_id) - return BranchServiceStatus.UNKNOWN - status = branch.status or BranchServiceStatus.UNKNOWN - if isinstance(status, BranchServiceStatus): - return status - member = BranchServiceStatus._value2member_map_.get(status) if status else None - if member is None: - logger.warning("Encountered unknown branch status %s; returning UNKNOWN", status) - return BranchServiceStatus.UNKNOWN - return member - - -async def get_branch_status( - branch_id: Identifier, - *, - session: AsyncSession | None = None, -) -> BranchServiceStatus: - if session is not None: - return await _resolve_branch_status(session, branch_id) - - return await refresh_branch_status(branch_id) diff --git a/src/deployment/health.py b/src/deployment/health.py deleted file mode 100644 index 25623ba8a..000000000 --- a/src/deployment/health.py +++ /dev/null @@ -1,63 +0,0 @@ -import logging - -from .._util import Identifier -from ..models.branch import BranchServiceStatus, BranchStatus -from .monitors.health import vm_monitor - -logger = logging.getLogger(__name__) - -SERVICE_PROBE_TIMEOUT_SECONDS = 2 - -BRANCH_SERVICE_ENDPOINTS: dict[str, tuple[str, int]] = { - "database": ("db", 5432), - "pgbouncer": ("pgbouncer", 6432), - "storage": ("storage", 5000), - "meta": ("meta", 8080), - "rest": ("rest", 3000), - "pgexporter": ("pgexporter", 9187), -} - - -def derive_branch_status_from_services( - service_status: BranchStatus, - *, - storage_enabled: bool, -) -> BranchServiceStatus: - statuses: list[BranchServiceStatus] = [ - service_status.database, - service_status.meta, - service_status.rest, - ] - if storage_enabled: - statuses.append(service_status.storage) - - if all(status == BranchServiceStatus.ACTIVE_HEALTHY for status in statuses): - return BranchServiceStatus.ACTIVE_HEALTHY - if any(status == BranchServiceStatus.ERROR for status in statuses): - return BranchServiceStatus.ERROR - if all(status == BranchServiceStatus.STOPPED for status in statuses): - return BranchServiceStatus.STOPPED - if any(status == BranchServiceStatus.UNKNOWN for status in statuses): - return BranchServiceStatus.UNKNOWN - return BranchServiceStatus.ACTIVE_UNHEALTHY - - -async def collect_branch_service_health(id_: Identifier) -> BranchStatus: - status = vm_monitor.status(id_) - if status is None or status.services is None: - return BranchStatus( - database=BranchServiceStatus.UNKNOWN, - storage=BranchServiceStatus.UNKNOWN, - meta=BranchServiceStatus.UNKNOWN, - rest=BranchServiceStatus.UNKNOWN, - ) - - services = status.services - return BranchStatus( - database=BranchServiceStatus.ACTIVE_HEALTHY if services.get("postgres", False) else BranchServiceStatus.STOPPED, - storage=BranchServiceStatus.ACTIVE_HEALTHY - if services.get("storageapi", False) - else BranchServiceStatus.STOPPED, - meta=BranchServiceStatus.ACTIVE_HEALTHY if services.get("meta", False) else BranchServiceStatus.STOPPED, - rest=BranchServiceStatus.ACTIVE_HEALTHY if services.get("rest", False) else BranchServiceStatus.STOPPED, - ) diff --git a/src/deployment/monitors/resize/__init__.py b/src/deployment/monitors/resize/__init__.py index 8eb219299..fee1051d9 100644 --- a/src/deployment/monitors/resize/__init__.py +++ b/src/deployment/monitors/resize/__init__.py @@ -49,7 +49,6 @@ should_transition_resize_status, ) from ....models.resources import ResourceLimitsPublic -from ...health import collect_branch_service_health, derive_branch_status_from_services from .pvc_resize import ( INITIAL_BACKOFF_SECONDS, VOLUME_SERVICE_MAP, @@ -103,11 +102,9 @@ async def _apply_volume_status( } branch.resize_statuses = statuses branch.resize_status = aggregate_resize_statuses(statuses) - await set_branch_status(branch.resize_status, branch) status_updated = True elif status is not None and should_transition_resize_status(branch.resize_status, status): branch.resize_status = status - await set_branch_status(branch.resize_status, branch) status_updated = True if status_updated and status == "COMPLETED" and capacity is not None: @@ -131,19 +128,6 @@ async def _apply_volume_status( await session.commit() -async def set_branch_status(status: BranchResizeStatus, branch: Branch) -> None: - if status not in {"FAILED", "COMPLETED"}: - return - - service_status = await collect_branch_service_health(branch.id) - branch.set_status( - derive_branch_status_from_services( - service_status, - storage_enabled=branch.enable_file_storage, - ) - ) - - async def _handle_pvc_event(core_v1: CoreV1Api, event: CoreV1Event) -> None: """Map a raw Kubernetes event into branch status updates and capacity changes.""" ref = event.involved_object