Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions test/extended/dr/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,6 @@ func masterNodes(oc *exutil.CLI) []*corev1.Node {
return nodes
}

func constructEtcdConnectionString(masters []string) string {
//TODO vrutkovs: replace this nonsense with `etcdctl member list -w json ...`
etcdConnectionString := ""
for _, master := range masters {
result, err := e2essh.SSH("cat /run/etcd/environment", master+":22", e2e.TestContext.Provider)
o.Expect(err).NotTo(o.HaveOccurred())
var entry string
for _, entry = range strings.Split(result.Stdout, "\n") {
if strings.HasPrefix(entry, "ETCD_DNS_NAME=") {
break
}
}
entries := strings.Split(entry, "=")
o.Expect(entries).To(o.HaveLen(2))
etcdDNSName := entries[1]
o.Expect(etcdDNSName).NotTo(o.BeEmpty())
etcdConnectionString = fmt.Sprintf("%setcd-member-%s=https://%s:2380,", etcdConnectionString, master, etcdDNSName)
}
return etcdConnectionString[:len(etcdConnectionString)-1]
}

func waitForMastersToUpdate(oc *exutil.CLI, mcps dynamic.NamespaceableResourceInterface) {
e2elog.Logf("Waiting for MachineConfig master to finish rolling out")
err := wait.Poll(30*time.Second, 30*time.Minute, func() (done bool, err error) {
Expand Down
226 changes: 40 additions & 186 deletions test/extended/dr/quorum_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,24 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"strings"
"time"

"k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/route53"
g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/test/e2e/framework"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
Expand All @@ -36,10 +35,11 @@ import (

const (
machineAnnotationName = "machine.openshift.io/machine"
localEtcdSignerYaml = "/tmp/kube-etcd-cert-signer.yaml"
)

var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
defer g.GinkgoRecover()

f := framework.NewDefaultFramework("disaster-recovery")
f.SkipNamespaceCreation = true
f.SkipPrivilegedPSPBinding = true
Expand Down Expand Up @@ -103,8 +103,6 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
survivingMachineName := getMachineNameByNodeName(oc, survivingNodeName)
survivingMachine, err := ms.Get(context.Background(), survivingMachineName, metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
// Set etcd connection string before destroying masters, as ssh bastion may become unavailable
etcdConnectionString := constructEtcdConnectionString([]string{survivingNodeName})

framework.Logf("Destroy %d masters", len(masters)-1)
var masterMachines []string
Expand Down Expand Up @@ -147,9 +145,9 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
}

framework.Logf("Perform etcd backup on remaining machine %s (machine %s)", survivingNodeName, survivingMachineName)
expectSSH("sudo -i /bin/bash -c 'rm -f /root/assets/backup/snapshot*'; sudo -i /bin/bash -x /usr/local/bin/etcd-snapshot-backup.sh /root/assets/backup", survivingNode)
framework.Logf("Restore etcd on remaining node %s (machine %s)", survivingNodeName, survivingMachineName)
expectSSH(fmt.Sprintf("sudo -i /bin/bash -c '/bin/bash -x /usr/local/bin/etcd-snapshot-restore.sh /root/assets/backup/snapshot* %s'", etcdConnectionString), survivingNode)
expectSSH("sudo -i /bin/bash -cx 'rm -rf /home/core/backup; /usr/local/bin/cluster-backup.sh ~core/backup'", survivingNode)
framework.Logf("Restore etcd and control-plane on remaining node %s (machine %s)", survivingNodeName, survivingMachineName)
expectSSH("sudo -i /bin/bash -cx '/usr/local/bin/cluster-restore.sh /home/core/backup'", survivingNode)

framework.Logf("Wait for API server to come up")
time.Sleep(30 * time.Second)
Expand Down Expand Up @@ -202,24 +200,27 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
mastersList, err := ms.List(context.Background(), metav1.ListOptions{
LabelSelector: "machine.openshift.io/cluster-api-machine-role=master",
})
if err != nil || mastersList.Items == nil {
framework.Logf("return false - err %v mastersList.Items %v", err, mastersList.Items)
if err != nil {
return false, err
}
if mastersList.Items == nil {
return false, nil
}
return len(mastersList.Items) == expectedNumberOfMasters, nil
})
o.Expect(err).NotTo(o.HaveOccurred())

framework.Logf("Wait for masters to join as nodes and go ready")
err = wait.Poll(30*time.Second, 30*time.Minute, func() (done bool, err error) {
err = wait.Poll(30*time.Second, 50*time.Minute, func() (done bool, err error) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic", r)
}
}()
nodes, err := oc.AdminKubeClient().CoreV1().Nodes().List(context.Background(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/master="})
if err != nil {
return false, err
// scale up to 2nd etcd will make this error inevitable
return false, nil
}
ready := countReady(nodes.Items)
if ready != expectedNumberOfMasters {
Expand All @@ -231,114 +232,53 @@ var _ = g.Describe("[sig-etcd][Feature:DisasterRecovery][Disruptive]", func() {
o.Expect(err).NotTo(o.HaveOccurred())
}

// reload all machine info
masterMachines = nil
masters = masterNodes(oc)
for _, node := range masters {
masterMachine := getMachineNameByNodeName(oc, node.Name)
masterMachines = append(masterMachines, masterMachine)
}

infra, err := oc.AdminConfigClient().ConfigV1().Infrastructures().Get(context.Background(), "cluster", metav1.GetOptions{})
o.Expect(infra.Status.EtcdDiscoveryDomain).NotTo(o.BeEmpty())
domain := infra.Status.EtcdDiscoveryDomain
framework.Logf("Etcd recovery domain: %s", domain)

var survivingMasterIP string
var masterEtcdDomains []string
var masterIPs []string
dnsUpdates := make(map[string]string)
for i, node := range masters {
// IMPORTANT: by convention the index is the last segment of the machine, if this
// changes these tests die horribly
machine := masterMachines[i]
last := strings.LastIndex(machine, "-")
o.Expect(last).ToNot(o.Equal(-1))
index := machine[last+1:]
o.Expect(index).ToNot(o.BeEmpty())
etcdName := fmt.Sprintf("etcd-%s.%s", index, domain)
masterEtcdDomains = append(masterEtcdDomains, etcdName)

var masterIP string
for _, address := range node.Status.Addresses {
if address.Type == "InternalIP" {
masterIP = address.Address
break
}
}
o.Expect(masterIP).ToNot(o.BeEmpty())
masterIPs = append(masterIPs, masterIP)

if node.Name == survivingNodeName {
survivingMasterIP = masterIP
continue
}
framework.Logf("Update DNS records for %s to %s", etcdName, masterIP)
dnsUpdates[etcdName] = masterIP
}
dnsErr := updateDNS(domain, dnsUpdates, framework.TestContext.Provider)
if dnsErr != nil {
framework.Logf("Could not complete DNS updates, continuing anyway: %v", dnsErr, err)
}

imagePullSecretPath := getPullSecret(oc)
defer os.Remove(imagePullSecretPath)
runPodSigner(oc, survivingNode, imagePullSecretPath)
framework.Logf("Force new revision of etcd-pod")
_, err = oc.AdminOperatorClient().OperatorV1().Etcds().Patch(context.Background(), "cluster", types.MergePatchType, []byte(`{"spec": {"forceRedeploymentReason": "recover-etcd"}}`), metav1.PatchOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

framework.Logf("Restore etcd on remaining masters")
setupEtcdEnvImage := getImagePullSpecFromRelease(oc, imagePullSecretPath, "machine-config-operator")
kubeClientAgent := getImagePullSpecFromRelease(oc, imagePullSecretPath, "kube-client-agent")
for i, node := range masters {
if node.Name == survivingNodeName {
framework.Logf("Skipping node as its the surviving master")
continue
}
err := wait.PollImmediate(15*time.Second, 5*time.Minute, func() (bool, error) {
res, err := ssh(fmt.Sprintf(`dig +short "%s"`, masterEtcdDomains[i]), node)
if err != nil {
framework.Logf("Retrieving DNS name for %s failed: %v", node.Name, err)
return false, nil
}
out := strings.TrimSpace(res.Stdout)
if out != masterIPs[i] {
framework.Logf("Expected DNS for %s to be %s, got %s", masterEtcdDomains[i], masterIPs[i], out)
return false, nil
}
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())
expectSSH(fmt.Sprintf("sudo -i env SETUP_ETCD_ENVIRONMENT=%s KUBE_CLIENT_AGENT=%s /bin/bash -x /usr/local/bin/etcd-member-recover.sh %s \"etcd-member-%s\"", setupEtcdEnvImage, kubeClientAgent, survivingMasterIP, node.GetName()), node)
}
framework.Logf("Force new revision of kube-apiserver")
_, err = oc.AdminOperatorClient().OperatorV1().KubeAPIServers().Patch(context.Background(), "cluster", types.MergePatchType, []byte(`{"spec": {"forceRedeploymentReason": "recover-kube-apiserver"}}`), metav1.PatchOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

framework.Logf("Wait for etcd pods to become available")
_, err = exutil.WaitForPods(
_, err = waitForPodsTolerateClientTimeout(
oc.AdminKubeClient().CoreV1().Pods("openshift-etcd"),
exutil.ParseLabelsOrDie("k8s-app=etcd"),
exutil.CheckPodIsReady,
expectedNumberOfMasters,
10*time.Minute,
40*time.Minute,
)
o.Expect(err).NotTo(o.HaveOccurred())

scaleEtcdQuorum(pollClient, expectedNumberOfMasters)

framework.Logf("Remove etcd signer")
err = oc.AdminKubeClient().CoreV1().Pods("openshift-config").Delete(context.Background(), "etcd-signer", metav1.DeleteOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

// Workaround for https://bugzilla.redhat.com/show_bug.cgi?id=1707006#
// SDN won't switch to Degraded mode when service is down after disaster recovery
// restartSDNPods(oc)
waitForMastersToUpdate(oc, mcps)
waitForOperatorsToSettle(coc)

// we shouldn't have errored, but try what we can
o.Expect(dnsErr).ToNot(o.HaveOccurred())
})
},
)
})

func waitForPodsTolerateClientTimeout(c corev1client.PodInterface, label labels.Selector, predicate func(corev1.Pod) bool, count int, timeout time.Duration) ([]string, error) {
var podNames []string
err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
p, e := exutil.GetPodNamesByFilter(c, label, predicate)
if e != nil {
// TODO tolerate transient etcd timeout only and fail other errors
return false, nil
}
if len(p) != count {
return false, nil
}
podNames = p
return true, nil
})
return podNames, err
}

func scaleEtcdQuorum(client kubernetes.Interface, replicas int) error {
etcdQGScale, err := client.AppsV1().Deployments("openshift-machine-config-operator").GetScale(context.Background(), "etcd-quorum-guard", metav1.GetOptions{})
if err != nil {
Expand All @@ -362,27 +302,6 @@ func scaleEtcdQuorum(client kubernetes.Interface, replicas int) error {
return nil
}

func runPodSigner(oc *exutil.CLI, survivingNode *corev1.Node, imagePullSecretPath string) {
framework.Logf("Run etcd signer pod")
nodeHostname := strings.Split(survivingNode.Name, ".")[0]

kubeEtcdSignerServerImage := getImagePullSpecFromRelease(oc, imagePullSecretPath, "kube-etcd-signer-server")
expectSSH(fmt.Sprintf("sudo -i env KUBE_ETCD_SIGNER_SERVER=%s /bin/bash -x /usr/local/bin/tokenize-signer.sh %s && sudo -i install -o core -g core /root/assets/manifests/kube-etcd-cert-signer.yaml /tmp/kube-etcd-cert-signer.yaml", kubeEtcdSignerServerImage, nodeHostname), survivingNode)
etcdSignerYaml := fetchFileContents(survivingNode, "/tmp/kube-etcd-cert-signer.yaml")
err := oc.Run("apply").InputString(etcdSignerYaml).Args("-f", "-").Execute()
o.Expect(err).NotTo(o.HaveOccurred())

framework.Logf("Wait for etcd signer pod to become Ready")
_, err = exutil.WaitForPods(
oc.AdminKubeClient().CoreV1().Pods("openshift-config"),
exutil.ParseLabelsOrDie("k8s-app=etcd"),
exutil.CheckPodIsReady,
1,
10*time.Minute,
)
o.Expect(err).NotTo(o.HaveOccurred())
}

func getPullSecret(oc *exutil.CLI) string {
framework.Logf("Saving image pull secret")
//TODO: copy of test/extended/operators/images.go, move this to a common func
Expand Down Expand Up @@ -425,71 +344,6 @@ func getImagePullSpecFromRelease(oc *exutil.CLI, imagePullSecretPath, imageName
return image
}

func updateDNS(domain string, dnsUpdates map[string]string, provider string) error {
if len(dnsUpdates) == 0 {
return nil
}

switch provider {
case "aws":
ssn := session.New()
r53 := route53.New(ssn)
zones, err := r53.ListHostedZonesByName(&route53.ListHostedZonesByNameInput{
DNSName: aws.String(domain),
MaxItems: aws.String("1"),
})
if err != nil {
return err
}
if len(zones.HostedZones) == 0 {
return fmt.Errorf("unable to find hosted zone for domain %q", domain)
}

// update route53 with the correct A records
input := &route53.ChangeResourceRecordSetsInput{
HostedZoneId: zones.HostedZones[0].Id,
ChangeBatch: &route53.ChangeBatch{
Comment: aws.String(fmt.Sprintf("update master addresses %v", dnsUpdates)),
},
}
for name, ip := range dnsUpdates {
input.ChangeBatch.Changes = append(input.ChangeBatch.Changes, &route53.Change{
Action: aws.String(route53.ChangeActionUpsert),
ResourceRecordSet: &route53.ResourceRecordSet{
Name: aws.String(name),
Type: aws.String("A"),
TTL: aws.Int64(60),
ResourceRecords: []*route53.ResourceRecord{{Value: aws.String(ip)}},
},
})
}
change, err := r53.ChangeResourceRecordSets(input)
if err != nil {
return err
}

// wait until we sync
var lastErr error
if err := wait.PollImmediate(15*time.Second, 5*time.Minute, func() (bool, error) {
status, err := r53.GetChange(&route53.GetChangeInput{Id: change.ChangeInfo.Id})
if err != nil {
return false, err
}
lastErr = fmt.Errorf("operation has status %s", *status.ChangeInfo.Status)
return *status.ChangeInfo.Status == route53.ChangeStatusInsync, nil
}); err != nil {
if err == wait.ErrWaitTimeout && lastErr != nil {
return lastErr
}
return err
}
return nil

default:
return fmt.Errorf("no DNS implementation available for provider %s, control plane cannot be restored", provider)
}
}

func getMachineNameByNodeName(oc *exutil.CLI, name string) string {
masterNode, err := oc.AdminKubeClient().CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
Expand Down