Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ func (d *Deployment) Deploy(ctx context.Context, kubecfg string) (rerr error) {
log.Warningf("Failed to start pod watcher: %v", err)
} else {
w.SetProgress(d.Progress)
// Restrict watcher to known namespaces managed during deployment to avoid noise
// from unrelated user workloads in other namespaces.
w.AllowNamespaces(
"kube-system",
"metallb-system",
"meshnet",
"arista-ceoslab-operator-system",
"lemming-operator",
"srlinux-controller-system",
"ixiatg-op-system",
"cdnos-controller-system",
)
defer func() {
cancel()
rerr = w.Cleanup(rerr)
Expand Down
29 changes: 29 additions & 0 deletions pods/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Watcher struct {
progress bool
currentNamespace string
currentPod types.UID
allowedNS map[string]struct{}
}

// NewWatcher returns a Watcher on the provided client or an error. The cancel
Expand Down Expand Up @@ -78,6 +79,24 @@ func (w *Watcher) SetProgress(value bool) {
w.mu.Unlock()
}

// AllowNamespaces restricts the watcher to only consider pods in the provided namespaces.
// If no namespaces are provided, all namespaces are considered.
func (w *Watcher) AllowNamespaces(namespaces ...string) {
w.mu.Lock()
defer w.mu.Unlock()
if len(namespaces) == 0 {
w.allowedNS = nil
return
}
w.allowedNS = make(map[string]struct{}, len(namespaces))
for _, ns := range namespaces {
if ns == "" {
continue
}
w.allowedNS[ns] = struct{}{}
}
}

func (w *Watcher) stop() {
w.mu.Lock()
stop := w.wstop
Expand Down Expand Up @@ -128,6 +147,16 @@ func (w *Watcher) display(format string, v ...any) {
}

func (w *Watcher) updatePod(s *PodStatus) bool {
// If allowed namespaces are configured, ignore pods outside them.
w.mu.Lock()
allowed := w.allowedNS
w.mu.Unlock()
if allowed != nil {
if _, ok := allowed[s.Namespace]; !ok {
return true
}
}

newNamespace := s.Namespace != w.currentNamespace
var newState string

Expand Down
249 changes: 248 additions & 1 deletion topo/node/drivenets/drivenets.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import (
"os"
"path/filepath"
"strings"
"time"

"github.com/drivenets/cdnos-controller/api/v1/clientset"
"github.com/openconfig/kne/topo/node"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

cdnosv1 "github.com/drivenets/cdnos-controller/api/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
log "k8s.io/klog/v2"

Expand Down Expand Up @@ -255,9 +258,108 @@ func (n *Node) cdnosCreate(ctx context.Context) error {
if _, err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).Create(ctx, dut, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create cdnos: %v", err)
}
// Ensure the controller-created Service has required Azure LB annotations when on AKS.
// Creation will fail if annotations cannot be applied within the timeout.
if err := n.annotateCdnosService(ctx); err != nil {
return fmt.Errorf("failed to annotate service for %s: %v", n.Name(), err)
}
return nil
}

// annotateCdnosService waits for the controller-created Service named "service-<node>"
// and adds Azure LoadBalancer annotations required by the user.
func (n *Node) annotateCdnosService(ctx context.Context) error {
if !isAzureAKS(n.KubeClient) {
log.V(1).Infof("Azure AKS not detected; skipping service annotation for %q", n.Name())
return nil
}
log.Infof("Azure AKS detected; annotating controller-managed Services for %q", n.Name())
deadline := time.Now().Add(10 * time.Minute)
desired := map[string]string{
"service.beta.kubernetes.io/azure-load-balancer-internal": "true",
}
// Build no-probe rules from this node's services (outside ports).
for port := range n.Proto.Services {
key := fmt.Sprintf("service.beta.kubernetes.io/port_%d_no_probe_rule", port)
desired[key] = "true"
}
for {
if time.Now().After(deadline) {
return fmt.Errorf("timeout waiting to annotate services for %q", n.Name())
}
svcs, err := n.servicesForNode(ctx)
if err != nil || len(svcs) == 0 {
time.Sleep(1 * time.Second)
continue
}
allAnnotated := true
for i := range svcs {
s := &svcs[i]
changed := false
if s.Annotations == nil {
s.Annotations = map[string]string{}
changed = true
}
for k, v := range desired {
if s.Annotations[k] != v {
s.Annotations[k] = v
changed = true
}
}
if changed {
// Use a short-lived background context to avoid parent ctx cancellations.
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err := n.KubeClient.CoreV1().Services(n.Namespace).Update(updateCtx, s, metav1.UpdateOptions{})
cancel()
if err != nil {
// Retry once on conflict with a fresh GET
if apierrors.IsConflict(err) {
getCtx, cancelGet := context.WithTimeout(context.Background(), 5*time.Second)
fresh, gerr := n.KubeClient.CoreV1().Services(n.Namespace).Get(getCtx, s.Name, metav1.GetOptions{})
cancelGet()
if gerr == nil {
if fresh.Annotations == nil {
fresh.Annotations = map[string]string{}
}
for k, v := range desired {
fresh.Annotations[k] = v
}
updateCtx2, cancelUpd2 := context.WithTimeout(context.Background(), 5*time.Second)
_, uerr := n.KubeClient.CoreV1().Services(n.Namespace).Update(updateCtx2, fresh, metav1.UpdateOptions{})
cancelUpd2()
if uerr == nil {
log.Infof("Annotated Service %q with Azure LB annotations (after conflict retry)", s.Name)
continue
}
}
}
allAnnotated = false
continue
}
log.Infof("Annotated Service %q with Azure LB annotations", s.Name)
}
// Verify
getCtx, cancelGet := context.WithTimeout(context.Background(), 5*time.Second)
got, err := n.KubeClient.CoreV1().Services(n.Namespace).Get(getCtx, s.Name, metav1.GetOptions{})
cancelGet()
if err != nil {
allAnnotated = false
continue
}
for k, v := range desired {
if got.Annotations[k] != v {
allAnnotated = false
break
}
}
}
if allAnnotated {
return nil
}
time.Sleep(500 * time.Millisecond)
}
}

func (n *Node) Status(ctx context.Context) (node.Status, error) {
if !isModelCdnos(n.Impl.Proto.Model) {
return node.StatusUnknown, fmt.Errorf("invalid model specified")
Expand Down Expand Up @@ -298,7 +400,112 @@ func (n *Node) cdnosDelete(ctx context.Context) error {
if err != nil {
return err
}
return cs.CdnosV1alpha1().Cdnoss(n.Namespace).Delete(ctx, n.Name(), metav1.DeleteOptions{})
// 1) Start teardown by deleting all Cdnos CRs in the namespace
// (controller will clean up owned objects for each).
list, err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
if len(list.Items) == 0 {
log.V(1).Infof("No Cdnos CRs found in namespace %q", n.Namespace)
} else {
var crNames []string
for _, item := range list.Items {
crNames = append(crNames, item.Name)
}
log.Infof("Deleting Cdnos CRs in %q: %v", n.Namespace, crNames)
for _, item := range list.Items {
if err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).Delete(ctx, item.Name, metav1.DeleteOptions{}); err != nil {
return err
}
}
}

// 2) Monitor Services associated with this node until the controller removes them.
svcs, _ := n.servicesForNode(ctx)
if len(svcs) == 0 {
log.V(1).Infof("No Services found for node %q", n.Name())
} else {
var svcNames []string
for _, s := range svcs {
svcNames = append(svcNames, s.Name)
}
log.Infof("Monitoring Services for %q to be removed by controller: %v", n.Name(), svcNames)
}
// Wait for Services to be removed (longer on AKS due to LoadBalancer cleanup).
waitDeadline := time.Now().Add(2 * time.Minute)
if isAzureAKS(n.KubeClient) {
waitDeadline = time.Now().Add(10 * time.Minute)
log.Infof("AKS detected; waiting up to %v for all Services to be removed", time.Until(waitDeadline).Truncate(time.Second))
} else {
log.V(1).Infof("Azure AKS not detected; waiting up to %v for all Services to be removed", time.Until(waitDeadline).Truncate(time.Second))
}
start := time.Now()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
if time.Now().After(waitDeadline) {
log.Warningf("Timeout waiting for Services removal; continuing teardown")
break
}
svcs, _ = n.servicesForNode(ctx)
remaining := len(svcs)
if remaining == 0 {
log.Infof("All Services for %q removed after %v", n.Name(), time.Since(start).Truncate(time.Second))
break
}
select {
case <-ticker.C:
var names []string
for _, s := range svcs {
names = append(names, s.Name)
}
log.Infof("Waiting for Services removal for %q (%d remaining: %v, %v elapsed)", n.Name(), remaining, names, time.Since(start).Truncate(time.Second))
default:
}
time.Sleep(2 * time.Second)
}
return nil
}

// servicesForNode lists Services in the namespace that are associated with this node.
// It matches by:
// - name equals "service-<node>"
// - label "name" equals node name (per controller)
// - selector app == node name
// - ownerReference is Cdnos/<node>
func (n *Node) servicesForNode(ctx context.Context) ([]corev1.Service, error) {
// Use a short-lived background context for API calls to avoid parent ctx deadline cancellations.
_ = ctx // ctx is intentionally unused to avoid parent deadline cancellations
listCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
list, err := n.KubeClient.CoreV1().Services(n.Namespace).List(listCtx, metav1.ListOptions{})
if err != nil {
return nil, err
}
var out []corev1.Service
wantName := fmt.Sprintf("service-%s", n.Name())
for _, s := range list.Items {
if s.Name == wantName {
out = append(out, s)
continue
}
if s.Labels["name"] == n.Name() {
out = append(out, s)
continue
}
if s.Spec.Selector != nil && s.Spec.Selector["app"] == n.Name() {
out = append(out, s)
continue
}
for _, or := range s.OwnerReferences {
if or.Kind == "Cdnos" && or.Name == n.Name() {
out = append(out, s)
break
}
}
}
return out, nil
}

func (n *Node) ResetCfg(ctx context.Context) error {
Expand Down Expand Up @@ -380,6 +587,46 @@ func init() {
node.Vendor(tpb.Vendor_DRIVENETS, New)
}

// isAzureAKS attempts to detect whether the current cluster is Azure AKS.
// It returns true if any node has a providerID starting with "azure://"
// or has any label prefixed with "kubernetes.azure.com/".
func isAzureAKS(k kubernetes.Interface) bool {
// Allow manual override for environments where listing nodes is restricted.
if v := os.Getenv("KNE_FORCE_AKS"); v == "1" || strings.ToLower(v) == "true" {
log.V(1).Infof("AKS detection overridden via KNE_FORCE_AKS")
return true
}
if v := os.Getenv("KNE_FORCE_AZURE_ANNOTATIONS"); v == "1" || strings.ToLower(v) == "true" {
log.V(1).Infof("AKS detection overridden via KNE_FORCE_AZURE_ANNOTATIONS")
return true
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
nodes, err := k.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
log.V(1).Infof("AKS detection: failed to list nodes: %v", err)
return false
}
if len(nodes.Items) == 0 {
log.V(1).Infof("AKS detection: no nodes found in cluster")
return false
}
for _, n := range nodes.Items {
if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
log.V(1).Infof("AKS detection: node %q providerID %q indicates Azure", n.Name, n.Spec.ProviderID)
return true
}
for key := range n.Labels {
if strings.HasPrefix(key, "kubernetes.azure.com/") {
log.V(1).Infof("AKS detection: node %q has Azure label %q", n.Name, key)
return true
}
}
}
log.V(1).Infof("AKS detection: no Azure providerID or labels found on any node")
return false
}

func (n *Node) CreateConfig(ctx context.Context) (*corev1.Volume, error) {
pb := n.Proto
var data []byte
Expand Down
2 changes: 2 additions & 0 deletions topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error
log.Warningf("Failed to start pod watcher: %v", err)
} else {
w.SetProgress(m.progress)
// Only watch pods in this topology's namespace to avoid unrelated failures.
w.AllowNamespaces(m.topo.Name)
defer func() {
cancel()
rerr = w.Cleanup(rerr)
Expand Down
Loading