Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
540 changes: 540 additions & 0 deletions docs/internal/adr_inbound_migration_limit.ru.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package internal

import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
virtv1 "kubevirt.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -29,28 +31,76 @@ import (
"github.com/deckhouse/virtualization/api/core/v1alpha2"
)

const dynamicSettingsHandlerName = "DynamicSettingsHandler"
const (
dynamicSettingsHandlerName = "DynamicSettingsHandler"
inboundSlotRequeueDelay = 5 * time.Second
)

type InboundMigrationLimiter interface {
TryAcquire(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode string, limit int) (bool, error)
Release(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance, targetNode string, limit int) error
}

func NewDynamicSettingsHandler(client client.Client) *DynamicSettingsHandler {
return NewDynamicSettingsHandlerWithLimiter(client, livemigration.NewInboundMigrationLimiter(client))
}

func NewDynamicSettingsHandlerWithLimiter(client client.Client, limiter InboundMigrationLimiter) *DynamicSettingsHandler {
return &DynamicSettingsHandler{
client: client,
client: client,
limiter: limiter,
}
}

type DynamicSettingsHandler struct {
client client.Client
client client.Client
limiter InboundMigrationLimiter
}

func (h *DynamicSettingsHandler) Handle(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance) (reconcile.Result, error) {
log := logger.FromContext(ctx).With(logger.SlogHandler(dynamicSettingsHandlerName))

if kvvmi.Status.MigrationState != nil && (kvvmi.Status.MigrationState.Completed || kvvmi.Status.MigrationState.Failed) {
targetNode, err := h.resolveTargetNode(ctx, kvvmi)
if err != nil {
return reconcile.Result{}, err
}
if err := h.limiter.Release(ctx, kvvmi, targetNode, livemigration.ParallelInboundMigrationsPerNodeDefault); err != nil {
return reconcile.Result{}, err
}
livemigration.ClearInboundMigrationSlotWaiting(kvvmi)
return reconcile.Result{}, nil
}

if !h.shouldUpdateMigrationConfiguration(kvvmi) {
return reconcile.Result{}, nil
}

targetNode, err := h.resolveTargetNode(ctx, kvvmi)
if err != nil {
return reconcile.Result{}, err
}
if targetNode == "" {
log.Debug("Target node is not resolved yet, waiting before setting migrationConfiguration")
return reconcile.Result{RequeueAfter: inboundSlotRequeueDelay}, nil
}

acquired, err := h.limiter.TryAcquire(ctx, kvvmi, targetNode, livemigration.ParallelInboundMigrationsPerNodeDefault)
if err != nil {
return reconcile.Result{}, err
}
if !acquired {
livemigration.MarkInboundMigrationSlotWaiting(kvvmi, targetNode)
log.Debug("Inbound migration slot is not acquired, waiting before setting migrationConfiguration",
"targetNode", targetNode,
)
return reconcile.Result{RequeueAfter: inboundSlotRequeueDelay}, nil
}
livemigration.ClearInboundMigrationSlotWaiting(kvvmi)

var vm v1alpha2.VirtualMachine
vmKey := client.ObjectKeyFromObject(kvvmi)
err := h.client.Get(ctx, vmKey, &vm)
err = h.client.Get(ctx, vmKey, &vm)
if err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -99,10 +149,32 @@ func (h *DynamicSettingsHandler) Name() string {
// shouldUpdateMigrationConfiguration indicates if live migration controller should inject
// migration configuration into KVVMI status:
// 1. status.migrationState is created by the virt-controller.
// 2. migration is not in a Completed state.
// 2. migration is not in a terminal state and has no migration configuration yet.
func (h *DynamicSettingsHandler) shouldUpdateMigrationConfiguration(kvvmi *virtv1.VirtualMachineInstance) bool {
return kvvmi.Status.MigrationState != nil &&
!kvvmi.Status.MigrationState.Completed
!kvvmi.Status.MigrationState.Completed &&
!kvvmi.Status.MigrationState.Failed &&
kvvmi.Status.MigrationState.MigrationConfiguration == nil
}

func (h *DynamicSettingsHandler) resolveTargetNode(ctx context.Context, kvvmi *virtv1.VirtualMachineInstance) (string, error) {
if kvvmi.Status.MigrationState == nil {
return "", nil
}
if kvvmi.Status.MigrationState.TargetNode != "" {
return kvvmi.Status.MigrationState.TargetNode, nil
}
if kvvmi.Status.MigrationState.TargetPod == "" {
return "", nil
}

var pod corev1.Pod
err := h.client.Get(ctx, types.NamespacedName{Namespace: kvvmi.Namespace, Name: kvvmi.Status.MigrationState.TargetPod}, &pod)
if err != nil {
return "", client.IgnoreNotFound(err)
}

return pod.Spec.NodeName, nil
}

// getVMOPInProgressForVM check if there is at least one VMOP for the same VM in progress phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
virtv1 "kubevirt.io/api/core/v1"

vmbuilder "github.com/deckhouse/virtualization-controller/pkg/builder/vm"
"github.com/deckhouse/virtualization-controller/pkg/common/testutil"
"github.com/deckhouse/virtualization-controller/pkg/livemigration"
"github.com/deckhouse/virtualization/api/core/v1alpha2"
)

Expand Down Expand Up @@ -57,6 +59,13 @@ var _ = Describe("TestDynamicSettingsHandler", func() {
return vmi
}

withMigrationState := func(kvvmi *virtv1.VirtualMachineInstance, migrationUID string) {
kvvmi.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{
TargetNode: "node-a",
MigrationUID: types.UID(migrationUID),
}
}

newVMOPEvict := func(force *bool) *v1alpha2.VirtualMachineOperation {
vmop := &v1alpha2.VirtualMachineOperation{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -98,21 +107,46 @@ var _ = Describe("TestDynamicSettingsHandler", func() {
It("Should set migrationConfiguration", func() {
vm := newVM()
kvvmi := newKVVMI()

kvvmi.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{}
withMigrationState(kvvmi, "migration-uid")

fakeClient := setupEnvironment(kvvmi, vm, newKVConfig())
h := NewDynamicSettingsHandler(fakeClient)
_, err := h.Handle(ctx, kvvmi)
Expect(err).NotTo(HaveOccurred())

Expect(kvvmi.Status.MigrationState.MigrationConfiguration).ShouldNot(BeNil(), "Should set migrationConfiguration")
Expect(kvvmi.Annotations).NotTo(HaveKey(livemigration.InboundMigrationSlotAnnotation))
})

It("Should wait without migrationConfiguration when inbound slot is busy", func() {
vm := newVM()
kvvmi := newKVVMI()
withMigrationState(kvvmi, "migration-uid")

otherKVVMI := newKVVMI()
otherKVVMI.Name = "other-vm"
withMigrationState(otherKVVMI, "other-migration-uid")

fakeClient := setupEnvironment(kvvmi, vm, otherKVVMI, newKVConfig())
limiter := livemigration.NewInboundMigrationLimiter(fakeClient)
acquired, err := limiter.TryAcquire(ctx, otherKVVMI, "node-a", livemigration.ParallelInboundMigrationsPerNodeDefault)
Expect(err).NotTo(HaveOccurred())
Expect(acquired).To(BeTrue())

h := NewDynamicSettingsHandler(fakeClient)
res, err := h.Handle(ctx, kvvmi)
Expect(err).NotTo(HaveOccurred())

Expect(res.RequeueAfter).To(BeNumerically(">", 0))
Expect(kvvmi.Status.MigrationState.MigrationConfiguration).Should(BeNil(), "Should not set migrationConfiguration")
Expect(kvvmi.Annotations).To(HaveKeyWithValue(livemigration.InboundMigrationSlotAnnotation, livemigration.InboundMigrationSlotWaiting))
Expect(kvvmi.Annotations).To(HaveKeyWithValue(livemigration.InboundMigrationTargetNodeAnnotation, "node-a"))
})

It("Should propagate DisableTLS from KubeVirt config", func() {
vm := newVM()
kvvmi := newKVVMI()
kvvmi.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{}
withMigrationState(kvvmi, "migration-uid")

kvConfig := newKVConfig()
kvConfig.Spec.Configuration.MigrationConfiguration = &virtv1.MigrationConfiguration{
Expand Down Expand Up @@ -154,7 +188,7 @@ var _ = Describe("TestDynamicSettingsHandler", func() {
vm.Spec.LiveMigrationPolicy = policy

kvvmi := newKVVMI()
kvvmi.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{}
withMigrationState(kvvmi, "migration-uid")

vmop := newVMOPEvict(force)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ const (
const (
reasonFailedAttachVolume = "FailedAttachVolume"
reasonFailedMount = "FailedMount"

reasonTargetNodeIncomingMigrationLimitExceeded = "TargetNodeIncomingMigrationLimitExceeded"
messageTargetNodeIncomingMigrationLimitExceeded = "Target node has no free inbound migration slots."
)

type Base interface {
Expand Down Expand Up @@ -578,6 +581,13 @@ func (h LifecycleHandler) getInProgressReasonAndMessage(
case virtv1.MigrationPhaseUnset, virtv1.MigrationPending:
reason = vmopcondition.ReasonMigrationPending
message = messageMigrationPending
if _, found := conditions.GetKVVMIMCondition(virtv1.VirtualMachineInstanceMigrationConditionType(reasonTargetNodeIncomingMigrationLimitExceeded), mig.Status.Conditions); found {
message = messageTargetNodeIncomingMigrationLimitExceeded
} else if waiting, err := h.isWaitingForInboundMigrationSlot(ctx, mig); err != nil {
return reason, message, err
} else if waiting {
message = messageTargetNodeIncomingMigrationLimitExceeded
}
case virtv1.MigrationScheduling:
reason = vmopcondition.ReasonTargetScheduling
message = messageTargetPodScheduling
Expand Down Expand Up @@ -693,6 +703,20 @@ func humanizeMigrationFailedMessage(message string) string {
return message
}

func (h LifecycleHandler) isWaitingForInboundMigrationSlot(ctx context.Context, mig *virtv1.VirtualMachineInstanceMigration) (bool, error) {
if mig == nil || mig.Spec.VMIName == "" {
return false, nil
}

var kvvmi virtv1.VirtualMachineInstance
err := h.client.Get(ctx, types.NamespacedName{Namespace: mig.Namespace, Name: mig.Spec.VMIName}, &kvvmi)
if err != nil {
return false, client.IgnoreNotFound(err)
}

return livemigration.IsInboundMigrationSlotWaiting(&kvvmi), nil
}

func (h LifecycleHandler) getTargetPod(ctx context.Context, mig *virtv1.VirtualMachineInstanceMigration) (*corev1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,26 @@ var _ = Describe("LifecycleHandler", func() {
),
)

It("should keep migration pending for inbound target node limit", func() {
mig := newSimpleMigration("vmop-test", name)
mig.Status.Phase = virtv1.MigrationPending
mig.Status.Conditions = []virtv1.VirtualMachineInstanceMigrationCondition{{
Type: virtv1.VirtualMachineInstanceMigrationConditionType(reasonTargetNodeIncomingMigrationLimitExceeded),
Status: corev1.ConditionTrue,
Reason: reasonTargetNodeIncomingMigrationLimitExceeded,
Message: messageTargetNodeIncomingMigrationLimitExceeded,
}}

fakeClient, err := testutil.NewFakeClientWithObjects(mig)
Expect(err).NotTo(HaveOccurred())

h := LifecycleHandler{client: fakeClient}
reason, msg, err := h.getInProgressReasonAndMessage(ctx, mig)
Expect(err).NotTo(HaveOccurred())
Expect(reason).To(Equal(vmopcondition.ReasonMigrationPending))
Expect(msg).To(Equal(messageTargetNodeIncomingMigrationLimitExceeded))
})

DescribeTable("should build in-progress reason and message", func(
phase virtv1.VirtualMachineInstanceMigrationPhase,
state *virtv1.VirtualMachineInstanceMigrationState,
Expand Down
Loading
Loading