diff --git a/docs/internal/adr_inbound_migration_limit.ru.md b/docs/internal/adr_inbound_migration_limit.ru.md new file mode 100644 index 0000000000..2d1c5fbf11 --- /dev/null +++ b/docs/internal/adr_inbound_migration_limit.ru.md @@ -0,0 +1,540 @@ +# ADR: ограничение входящих live migrations на target node + +## Статус + +Предложено. + +## Контекст + +В модуле virtualization live migration выполняется через KubeVirt `VirtualMachineInstanceMigration`. +Пользовательские и автоматические сценарии миграции в Deckhouse проходят через несколько уровней: + +1. `VirtualMachineOperation` (`VMOP`) создаётся пользователем, контроллером эвакуации, workload-updater или другим компонентом. +2. `vmop-migration-controller` создаёт KubeVirt-ресурс `VirtualMachineInstanceMigration`. +3. KubeVirt создаёт target pod для миграции. +4. Kubernetes scheduler назначает target pod на node. +5. KubeVirt выполняет live migration. +6. Контроллеры virtualization синхронизируют статус KubeVirt migration обратно в `VMOP` и `VirtualMachine`. + +Сейчас ограничение параллелизма задаётся через KubeVirt `MigrationConfiguration`: + +```yaml +parallelMigrationsPerCluster: +parallelOutboundMigrationsPerNode: +``` + +В KubeVirt нет симметричной настройки: + +```yaml +parallelInboundMigrationsPerNode: +``` + +Из-за этого платформа умеет ограничивать количество исходящих миграций с source node, но не умеет ограничивать количество входящих миграций на target node. На практике несколько VM могут одновременно мигрировать на одну и ту же target node, даже если для source nodes ограничение уже работает. + +Требование: контролировать, что входящих миграций на target node не более одной. Остальные миграции должны штатно ожидать свободный inbound slot, а не завершаться ошибкой. + +## Новые вводные + +В проекте уже есть механизм ожидания динамических параметров миграции: + +- `virtualization-controller` вычисляет параметры через `images/virtualization-artifact/pkg/livemigration/migration_configuration.go`; +- `livemigration-controller` патчит `KVVMI.status.migrationState.migrationConfiguration`; +- KubeVirt/virt-launcher ждёт `migrationConfiguration` перед продолжением миграции. + +Основная точка подключения: + +```text +images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler.go +DynamicSettingsHandler.Handle(ctx, kvvmi) +``` + +Сейчас handler выставляет: + +```go +kvvmi.Status.MigrationState.MigrationConfiguration = conf +``` + +Эту точку нужно использовать как gate для inbound migration limit: не выставлять `MigrationConfiguration`, пока target node не получила inbound slot. + +## Проблема + +Ограничение нельзя надёжно реализовать до создания `VirtualMachineInstanceMigration`, потому что target node становится известна только после создания target pod и его назначения scheduler-ом. + +Наш контроллер не должен заранее выбирать target node. Иначе ему пришлось бы повторять часть логики Kubernetes scheduler и KubeVirt placement: + +- учитывать `nodeSelector` из `VMOP.spec.migrate.nodeSelector`; +- учитывать placement самой `VirtualMachine`; +- учитывать taints/tolerations, affinities, resources, devices, storage constraints; +- учитывать динамические изменения node и pod scheduling state. + +Такой подход будет неполным и не даст гарантии, что KubeVirt и scheduler выберут именно ту node, которую предварительно проверил controller. + +Также ограничение должно применяться не только к миграциям, созданным через пользовательский `VMOP`, но и к другим источникам миграций: + +- eviction; +- node drain; +- workload updater; +- автоматические системные миграции; +- миграции, созданные напрямую через KubeVirt API. + +## Решение + +Реализовать inbound migration limit в `virtualization-controller` через задержку выдачи `MigrationConfiguration` до получения `Lease` на target node. + +Общий flow: + +1. `VirtualMachineInstanceMigration` создаётся как сейчас. +2. KubeVirt создаёт target pod. +3. Kubernetes scheduler назначает target pod на target node. +4. `livemigration-controller` определяет target node. +5. Перед patch-ем `KVVMI.status.migrationState.migrationConfiguration` controller пытается получить inbound slot через Kubernetes `Lease`. +6. Если slot получен, controller выставляет `MigrationConfiguration`, и миграция продолжается. +7. Если slot занят, controller не выставляет `MigrationConfiguration`, помечает VMI annotation-ом ожидания и requeue-ит reconcile. +8. KubeVirt patch в месте ожидания `migrationConfiguration` не должен считать timeout, пока VMI явно помечена как ожидающая inbound lease. +9. Lease освобождается при завершении миграции или через stale recovery. + +На первом этапе лимит фиксированный: + +```text +parallelInboundMigrationsPerNode = 1 +``` + +При этом механизм должен проектироваться как slot-based limiter: один `Lease` соответствует одному inbound slot на target node. Лимит `1` является частным случаем с одним slot. + +## Target node + +Target node выбирает Kubernetes scheduler, а не `virtualization-controller`. + +`livemigration-controller` определяет target node из доступного состояния KubeVirt: + +1. `kvvmi.Status.MigrationState.TargetNode`, если поле заполнено; +2. `kvvmi.Status.MigrationState.TargetPod` → `pod.spec.nodeName`, если target pod уже создан и назначен scheduler-ом. + +Если target node ещё неизвестна, inbound limiter не должен блокировать миграцию и не должен пытаться выбрать node самостоятельно. Controller ждёт следующего reconcile, когда KubeVirt/scheduler продвинут scheduling target pod. + +## Gate через MigrationConfiguration + +`MigrationConfiguration` становится точкой допуска миграции к активной фазе. + +Логика в `DynamicSettingsHandler.Handle`: + +```go +targetNode := resolveTargetNode(kvvmi) +if targetNode == "" { + return requeue +} + +acquired, err := inboundLimiter.TryAcquire(ctx, kvvmi, targetNode, limit) +if err != nil { + return err +} + +if !acquired { + markInboundSlotWaiting(kvvmi, targetNode) + return requeue +} + +clearInboundSlotWaiting(kvvmi) +kvvmi.Status.MigrationState.MigrationConfiguration = conf +``` + +Если lease занята, `MigrationConfiguration` не выставляется. Это удерживает миграцию в уже существующем KubeVirt flow ожидания параметров миграции. + +## Annotation model + +Для явного состояния ожидания используются annotations на VMI: + +```yaml +virtualization.deckhouse.io/inbound-migration-slot: waiting +virtualization.deckhouse.io/inbound-migration-target-node: +``` + +Правила: + +- если inbound slot не получен, controller выставляет `inbound-migration-slot=waiting` и target node; +- пока annotation имеет значение `waiting`, `MigrationConfiguration` не выставляется; +- когда lease получена, controller удаляет waiting annotations и выставляет `MigrationConfiguration`; +- отдельное состояние `acquired` не требуется: наличие `MigrationConfiguration` и отсутствие `waiting` достаточно для продолжения миграции. + +Эти annotations нужны не только для диагностики, но и для корректной работы timeout-а ожидания migration parameters в KubeVirt. + +## Timeout ожидания migration parameters + +В KubeVirt есть ожидание `migrationConfiguration`. Если параметры не появились за заданное время, миграция может быть завершена ошибкой. + +Для inbound limiter это поведение нужно изменить: + +```text +Если MigrationConfiguration == nil +и VMI имеет annotation virtualization.deckhouse.io/inbound-migration-slot=waiting, +то timeout ожидания migration parameters не должен тикать или не должен приводить к failed migration. +``` + +Иначе вторая и последующие миграции на ту же target node будут падать по timeout, хотя они штатно стоят в очереди на inbound slot. + +Если annotation `waiting` отсутствует, существующее поведение timeout-а сохраняется. Это важно, чтобы реальные проблемы с выдачей migration parameters не маскировались inbound limiter-ом. + +## Lease model + +Рекомендуемая реализация limiter-а — Kubernetes `Lease` из `coordination.k8s.io/v1`. + +Один `Lease` представляет один inbound slot target node. + +При лимите `1` для target node доступен один slot: + +```text +namespace: d8-virtualization +name: inbound-migration--0 +holderIdentity: // +``` + +При будущем лимите `5` для той же target node доступны пять независимых slots: + +```text +inbound-migration--0 +inbound-migration--1 +inbound-migration--2 +inbound-migration--3 +inbound-migration--4 +``` + +Правила: + +- если slot lease отсутствует, миграция может создать его со своим holder; +- если slot lease уже принадлежит текущей миграции/VMI, reconcile идемпотентно продолжает выполнение и обновляет `renewTime`; +- если slot lease принадлежит другой active migration, slot считается занятым; +- если владелец slot lease отсутствует, terminal или stale, slot можно перехватить через optimistic update; +- если все slots заняты другими active migrations, текущая миграция остаётся в ожидании `MigrationConfiguration`; +- release удаляет только lease, принадлежащий текущей миграции/VMI. + +Рекомендуемый объект: + +```yaml +apiVersion: coordination.k8s.io/v1 +kind: Lease +metadata: + namespace: d8-virtualization + name: inbound-migration-- + labels: + virtualization.deckhouse.io/component: inbound-migration-limiter + virtualization.deckhouse.io/target-node-hash: + virtualization.deckhouse.io/slot-index: "" + annotations: + virtualization.deckhouse.io/target-node: + virtualization.deckhouse.io/migration-namespace: + virtualization.deckhouse.io/migration-name: + virtualization.deckhouse.io/migration-uid: +spec: + holderIdentity: // + leaseDurationSeconds: 300 + acquireTime: + renewTime: +``` + +OwnerReference на `VirtualMachineInstanceMigration` добавлять не нужно, потому что migration namespaced, а lease хранится в namespace control plane. Cross-namespace owner reference для namespaced объектов некорректен. + +## TryAcquire + +`TryAcquire(ctx, kvvmi, targetNode, limit)` должен работать так: + +1. Построить список lease names по `targetNode` и текущему лимиту. +2. Сначала найти lease, который уже принадлежит текущей migration/VMI. +3. Если такой lease найден, обновить `renewTime` и вернуть `true`. +4. Если текущая migration ещё не владеет slot-ом, пройти по всем slots и попытаться занять первый доступный. +5. Если lease не найден, создать lease с holder текущей migration/VMI. +6. Если create завершился conflict/already exists, перечитать slot или перейти к следующему. +7. Если lease принадлежит другой migration, проверить владельца. +8. Если владелец существует и не terminal, считать slot занятым. +9. Если владелец отсутствует или terminal, перехватить slot через `Update` с текущим `resourceVersion`. +10. Если один из slots успешно создан или перехвачен, вернуть `true`. +11. Если все slots заняты активными владельцами, вернуть `false`. + +Операции `Get/Create/Update/Delete` для Lease желательно выполнять через non-cached client или APIReader, если это доступно в месте интеграции. Корректность должна опираться на optimistic concurrency Kubernetes API. + +## Release и stale recovery + +Lease должен освобождаться при завершении миграции: + +- `VirtualMachineInstanceMigration` перешла в terminal phase; +- migration завершилась `Completed`/`Failed` на уровне отслеживаемого состояния; +- VMI больше не находится в live migration state; +- владелец lease удалён или его UID не совпадает с UID в annotations. + +`Release(ctx, owner, targetNode)` должен быть идемпотентным: + +1. Найти lease, принадлежащий текущей migration/VMI. +2. Если lease отсутствует, завершиться успешно. +3. Если lease принадлежит текущему owner, удалить lease. +4. Если delete получил `NotFound`, завершиться успешно. + +Если лимит был уменьшен после того, как migration заняла slot с индексом за пределами нового лимита, release всё равно должен уметь найти и удалить её lease. Для этого release может дополнительно list-ить leases по labels `component=inbound-migration-limiter` и `target-node-hash=`, а затем фильтровать holder текущей migration/VMI. + +Stale recovery выполняется при `TryAcquire`: + +1. прочитать holder из annotations/`holderIdentity`; +2. найти соответствующую `VirtualMachineInstanceMigration` или VMI; +3. если owner отсутствует, UID отличается или migration terminal, считать lease stale; +4. перехватить lease через optimistic update с `resourceVersion`. + +`leaseDurationSeconds` и `renewTime` используются как диагностическая и safety-информация. Нельзя освобождать lease только по истечению времени, если migration-владелец всё ещё существует и не terminal: долгие live migrations допустимы. + +## Статусы и условия + +Ожидающая inbound slot миграция не должна считаться failed. + +На уровне KubeVirt migration и VMI желательно отражать ожидание через annotation: + +```text +virtualization.deckhouse.io/inbound-migration-slot=waiting +virtualization.deckhouse.io/inbound-migration-target-node= +``` + +На уровне `VirtualMachineOperation` можно использовать существующий pending mapping: + +```text +VMOP.status.phase: Pending +Completed condition: + status: False + reason: MigrationPending + message: The VirtualMachineOperation for migrating the virtual machine has been queued. Waiting for the queue to be processed and for this operation to be executed. +``` + +Для лучшей диагностики можно уточнить message, если VMI помечена как ожидающая inbound slot. Добавление нового API reason возможно позже, но для первого этапа не обязательно. + +## Конфигурация + +### Первый этап + +Лимит фиксированный: + +```text +parallelInboundMigrationsPerNode = 1 +``` + +Преимущества: + +- минимальные изменения публичного API; +- не требует новых ModuleConfig параметров; +- закрывает исходное требование; +- оставляет простой путь к будущему конфигурируемому лимиту. + +### Возможное развитие + +Позже можно сделать настройку конфигурируемой через ModuleConfig annotation и Helm values: + +```yaml +virtualization.deckhouse.io/parallel-inbound-migrations-per-node: "5" +``` + +Внутренний values path: + +```text +virtualization.internal.virtConfig.parallelInboundMigrationsPerNode +``` + +Так как upstream KubeVirt `MigrationConfiguration` не содержит такого поля, эта настройка будет Deckhouse-specific и должна применяться в логике `virtualization-controller`, а не как поле upstream `MigrationConfiguration`. + +## RBAC + +`virtualization-controller` должен получить права на leases в namespace `d8-virtualization`: + +```text +apiGroups: ["coordination.k8s.io"] +resources: ["leases"] +verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] +``` + +`list/watch` нужны для cleanup/stale recovery и для случаев, когда release должен найти slot за пределами текущего лимита. + +## Изменяемые компоненты + +Основные места реализации: + +- `images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler.go` — gate перед выставлением `MigrationConfiguration`; +- `images/virtualization-artifact/pkg/controller/livemigration/live_migration_reconciler.go` — release/stale cleanup на завершении миграции; +- `images/virtualization-artifact/pkg/livemigration/migration_configuration.go` — существующая генерация `MigrationConfiguration`; +- `images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go` — отображение ожидания в статус `VMOP`, если потребуется; +- KubeVirt patch в месте ожидания `migrationConfiguration` — исключить период `inbound-migration-slot=waiting` из timeout-а ожидания parameters; +- `images/hooks/pkg/hooks/migration-config/hook.go` и `templates/kubevirt/_kubevirt_helpers.tpl` — существующие места конфигурации миграций, без добавления upstream-поля `parallelInboundMigrationsPerNode`. + +## Альтернативы + +### Альтернатива 1: предварительно выбирать target node в `vmop-migration-controller` + +Суть: до создания `VirtualMachineInstanceMigration` выбрать target node и не создавать migration, если node занята. + +Недостатки: + +- target node должен выбирать Kubernetes scheduler; +- controller должен повторить scheduler logic; +- нет гарантии, что KubeVirt выберет проверенную node; +- не покрывает миграции, созданные не через `VMOP`; +- возможны гонки между несколькими VMOP. + +Решение отклонено. + +### Альтернатива 2: limiter внутри patched KubeVirt `virt-controller` + +Суть: встроить Lease limiter непосредственно в KubeVirt migration control loop. + +Преимущества: + +- близко к месту управления lifecycle KubeVirt migration; +- можно блокировать продвижение фаз напрямую. + +Недостатки: + +- больше patch surface в KubeVirt; +- сложнее поддерживать при обновлениях upstream; +- в проекте уже есть Deckhouse-specific gate ожидания `MigrationConfiguration`, который решает задачу меньшим изменением. + +Решение отклонено в пользу gate через `MigrationConfiguration`. + +### Альтернатива 3: простой подсчёт активных миграций без Lease + +Суть: перед выдачей `MigrationConfiguration` list-ить все migrations и считать active incoming на target node. + +Преимущества: + +- проще реализации; +- не требует дополнительных ресурсов. + +Недостатки: + +- нет строгой гарантии при concurrent reconcile; +- возможны race conditions; +- поведение зависит от cache freshness. + +Можно использовать как дополнительную диагностику, но не как основной механизм гарантии. + +Решение отклонено как основной вариант. + +### Альтернатива 4: mutating webhook и init container gate + +Суть: модифицировать target pod через webhook и удерживать его через init container до получения inbound slot. + +Недостатки: + +- сложнее операционно; +- требует вмешательства в pod lifecycle; +- уже есть более подходящая точка ожидания `MigrationConfiguration`; +- возможны побочные эффекты для KubeVirt target pod lifecycle. + +Решение отклонено. + +### Альтернатива 5: reactive abort/retry + +Суть: разрешить KubeVirt начать миграцию, а при превышении inbound limit abort-ить или retry-ить лишние миграции. + +Недостатки: + +- на короткое время лимит может быть превышен; +- создаёт лишние abort/retry циклы; +- хуже пользовательский опыт; +- сложнее отличать штатную очередь от ошибки. + +Решение отклонено. + +## Последствия + +### Положительные + +- На target node будет не более настроенного числа активных входящих live migrations; на первом этапе — не более одной. +- Target node по-прежнему выбирает Kubernetes scheduler. +- Не нужно реализовывать собственную scheduler logic в `virtualization-controller`. +- Остальные миграции будут ждать, а не падать. +- Ожидание использует уже существующий gate `MigrationConfiguration`. +- Ограничение будет работать независимо от источника миграции. +- Lease даёт строгую защиту от race condition между несколькими reconcile workers. +- Снижается риск перегрузки target node сетью, CPU, памятью и storage attach операциями. + +### Отрицательные + +- Требуется новый служебный ресурс `Lease` и логика очистки stale leases. +- Требуется patch KubeVirt timeout-а ожидания `migrationConfiguration`. +- Появляется Deckhouse-specific поведение, которое нужно учитывать при обновлении KubeVirt. +- Возможна меньшая скорость массовой эвакуации, если много VM мигрируют на одну target node. +- Нужно аккуратно синхронизировать annotations, lease ownership и status patch VMI. + +## План реализации + +### Шаг 1. Добавить inbound limiter в `virtualization-controller` + +Добавить компонент примерно такого вида: + +```go +type InboundMigrationLimiter interface { + TryAcquire(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode string, limit int) (bool, error) + Release(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode string, limit int) error +} +``` + +Реализация должна использовать `coordination.k8s.io/v1 Lease`. + +### Шаг 2. Интегрировать limiter в `DynamicSettingsHandler.Handle` + +Логика: + +1. определить target node из `kvvmi.Status.MigrationState.TargetNode` или target pod `spec.nodeName`; +2. если target node неизвестна, requeue без выбора node; +3. вызвать `TryAcquire`; +4. если slot не получен, выставить waiting annotations и не выставлять `MigrationConfiguration`; +5. если slot получен, удалить waiting annotations и выставить `MigrationConfiguration`. + +### Шаг 3. Изменить KubeVirt timeout ожидания parameters + +В KubeVirt patch-е ожидания `migrationConfiguration` добавить правило: + +```text +VMI with virtualization.deckhouse.io/inbound-migration-slot=waiting is waiting for inbound lease and must not fail by migration parameters timeout. +``` + +### Шаг 4. Добавить release/stale recovery + +Логика: + +1. при terminal migration удалить lease owner-а; +2. при failed/completed состояниях удалить lease; +3. при обнаружении stale holder-а во время `TryAcquire` перехватить slot; +4. сделать release идемпотентным. + +### Шаг 5. Синхронизировать диагностику в VMOP + +Если VMI имеет `inbound-migration-slot=waiting`, `vmop-migration-controller` должен отображать операцию как pending, а не failed. + +Минимальный вариант: + +- `VMOP.status.phase = Pending`; +- `Completed.reason = MigrationPending`; +- message содержит информацию про ожидание inbound slot на target node. + +### Шаг 6. Тесты + +Нужны unit/integration тесты: + +1. одна миграция на target node получает slot lease и получает `MigrationConfiguration`; +2. при лимите `1` вторая миграция на ту же target node получает waiting annotations и не получает `MigrationConfiguration`; +3. KubeVirt timeout ожидания parameters не fail-ит VMI с `inbound-migration-slot=waiting`; +4. migration без waiting annotation сохраняет существующее timeout-поведение; +5. миграция на другую target node получает свой lease и продолжается; +6. после завершения первой миграции ожидающая миграция получает освободившийся slot; +7. stale lease от отсутствующей или terminal migration перехватывается; +8. lease, принадлежащий текущей migration, не блокирует повторный reconcile; +9. concurrent `TryAcquire` не выдаёт один и тот же slot двум migrations одновременно; +10. release идемпотентен; +11. VMOP для ожидающей inbound slot миграции остаётся в `Pending`, а не переходит в `Failed`. + +## Нерешённые вопросы + +1. Достаточно ли хранить holder по `VirtualMachineInstanceMigration`, или удобнее привязывать lease к VMI и текущему migration UID из `MigrationState`? +2. Делать ли `parallelInboundMigrationsPerNode` публичной настройкой сразу или оставить фиксированным `1`? +3. Нужно ли добавлять новый API reason в `VMOP`, или достаточно существующего `MigrationPending` с уточнённым message? +4. Где именно в KubeVirt ожидании `migrationConfiguration` лучше исключить waiting period из timeout-а: останавливать timer или игнорировать timeout result при наличии annotation? + +## Рекомендация + +Реализовать inbound migration limit через задержку выдачи `MigrationConfiguration` в `virtualization-controller` до получения Lease на target node. + +Target node должен выбирать Kubernetes scheduler. `virtualization-controller` только читает результат scheduling-а из KubeVirt/VMI state и использует его для Lease-based limiter-а. + +На первом этапе использовать фиксированный лимит `1`, waiting annotations и patch KubeVirt timeout-а ожидания migration parameters. В `VMOP` отображать ожидание как `Pending`, не переводя операцию в `Failed`. diff --git a/images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler.go b/images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler.go index 6b98be0751..c11e2cd2ad 100644 --- a/images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler.go +++ b/images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler.go @@ -18,7 +18,9 @@ package internal import ( "context" + "time" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" virtv1 "kubevirt.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -29,28 +31,76 @@ import ( "github.com/deckhouse/virtualization/api/core/v1alpha2" ) -const dynamicSettingsHandlerName = "DynamicSettingsHandler" +const ( + dynamicSettingsHandlerName = "DynamicSettingsHandler" + inboundSlotRequeueDelay = 5 * time.Second +) + +type InboundMigrationLimiter interface { + TryAcquire(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode string, limit int) (bool, error) + Release(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode string, limit int) error +} func NewDynamicSettingsHandler(client client.Client) *DynamicSettingsHandler { + return NewDynamicSettingsHandlerWithLimiter(client, livemigration.NewInboundMigrationLimiter(client)) +} + +func NewDynamicSettingsHandlerWithLimiter(client client.Client, limiter InboundMigrationLimiter) *DynamicSettingsHandler { return &DynamicSettingsHandler{ - client: client, + client: client, + limiter: limiter, } } type DynamicSettingsHandler struct { - client client.Client + client client.Client + limiter InboundMigrationLimiter } func (h *DynamicSettingsHandler) Handle(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance) (reconcile.Result, error) { log := logger.FromContext(ctx).With(logger.SlogHandler(dynamicSettingsHandlerName)) + if kvvmi.Status.MigrationState != nil && (kvvmi.Status.MigrationState.Completed || kvvmi.Status.MigrationState.Failed) { + targetNode, err := h.resolveTargetNode(ctx, kvvmi) + if err != nil { + return reconcile.Result{}, err + } + if err := h.limiter.Release(ctx, kvvmi, targetNode, livemigration.ParallelInboundMigrationsPerNodeDefault); err != nil { + return reconcile.Result{}, err + } + livemigration.ClearInboundMigrationSlotWaiting(kvvmi) + return reconcile.Result{}, nil + } + if !h.shouldUpdateMigrationConfiguration(kvvmi) { return reconcile.Result{}, nil } + targetNode, err := h.resolveTargetNode(ctx, kvvmi) + if err != nil { + return reconcile.Result{}, err + } + if targetNode == "" { + log.Debug("Target node is not resolved yet, waiting before setting migrationConfiguration") + return reconcile.Result{RequeueAfter: inboundSlotRequeueDelay}, nil + } + + acquired, err := h.limiter.TryAcquire(ctx, kvvmi, targetNode, livemigration.ParallelInboundMigrationsPerNodeDefault) + if err != nil { + return reconcile.Result{}, err + } + if !acquired { + livemigration.MarkInboundMigrationSlotWaiting(kvvmi, targetNode) + log.Debug("Inbound migration slot is not acquired, waiting before setting migrationConfiguration", + "targetNode", targetNode, + ) + return reconcile.Result{RequeueAfter: inboundSlotRequeueDelay}, nil + } + livemigration.ClearInboundMigrationSlotWaiting(kvvmi) + var vm v1alpha2.VirtualMachine vmKey := client.ObjectKeyFromObject(kvvmi) - err := h.client.Get(ctx, vmKey, &vm) + err = h.client.Get(ctx, vmKey, &vm) if err != nil { return reconcile.Result{}, err } @@ -99,10 +149,32 @@ func (h *DynamicSettingsHandler) Name() string { // shouldUpdateMigrationConfiguration indicates if live migration controller should inject // migration configuration into KVVMI status: // 1. status.migrationState is created by the virt-controller. -// 2. migration is not in a Completed state. +// 2. migration is not in a terminal state and has no migration configuration yet. func (h *DynamicSettingsHandler) shouldUpdateMigrationConfiguration(kvvmi *virtv1.VirtualMachineInstance) bool { return kvvmi.Status.MigrationState != nil && - !kvvmi.Status.MigrationState.Completed + !kvvmi.Status.MigrationState.Completed && + !kvvmi.Status.MigrationState.Failed && + kvvmi.Status.MigrationState.MigrationConfiguration == nil +} + +func (h *DynamicSettingsHandler) resolveTargetNode(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance) (string, error) { + if kvvmi.Status.MigrationState == nil { + return "", nil + } + if kvvmi.Status.MigrationState.TargetNode != "" { + return kvvmi.Status.MigrationState.TargetNode, nil + } + if kvvmi.Status.MigrationState.TargetPod == "" { + return "", nil + } + + var pod corev1.Pod + err := h.client.Get(ctx, types.NamespacedName{Namespace: kvvmi.Namespace, Name: kvvmi.Status.MigrationState.TargetPod}, &pod) + if err != nil { + return "", client.IgnoreNotFound(err) + } + + return pod.Spec.NodeName, nil } // getVMOPInProgressForVM check if there is at least one VMOP for the same VM in progress phase. diff --git a/images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler_test.go b/images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler_test.go index 1652108564..7a380b280d 100644 --- a/images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler_test.go +++ b/images/virtualization-artifact/pkg/controller/livemigration/internal/dynamic_settings_handler_test.go @@ -20,11 +20,13 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" virtv1 "kubevirt.io/api/core/v1" vmbuilder "github.com/deckhouse/virtualization-controller/pkg/builder/vm" "github.com/deckhouse/virtualization-controller/pkg/common/testutil" + "github.com/deckhouse/virtualization-controller/pkg/livemigration" "github.com/deckhouse/virtualization/api/core/v1alpha2" ) @@ -57,6 +59,13 @@ var _ = Describe("TestDynamicSettingsHandler", func() { return vmi } + withMigrationState := func(kvvmi *virtv1.VirtualMachineInstance, migrationUID string) { + kvvmi.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{ + TargetNode: "node-a", + MigrationUID: types.UID(migrationUID), + } + } + newVMOPEvict := func(force *bool) *v1alpha2.VirtualMachineOperation { vmop := &v1alpha2.VirtualMachineOperation{ TypeMeta: metav1.TypeMeta{ @@ -98,8 +107,7 @@ var _ = Describe("TestDynamicSettingsHandler", func() { It("Should set migrationConfiguration", func() { vm := newVM() kvvmi := newKVVMI() - - kvvmi.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{} + withMigrationState(kvvmi, "migration-uid") fakeClient := setupEnvironment(kvvmi, vm, newKVConfig()) h := NewDynamicSettingsHandler(fakeClient) @@ -107,12 +115,38 @@ var _ = Describe("TestDynamicSettingsHandler", func() { Expect(err).NotTo(HaveOccurred()) Expect(kvvmi.Status.MigrationState.MigrationConfiguration).ShouldNot(BeNil(), "Should set migrationConfiguration") + Expect(kvvmi.Annotations).NotTo(HaveKey(livemigration.InboundMigrationSlotAnnotation)) + }) + + It("Should wait without migrationConfiguration when inbound slot is busy", func() { + vm := newVM() + kvvmi := newKVVMI() + withMigrationState(kvvmi, "migration-uid") + + otherKVVMI := newKVVMI() + otherKVVMI.Name = "other-vm" + withMigrationState(otherKVVMI, "other-migration-uid") + + fakeClient := setupEnvironment(kvvmi, vm, otherKVVMI, newKVConfig()) + limiter := livemigration.NewInboundMigrationLimiter(fakeClient) + acquired, err := limiter.TryAcquire(ctx, otherKVVMI, "node-a", livemigration.ParallelInboundMigrationsPerNodeDefault) + Expect(err).NotTo(HaveOccurred()) + Expect(acquired).To(BeTrue()) + + h := NewDynamicSettingsHandler(fakeClient) + res, err := h.Handle(ctx, kvvmi) + Expect(err).NotTo(HaveOccurred()) + + Expect(res.RequeueAfter).To(BeNumerically(">", 0)) + Expect(kvvmi.Status.MigrationState.MigrationConfiguration).Should(BeNil(), "Should not set migrationConfiguration") + Expect(kvvmi.Annotations).To(HaveKeyWithValue(livemigration.InboundMigrationSlotAnnotation, livemigration.InboundMigrationSlotWaiting)) + Expect(kvvmi.Annotations).To(HaveKeyWithValue(livemigration.InboundMigrationTargetNodeAnnotation, "node-a")) }) It("Should propagate DisableTLS from KubeVirt config", func() { vm := newVM() kvvmi := newKVVMI() - kvvmi.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{} + withMigrationState(kvvmi, "migration-uid") kvConfig := newKVConfig() kvConfig.Spec.Configuration.MigrationConfiguration = &virtv1.MigrationConfiguration{ @@ -154,7 +188,7 @@ var _ = Describe("TestDynamicSettingsHandler", func() { vm.Spec.LiveMigrationPolicy = policy kvvmi := newKVVMI() - kvvmi.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{} + withMigrationState(kvvmi, "migration-uid") vmop := newVMOPEvict(force) diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go index 9bb21ce859..af03fc10c2 100644 --- a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go @@ -68,6 +68,9 @@ const ( const ( reasonFailedAttachVolume = "FailedAttachVolume" reasonFailedMount = "FailedMount" + + reasonTargetNodeIncomingMigrationLimitExceeded = "TargetNodeIncomingMigrationLimitExceeded" + messageTargetNodeIncomingMigrationLimitExceeded = "Target node has no free inbound migration slots." ) type Base interface { @@ -578,6 +581,13 @@ func (h LifecycleHandler) getInProgressReasonAndMessage( case virtv1.MigrationPhaseUnset, virtv1.MigrationPending: reason = vmopcondition.ReasonMigrationPending message = messageMigrationPending + if _, found := conditions.GetKVVMIMCondition(virtv1.VirtualMachineInstanceMigrationConditionType(reasonTargetNodeIncomingMigrationLimitExceeded), mig.Status.Conditions); found { + message = messageTargetNodeIncomingMigrationLimitExceeded + } else if waiting, err := h.isWaitingForInboundMigrationSlot(ctx, mig); err != nil { + return reason, message, err + } else if waiting { + message = messageTargetNodeIncomingMigrationLimitExceeded + } case virtv1.MigrationScheduling: reason = vmopcondition.ReasonTargetScheduling message = messageTargetPodScheduling @@ -693,6 +703,20 @@ func humanizeMigrationFailedMessage(message string) string { return message } +func (h LifecycleHandler) isWaitingForInboundMigrationSlot(ctx context.Context, mig *virtv1.VirtualMachineInstanceMigration) (bool, error) { + if mig == nil || mig.Spec.VMIName == "" { + return false, nil + } + + var kvvmi virtv1.VirtualMachineInstance + err := h.client.Get(ctx, types.NamespacedName{Namespace: mig.Namespace, Name: mig.Spec.VMIName}, &kvvmi) + if err != nil { + return false, client.IgnoreNotFound(err) + } + + return livemigration.IsInboundMigrationSlotWaiting(&kvvmi), nil +} + func (h LifecycleHandler) getTargetPod(ctx context.Context, mig *virtv1.VirtualMachineInstanceMigration) (*corev1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ MatchLabels: map[string]string{ diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle_test.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle_test.go index e821efab49..45328e651d 100644 --- a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle_test.go +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle_test.go @@ -368,6 +368,26 @@ var _ = Describe("LifecycleHandler", func() { ), ) + It("should keep migration pending for inbound target node limit", func() { + mig := newSimpleMigration("vmop-test", name) + mig.Status.Phase = virtv1.MigrationPending + mig.Status.Conditions = []virtv1.VirtualMachineInstanceMigrationCondition{{ + Type: virtv1.VirtualMachineInstanceMigrationConditionType(reasonTargetNodeIncomingMigrationLimitExceeded), + Status: corev1.ConditionTrue, + Reason: reasonTargetNodeIncomingMigrationLimitExceeded, + Message: messageTargetNodeIncomingMigrationLimitExceeded, + }} + + fakeClient, err := testutil.NewFakeClientWithObjects(mig) + Expect(err).NotTo(HaveOccurred()) + + h := LifecycleHandler{client: fakeClient} + reason, msg, err := h.getInProgressReasonAndMessage(ctx, mig) + Expect(err).NotTo(HaveOccurred()) + Expect(reason).To(Equal(vmopcondition.ReasonMigrationPending)) + Expect(msg).To(Equal(messageTargetNodeIncomingMigrationLimitExceeded)) + }) + DescribeTable("should build in-progress reason and message", func( phase virtv1.VirtualMachineInstanceMigrationPhase, state *virtv1.VirtualMachineInstanceMigrationState, diff --git a/images/virtualization-artifact/pkg/livemigration/inbound_limiter.go b/images/virtualization-artifact/pkg/livemigration/inbound_limiter.go new file mode 100644 index 0000000000..3b3312c784 --- /dev/null +++ b/images/virtualization-artifact/pkg/livemigration/inbound_limiter.go @@ -0,0 +1,311 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package livemigration + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "strconv" + "strings" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + virtv1 "kubevirt.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + InboundMigrationSlotAnnotation = "virtualization.deckhouse.io/inbound-migration-slot" + InboundMigrationTargetNodeAnnotation = "virtualization.deckhouse.io/inbound-migration-target-node" + InboundMigrationSlotWaiting = "waiting" + + InboundMigrationLimiterComponentLabel = "virtualization.deckhouse.io/component" + InboundMigrationTargetNodeHashLabel = "virtualization.deckhouse.io/target-node-hash" + InboundMigrationSlotIndexLabel = "virtualization.deckhouse.io/slot-index" + + InboundMigrationLimiterComponent = "inbound-migration-limiter" + + InboundMigrationLeaseTargetNodeAnnotation = "virtualization.deckhouse.io/target-node" + InboundMigrationVMINamespaceAnnotation = "virtualization.deckhouse.io/vmi-namespace" + InboundMigrationVMINameAnnotation = "virtualization.deckhouse.io/vmi-name" + InboundMigrationMigrationUIDAnnotation = "virtualization.deckhouse.io/migration-uid" + InboundMigrationLeaseNamespace = "d8-virtualization" + ParallelInboundMigrationsPerNodeDefault = 1 + InboundMigrationLeaseDurationSeconds int32 = 300 +) + +type InboundMigrationLimiter struct { + client client.Client +} + +func NewInboundMigrationLimiter(client client.Client) *InboundMigrationLimiter { + return &InboundMigrationLimiter{client: client} +} + +func (l *InboundMigrationLimiter) TryAcquire(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode string, limit int) (bool, error) { + if limit < 1 { + limit = 1 + } + + slots := slotNames(targetNode, limit) + for _, slot := range slots { + lease, err := l.getLease(ctx, slot) + if apierrors.IsNotFound(err) { + continue + } + if err != nil { + return false, err + } + if leaseHeldByVMI(lease, kvvmi) { + return true, l.renewLease(ctx, lease) + } + } + + for slotIndex, slot := range slots { + acquired, err := l.tryAcquireSlot(ctx, kvvmi, targetNode, slot, slotIndex) + if apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) { + continue + } + if err != nil { + return false, err + } + if acquired { + return true, nil + } + } + + return false, nil +} + +func (l *InboundMigrationLimiter) Release(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode string, limit int) error { + if targetNode == "" { + return nil + } + if limit < 1 { + limit = 1 + } + + for _, slot := range slotNames(targetNode, limit) { + lease, err := l.getLease(ctx, slot) + if apierrors.IsNotFound(err) { + continue + } + if err != nil { + return err + } + if leaseHeldByVMI(lease, kvvmi) { + return client.IgnoreNotFound(l.client.Delete(ctx, lease)) + } + } + + var leases coordinationv1.LeaseList + err := l.client.List(ctx, &leases, + client.InNamespace(InboundMigrationLeaseNamespace), + client.MatchingLabels{ + InboundMigrationLimiterComponentLabel: InboundMigrationLimiterComponent, + InboundMigrationTargetNodeHashLabel: targetNodeHash(targetNode), + }, + ) + if err != nil { + return err + } + + for i := range leases.Items { + lease := &leases.Items[i] + if leaseHeldByVMI(lease, kvvmi) { + return client.IgnoreNotFound(l.client.Delete(ctx, lease)) + } + } + + return nil +} + +func (l *InboundMigrationLimiter) tryAcquireSlot(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode, slot string, slotIndex int) (bool, error) { + lease, err := l.getLease(ctx, slot) + if apierrors.IsNotFound(err) { + return true, l.client.Create(ctx, newInboundMigrationLease(kvvmi, targetNode, slot, slotIndex)) + } + if err != nil { + return false, err + } + if leaseHeldByVMI(lease, kvvmi) { + return true, l.renewLease(ctx, lease) + } + + active, err := l.leaseHolderIsActive(ctx, lease) + if err != nil { + return false, err + } + if active { + return false, nil + } + + updateLeaseHolder(lease, kvvmi, targetNode, slotIndex) + return true, l.client.Update(ctx, lease) +} + +func (l *InboundMigrationLimiter) getLease(ctx context.Context, name string) (*coordinationv1.Lease, error) { + lease := &coordinationv1.Lease{} + err := l.client.Get(ctx, types.NamespacedName{Namespace: InboundMigrationLeaseNamespace, Name: name}, lease) + return lease, err +} + +func (l *InboundMigrationLimiter) renewLease(ctx context.Context, lease *coordinationv1.Lease) error { + now := metav1.NewMicroTime(time.Now()) + lease.Spec.RenewTime = &now + return l.client.Update(ctx, lease) +} + +func (l *InboundMigrationLimiter) leaseHolderIsActive(ctx context.Context, lease *coordinationv1.Lease) (bool, error) { + vmiNamespace := lease.Annotations[InboundMigrationVMINamespaceAnnotation] + vmiName := lease.Annotations[InboundMigrationVMINameAnnotation] + migrationUID := lease.Annotations[InboundMigrationMigrationUIDAnnotation] + if vmiNamespace == "" || vmiName == "" || migrationUID == "" { + return false, nil + } + + var kvvmi virtv1.VirtualMachineInstance + err := l.client.Get(ctx, types.NamespacedName{Namespace: vmiNamespace, Name: vmiName}, &kvvmi) + if apierrors.IsNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + if kvvmi.Status.MigrationState == nil || kvvmi.Status.MigrationState.Completed || kvvmi.Status.MigrationState.Failed { + return false, nil + } + if string(kvvmi.Status.MigrationState.MigrationUID) != migrationUID { + return false, nil + } + + return true, nil +} + +func newInboundMigrationLease(kvvmi *virtv1.VirtualMachineInstance, targetNode, name string, slotIndex int) *coordinationv1.Lease { + now := metav1.NewMicroTime(time.Now()) + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: InboundMigrationLeaseNamespace, + Name: name, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Spec: coordinationv1.LeaseSpec{ + LeaseDurationSeconds: ptrInt32(InboundMigrationLeaseDurationSeconds), + AcquireTime: &now, + RenewTime: &now, + }, + } + updateLeaseHolder(lease, kvvmi, targetNode, slotIndex) + return lease +} + +func updateLeaseHolder(lease *coordinationv1.Lease, kvvmi *virtv1.VirtualMachineInstance, targetNode string, slotIndex int) { + if lease.Labels == nil { + lease.Labels = map[string]string{} + } + if lease.Annotations == nil { + lease.Annotations = map[string]string{} + } + + migrationUID := string(kvvmi.Status.MigrationState.MigrationUID) + lease.Labels[InboundMigrationLimiterComponentLabel] = InboundMigrationLimiterComponent + lease.Labels[InboundMigrationTargetNodeHashLabel] = targetNodeHash(targetNode) + lease.Labels[InboundMigrationSlotIndexLabel] = strconv.Itoa(slotIndex) + lease.Annotations[InboundMigrationLeaseTargetNodeAnnotation] = targetNode + lease.Annotations[InboundMigrationVMINamespaceAnnotation] = kvvmi.Namespace + lease.Annotations[InboundMigrationVMINameAnnotation] = kvvmi.Name + lease.Annotations[InboundMigrationMigrationUIDAnnotation] = migrationUID + lease.Spec.HolderIdentity = ptrString(fmt.Sprintf("%s/%s/%s", kvvmi.Namespace, kvvmi.Name, migrationUID)) + now := metav1.NewMicroTime(time.Now()) + lease.Spec.RenewTime = &now + if lease.Spec.AcquireTime == nil { + lease.Spec.AcquireTime = &now + } +} + +func leaseHeldByVMI(lease *coordinationv1.Lease, kvvmi *virtv1.VirtualMachineInstance) bool { + if lease.Annotations == nil || kvvmi.Status.MigrationState == nil { + return false + } + + return lease.Annotations[InboundMigrationVMINamespaceAnnotation] == kvvmi.Namespace && + lease.Annotations[InboundMigrationVMINameAnnotation] == kvvmi.Name && + lease.Annotations[InboundMigrationMigrationUIDAnnotation] == string(kvvmi.Status.MigrationState.MigrationUID) +} + +func slotNames(targetNode string, limit int) []string { + result := make([]string, 0, limit) + hash := targetNodeHash(targetNode) + for i := range limit { + result = append(result, fmt.Sprintf("inbound-migration-%s-%d", hash, i)) + } + return result +} + +func targetNodeHash(targetNode string) string { + sum := sha256.Sum256([]byte(targetNode)) + return hex.EncodeToString(sum[:])[:16] +} + +func ptrString(v string) *string { + return &v +} + +func ptrInt32(v int32) *int32 { + return &v +} + +func MarkInboundMigrationSlotWaiting(kvvmi *virtv1.VirtualMachineInstance, targetNode string) { + if kvvmi.Annotations == nil { + kvvmi.Annotations = map[string]string{} + } + kvvmi.Annotations[InboundMigrationSlotAnnotation] = InboundMigrationSlotWaiting + kvvmi.Annotations[InboundMigrationTargetNodeAnnotation] = targetNode +} + +func ClearInboundMigrationSlotWaiting(kvvmi *virtv1.VirtualMachineInstance) { + if kvvmi.Annotations == nil { + return + } + delete(kvvmi.Annotations, InboundMigrationSlotAnnotation) + delete(kvvmi.Annotations, InboundMigrationTargetNodeAnnotation) + if len(kvvmi.Annotations) == 0 { + kvvmi.Annotations = nil + } +} + +func IsInboundMigrationSlotWaiting(kvvmi *virtv1.VirtualMachineInstance) bool { + return kvvmi.Annotations[InboundMigrationSlotAnnotation] == InboundMigrationSlotWaiting +} + +func InboundMigrationWaitingTargetNode(kvvmi *virtv1.VirtualMachineInstance) string { + return kvvmi.Annotations[InboundMigrationTargetNodeAnnotation] +} + +func DumpInboundMigrationSlot(kvvmi *virtv1.VirtualMachineInstance) string { + if !IsInboundMigrationSlotWaiting(kvvmi) { + return "not waiting" + } + return strings.Join([]string{InboundMigrationSlotWaiting, InboundMigrationWaitingTargetNode(kvvmi)}, ":") +} diff --git a/images/virtualization-artifact/pkg/livemigration/inbound_limiter_test.go b/images/virtualization-artifact/pkg/livemigration/inbound_limiter_test.go new file mode 100644 index 0000000000..b68e287273 --- /dev/null +++ b/images/virtualization-artifact/pkg/livemigration/inbound_limiter_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package livemigration + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + coordinationv1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + virtv1 "kubevirt.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/deckhouse/virtualization-controller/pkg/common/testutil" +) + +var _ = Describe("InboundMigrationLimiter", func() { + const ( + namespace = "default" + targetNode = "node-a" + ) + + ctx := testutil.ContextBackgroundWithNoOpLogger() + + newKVVMI := func(name, migrationUID string) *virtv1.VirtualMachineInstance { + return &virtv1.VirtualMachineInstance{ + TypeMeta: metav1.TypeMeta{ + APIVersion: virtv1.SchemeGroupVersion.String(), + Kind: virtv1.VirtualMachineInstanceGroupVersionKind.Kind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Status: virtv1.VirtualMachineInstanceStatus{ + MigrationState: &virtv1.VirtualMachineInstanceMigrationState{ + TargetNode: targetNode, + MigrationUID: types.UID(migrationUID), + }, + }, + } + } + + It("Should acquire one slot only for the same target node", func() { + first := newKVVMI("first", "first-migration") + second := newKVVMI("second", "second-migration") + fakeClient, err := testutil.NewFakeClientWithObjects(first, second) + Expect(err).NotTo(HaveOccurred()) + + limiter := NewInboundMigrationLimiter(fakeClient) + acquired, err := limiter.TryAcquire(ctx, first, targetNode, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(acquired).To(BeTrue()) + + acquired, err = limiter.TryAcquire(ctx, second, targetNode, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(acquired).To(BeFalse()) + }) + + It("Should release acquired slot", func() { + first := newKVVMI("first", "first-migration") + second := newKVVMI("second", "second-migration") + fakeClient, err := testutil.NewFakeClientWithObjects(first, second) + Expect(err).NotTo(HaveOccurred()) + + limiter := NewInboundMigrationLimiter(fakeClient) + acquired, err := limiter.TryAcquire(ctx, first, targetNode, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(acquired).To(BeTrue()) + + Expect(limiter.Release(ctx, first, targetNode, 1)).To(Succeed()) + + acquired, err = limiter.TryAcquire(ctx, second, targetNode, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(acquired).To(BeTrue()) + }) + + It("Should steal stale slot", func() { + first := newKVVMI("first", "first-migration") + second := newKVVMI("second", "second-migration") + fakeClient, err := testutil.NewFakeClientWithObjects(first, second) + Expect(err).NotTo(HaveOccurred()) + + limiter := NewInboundMigrationLimiter(fakeClient) + acquired, err := limiter.TryAcquire(ctx, first, targetNode, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(acquired).To(BeTrue()) + + first.Status.MigrationState.Completed = true + Expect(fakeClient.Status().Update(ctx, first)).To(Succeed()) + + acquired, err = limiter.TryAcquire(ctx, second, targetNode, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(acquired).To(BeTrue()) + + var leases coordinationv1.LeaseList + Expect(fakeClient.List(ctx, &leases, client.InNamespace(InboundMigrationLeaseNamespace))).To(Succeed()) + Expect(leases.Items).To(HaveLen(1)) + Expect(leases.Items[0].Annotations).To(HaveKeyWithValue(InboundMigrationVMINameAnnotation, second.Name)) + }) +}) diff --git a/images/virtualization-artifact/pkg/livemigration/migration_configuration.go b/images/virtualization-artifact/pkg/livemigration/migration_configuration.go index 1b6618d0b9..7fce43e88f 100644 --- a/images/virtualization-artifact/pkg/livemigration/migration_configuration.go +++ b/images/virtualization-artifact/pkg/livemigration/migration_configuration.go @@ -103,20 +103,34 @@ func DumpKVVMIMigrationConfiguration(kvvmi *virtv1.VirtualMachineInstance) strin } func GenerateMigrationConfigurationPatch(current, changed *virtv1.VirtualMachineInstance) ([]byte, error) { - if current.Status.MigrationState == nil || changed.Status.MigrationState == nil { - return nil, nil + jsonPatch := patch.NewJSONPatch() + + if current.Status.MigrationState != nil && changed.Status.MigrationState != nil { + currentConf := current.Status.MigrationState.MigrationConfiguration + changedConf := changed.Status.MigrationState.MigrationConfiguration + if !equality.Semantic.DeepEqual(currentConf, changedConf) { + op := patch.PatchReplaceOp + if currentConf == nil { + op = patch.PatchAddOp + } + jsonPatch.Append(patch.NewJSONPatchOperation(op, "/status/migrationState/migrationConfiguration", changedConf)) + } } - currentConf := current.Status.MigrationState.MigrationConfiguration - changedConf := changed.Status.MigrationState.MigrationConfiguration - if equality.Semantic.DeepEqual(currentConf, changedConf) { - return nil, nil + if !equality.Semantic.DeepEqual(current.Annotations, changed.Annotations) { + switch { + case changed.Annotations == nil: + jsonPatch.Append(patch.NewJSONPatchOperation(patch.PatchRemoveOp, "/metadata/annotations", nil)) + case current.Annotations == nil: + jsonPatch.Append(patch.NewJSONPatchOperation(patch.PatchAddOp, "/metadata/annotations", changed.Annotations)) + default: + jsonPatch.Append(patch.NewJSONPatchOperation(patch.PatchReplaceOp, "/metadata/annotations", changed.Annotations)) + } } - op := patch.PatchReplaceOp - if currentConf == nil { - op = patch.PatchAddOp + if jsonPatch.Len() == 0 { + return nil, nil } - return patch.NewJSONPatch(patch.NewJSONPatchOperation(op, "/status/migrationState/migrationConfiguration", changedConf)).Bytes() + return jsonPatch.Bytes() }