diff --git a/test/extended/machines/scale.go b/test/extended/machines/scale.go new file mode 100644 index 000000000000..ab25fef63df3 --- /dev/null +++ b/test/extended/machines/scale.go @@ -0,0 +1,215 @@ +package operators + +import ( + "fmt" + "strconv" + "time" + + "k8s.io/client-go/kubernetes" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" + + g "github.com/onsi/ginkgo" + o "github.com/onsi/gomega" + "github.com/stretchr/objx" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + e2e "k8s.io/kubernetes/test/e2e/framework" +) + +const ( + machineAPIGroup = "machine.openshift.io" + machineSetOwningLabel = "machine.openshift.io/cluster-api-machineset" + scalingTime = 7 * time.Minute +) + +// machineSetClient returns a client for machines scoped to the proper namespace +func machineSetClient(dc dynamic.Interface) dynamic.ResourceInterface { + machineSetClient := dc.Resource(schema.GroupVersionResource{Group: machineAPIGroup, Resource: "machinesets", Version: "v1beta1"}) + return machineSetClient.Namespace(machineAPINamespace) +} + +// listWorkerMachineSets list all worker machineSets +func listWorkerMachineSets(dc dynamic.Interface) ([]objx.Map, error) { + mc := machineSetClient(dc) + obj, err := mc.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + machineSets := []objx.Map{} + for _, ms := range objects(objx.Map(obj.UnstructuredContent()).Get("items")) { + e2e.Logf("Labels %v", ms.Get("spec.selector.matchLabels")) + labels := (*ms.Get("spec.selector.matchLabels")).Data().(map[string]interface{}) + if val, ok := labels[machineLabelRole]; ok { + if val == "worker" { + machineSets = append(machineSets, ms) + continue + } + } + } + return machineSets, nil +} + +func getMachineSetReplicaNumber(item objx.Map) int { + replicas, _ := strconv.Atoi(item.Get("spec.replicas").String()) + return replicas +} + +// getNodesFromMachineSet returns an array of nodes backed by machines owned by a given machineSet +func getNodesFromMachineSet(c *kubernetes.Clientset, dc dynamic.Interface, machineSetName string) ([]*corev1.Node, error) { + machines, err := listMachines(dc, fmt.Sprintf("%s=%s", machineSetOwningLabel, machineSetName)) + if err != nil { + return nil, fmt.Errorf("failed to list machines: %v", err) + } + + // fetch nodes + allWorkerNodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{ + LabelSelector: nodeLabelSelectorWorker, + }) + if err != nil { + return nil, fmt.Errorf("failed to list worker nodes: %v", err) + } + + machineToNodes, match := mapMachineNameToNodeName(machines, allWorkerNodes.Items) + if !match { + return nil, fmt.Errorf("not all machines have a node reference: %v", machineToNodes) + } + var nodes []*corev1.Node + for machineName := range machineToNodes { + node, err := c.CoreV1().Nodes().Get(machineToNodes[machineName], metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get worker nodes %q: %v", machineToNodes[machineName], err) + } + nodes = append(nodes, node) + } + + return nodes, nil +} + +func getScaleClient() (scale.ScalesGetter, error) { + cfg, err := e2e.LoadConfig() + if err != nil { + return nil, fmt.Errorf("error getting config: %v", err) + } + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("error discovering client: %v", err) + } + + groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) + if err != nil { + return nil, fmt.Errorf("error getting API resources: %v", err) + } + restMapper := restmapper.NewDiscoveryRESTMapper(groupResources) + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discoveryClient) + + scaleClient, err := scale.NewForConfig(cfg, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + if err != nil { + return nil, fmt.Errorf("error creating scale client: %v", err) + } + return scaleClient, nil +} + +// scaleMachineSet scales a machineSet with a given name to the given number of replicas +func scaleMachineSet(name string, replicas int) error { + scaleClient, err := getScaleClient() + if err != nil { + return fmt.Errorf("error calling getScaleClient: %v", err) + } + + scale, err := scaleClient.Scales(machineAPINamespace).Get(schema.GroupResource{Group: machineAPIGroup, Resource: "MachineSet"}, name) + if err != nil { + return fmt.Errorf("error calling scaleClient.Scales get: %v", err) + } + + scaleUpdate := scale.DeepCopy() + scaleUpdate.Spec.Replicas = int32(replicas) + _, err = scaleClient.Scales(machineAPINamespace).Update(schema.GroupResource{Group: machineAPIGroup, Resource: "MachineSet"}, scaleUpdate) + if err != nil { + return fmt.Errorf("error calling scaleClient.Scales update while setting replicas to %d: %v", err, replicas) + } + return nil +} + +var _ = g.Describe("[Feature:Machines][Serial] Managed cluster should", func() { + g.It("grow and decrease when scaling different machineSets simultaneously", func() { + // expect new nodes to come up for machineSet + verifyNodeScalingFunc := func(c *kubernetes.Clientset, dc dynamic.Interface, expectedScaleOut int, machineSet objx.Map) bool { + nodes, err := getNodesFromMachineSet(c, dc, machineName(machineSet)) + if err != nil { + e2e.Logf("Error getting nodes from machineSet: %v", err) + return false + } + e2e.Logf("node count : %v, expectedCount %v", len(nodes), expectedScaleOut) + for i := range nodes { + e2e.Logf("node: %v", nodes[i].Name) + if !isNodeReady(*nodes[i]) { + e2e.Logf("Node %q is not ready", nodes[i].Name) + return false + } + } + return len(nodes) == expectedScaleOut + } + + cfg, err := e2e.LoadConfig() + o.Expect(err).NotTo(o.HaveOccurred()) + c, err := e2e.LoadClientset() + o.Expect(err).NotTo(o.HaveOccurred()) + dc, err := dynamic.NewForConfig(cfg) + o.Expect(err).NotTo(o.HaveOccurred()) + + g.By("fetching worker machineSets") + machineSets, err := listWorkerMachineSets(dc) + o.Expect(err).NotTo(o.HaveOccurred()) + if len(machineSets) == 0 { + e2e.Skipf("Expects at least one worker machineset. Found none!!!") + } + + g.By("checking initial cluster workers size") + nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{ + LabelSelector: nodeLabelSelectorWorker, + }) + o.Expect(err).NotTo(o.HaveOccurred()) + + initialNumberOfWorkers := len(nodeList.Items) + + initialReplicasMachineSets := map[string]int{} + + for _, machineSet := range machineSets { + initialReplicasMachineSet := getMachineSetReplicaNumber(machineSet) + expectedScaleOut := initialReplicasMachineSet + 1 + initialReplicasMachineSets[machineName(machineSet)] = initialReplicasMachineSet + g.By(fmt.Sprintf("scaling %q from %d to %d replicas", machineName(machineSet), initialReplicasMachineSet, expectedScaleOut)) + err = scaleMachineSet(machineName(machineSet), expectedScaleOut) + o.Expect(err).NotTo(o.HaveOccurred()) + } + for _, machineSet := range machineSets { + expectedScaleOut := initialReplicasMachineSets[machineName(machineSet)] + 1 + o.Eventually(func() bool { + return verifyNodeScalingFunc(c, dc, expectedScaleOut, machineSet) + }, scalingTime, 5*time.Second).Should(o.BeTrue()) + } + + for _, machineSet := range machineSets { + scaledReplicasMachineSet := initialReplicasMachineSets[machineName(machineSet)] + 1 + g.By(fmt.Sprintf("scaling %q from %d to %d replicas", machineName(machineSet), scaledReplicasMachineSet, initialReplicasMachineSets[machineName(machineSet)])) + err = scaleMachineSet(machineName(machineSet), initialReplicasMachineSets[machineName(machineSet)]) + o.Expect(err).NotTo(o.HaveOccurred()) + } + + g.By(fmt.Sprintf("waiting for cluster to get back to original size. Final size should be %d worker nodes", initialNumberOfWorkers)) + o.Eventually(func() bool { + nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{ + LabelSelector: nodeLabelSelectorWorker, + }) + o.Expect(err).NotTo(o.HaveOccurred()) + return len(nodeList.Items) == initialNumberOfWorkers + }, 1*time.Minute, 5*time.Second).Should(o.BeTrue()) + }) +}) diff --git a/test/extended/machines/workers.go b/test/extended/machines/workers.go index 7afa4eb68fc7..a235434269f3 100644 --- a/test/extended/machines/workers.go +++ b/test/extended/machines/workers.go @@ -22,6 +22,7 @@ import ( const ( machineAPINamespace = "openshift-machine-api" nodeLabelSelectorWorker = "node-role.kubernetes.io/worker" + machineLabelRole = "machine.openshift.io/cluster-api-machine-role" // time after purge of machine to wait for replacement and ready node // TODO: tighten this further based on node lifecycle controller [appears to be ~5m30s] @@ -91,6 +92,20 @@ func mapNodeNameToMachine(nodes []corev1.Node, machines []objx.Map) (map[string] return result, len(nodes) == len(result) } +// mapMachineNameToNodeName returns a tuple (map node to machine by name, true if a match is found for every node) +func mapMachineNameToNodeName(machines []objx.Map, nodes []corev1.Node) (map[string]string, bool) { + result := map[string]string{} + for i := range machines { + for j := range nodes { + if nodes[j].Name == nodeNameFromNodeRef(machines[i]) { + result[machineName(machines[i])] = nodes[j].Name + break + } + } + } + return result, len(machines) == len(result) +} + func isNodeReady(node corev1.Node) bool { for _, c := range node.Status.Conditions { if c.Type == corev1.NodeReady {