From 950af28107320cc93074c23de4c64289da04cfcd Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Mon, 15 Oct 2018 11:07:54 -0400 Subject: [PATCH 01/12] daemon: Run once from a local or remote file Signed-off-by: Steve Milner --- cmd/machine-config-daemon/start.go | 3 +++ pkg/daemon/daemon.go | 28 ++++++++++++++++++++++------ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/cmd/machine-config-daemon/start.go b/cmd/machine-config-daemon/start.go index e98a58efdb..bfc7efc263 100644 --- a/cmd/machine-config-daemon/start.go +++ b/cmd/machine-config-daemon/start.go @@ -24,6 +24,7 @@ var ( kubeconfig string nodeName string rootMount string + onceFrom string } ) @@ -32,6 +33,7 @@ func init() { startCmd.PersistentFlags().StringVar(&startOpts.kubeconfig, "kubeconfig", "", "Kubeconfig file to access a remote cluster (testing only)") startCmd.PersistentFlags().StringVar(&startOpts.nodeName, "node-name", "", "kubernetes node name daemon is managing.") startCmd.PersistentFlags().StringVar(&startOpts.rootMount, "root-mount", "/rootfs", "where the nodes root filesystem is mounted for chroot and file manipulation.") + startCmd.PersistentFlags().StringVar(&startOpts.onceFrom, "once-from", "", "Runs the daemon once using a provided file path or URL endpoint as its machine config source") } func runStartCmd(cmd *cobra.Command, args []string) { @@ -82,6 +84,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { cb.KubeClientOrDie(componentName), daemon.NewFileSystemClient(), ctx.KubeInformerFactory.Core().V1().Nodes(), + startOpts.onceFrom, ) if err != nil { glog.Fatalf("failed to initialize daemon: %v", err) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index a96b814502..b4d5a1f9a1 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -3,9 +3,9 @@ package daemon import ( "fmt" "io/ioutil" + "os" "path/filepath" "strings" - "os" "github.com/coreos/go-systemd/login1" ignv2_2types "github.com/coreos/ignition/config/v2_2/types" @@ -15,12 +15,12 @@ import ( mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" mcfgclientv1 "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned/typed/machineconfiguration.openshift.io/v1" "github.com/vincent-petithory/dataurl" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" coreinformersv1 "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/tools/cache" + "k8s.io/client-go/kubernetes" corelisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" ) // Daemon is the dispatch point for the functions of the agent on the @@ -56,6 +56,8 @@ type Daemon struct { nodeLister corelisterv1.NodeLister nodeListerSynced cache.InformerSynced + // onceFrom defines where the source config is to run the daemon once and exit + onceFrom string } const ( @@ -78,6 +80,7 @@ func New( kubeClient kubernetes.Interface, fileSystemClient FileSystemClient, nodeInformer coreinformersv1.NodeInformer, + onceFrom string, ) (*Daemon, error) { loginClient, err := login1.New() if err != nil { @@ -104,6 +107,7 @@ func New( rootMount: rootMount, fileSystemClient: fileSystemClient, bootedOSImageURL: osImageURL, + onceFrom: onceFrom, } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -185,7 +189,7 @@ func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { node := cur.(*corev1.Node) // First check if the node that was updated is this daemon's node - if (node.Name != dn.name) { + if node.Name != dn.name { // The node that was changed was not ours return } @@ -202,7 +206,7 @@ func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { } // Detect if there is an update - if (node.Annotations[DesiredMachineConfigAnnotationKey] == node.Annotations[CurrentMachineConfigAnnotationKey]) { + if node.Annotations[DesiredMachineConfigAnnotationKey] == node.Annotations[CurrentMachineConfigAnnotationKey] { // No actual update to the config return } @@ -419,3 +423,15 @@ func (dn *Daemon) Close() { func getMachineConfig(client mcfgclientv1.MachineConfigInterface, name string) (*mcfgv1.MachineConfig, error) { return client.Get(name, metav1.GetOptions{}) } + +// validPath attempts to see if the path provided is indeed an acceptable +// filesystem path. This function does not check if the path exists. +func validPath(path string) bool { + path = filepath.Clean(path) + for _, validStart := range []string{".", "..", "/"} { + if strings.HasPrefix(path, validStart) { + return true + } + } + return false +} From 88c4a8b680beaf4dbebd69a33b92a5666d763cb1 Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Thu, 18 Oct 2018 14:50:41 -0400 Subject: [PATCH 02/12] daemon: More methods for getting MCs Signed-off-by: Steve Milner --- pkg/daemon/daemon.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index b4d5a1f9a1..675e75bd23 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -3,6 +3,7 @@ package daemon import ( "fmt" "io/ioutil" + "net/http" "os" "path/filepath" "strings" @@ -11,6 +12,7 @@ import ( ignv2_2types "github.com/coreos/ignition/config/v2_2/types" "github.com/golang/glog" drain "github.com/openshift/kubernetes-drain" + "github.com/openshift/machine-config-operator/lib/resourceread" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" mcfgclientv1 "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned/typed/machineconfiguration.openshift.io/v1" @@ -420,6 +422,38 @@ func (dn *Daemon) Close() { dn.loginClient.Close() } +// getMachineConfigFromFile parses a valid machine config file in yaml format and returns +// a MachineConfig struct. +func (dn *Daemon) getMachineConfigFromFile(filePath string) (*mcfgv1.MachineConfig, error) { + data, err := dn.fileSystemClient.ReadFile(filePath) + if err != nil { + return nil, err + } + config := resourceread.ReadMachineConfigV1OrDie(data) + return config, nil +} + +// getMachineConfigFromURL reads a remote MC in yaml format and returns a MachineConfig struct. +func (dn *Daemon) getMachineConfigFromURL(url string) (*mcfgv1.MachineConfig, error) { + // Make a request to the remote URL + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Read the body content from the request + body, err := dn.fileSystemClient.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal the body into the machineConfig + config := resourceread.ReadMachineConfigV1OrDie(body) + + return config, nil +} + func getMachineConfig(client mcfgclientv1.MachineConfigInterface, name string) (*mcfgv1.MachineConfig, error) { return client.Get(name, metav1.GetOptions{}) } From aa6d05652181b118be4dc0843aed30b3d8f35299 Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Fri, 19 Oct 2018 14:32:47 -0400 Subject: [PATCH 03/12] daemon: Add runOnce for once-from usage Signed-off-by: Steve Milner --- pkg/daemon/daemon.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 675e75bd23..ccc271acd0 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -126,6 +126,12 @@ func New( // responsible for triggering callbacks to handle updates. Successful // updates shouldn't return, and should just reboot the node. func (dn *Daemon) Run(stop <-chan struct{}) error { + // Catch quickly if we've been asked to run once. + if dn.onceFrom != "" { + glog.V(2).Info("Running once per request") + return dn.runOnce() + } + if !cache.WaitForCacheSync(stop, dn.nodeListerSynced) { glog.Error("Marking degraded due to: failure to sync caches") return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) @@ -183,6 +189,41 @@ func (dn *Daemon) CheckStateOnBoot(stop <-chan struct{}) error { return nil } +// runOnce pulls the MachineConfig from either a file or remote URL +// executes one run, and exits. +// TODO: Revisit the Run/process methods and refactor and unify them with runOnce +func (dn *Daemon) runOnce() error { + var machineConfig *mcfgv1.MachineConfig + var err error + if strings.HasPrefix("http://", dn.onceFrom) || strings.HasPrefix("https://", dn.onceFrom) { + // If we sense a remote URL has been provided then request MC content + // from a remote URL and parse it + glog.V(2).Infof("Getting machine config content from %s", dn.onceFrom) + machineConfig, err = dn.getMachineConfigFromURL(dn.onceFrom) + if err != nil { + return err + } + } else if validPath(dn.onceFrom) { + // If we sense a local file has been provided parse it + absoluteOnceFrom, err := filepath.Abs(filepath.Clean(dn.onceFrom)) + if err != nil { + return err + } + machineConfig, err = dn.getMachineConfigFromFile(absoluteOnceFrom) + if err != nil { + return err + } + } else { + // Otherwise return an error as the input format is unsupported + return fmt.Errorf("%s is not a path nor url; can not run once", dn.onceFrom) + } + + // At this point we have a populated MachineConfig struct for our desired config + // and we run update using an empty machineConfig as there is no provided current state. + oldConfig := mcfgv1.MachineConfig{} + return dn.update(&oldConfig, machineConfig) +} + // handleNodeUpdate is the handler for informer callbacks detecting node // changes. If an update is requested by the controller, we assume that // that means something changed and apply the desired config no matter what. From e390ceb8d24ab819f70c0a19dc2cc2c920d5d5b5 Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Fri, 19 Oct 2018 14:42:49 -0400 Subject: [PATCH 04/12] daemon: Add ReadAll to FsClient Signed-off-by: Steve Milner --- pkg/daemon/fsclient.go | 7 +++++++ pkg/daemon/fsclient_test.go | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/daemon/fsclient.go b/pkg/daemon/fsclient.go index 973e4512a7..fae3c6160d 100644 --- a/pkg/daemon/fsclient.go +++ b/pkg/daemon/fsclient.go @@ -1,6 +1,7 @@ package daemon import ( + "io" "io/ioutil" "os" ) @@ -16,6 +17,7 @@ type FileSystemClient interface { Chmod(string, os.FileMode) error Chown(string, int, int) error WriteFile(filename string, data []byte, perm os.FileMode) error + ReadAll(reader io.Reader) ([]byte, error) ReadFile(filename string) ([]byte, error) } @@ -72,6 +74,11 @@ func (f FsClient) ReadFile(filename string) ([]byte, error) { return ioutil.ReadFile(filename) } +// ReadAll implements ioutil.ReadAll +func (f FsClient) ReadAll(reader io.Reader) ([]byte, error) { + return ioutil.ReadAll(reader) +} + // NewFileSystemClient creates a new file system client using the default // implementations provided by the os package. func NewFileSystemClient() FileSystemClient { diff --git a/pkg/daemon/fsclient_test.go b/pkg/daemon/fsclient_test.go index 09c3e1a88c..c61ce80fce 100644 --- a/pkg/daemon/fsclient_test.go +++ b/pkg/daemon/fsclient_test.go @@ -1,6 +1,9 @@ package daemon -import "os" +import ( + "io" + "os" +) // CreateReturn is a structure used for testing. It holds a single return value // set for a mocked Create call. @@ -34,6 +37,7 @@ type FsClientMock struct { ChmodReturns []error ChownReturns []error WriteFileReturns []error + ReadAllReturns []ReadFileReturn ReadFileReturns []ReadFileReturn } @@ -101,6 +105,15 @@ func (f FsClientMock) WriteFile(filename string, data []byte, perm os.FileMode) return updateErrorReturns(&f.WriteFileReturns) } +// ReadAll provides a mocked implemention +func (f FsClientMock) ReadAll(reader io.Reader) ([]byte, error) { + returnValues := f.ReadAllReturns[0] + if len(f.ReadAllReturns) > 1 { + f.ReadAllReturns = f.ReadAllReturns[1:] + } + return returnValues.Bytes, returnValues.Error +} + // ReadFile provides a mocked implemention func (f FsClientMock) ReadFile(filename string) ([]byte, error) { returnValues := f.ReadFileReturns[0] From 6f67c7049ce082e7404fd50f48125d506c09b281 Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Tue, 30 Oct 2018 13:23:25 -0400 Subject: [PATCH 05/12] daemon: Allow runOnce to provide desired MC - prepUpdateFromCluster and executeUpdateFromCluster* pulled out of handleNodeUpdate for reuse - triggerUpdateWithMachineConfig added for triggering with a provided desired config - triggerUpdate forwards to triggerUpdateWithMachineConfig(nil) - executeUpdateFromClusterWithMachineConfig added for updateing with a provided desired config - executeUpdateFromCluster forwards to executeUpdateFromClusterWithMachineConfig(nil) Signed-off-by: Steve Milner --- pkg/daemon/daemon.go | 116 +++++++++++++++++++++++++++++++++---------- 1 file changed, 90 insertions(+), 26 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index ccc271acd0..18cd92b1cd 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -194,17 +194,38 @@ func (dn *Daemon) CheckStateOnBoot(stop <-chan struct{}) error { // TODO: Revisit the Run/process methods and refactor and unify them with runOnce func (dn *Daemon) runOnce() error { var machineConfig *mcfgv1.MachineConfig + var oldConfig mcfgv1.MachineConfig var err error + if strings.HasPrefix("http://", dn.onceFrom) || strings.HasPrefix("https://", dn.onceFrom) { + // NOTE: This case expects a cluster to exists already. // If we sense a remote URL has been provided then request MC content - // from a remote URL and parse it + // from a remote URL and parse it. glog.V(2).Infof("Getting machine config content from %s", dn.onceFrom) machineConfig, err = dn.getMachineConfigFromURL(dn.onceFrom) if err != nil { return err } + + needUpdate, err := dn.prepUpdateFromCluster() + if err != nil { + glog.V(2).Infof("Unable to prep update: %s", err) + return err + } else if needUpdate == false { + glog.V(2).Infof("No update needed") + return nil + } + // At this point we have verified we need to update + if err := dn.executeUpdateFromClusterWithMachineConfig(machineConfig); err != nil { + glog.Warningf("Unable to update: %s", err) + return err + } + return nil + } else if validPath(dn.onceFrom) { - // If we sense a local file has been provided parse it + // NOTE: This case expects that the cluster is NOT CREATED YET. + // If we sense a local file has been provided parse it. + oldConfig = mcfgv1.MachineConfig{} absoluteOnceFrom, err := filepath.Abs(filepath.Clean(dn.onceFrom)) if err != nil { return err @@ -213,49 +234,79 @@ func (dn *Daemon) runOnce() error { if err != nil { return err } - } else { - // Otherwise return an error as the input format is unsupported - return fmt.Errorf("%s is not a path nor url; can not run once", dn.onceFrom) + // Execute update without hitting the cluster + return dn.update(&oldConfig, machineConfig) } - - // At this point we have a populated MachineConfig struct for our desired config - // and we run update using an empty machineConfig as there is no provided current state. - oldConfig := mcfgv1.MachineConfig{} - return dn.update(&oldConfig, machineConfig) + // Otherwise return an error as the input format is unsupported + return fmt.Errorf("%s is not a path nor url; can not run once", dn.onceFrom) } -// handleNodeUpdate is the handler for informer callbacks detecting node -// changes. If an update is requested by the controller, we assume that -// that means something changed and apply the desired config no matter what. +// handleNodeUpdate is the gatekeeper handler for informer callbacks detecting +// node changes. If an update is requested by the controller, we assume that +// that means something changed and pass over to execution methods no matter what. // Also note that we only care about node updates, not creation or deletion. func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { node := cur.(*corev1.Node) // First check if the node that was updated is this daemon's node - if node.Name != dn.name { - // The node that was changed was not ours - return + if node.Name == dn.name { + // Pass to the shared update prep method + needUpdate, err := dn.prepUpdateFromCluster() + if err != nil { + // On prepUpdateFromCluster error the node should already be marked degraded + glog.V(2).Infof("Unable to prep update: %s", err) + dn.reboot() + return + } + // Only executeUpdateFromCluster when we need to update + if needUpdate { + // Note that if executeUpdateFromCluster errors it will mark the node + // degraded and reboot. + if err = dn.executeUpdateFromCluster(); err != nil { + glog.V(2).Infof("Unable to execute update: %s", err) + return + } + } } + // The node that was changed was not ours, return out + return +} +// prepUpdateFromCluster handles the shared update prepping functionality for +// flows that expect the cluster to already be available. +func (dn *Daemon) prepUpdateFromCluster() (bool, error) { // Then check we're not already in a degraded state. if state, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, MachineConfigDaemonStateAnnotationKey); err != nil { // try to set to degraded... because we failed to check if we're degraded glog.Errorf("Marking degraded due to: %v", err) setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) - return + return false, err } else if state == MachineConfigDaemonStateDegraded { // Just return since we want to continue sleeping - return + return false, fmt.Errorf("state is already degraded") + } + + // Grab the node instance + node, err := dn.kubeClient.CoreV1().Nodes().Get(dn.name, metav1.GetOptions{}) + if err != nil { + return false, err } // Detect if there is an update if node.Annotations[DesiredMachineConfigAnnotationKey] == node.Annotations[CurrentMachineConfigAnnotationKey] { // No actual update to the config - return + glog.V(2).Info("No updating is required") + return false, nil } + return true, nil +} +// executeUpdateFromClusterWithMachineConfig starts the actual update process. The provided config +// will be used as the desired config, while the current config will be pulled from the cluster. If +// you want both pulled from the cluster please use executeUpdateFromCluster(). +func (dn *Daemon) executeUpdateFromClusterWithMachineConfig(desiredConfig *mcfgv1.MachineConfig) error { // The desired machine config has changed, trigger update - if err := dn.triggerUpdate(); err != nil { + if err := dn.triggerUpdateWithMachineConfig(desiredConfig); err != nil { glog.Errorf("Marking degraded due to: %v", err) if errSet := setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name); errSet != nil { glog.Errorf("Futher error attempting to set the node to degraded: %v", errSet) @@ -267,6 +318,12 @@ func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { // we managed to update the machine without rebooting. in this case, // continue as usual waiting for the next update glog.V(2).Infof("Successfully updated without reboot") + return nil +} + +// executeUpdateFromCluster starts the actual update process using configs from the cluster. +func (dn *Daemon) executeUpdateFromCluster() error { + return dn.executeUpdateFromClusterWithMachineConfig(nil) } // completeUpdate does all the stuff required to finish an update. right now, it @@ -288,8 +345,9 @@ func (dn *Daemon) completeUpdate(dcAnnotation string) error { return nil } -// triggerUpdate starts the update using the current and the target config. -func (dn *Daemon) triggerUpdate() error { +// triggerUpdateWithMachineConfig starts the update using the desired config and queries the cluster for +// the current config. If all configs should be pulled from the cluster use triggerUpdate(). +func (dn *Daemon) triggerUpdateWithMachineConfig(desiredConfig *mcfgv1.MachineConfig) error { if err := setUpdateWorking(dn.kubeClient.CoreV1().Nodes(), dn.name); err != nil { return err } @@ -306,15 +364,21 @@ func (dn *Daemon) triggerUpdate() error { if err != nil { return err } - desiredConfig, err := getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), dcAnnotation) - if err != nil { - return err + if desiredConfig == nil { + desiredConfig, err = getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), dcAnnotation) + if err != nil { + return err + } } - // run the update process. this function doesn't currently return. return dn.update(currentConfig, desiredConfig) } +// triggerUpdate starts the update using the current and the target config. +func (dn *Daemon) triggerUpdate() error { + return dn.triggerUpdateWithMachineConfig(nil) +} + // isDesiredMachineState confirms that the node is actually in the state that it // wants to be in. It does this by looking at the elements in the target config // and checks if all are present on the node. Returns true iff there are no From fb67e0e6753dae209aaa9f08160d780a54c24729 Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Tue, 6 Nov 2018 10:03:52 -0500 Subject: [PATCH 06/12] daemon: validPath -> ValidPath Signed-off-by: Steve Milner --- pkg/daemon/daemon.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 18cd92b1cd..cdf52ee689 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -222,7 +222,7 @@ func (dn *Daemon) runOnce() error { } return nil - } else if validPath(dn.onceFrom) { + } else if ValidPath(dn.onceFrom) { // NOTE: This case expects that the cluster is NOT CREATED YET. // If we sense a local file has been provided parse it. oldConfig = mcfgv1.MachineConfig{} @@ -563,9 +563,9 @@ func getMachineConfig(client mcfgclientv1.MachineConfigInterface, name string) ( return client.Get(name, metav1.GetOptions{}) } -// validPath attempts to see if the path provided is indeed an acceptable +// ValidPath attempts to see if the path provided is indeed an acceptable // filesystem path. This function does not check if the path exists. -func validPath(path string) bool { +func ValidPath(path string) bool { path = filepath.Clean(path) for _, validStart := range []string{".", "..", "/"} { if strings.HasPrefix(path, validStart) { From 007cb7ce73e6562642720e734e8996b429ca1581 Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Tue, 6 Nov 2018 10:04:58 -0500 Subject: [PATCH 07/12] daemon: Split Daemon constructors - New: Base instance that works without the cluster. Used in NewClusterDrivenDaemon. - NewClusterDrivenDaemon: Builds on top of New. Works with cluster resources. Signed-off-by: Steve Milner --- pkg/daemon/daemon.go | 62 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index cdf52ee689..eb9334601c 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -12,6 +12,7 @@ import ( ignv2_2types "github.com/coreos/ignition/config/v2_2/types" "github.com/golang/glog" drain "github.com/openshift/kubernetes-drain" + "github.com/openshift/machine-config-operator/cmd/common" "github.com/openshift/machine-config-operator/lib/resourceread" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" @@ -19,7 +20,6 @@ import ( "github.com/vincent-petithory/dataurl" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coreinformersv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -71,17 +71,13 @@ const ( pathDevNull = "/dev/null" ) -// New sets up the systemd and kubernetes connections needed to update the -// machine. +// New sets up the bare minimum local connections required to create a Daemon instance func New( rootMount string, nodeName string, operatingSystem string, nodeUpdaterClient NodeUpdaterClient, - client mcfgclientset.Interface, - kubeClient kubernetes.Interface, fileSystemClient FileSystemClient, - nodeInformer coreinformersv1.NodeInformer, onceFrom string, ) (*Daemon, error) { loginClient, err := login1.New() @@ -89,14 +85,11 @@ func New( return nil, fmt.Errorf("Error establishing connection to logind dbus: %v", err) } - if err = loadNodeAnnotations(kubeClient.CoreV1().Nodes(), nodeName); err != nil { - return nil, err - } - osImageURL, osVersion, err := nodeUpdaterClient.GetBootedOSImageURL(rootMount) if err != nil { return nil, fmt.Errorf("Error reading osImageURL from rpm-ostree: %v", err) } + glog.Infof("Booted osImageURL: %s (%s)", osImageURL, osVersion) dn := &Daemon{ @@ -104,14 +97,58 @@ func New( OperatingSystem: operatingSystem, NodeUpdaterClient: nodeUpdaterClient, loginClient: loginClient, - client: client, - kubeClient: kubeClient, rootMount: rootMount, fileSystemClient: fileSystemClient, bootedOSImageURL: osImageURL, onceFrom: onceFrom, } + return dn, nil +} + +// NewClusterDrivenDaemon sets up the systemd and kubernetes connections needed to update the +// machine. +func NewClusterDrivenDaemon( + rootMount string, + nodeName string, + operatingSystem string, + nodeUpdaterClient NodeUpdaterClient, + kubeconfig string, + fileSystemClient FileSystemClient, + onceFrom string, + stopCh chan (struct{}), + componentName string, +) (*Daemon, error) { + dn, err := New( + rootMount, + nodeName, + operatingSystem, + nodeUpdaterClient, + fileSystemClient, + onceFrom) + + if err != nil { + return nil, err + } + + cb, err := common.NewClientBuilder(kubeconfig) + if err != nil { + glog.Fatalf("error creating clients: %v", err) + } + + ctx := common.CreateControllerContext(cb, stopCh, componentName) + + ctx.KubeInformerFactory.Start(ctx.Stop) + close(ctx.KubeInformersStarted) + + dn.kubeClient = cb.KubeClientOrDie(componentName) + dn.client = cb.MachineConfigClientOrDie(componentName) + + if err = loadNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), nodeName); err != nil { + return nil, err + } + + nodeInformer := ctx.KubeInformerFactory.Core().V1().Nodes() nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: dn.handleNodeUpdate, }) @@ -206,7 +243,6 @@ func (dn *Daemon) runOnce() error { if err != nil { return err } - needUpdate, err := dn.prepUpdateFromCluster() if err != nil { glog.V(2).Infof("Unable to prep update: %s", err) From c4b9349b26af7c5ac60d88862567355de666170f Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Tue, 6 Nov 2018 10:06:31 -0500 Subject: [PATCH 08/12] mcd/start.go: Restructure for cluster/non-cluster use Signed-off-by: Steve Milner --- cmd/machine-config-daemon/start.go | 73 +++++++++++++++++------------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/cmd/machine-config-daemon/start.go b/cmd/machine-config-daemon/start.go index bfc7efc263..776b604cc9 100644 --- a/cmd/machine-config-daemon/start.go +++ b/cmd/machine-config-daemon/start.go @@ -6,7 +6,6 @@ import ( "syscall" "github.com/golang/glog" - "github.com/openshift/machine-config-operator/cmd/common" "github.com/openshift/machine-config-operator/pkg/daemon" "github.com/openshift/machine-config-operator/pkg/version" "github.com/spf13/cobra" @@ -40,6 +39,8 @@ func runStartCmd(cmd *cobra.Command, args []string) { flag.Set("logtostderr", "true") flag.Parse() + glog.V(2).Infof("Options parsed: %+v", startOpts) + // To help debugging, immediately log version glog.Infof("Version: %+v", version.Version) @@ -56,11 +57,6 @@ func runStartCmd(cmd *cobra.Command, args []string) { startOpts.nodeName = name } - cb, err := common.NewClientBuilder(startOpts.kubeconfig) - if err != nil { - glog.Fatalf("error creating clients: %v", err) - } - // Ensure that the rootMount exists if _, err := os.Stat(startOpts.rootMount); err != nil { if os.IsNotExist(err) { @@ -71,23 +67,44 @@ func runStartCmd(cmd *cobra.Command, args []string) { stopCh := make(chan struct{}) defer close(stopCh) - - ctx := common.CreateControllerContext(cb, stopCh, componentName) - // create the daemon instance. this also initializes kube client items - // which need to come from the container and not the chroot. - daemon, err := daemon.New( - startOpts.rootMount, - startOpts.nodeName, - operatingSystem, - daemon.NewNodeUpdaterClient(), - cb.MachineConfigClientOrDie(componentName), - cb.KubeClientOrDie(componentName), - daemon.NewFileSystemClient(), - ctx.KubeInformerFactory.Core().V1().Nodes(), - startOpts.onceFrom, - ) - if err != nil { - glog.Fatalf("failed to initialize daemon: %v", err) + var dn *daemon.Daemon + + // If we are asked to run once and it's a valid file system path use + // the bare Daemon + if startOpts.onceFrom != "" && daemon.ValidPath(startOpts.onceFrom) { + dn, err = daemon.New( + startOpts.rootMount, + startOpts.nodeName, + operatingSystem, + daemon.NewNodeUpdaterClient(), + daemon.NewFileSystemClient(), + startOpts.onceFrom, + ) + if err != nil { + glog.Fatalf("failed to initialize single run daemon: %v", err) + } + // Else we use the cluster driven daemon + } else { + // create the daemon instance. this also initializes kube client items + // which need to come from the container and not the chroot. + dn, err = daemon.NewClusterDrivenDaemon( + startOpts.rootMount, + startOpts.nodeName, + operatingSystem, + daemon.NewNodeUpdaterClient(), + startOpts.kubeconfig, + daemon.NewFileSystemClient(), + startOpts.onceFrom, + stopCh, + componentName, + ) + if err != nil { + glog.Fatalf("failed to initialize daemon: %v", err) + } + err = dn.CheckStateOnBoot(stopCh) + if err != nil { + glog.Fatalf("error checking initial state of node: %v", err) + } } glog.Infof(`Calling chroot("%s")`, startOpts.rootMount) @@ -103,15 +120,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { glog.Info("Starting MachineConfigDaemon") defer glog.Info("Shutting down MachineConfigDaemon") - err = daemon.CheckStateOnBoot(stopCh) - if err != nil { - glog.Fatalf("error checking initial state of node: %v", err) - } - - ctx.KubeInformerFactory.Start(ctx.Stop) - close(ctx.KubeInformersStarted) - - err = daemon.Run(stopCh) + err = dn.Run(stopCh) if err != nil { glog.Fatalf("failed to run: %v", err) } From 7291b6125fddc9195efbe79f07df8f9ac78bc947 Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Tue, 6 Nov 2018 17:10:46 -0500 Subject: [PATCH 09/12] daemon: Start informers after CheckStateOnBoot Split out the informers creation/start into StartInformer. Idea from @yuqi-zhang. Signed-off-by: Steve Milner --- cmd/machine-config-daemon/start.go | 3 +++ pkg/daemon/daemon.go | 21 +++++++++++++++------ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/cmd/machine-config-daemon/start.go b/cmd/machine-config-daemon/start.go index 776b604cc9..aa84b35308 100644 --- a/cmd/machine-config-daemon/start.go +++ b/cmd/machine-config-daemon/start.go @@ -105,6 +105,9 @@ func runStartCmd(cmd *cobra.Command, args []string) { if err != nil { glog.Fatalf("error checking initial state of node: %v", err) } + if err = dn.StartInformer(stopCh, startOpts.nodeName, componentName, startOpts.kubeconfig); err != nil { + glog.Fatalf("error starting kubernetes informers: %v", err) + } } glog.Infof(`Calling chroot("%s")`, startOpts.rootMount) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index eb9334601c..e3bba6e00e 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -136,17 +136,23 @@ func NewClusterDrivenDaemon( glog.Fatalf("error creating clients: %v", err) } - ctx := common.CreateControllerContext(cb, stopCh, componentName) - - ctx.KubeInformerFactory.Start(ctx.Stop) - close(ctx.KubeInformersStarted) - dn.kubeClient = cb.KubeClientOrDie(componentName) dn.client = cb.MachineConfigClientOrDie(componentName) if err = loadNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), nodeName); err != nil { return nil, err } + return dn, nil +} + +// StartInformer initializes and starts the informers. +func (dn *Daemon) StartInformer(stopCh chan (struct{}), nodeName, componentName, kubeconfig string) error { + cb, err := common.NewClientBuilder(kubeconfig) + if err != nil { + return err + } + + ctx := common.CreateControllerContext(cb, stopCh, componentName) nodeInformer := ctx.KubeInformerFactory.Core().V1().Nodes() nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -156,7 +162,10 @@ func NewClusterDrivenDaemon( dn.nodeLister = nodeInformer.Lister() dn.nodeListerSynced = nodeInformer.Informer().HasSynced - return dn, nil + ctx.KubeInformerFactory.Start(ctx.Stop) + close(ctx.KubeInformersStarted) + + return nil } // Run finishes informer setup and then blocks, and the informer will be From 10057300ec039b03733db85c98f5c7215e6c42ce Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Wed, 7 Nov 2018 12:07:24 -0500 Subject: [PATCH 10/12] daemon/update: Allow reconcile skip When we are in runOnce mode AND the previous MachineConfig does not have a Kind we can assume that there was no previous config to check against. Signed-off-by: Steve Milner --- pkg/daemon/update.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/daemon/update.go b/pkg/daemon/update.go index d37e69cb4b..109cbfcd38 100644 --- a/pkg/daemon/update.go +++ b/pkg/daemon/update.go @@ -80,6 +80,12 @@ func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) error { func (dn *Daemon) reconcilable(oldConfig, newConfig *mcfgv1.MachineConfig) (bool, error) { glog.Info("Checking if configs are reconcilable") + // We skip out of reconcilable if there is no Kind and we are in runOnce mode. The + // reason is that there is a good chance a previous state is not available to match against. + if oldConfig.Kind == "" && dn.onceFrom != "" { + glog.Infof("Missing kind in old config. Assuming reconcilable with new.") + return true, nil + } oldIgn := oldConfig.Spec.Config newIgn := newConfig.Spec.Config From 2db59b5d76eac06b7352d272fc7f72bbda4224fe Mon Sep 17 00:00:00 2001 From: Steve Milner Date: Thu, 8 Nov 2018 10:37:16 -0500 Subject: [PATCH 11/12] daemon/update: Don't drain the node without cluster Signed-off-by: Steve Milner --- pkg/daemon/update.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/pkg/daemon/update.go b/pkg/daemon/update.go index 109cbfcd38..0fc89e1fc4 100644 --- a/pkg/daemon/update.go +++ b/pkg/daemon/update.go @@ -48,24 +48,28 @@ func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) error { return err } - glog.Info("Update completed. Draining the node.") + // TODO: Change the logic to be clearer + // We need to skip draining of the node when we are running once + // and there is no cluster. + if dn.onceFrom != "" && !ValidPath(dn.onceFrom) { + glog.Info("Update completed. Draining the node.") - node, err := dn.kubeClient.CoreV1().Nodes().Get(dn.name, metav1.GetOptions{}) - if err != nil { - return err - } + node, err := dn.kubeClient.CoreV1().Nodes().Get(dn.name, metav1.GetOptions{}) + if err != nil { + return err + } - err = drain.Drain(dn.kubeClient, []*corev1.Node{node}, &drain.DrainOptions{ - DeleteLocalData: true, - Force: true, - GracePeriodSeconds: 600, - IgnoreDaemonsets: true, - }) - if err != nil { - return err + err = drain.Drain(dn.kubeClient, []*corev1.Node{node}, &drain.DrainOptions{ + DeleteLocalData: true, + Force: true, + GracePeriodSeconds: 600, + IgnoreDaemonsets: true, + }) + if err != nil { + return err + } + glog.V(2).Infof("Node successfully drained") } - glog.V(2).Infof("Node successfully drained") - // reboot. this function shouldn't actually return. return dn.reboot() } From d1e716516446b9072bb6ccbf9bf9bdb15f95ae7d Mon Sep 17 00:00:00 2001 From: Yu Qi Zhang Date: Thu, 8 Nov 2018 12:57:17 -0500 Subject: [PATCH 12/12] daemon: revert to old workflow for informer Remove StartInformer function, as the creation and start must follow the creation - chroot - check state - start workflow. Modify ClientBuilder creation to use old workflow as well. Signed-off-by: Yu Qi Zhang --- cmd/machine-config-daemon/start.go | 29 ++++++++++++++++--------- pkg/daemon/daemon.go | 35 ++++++------------------------ 2 files changed, 26 insertions(+), 38 deletions(-) diff --git a/cmd/machine-config-daemon/start.go b/cmd/machine-config-daemon/start.go index aa84b35308..c8366d8f12 100644 --- a/cmd/machine-config-daemon/start.go +++ b/cmd/machine-config-daemon/start.go @@ -6,6 +6,7 @@ import ( "syscall" "github.com/golang/glog" + "github.com/openshift/machine-config-operator/cmd/common" "github.com/openshift/machine-config-operator/pkg/daemon" "github.com/openshift/machine-config-operator/pkg/version" "github.com/spf13/cobra" @@ -68,6 +69,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { stopCh := make(chan struct{}) defer close(stopCh) var dn *daemon.Daemon + var ctx *common.ControllerContext // If we are asked to run once and it's a valid file system path use // the bare Daemon @@ -85,6 +87,11 @@ func runStartCmd(cmd *cobra.Command, args []string) { } // Else we use the cluster driven daemon } else { + cb, err := common.NewClientBuilder(startOpts.kubeconfig) + if err != nil { + glog.Fatalf("failed to initialize daemon: %v", err) + } + ctx = common.CreateControllerContext(cb, stopCh, componentName) // create the daemon instance. this also initializes kube client items // which need to come from the container and not the chroot. dn, err = daemon.NewClusterDrivenDaemon( @@ -92,22 +99,15 @@ func runStartCmd(cmd *cobra.Command, args []string) { startOpts.nodeName, operatingSystem, daemon.NewNodeUpdaterClient(), - startOpts.kubeconfig, + cb.MachineConfigClientOrDie(componentName), + cb.KubeClientOrDie(componentName), daemon.NewFileSystemClient(), startOpts.onceFrom, - stopCh, - componentName, + ctx.KubeInformerFactory.Core().V1().Nodes(), ) if err != nil { glog.Fatalf("failed to initialize daemon: %v", err) } - err = dn.CheckStateOnBoot(stopCh) - if err != nil { - glog.Fatalf("error checking initial state of node: %v", err) - } - if err = dn.StartInformer(stopCh, startOpts.nodeName, componentName, startOpts.kubeconfig); err != nil { - glog.Fatalf("error starting kubernetes informers: %v", err) - } } glog.Infof(`Calling chroot("%s")`, startOpts.rootMount) @@ -120,6 +120,15 @@ func runStartCmd(cmd *cobra.Command, args []string) { glog.Fatalf("unable to change directory to /: %s", err) } + if startOpts.onceFrom == "" { + err = dn.CheckStateOnBoot(stopCh) + if err != nil { + glog.Fatalf("error checking initial state of node: %v", err) + } + ctx.KubeInformerFactory.Start(ctx.Stop) + close(ctx.KubeInformersStarted) + } + glog.Info("Starting MachineConfigDaemon") defer glog.Info("Shutting down MachineConfigDaemon") diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index e3bba6e00e..8733a8056b 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -12,7 +12,6 @@ import ( ignv2_2types "github.com/coreos/ignition/config/v2_2/types" "github.com/golang/glog" drain "github.com/openshift/kubernetes-drain" - "github.com/openshift/machine-config-operator/cmd/common" "github.com/openshift/machine-config-operator/lib/resourceread" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" @@ -20,6 +19,7 @@ import ( "github.com/vincent-petithory/dataurl" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreinformersv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -113,11 +113,11 @@ func NewClusterDrivenDaemon( nodeName string, operatingSystem string, nodeUpdaterClient NodeUpdaterClient, - kubeconfig string, + client mcfgclientset.Interface, + kubeClient kubernetes.Interface, fileSystemClient FileSystemClient, onceFrom string, - stopCh chan (struct{}), - componentName string, + nodeInformer coreinformersv1.NodeInformer, ) (*Daemon, error) { dn, err := New( rootMount, @@ -131,41 +131,20 @@ func NewClusterDrivenDaemon( return nil, err } - cb, err := common.NewClientBuilder(kubeconfig) - if err != nil { - glog.Fatalf("error creating clients: %v", err) - } - - dn.kubeClient = cb.KubeClientOrDie(componentName) - dn.client = cb.MachineConfigClientOrDie(componentName) + dn.kubeClient = kubeClient + dn.client = client if err = loadNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), nodeName); err != nil { return nil, err } - return dn, nil -} - -// StartInformer initializes and starts the informers. -func (dn *Daemon) StartInformer(stopCh chan (struct{}), nodeName, componentName, kubeconfig string) error { - cb, err := common.NewClientBuilder(kubeconfig) - if err != nil { - return err - } - ctx := common.CreateControllerContext(cb, stopCh, componentName) - - nodeInformer := ctx.KubeInformerFactory.Core().V1().Nodes() nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: dn.handleNodeUpdate, }) - dn.nodeLister = nodeInformer.Lister() dn.nodeListerSynced = nodeInformer.Informer().HasSynced - ctx.KubeInformerFactory.Start(ctx.Stop) - close(ctx.KubeInformersStarted) - - return nil + return dn, nil } // Run finishes informer setup and then blocks, and the informer will be