Skip to content

Commit 9f7fe00

Browse files
enxebrevikaschoudhary16
authored andcommitted
Add test for scaling machineSets
1 parent 29359bd commit 9f7fe00

2 files changed

Lines changed: 230 additions & 0 deletions

File tree

test/extended/machines/scale.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package operators
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"time"
7+
8+
"k8s.io/client-go/kubernetes"
9+
10+
corev1 "k8s.io/api/core/v1"
11+
"k8s.io/client-go/discovery"
12+
"k8s.io/client-go/restmapper"
13+
"k8s.io/client-go/scale"
14+
15+
g "github.com/onsi/ginkgo"
16+
o "github.com/onsi/gomega"
17+
"github.com/stretchr/objx"
18+
19+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/runtime/schema"
21+
"k8s.io/client-go/dynamic"
22+
e2e "k8s.io/kubernetes/test/e2e/framework"
23+
)
24+
25+
const (
26+
machineAPIGroup = "machine.openshift.io"
27+
machineSetOwningLabel = "machine.openshift.io/cluster-api-machineset"
28+
scalingTime = 7 * time.Minute
29+
)
30+
31+
// machineSetClient returns a client for machines scoped to the proper namespace
32+
func machineSetClient(dc dynamic.Interface) dynamic.ResourceInterface {
33+
machineSetClient := dc.Resource(schema.GroupVersionResource{Group: machineAPIGroup, Resource: "machinesets", Version: "v1beta1"})
34+
return machineSetClient.Namespace(machineAPINamespace)
35+
}
36+
37+
// listWorkerMachineSets list all worker machineSets
38+
func listWorkerMachineSets(dc dynamic.Interface) ([]objx.Map, error) {
39+
mc := machineSetClient(dc)
40+
obj, err := mc.List(metav1.ListOptions{})
41+
if err != nil {
42+
return nil, err
43+
}
44+
machineSets := []objx.Map{}
45+
for _, ms := range objects(objx.Map(obj.UnstructuredContent()).Get("items")) {
46+
e2e.Logf("Labels %v", ms.Get("spec.selector.matchLabels"))
47+
labels := (*ms.Get("spec.selector.matchLabels")).Data().(map[string]interface{})
48+
if val, ok := labels[machineLabelRole]; ok {
49+
if val == "worker" {
50+
machineSets = append(machineSets, ms)
51+
continue
52+
}
53+
}
54+
}
55+
return machineSets, nil
56+
}
57+
58+
func getMachineSetReplicaNumber(item objx.Map) int {
59+
replicas, _ := strconv.Atoi(item.Get("spec.replicas").String())
60+
return replicas
61+
}
62+
63+
// getNodesFromMachineSet returns an array of nodes backed by machines owned by a given machineSet
64+
func getNodesFromMachineSet(c *kubernetes.Clientset, dc dynamic.Interface, machineSetName string) ([]*corev1.Node, error) {
65+
machines, err := listMachines(dc, fmt.Sprintf("%s=%s", machineSetOwningLabel, machineSetName))
66+
if err != nil {
67+
return nil, fmt.Errorf("failed to list machines: %v", err)
68+
}
69+
70+
// fetch nodes
71+
allWorkerNodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{
72+
LabelSelector: nodeLabelSelectorWorker,
73+
})
74+
if err != nil {
75+
return nil, fmt.Errorf("failed to list worker nodes: %v", err)
76+
}
77+
78+
machineToNodes, match := mapMachineNameToNodeName(machines, allWorkerNodes.Items)
79+
if !match {
80+
return nil, fmt.Errorf("not all machines have a node reference: %v", machineToNodes)
81+
}
82+
var nodes []*corev1.Node
83+
for machineName := range machineToNodes {
84+
node, err := c.CoreV1().Nodes().Get(machineToNodes[machineName], metav1.GetOptions{})
85+
if err != nil {
86+
return nil, fmt.Errorf("failed to get worker nodes %q: %v", machineToNodes[machineName], err)
87+
}
88+
nodes = append(nodes, node)
89+
}
90+
91+
return nodes, nil
92+
}
93+
94+
func getScaleClient() (scale.ScalesGetter, error) {
95+
cfg, err := e2e.LoadConfig()
96+
if err != nil {
97+
return nil, fmt.Errorf("error getting config: %v", err)
98+
}
99+
100+
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
101+
if err != nil {
102+
return nil, fmt.Errorf("error discovering client: %v", err)
103+
}
104+
105+
groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
106+
if err != nil {
107+
return nil, fmt.Errorf("error getting API resources: %v", err)
108+
}
109+
restMapper := restmapper.NewDiscoveryRESTMapper(groupResources)
110+
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discoveryClient)
111+
112+
scaleClient, err := scale.NewForConfig(cfg, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
113+
if err != nil {
114+
return nil, fmt.Errorf("error creating scale client: %v", err)
115+
}
116+
return scaleClient, nil
117+
}
118+
119+
// scaleMachineSet scales a machineSet with a given name to the given number of replicas
120+
func scaleMachineSet(name string, replicas int) error {
121+
scaleClient, err := getScaleClient()
122+
if err != nil {
123+
return fmt.Errorf("error calling getScaleClient: %v", err)
124+
}
125+
126+
scale, err := scaleClient.Scales(machineAPINamespace).Get(schema.GroupResource{Group: machineAPIGroup, Resource: "MachineSet"}, name)
127+
if err != nil {
128+
return fmt.Errorf("error calling scaleClient.Scales get: %v", err)
129+
}
130+
131+
scaleUpdate := scale.DeepCopy()
132+
scaleUpdate.Spec.Replicas = int32(replicas)
133+
_, err = scaleClient.Scales(machineAPINamespace).Update(schema.GroupResource{Group: machineAPIGroup, Resource: "MachineSet"}, scaleUpdate)
134+
if err != nil {
135+
return fmt.Errorf("error calling scaleClient.Scales update while setting replicas to %d: %v", err, replicas)
136+
}
137+
return nil
138+
}
139+
140+
var _ = g.Describe("[Feature:Machines][Serial] Managed cluster should", func() {
141+
g.It("grow and decrease when scaling different machineSets simultaneously", func() {
142+
// expect new nodes to come up for machineSet
143+
verifyNodeScalingFunc := func(c *kubernetes.Clientset, dc dynamic.Interface, expectedScaleOut int, machineSet objx.Map) bool {
144+
nodes, err := getNodesFromMachineSet(c, dc, machineName(machineSet))
145+
if err != nil {
146+
e2e.Logf("Error getting nodes from machineSet: %v", err)
147+
return false
148+
}
149+
e2e.Logf("node count : %v, expectedCount %v", len(nodes), expectedScaleOut)
150+
for i := range nodes {
151+
e2e.Logf("node: %v", nodes[i].Name)
152+
if !isNodeReady(*nodes[i]) {
153+
e2e.Logf("Node %q is not ready", nodes[i].Name)
154+
return false
155+
}
156+
}
157+
return len(nodes) == expectedScaleOut
158+
}
159+
160+
cfg, err := e2e.LoadConfig()
161+
o.Expect(err).NotTo(o.HaveOccurred())
162+
c, err := e2e.LoadClientset()
163+
o.Expect(err).NotTo(o.HaveOccurred())
164+
dc, err := dynamic.NewForConfig(cfg)
165+
o.Expect(err).NotTo(o.HaveOccurred())
166+
167+
g.By("fetching worker machineSets")
168+
machineSets, err := listWorkerMachineSets(dc)
169+
o.Expect(err).NotTo(o.HaveOccurred())
170+
if len(machineSets) == 0 {
171+
e2e.Skipf("Expects at least one worker machineset. Found none!!!")
172+
}
173+
174+
g.By("checking initial cluster workers size")
175+
nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{
176+
LabelSelector: nodeLabelSelectorWorker,
177+
})
178+
o.Expect(err).NotTo(o.HaveOccurred())
179+
180+
initialNumberOfWorkers := len(nodeList.Items)
181+
182+
initialReplicasMachineSets := map[string]int{}
183+
184+
for _, machineSet := range machineSets {
185+
initialReplicasMachineSet := getMachineSetReplicaNumber(machineSet)
186+
expectedScaleOut := initialReplicasMachineSet + 1
187+
initialReplicasMachineSets[machineName(machineSet)] = initialReplicasMachineSet
188+
g.By(fmt.Sprintf("scaling %q from %d to %d replicas", machineName(machineSet), initialReplicasMachineSet, expectedScaleOut))
189+
err = scaleMachineSet(machineName(machineSet), expectedScaleOut)
190+
o.Expect(err).NotTo(o.HaveOccurred())
191+
}
192+
for _, machineSet := range machineSets {
193+
expectedScaleOut := initialReplicasMachineSets[machineName(machineSet)] + 1
194+
o.Eventually(func() bool {
195+
return verifyNodeScalingFunc(c, dc, expectedScaleOut, machineSet)
196+
}, scalingTime, 5*time.Second).Should(o.BeTrue())
197+
}
198+
199+
for _, machineSet := range machineSets {
200+
scaledReplicasMachineSet := initialReplicasMachineSets[machineName(machineSet)] + 1
201+
g.By(fmt.Sprintf("scaling %q from %d to %d replicas", machineName(machineSet), scaledReplicasMachineSet, initialReplicasMachineSets[machineName(machineSet)]))
202+
err = scaleMachineSet(machineName(machineSet), initialReplicasMachineSets[machineName(machineSet)])
203+
o.Expect(err).NotTo(o.HaveOccurred())
204+
}
205+
206+
g.By(fmt.Sprintf("waiting for cluster to get back to original size. Final size should be %d worker nodes", initialNumberOfWorkers))
207+
o.Eventually(func() bool {
208+
nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{
209+
LabelSelector: nodeLabelSelectorWorker,
210+
})
211+
o.Expect(err).NotTo(o.HaveOccurred())
212+
return len(nodeList.Items) == initialNumberOfWorkers
213+
}, 1*time.Minute, 5*time.Second).Should(o.BeTrue())
214+
})
215+
})

test/extended/machines/workers.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
const (
2323
machineAPINamespace = "openshift-machine-api"
2424
nodeLabelSelectorWorker = "node-role.kubernetes.io/worker"
25+
machineLabelRole = "machine.openshift.io/cluster-api-machine-role"
2526

2627
// time after purge of machine to wait for replacement and ready node
2728
// TODO: tighten this further based on node lifecycle controller [appears to be ~5m30s]
@@ -91,6 +92,20 @@ func mapNodeNameToMachine(nodes []corev1.Node, machines []objx.Map) (map[string]
9192
return result, len(nodes) == len(result)
9293
}
9394

95+
// mapMachineNameToNodeName returns a tuple (map node to machine by name, true if a match is found for every node)
96+
func mapMachineNameToNodeName(machines []objx.Map, nodes []corev1.Node) (map[string]string, bool) {
97+
result := map[string]string{}
98+
for i := range machines {
99+
for j := range nodes {
100+
if nodes[j].Name == nodeNameFromNodeRef(machines[i]) {
101+
result[machineName(machines[i])] = nodes[j].Name
102+
break
103+
}
104+
}
105+
}
106+
return result, len(machines) == len(result)
107+
}
108+
94109
func isNodeReady(node corev1.Node) bool {
95110
for _, c := range node.Status.Conditions {
96111
if c.Type == corev1.NodeReady {

0 commit comments

Comments
 (0)