From d67e633b7c056ab26d28fc20435db1ab4ef90520 Mon Sep 17 00:00:00 2001 From: Yu Qi Zhang Date: Fri, 26 Oct 2018 10:18:27 -0400 Subject: [PATCH] daemon: use informers to check for updates Change existing watch workflow to an informer instead, as watches are prone to breaking and restarting the MCD. The Run() function within MCD is changed from an infinite loop to instead block, and updates are handled by callbacks via the informer. Signed-off-by: Yu Qi Zhang --- cmd/machine-config-daemon/start.go | 18 +++- pkg/daemon/daemon.go | 149 ++++++++++++++++++----------- pkg/daemon/node.go | 52 ---------- 3 files changed, 111 insertions(+), 108 deletions(-) diff --git a/cmd/machine-config-daemon/start.go b/cmd/machine-config-daemon/start.go index eca61db35f..e98a58efdb 100644 --- a/cmd/machine-config-daemon/start.go +++ b/cmd/machine-config-daemon/start.go @@ -67,6 +67,10 @@ func runStartCmd(cmd *cobra.Command, args []string) { glog.Fatalf("unable to verify rootMount %s exists: %s", startOpts.rootMount, err) } + stopCh := make(chan struct{}) + defer close(stopCh) + + ctx := common.CreateControllerContext(cb, stopCh, componentName) // create the daemon instance. this also initializes kube client items // which need to come from the container and not the chroot. daemon, err := daemon.New( @@ -77,6 +81,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { cb.MachineConfigClientOrDie(componentName), cb.KubeClientOrDie(componentName), daemon.NewFileSystemClient(), + ctx.KubeInformerFactory.Core().V1().Nodes(), ) if err != nil { glog.Fatalf("failed to initialize daemon: %v", err) @@ -92,8 +97,17 @@ func runStartCmd(cmd *cobra.Command, args []string) { glog.Fatalf("unable to change directory to /: %s", err) } - stopCh := make(chan struct{}) - defer close(stopCh) + glog.Info("Starting MachineConfigDaemon") + defer glog.Info("Shutting down MachineConfigDaemon") + + err = daemon.CheckStateOnBoot(stopCh) + if err != nil { + glog.Fatalf("error checking initial state of node: %v", err) + } + + ctx.KubeInformerFactory.Start(ctx.Stop) + close(ctx.KubeInformersStarted) + err = daemon.Run(stopCh) if err != nil { glog.Fatalf("failed to run: %v", err) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 4781cc6c0c..c9af83852b 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -17,6 +17,10 @@ import ( "github.com/vincent-petithory/dataurl" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + corev1 "k8s.io/api/core/v1" + coreinformersv1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" + corelisterv1 "k8s.io/client-go/listers/core/v1" ) // Daemon is the dispatch point for the functions of the agent on the @@ -47,6 +51,11 @@ type Daemon struct { // rootMount is the location for the MCD to chroot in rootMount string + + // nodeLister is used to watch for updates via the informer + nodeLister corelisterv1.NodeLister + + nodeListerSynced cache.InformerSynced } const ( @@ -68,6 +77,7 @@ func New( client mcfgclientset.Interface, kubeClient kubernetes.Interface, fileSystemClient FileSystemClient, + nodeInformer coreinformersv1.NodeInformer, ) (*Daemon, error) { loginClient, err := login1.New() if err != nil { @@ -84,7 +94,7 @@ func New( } glog.Infof("Booted osImageURL: %s (%s)", osImageURL, osVersion) - return &Daemon{ + dn := &Daemon{ name: nodeName, OperatingSystem: operatingSystem, NodeUpdaterClient: nodeUpdaterClient, @@ -94,17 +104,43 @@ func New( rootMount: rootMount, fileSystemClient: fileSystemClient, bootedOSImageURL: osImageURL, - }, nil + } + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: dn.handleNodeUpdate, + }) + + dn.nodeLister = nodeInformer.Lister() + dn.nodeListerSynced = nodeInformer.Informer().HasSynced + + return dn, nil } -// Run watches the annotations on the machine until they indicate that we need -// an update. then it triggers an update of the machine. currently, the update -// function shouldn't return, and should just reboot the node, unless an error -// occurs, in which case it will return the error up the call stack. +// Run finishes informer setup and then blocks, and the informer will be +// responsible for triggering callbacks to handle updates. Successful +// updates shouldn't return, and should just reboot the node. func (dn *Daemon) Run(stop <-chan struct{}) error { - glog.Info("Starting MachineConfigDaemon") - defer glog.Info("Shutting down MachineConfigDaemon") + if !cache.WaitForCacheSync(stop, dn.nodeListerSynced) { + glog.Error("Marking degraded due to: failure to sync caches") + return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) + } + <-stop + + // Run() should block on the above <-stop until an updated is detected, + // which is handled by the callbacks. + return nil +} + +// CheckStateOnBoot is responsible for checking whether the node has +// degraded, and if not, whether an update is required immediately. +// The flow goes something like this - +// 1. Sanity check if we're in a degraded state. If yes, handle appropriately. +// 2. we restarted for some reason. the happy path reason we restarted is +// because of a machine reboot. validate the current machine state is the +// desired machine state. if we aren't try updating again. if we are, update +// the current state annotation accordingly. +func (dn *Daemon) CheckStateOnBoot(stop <-chan struct{}) error { // sanity check we're not already in a degraded state if state, err := getNodeAnnotationExt(dn.kubeClient.CoreV1().Nodes(), dn.name, MachineConfigDaemonStateAnnotationKey, true); err != nil { // try to set to degraded... because we failed to check if we're degraded @@ -121,64 +157,69 @@ func (dn *Daemon) Run(stop <-chan struct{}) error { } } - if err := dn.process(); err != nil { + // validate machine state + isDesired, dcAnnotation, err := dn.isDesiredMachineState() + if err != nil { glog.Errorf("Marking degraded due to: %v", err) return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) } + if isDesired { + // we got the machine state we wanted. set the update complete! + if err := dn.completeUpdate(dcAnnotation); err != nil { + glog.Errorf("Marking degraded due to: %v", err) + return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) + } + } else if err := dn.triggerUpdate(); err != nil { + return err + } + return nil } -// process is the main loop that actually does all the work. the flow goes -// something like this - -// 1. we restarted for some reason. the happy path reason we restarted is -// because of a machine reboot. validate the current machine state is the -// desired machine state. if we aren't try updating again. if we are, update -// the current state annotation accordingly. -// 2. watch the desired config annotation, waiting for an update to be -// requested by the controller. -// 3. if an update is requested by the controller, we assume that that means -// something changed and apply the desired config no matter what. -// 4. the update function doesn't return right now, but at some point in the -// future if a reboot isn't required for an update it will. if it returns, -// validate the machine state and set the update to done. -// -// the only reason this function will return is if an error occurs. otherwise it -// will keep trying to update the machine until it reboots. -func (dn *Daemon) process() error { - for { - // validate machine state - isDesired, dcAnnotation, err := dn.isDesiredMachineState() - if err != nil { - return err - } +// handleNodeUpdate is the handler for informer callbacks detecting node +// changes. If an update is requested by the controller, we assume that +// that means something changed and apply the desired config no matter what. +// Also note that we only care about node updates, not creation or deletion. +func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { + node := cur.(*corev1.Node) + + // First check if the node that was updated is this daemon's node + if (node.Name != dn.name) { + // The node that was changed was not ours + return + } - if isDesired { - // we got the machine state we wanted. set the update complete! - if err := dn.completeUpdate(dcAnnotation); err != nil { - return err - } + // Then check we're not already in a degraded state. + if state, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, MachineConfigDaemonStateAnnotationKey); err != nil { + // try to set to degraded... because we failed to check if we're degraded + glog.Errorf("Marking degraded due to: %v", err) + setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) + return + } else if state == MachineConfigDaemonStateDegraded { + // Just return since we want to continue sleeping + return + } - // now wait until we need another one. - glog.V(2).Infof("Watching for node annotation updates on %q", dn.name) - if err := waitUntilUpdate(dn.kubeClient.CoreV1().Nodes(), dn.name); err != nil { - return err - } - } + // Detect if there is an update + if (node.Annotations[DesiredMachineConfigAnnotationKey] == node.Annotations[CurrentMachineConfigAnnotationKey]) { + // No actual update to the config + return + } - // either the machine state isn't what we wanted and we should try - // again, or the machine state is what we wanted, and now another update - // is was triggered. - if err := dn.triggerUpdate(); err != nil { - return err + // The desired machine config has changed, trigger update + if err := dn.triggerUpdate(); err != nil { + glog.Errorf("Marking degraded due to: %v", err) + if errSet := setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name); errSet != nil { + glog.Errorf("Futher error attempting to set the node to degraded: %v", errSet) } - - // we managed to update the machine without rebooting. in this case, we - // basically just restart the logic, but working under the assumption - // that everything is already initialized for us, so we just go to the - // top - glog.V(2).Infof("Successfully updated without reboot") + // reboot the node, which will catch the degraded state and sleep + dn.reboot() } + + // we managed to update the machine without rebooting. in this case, + // continue as usual waiting for the next update + glog.V(2).Infof("Successfully updated without reboot") } // completeUpdate does all the stuff required to finish an update. right now, it diff --git a/pkg/daemon/node.go b/pkg/daemon/node.go index 5e4f592b12..a343dcd6bb 100644 --- a/pkg/daemon/node.go +++ b/pkg/daemon/node.go @@ -7,8 +7,6 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/watch" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/retry" ) @@ -19,45 +17,6 @@ const ( InitialNodeAnnotationsFilePath = "/etc/machine-config-daemon/node-annotations.json" ) -// waitUntilUpdate blocks until the desiredConfig annotation doesn't match the -// currentConfig annotation, which indicates that there is an update available -// for the node. -func waitUntilUpdate(client corev1.NodeInterface, node string) error { - n, err := client.Get(node, metav1.GetOptions{}) - if err != nil { - return err - } - - watcher, err := client.Watch(metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", node).String(), - ResourceVersion: n.ResourceVersion, - }) - if err != nil { - return fmt.Errorf("Failed to watch self node (%q): %v", node, err) - } - - // make sure the condition isn't already true - dc, err := getNodeAnnotation(client, node, DesiredMachineConfigAnnotationKey) - if err != nil { - return err - } - cc, err := getNodeAnnotation(client, node, CurrentMachineConfigAnnotationKey) - if err != nil || cc == "" { - return err - } - if dc != cc { - return nil - } - - // for now, we wait forever. that might not be the best long-term strategy. - _, err = watch.Until(0, watcher, updateWatcher) - if err != nil { - return fmt.Errorf("Failed to watch for update request: %v", err) - } - - return nil -} - // setConfig sets the given annotation key, value pair. func setNodeAnnotations(client corev1.NodeInterface, node string, m map[string]string) error { return updateNodeRetry(client, node, func(node *v1.Node) { @@ -120,17 +79,6 @@ func getNodeAnnotationExt(client corev1.NodeInterface, node string, k string, al return v, nil } -// updateWatcher is the handler for the watch event. -func updateWatcher(event watch.Event) (bool, error) { - switch event.Type { - case watch.Modified: - node := event.Object.(*v1.Node) - return node.Annotations[DesiredMachineConfigAnnotationKey] != node.Annotations[CurrentMachineConfigAnnotationKey], nil - } - - return false, nil -} - // updateNodeRetry calls f to update a node object in Kubernetes. // It will attempt to update the node by applying f to it up to DefaultBackoff // number of times.