Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions test/extended/machines/scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package operators

import (
"fmt"
"strconv"
"time"

"k8s.io/client-go/kubernetes"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"

g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"
"github.com/stretchr/objx"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
e2e "k8s.io/kubernetes/test/e2e/framework"
)

const (
machineAPIGroup = "machine.openshift.io"
machineSetOwningLabel = "machine.openshift.io/cluster-api-machineset"
scalingTime = 7 * time.Minute
)

// machineSetClient returns a client for machines scoped to the proper namespace
func machineSetClient(dc dynamic.Interface) dynamic.ResourceInterface {
machineSetClient := dc.Resource(schema.GroupVersionResource{Group: machineAPIGroup, Resource: "machinesets", Version: "v1beta1"})
return machineSetClient.Namespace(machineAPINamespace)
}

// listWorkerMachineSets list all worker machineSets
func listWorkerMachineSets(dc dynamic.Interface) ([]objx.Map, error) {
mc := machineSetClient(dc)
obj, err := mc.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
machineSets := []objx.Map{}
for _, ms := range objects(objx.Map(obj.UnstructuredContent()).Get("items")) {
e2e.Logf("Labels %v", ms.Get("spec.selector.matchLabels"))
labels := (*ms.Get("spec.selector.matchLabels")).Data().(map[string]interface{})
if val, ok := labels[machineLabelRole]; ok {
if val == "worker" {
machineSets = append(machineSets, ms)
continue
}
}
}
return machineSets, nil
}

func getMachineSetReplicaNumber(item objx.Map) int {
replicas, _ := strconv.Atoi(item.Get("spec.replicas").String())
return replicas
}

// getNodesFromMachineSet returns an array of nodes backed by machines owned by a given machineSet
func getNodesFromMachineSet(c *kubernetes.Clientset, dc dynamic.Interface, machineSetName string) ([]*corev1.Node, error) {
machines, err := listMachines(dc, fmt.Sprintf("%s=%s", machineSetOwningLabel, machineSetName))
if err != nil {
return nil, fmt.Errorf("failed to list machines: %v", err)
}

// fetch nodes
allWorkerNodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{
LabelSelector: nodeLabelSelectorWorker,
Copy link
Copy Markdown
Contributor

@smarterclayton smarterclayton Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not required that a cluster have worker selector nodes. Will this e2e test basically only run if you have worker nodes? Or will it fail if you don't?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today we have machinesets only for the worker nodes in a newly created cluster by default. This is the assumption here. Though omitting worker label selector in the listing, will also be fine. Yes, test wil fail if there are no worker nodes in the cluster.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should skip if there is no worker machine set with a clear message, rather than fail.

})
if err != nil {
return nil, fmt.Errorf("failed to list worker nodes: %v", err)
}

machineToNodes, match := mapMachineNameToNodeName(machines, allWorkerNodes.Items)
if !match {
return nil, fmt.Errorf("not all machines have a node reference: %v", machineToNodes)
}
var nodes []*corev1.Node
for machineName := range machineToNodes {
node, err := c.CoreV1().Nodes().Get(machineToNodes[machineName], metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get worker nodes %q: %v", machineToNodes[machineName], err)
}
nodes = append(nodes, node)
}

return nodes, nil
}

func getScaleClient() (scale.ScalesGetter, error) {
cfg, err := e2e.LoadConfig()
if err != nil {
return nil, fmt.Errorf("error getting config: %v", err)
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("error discovering client: %v", err)
}

groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
if err != nil {
return nil, fmt.Errorf("error getting API resources: %v", err)
}
restMapper := restmapper.NewDiscoveryRESTMapper(groupResources)
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discoveryClient)

scaleClient, err := scale.NewForConfig(cfg, restMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, fmt.Errorf("error creating scale client: %v", err)
}
return scaleClient, nil
}

// scaleMachineSet scales a machineSet with a given name to the given number of replicas
func scaleMachineSet(name string, replicas int) error {
Comment thread
frobware marked this conversation as resolved.
scaleClient, err := getScaleClient()
if err != nil {
return fmt.Errorf("error calling getScaleClient: %v", err)
}

scale, err := scaleClient.Scales(machineAPINamespace).Get(schema.GroupResource{Group: machineAPIGroup, Resource: "MachineSet"}, name)
if err != nil {
return fmt.Errorf("error calling scaleClient.Scales get: %v", err)
}

scaleUpdate := scale.DeepCopy()
scaleUpdate.Spec.Replicas = int32(replicas)
_, err = scaleClient.Scales(machineAPINamespace).Update(schema.GroupResource{Group: machineAPIGroup, Resource: "MachineSet"}, scaleUpdate)
if err != nil {
return fmt.Errorf("error calling scaleClient.Scales update while setting replicas to %d: %v", err, replicas)
}
return nil
}

var _ = g.Describe("[Feature:Machines][Serial] Managed cluster should", func() {
g.It("grow and decrease when scaling different machineSets simultaneously", func() {
// expect new nodes to come up for machineSet
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need a skip for platforms which don’t support scaling

verifyNodeScalingFunc := func(c *kubernetes.Clientset, dc dynamic.Interface, expectedScaleOut int, machineSet objx.Map) bool {
nodes, err := getNodesFromMachineSet(c, dc, machineName(machineSet))
if err != nil {
e2e.Logf("Error getting nodes from machineSet: %v", err)
return false
}
e2e.Logf("node count : %v, expectedCount %v", len(nodes), expectedScaleOut)
for i := range nodes {
e2e.Logf("node: %v", nodes[i].Name)
if !isNodeReady(*nodes[i]) {
e2e.Logf("Node %q is not ready", nodes[i].Name)
return false
}
}
return len(nodes) == expectedScaleOut
}

cfg, err := e2e.LoadConfig()
o.Expect(err).NotTo(o.HaveOccurred())
c, err := e2e.LoadClientset()
o.Expect(err).NotTo(o.HaveOccurred())
dc, err := dynamic.NewForConfig(cfg)
o.Expect(err).NotTo(o.HaveOccurred())

g.By("fetching worker machineSets")
machineSets, err := listWorkerMachineSets(dc)
o.Expect(err).NotTo(o.HaveOccurred())
if len(machineSets) == 0 {
e2e.Skipf("Expects at least one worker machineset. Found none!!!")
}

g.By("checking initial cluster workers size")
nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{
LabelSelector: nodeLabelSelectorWorker,
})
o.Expect(err).NotTo(o.HaveOccurred())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is empty, I would expect this test to be skipped. Also, why aren't you passing nodeLabelSelector to getNodesFromMachineSet?

Copy link
Copy Markdown
Contributor Author

@vikaschoudhary16 vikaschoudhary16 Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ok if there is no worker node in a newly created cluster? Is there a job which creates such a non-worker cluster?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be. We will be adding jobs that create 3 master clusters that run the e2e tests. In that scenario this test should be skipped (probably), or when we add that job we can change the logic here.


initialNumberOfWorkers := len(nodeList.Items)

initialReplicasMachineSets := map[string]int{}

for _, machineSet := range machineSets {
initialReplicasMachineSet := getMachineSetReplicaNumber(machineSet)
expectedScaleOut := initialReplicasMachineSet + 1
initialReplicasMachineSets[machineName(machineSet)] = initialReplicasMachineSet
g.By(fmt.Sprintf("scaling %q from %d to %d replicas", machineName(machineSet), initialReplicasMachineSet, expectedScaleOut))
err = scaleMachineSet(machineName(machineSet), expectedScaleOut)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not do for? so we scale both sets out immediately. Otherwise the second one only scales up when the first one finished all its validations

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@enxebre done!

o.Expect(err).NotTo(o.HaveOccurred())
}
for _, machineSet := range machineSets {
expectedScaleOut := initialReplicasMachineSets[machineName(machineSet)] + 1
o.Eventually(func() bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running time is now linear again?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this is just verification which is fine. We want scale up in parallel (which we have).

return verifyNodeScalingFunc(c, dc, expectedScaleOut, machineSet)
}, scalingTime, 5*time.Second).Should(o.BeTrue())
}

for _, machineSet := range machineSets {
scaledReplicasMachineSet := initialReplicasMachineSets[machineName(machineSet)] + 1
g.By(fmt.Sprintf("scaling %q from %d to %d replicas", machineName(machineSet), scaledReplicasMachineSet, initialReplicasMachineSets[machineName(machineSet)]))
err = scaleMachineSet(machineName(machineSet), initialReplicasMachineSets[machineName(machineSet)])
o.Expect(err).NotTo(o.HaveOccurred())
}

g.By(fmt.Sprintf("waiting for cluster to get back to original size. Final size should be %d worker nodes", initialNumberOfWorkers))
o.Eventually(func() bool {
nodeList, err := c.CoreV1().Nodes().List(metav1.ListOptions{
LabelSelector: nodeLabelSelectorWorker,
})
o.Expect(err).NotTo(o.HaveOccurred())
return len(nodeList.Items) == initialNumberOfWorkers
}, 1*time.Minute, 5*time.Second).Should(o.BeTrue())
})
})
15 changes: 15 additions & 0 deletions test/extended/machines/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
const (
machineAPINamespace = "openshift-machine-api"
nodeLabelSelectorWorker = "node-role.kubernetes.io/worker"
machineLabelRole = "machine.openshift.io/cluster-api-machine-role"

// time after purge of machine to wait for replacement and ready node
// TODO: tighten this further based on node lifecycle controller [appears to be ~5m30s]
Expand Down Expand Up @@ -91,6 +92,20 @@ func mapNodeNameToMachine(nodes []corev1.Node, machines []objx.Map) (map[string]
return result, len(nodes) == len(result)
}

// mapMachineNameToNodeName returns a tuple (map node to machine by name, true if a match is found for every node)
func mapMachineNameToNodeName(machines []objx.Map, nodes []corev1.Node) (map[string]string, bool) {
result := map[string]string{}
for i := range machines {
for j := range nodes {
if nodes[j].Name == nodeNameFromNodeRef(machines[i]) {
result[machineName(machines[i])] = nodes[j].Name
break
}
}
}
return result, len(machines) == len(result)
}

func isNodeReady(node corev1.Node) bool {
for _, c := range node.Status.Conditions {
if c.Type == corev1.NodeReady {
Expand Down