diff --git a/cmd/machine-config-daemon/start.go b/cmd/machine-config-daemon/start.go index e98a58efdb..c8366d8f12 100644 --- a/cmd/machine-config-daemon/start.go +++ b/cmd/machine-config-daemon/start.go @@ -24,6 +24,7 @@ var ( kubeconfig string nodeName string rootMount string + onceFrom string } ) @@ -32,12 +33,15 @@ func init() { startCmd.PersistentFlags().StringVar(&startOpts.kubeconfig, "kubeconfig", "", "Kubeconfig file to access a remote cluster (testing only)") startCmd.PersistentFlags().StringVar(&startOpts.nodeName, "node-name", "", "kubernetes node name daemon is managing.") startCmd.PersistentFlags().StringVar(&startOpts.rootMount, "root-mount", "/rootfs", "where the nodes root filesystem is mounted for chroot and file manipulation.") + startCmd.PersistentFlags().StringVar(&startOpts.onceFrom, "once-from", "", "Runs the daemon once using a provided file path or URL endpoint as its machine config source") } func runStartCmd(cmd *cobra.Command, args []string) { flag.Set("logtostderr", "true") flag.Parse() + glog.V(2).Infof("Options parsed: %+v", startOpts) + // To help debugging, immediately log version glog.Infof("Version: %+v", version.Version) @@ -54,11 +58,6 @@ func runStartCmd(cmd *cobra.Command, args []string) { startOpts.nodeName = name } - cb, err := common.NewClientBuilder(startOpts.kubeconfig) - if err != nil { - glog.Fatalf("error creating clients: %v", err) - } - // Ensure that the rootMount exists if _, err := os.Stat(startOpts.rootMount); err != nil { if os.IsNotExist(err) { @@ -69,22 +68,46 @@ func runStartCmd(cmd *cobra.Command, args []string) { stopCh := make(chan struct{}) defer close(stopCh) - - ctx := common.CreateControllerContext(cb, stopCh, componentName) - // create the daemon instance. this also initializes kube client items - // which need to come from the container and not the chroot. - daemon, err := daemon.New( - startOpts.rootMount, - startOpts.nodeName, - operatingSystem, - daemon.NewNodeUpdaterClient(), - cb.MachineConfigClientOrDie(componentName), - cb.KubeClientOrDie(componentName), - daemon.NewFileSystemClient(), - ctx.KubeInformerFactory.Core().V1().Nodes(), - ) - if err != nil { - glog.Fatalf("failed to initialize daemon: %v", err) + var dn *daemon.Daemon + var ctx *common.ControllerContext + + // If we are asked to run once and it's a valid file system path use + // the bare Daemon + if startOpts.onceFrom != "" && daemon.ValidPath(startOpts.onceFrom) { + dn, err = daemon.New( + startOpts.rootMount, + startOpts.nodeName, + operatingSystem, + daemon.NewNodeUpdaterClient(), + daemon.NewFileSystemClient(), + startOpts.onceFrom, + ) + if err != nil { + glog.Fatalf("failed to initialize single run daemon: %v", err) + } + // Else we use the cluster driven daemon + } else { + cb, err := common.NewClientBuilder(startOpts.kubeconfig) + if err != nil { + glog.Fatalf("failed to initialize daemon: %v", err) + } + ctx = common.CreateControllerContext(cb, stopCh, componentName) + // create the daemon instance. this also initializes kube client items + // which need to come from the container and not the chroot. + dn, err = daemon.NewClusterDrivenDaemon( + startOpts.rootMount, + startOpts.nodeName, + operatingSystem, + daemon.NewNodeUpdaterClient(), + cb.MachineConfigClientOrDie(componentName), + cb.KubeClientOrDie(componentName), + daemon.NewFileSystemClient(), + startOpts.onceFrom, + ctx.KubeInformerFactory.Core().V1().Nodes(), + ) + if err != nil { + glog.Fatalf("failed to initialize daemon: %v", err) + } } glog.Infof(`Calling chroot("%s")`, startOpts.rootMount) @@ -97,18 +120,19 @@ func runStartCmd(cmd *cobra.Command, args []string) { glog.Fatalf("unable to change directory to /: %s", err) } - glog.Info("Starting MachineConfigDaemon") - defer glog.Info("Shutting down MachineConfigDaemon") - - err = daemon.CheckStateOnBoot(stopCh) - if err != nil { - glog.Fatalf("error checking initial state of node: %v", err) + if startOpts.onceFrom == "" { + err = dn.CheckStateOnBoot(stopCh) + if err != nil { + glog.Fatalf("error checking initial state of node: %v", err) + } + ctx.KubeInformerFactory.Start(ctx.Stop) + close(ctx.KubeInformersStarted) } - ctx.KubeInformerFactory.Start(ctx.Stop) - close(ctx.KubeInformersStarted) + glog.Info("Starting MachineConfigDaemon") + defer glog.Info("Shutting down MachineConfigDaemon") - err = daemon.Run(stopCh) + err = dn.Run(stopCh) if err != nil { glog.Fatalf("failed to run: %v", err) } diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index a96b814502..8733a8056b 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -3,24 +3,26 @@ package daemon import ( "fmt" "io/ioutil" + "net/http" + "os" "path/filepath" "strings" - "os" "github.com/coreos/go-systemd/login1" ignv2_2types "github.com/coreos/ignition/config/v2_2/types" "github.com/golang/glog" drain "github.com/openshift/kubernetes-drain" + "github.com/openshift/machine-config-operator/lib/resourceread" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" mcfgclientv1 "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned/typed/machineconfiguration.openshift.io/v1" "github.com/vincent-petithory/dataurl" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" coreinformersv1 "k8s.io/client-go/informers/core/v1" - "k8s.io/client-go/tools/cache" + "k8s.io/client-go/kubernetes" corelisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" ) // Daemon is the dispatch point for the functions of the agent on the @@ -56,6 +58,8 @@ type Daemon struct { nodeLister corelisterv1.NodeLister nodeListerSynced cache.InformerSynced + // onceFrom defines where the source config is to run the daemon once and exit + onceFrom string } const ( @@ -67,31 +71,25 @@ const ( pathDevNull = "/dev/null" ) -// New sets up the systemd and kubernetes connections needed to update the -// machine. +// New sets up the bare minimum local connections required to create a Daemon instance func New( rootMount string, nodeName string, operatingSystem string, nodeUpdaterClient NodeUpdaterClient, - client mcfgclientset.Interface, - kubeClient kubernetes.Interface, fileSystemClient FileSystemClient, - nodeInformer coreinformersv1.NodeInformer, + onceFrom string, ) (*Daemon, error) { loginClient, err := login1.New() if err != nil { return nil, fmt.Errorf("Error establishing connection to logind dbus: %v", err) } - if err = loadNodeAnnotations(kubeClient.CoreV1().Nodes(), nodeName); err != nil { - return nil, err - } - osImageURL, osVersion, err := nodeUpdaterClient.GetBootedOSImageURL(rootMount) if err != nil { return nil, fmt.Errorf("Error reading osImageURL from rpm-ostree: %v", err) } + glog.Infof("Booted osImageURL: %s (%s)", osImageURL, osVersion) dn := &Daemon{ @@ -99,17 +97,50 @@ func New( OperatingSystem: operatingSystem, NodeUpdaterClient: nodeUpdaterClient, loginClient: loginClient, - client: client, - kubeClient: kubeClient, rootMount: rootMount, fileSystemClient: fileSystemClient, bootedOSImageURL: osImageURL, + onceFrom: onceFrom, + } + + return dn, nil +} + +// NewClusterDrivenDaemon sets up the systemd and kubernetes connections needed to update the +// machine. +func NewClusterDrivenDaemon( + rootMount string, + nodeName string, + operatingSystem string, + nodeUpdaterClient NodeUpdaterClient, + client mcfgclientset.Interface, + kubeClient kubernetes.Interface, + fileSystemClient FileSystemClient, + onceFrom string, + nodeInformer coreinformersv1.NodeInformer, +) (*Daemon, error) { + dn, err := New( + rootMount, + nodeName, + operatingSystem, + nodeUpdaterClient, + fileSystemClient, + onceFrom) + + if err != nil { + return nil, err + } + + dn.kubeClient = kubeClient + dn.client = client + + if err = loadNodeAnnotations(dn.kubeClient.CoreV1().Nodes(), nodeName); err != nil { + return nil, err } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: dn.handleNodeUpdate, }) - dn.nodeLister = nodeInformer.Lister() dn.nodeListerSynced = nodeInformer.Informer().HasSynced @@ -120,6 +151,12 @@ func New( // responsible for triggering callbacks to handle updates. Successful // updates shouldn't return, and should just reboot the node. func (dn *Daemon) Run(stop <-chan struct{}) error { + // Catch quickly if we've been asked to run once. + if dn.onceFrom != "" { + glog.V(2).Info("Running once per request") + return dn.runOnce() + } + if !cache.WaitForCacheSync(stop, dn.nodeListerSynced) { glog.Error("Marking degraded due to: failure to sync caches") return setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) @@ -177,38 +214,123 @@ func (dn *Daemon) CheckStateOnBoot(stop <-chan struct{}) error { return nil } -// handleNodeUpdate is the handler for informer callbacks detecting node -// changes. If an update is requested by the controller, we assume that -// that means something changed and apply the desired config no matter what. +// runOnce pulls the MachineConfig from either a file or remote URL +// executes one run, and exits. +// TODO: Revisit the Run/process methods and refactor and unify them with runOnce +func (dn *Daemon) runOnce() error { + var machineConfig *mcfgv1.MachineConfig + var oldConfig mcfgv1.MachineConfig + var err error + + if strings.HasPrefix("http://", dn.onceFrom) || strings.HasPrefix("https://", dn.onceFrom) { + // NOTE: This case expects a cluster to exists already. + // If we sense a remote URL has been provided then request MC content + // from a remote URL and parse it. + glog.V(2).Infof("Getting machine config content from %s", dn.onceFrom) + machineConfig, err = dn.getMachineConfigFromURL(dn.onceFrom) + if err != nil { + return err + } + needUpdate, err := dn.prepUpdateFromCluster() + if err != nil { + glog.V(2).Infof("Unable to prep update: %s", err) + return err + } else if needUpdate == false { + glog.V(2).Infof("No update needed") + return nil + } + // At this point we have verified we need to update + if err := dn.executeUpdateFromClusterWithMachineConfig(machineConfig); err != nil { + glog.Warningf("Unable to update: %s", err) + return err + } + return nil + + } else if ValidPath(dn.onceFrom) { + // NOTE: This case expects that the cluster is NOT CREATED YET. + // If we sense a local file has been provided parse it. + oldConfig = mcfgv1.MachineConfig{} + absoluteOnceFrom, err := filepath.Abs(filepath.Clean(dn.onceFrom)) + if err != nil { + return err + } + machineConfig, err = dn.getMachineConfigFromFile(absoluteOnceFrom) + if err != nil { + return err + } + // Execute update without hitting the cluster + return dn.update(&oldConfig, machineConfig) + } + // Otherwise return an error as the input format is unsupported + return fmt.Errorf("%s is not a path nor url; can not run once", dn.onceFrom) +} + +// handleNodeUpdate is the gatekeeper handler for informer callbacks detecting +// node changes. If an update is requested by the controller, we assume that +// that means something changed and pass over to execution methods no matter what. // Also note that we only care about node updates, not creation or deletion. func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { node := cur.(*corev1.Node) // First check if the node that was updated is this daemon's node - if (node.Name != dn.name) { - // The node that was changed was not ours - return + if node.Name == dn.name { + // Pass to the shared update prep method + needUpdate, err := dn.prepUpdateFromCluster() + if err != nil { + // On prepUpdateFromCluster error the node should already be marked degraded + glog.V(2).Infof("Unable to prep update: %s", err) + dn.reboot() + return + } + // Only executeUpdateFromCluster when we need to update + if needUpdate { + // Note that if executeUpdateFromCluster errors it will mark the node + // degraded and reboot. + if err = dn.executeUpdateFromCluster(); err != nil { + glog.V(2).Infof("Unable to execute update: %s", err) + return + } + } } + // The node that was changed was not ours, return out + return +} +// prepUpdateFromCluster handles the shared update prepping functionality for +// flows that expect the cluster to already be available. +func (dn *Daemon) prepUpdateFromCluster() (bool, error) { // Then check we're not already in a degraded state. if state, err := getNodeAnnotation(dn.kubeClient.CoreV1().Nodes(), dn.name, MachineConfigDaemonStateAnnotationKey); err != nil { // try to set to degraded... because we failed to check if we're degraded glog.Errorf("Marking degraded due to: %v", err) setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name) - return + return false, err } else if state == MachineConfigDaemonStateDegraded { // Just return since we want to continue sleeping - return + return false, fmt.Errorf("state is already degraded") + } + + // Grab the node instance + node, err := dn.kubeClient.CoreV1().Nodes().Get(dn.name, metav1.GetOptions{}) + if err != nil { + return false, err } // Detect if there is an update - if (node.Annotations[DesiredMachineConfigAnnotationKey] == node.Annotations[CurrentMachineConfigAnnotationKey]) { + if node.Annotations[DesiredMachineConfigAnnotationKey] == node.Annotations[CurrentMachineConfigAnnotationKey] { // No actual update to the config - return + glog.V(2).Info("No updating is required") + return false, nil } + return true, nil +} +// executeUpdateFromClusterWithMachineConfig starts the actual update process. The provided config +// will be used as the desired config, while the current config will be pulled from the cluster. If +// you want both pulled from the cluster please use executeUpdateFromCluster(). +func (dn *Daemon) executeUpdateFromClusterWithMachineConfig(desiredConfig *mcfgv1.MachineConfig) error { // The desired machine config has changed, trigger update - if err := dn.triggerUpdate(); err != nil { + if err := dn.triggerUpdateWithMachineConfig(desiredConfig); err != nil { glog.Errorf("Marking degraded due to: %v", err) if errSet := setUpdateDegraded(dn.kubeClient.CoreV1().Nodes(), dn.name); errSet != nil { glog.Errorf("Futher error attempting to set the node to degraded: %v", errSet) @@ -220,6 +342,12 @@ func (dn *Daemon) handleNodeUpdate(old, cur interface{}) { // we managed to update the machine without rebooting. in this case, // continue as usual waiting for the next update glog.V(2).Infof("Successfully updated without reboot") + return nil +} + +// executeUpdateFromCluster starts the actual update process using configs from the cluster. +func (dn *Daemon) executeUpdateFromCluster() error { + return dn.executeUpdateFromClusterWithMachineConfig(nil) } // completeUpdate does all the stuff required to finish an update. right now, it @@ -241,8 +369,9 @@ func (dn *Daemon) completeUpdate(dcAnnotation string) error { return nil } -// triggerUpdate starts the update using the current and the target config. -func (dn *Daemon) triggerUpdate() error { +// triggerUpdateWithMachineConfig starts the update using the desired config and queries the cluster for +// the current config. If all configs should be pulled from the cluster use triggerUpdate(). +func (dn *Daemon) triggerUpdateWithMachineConfig(desiredConfig *mcfgv1.MachineConfig) error { if err := setUpdateWorking(dn.kubeClient.CoreV1().Nodes(), dn.name); err != nil { return err } @@ -259,15 +388,21 @@ func (dn *Daemon) triggerUpdate() error { if err != nil { return err } - desiredConfig, err := getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), dcAnnotation) - if err != nil { - return err + if desiredConfig == nil { + desiredConfig, err = getMachineConfig(dn.client.MachineconfigurationV1().MachineConfigs(), dcAnnotation) + if err != nil { + return err + } } - // run the update process. this function doesn't currently return. return dn.update(currentConfig, desiredConfig) } +// triggerUpdate starts the update using the current and the target config. +func (dn *Daemon) triggerUpdate() error { + return dn.triggerUpdateWithMachineConfig(nil) +} + // isDesiredMachineState confirms that the node is actually in the state that it // wants to be in. It does this by looking at the elements in the target config // and checks if all are present on the node. Returns true iff there are no @@ -416,6 +551,50 @@ func (dn *Daemon) Close() { dn.loginClient.Close() } +// getMachineConfigFromFile parses a valid machine config file in yaml format and returns +// a MachineConfig struct. +func (dn *Daemon) getMachineConfigFromFile(filePath string) (*mcfgv1.MachineConfig, error) { + data, err := dn.fileSystemClient.ReadFile(filePath) + if err != nil { + return nil, err + } + config := resourceread.ReadMachineConfigV1OrDie(data) + return config, nil +} + +// getMachineConfigFromURL reads a remote MC in yaml format and returns a MachineConfig struct. +func (dn *Daemon) getMachineConfigFromURL(url string) (*mcfgv1.MachineConfig, error) { + // Make a request to the remote URL + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Read the body content from the request + body, err := dn.fileSystemClient.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal the body into the machineConfig + config := resourceread.ReadMachineConfigV1OrDie(body) + + return config, nil +} + func getMachineConfig(client mcfgclientv1.MachineConfigInterface, name string) (*mcfgv1.MachineConfig, error) { return client.Get(name, metav1.GetOptions{}) } + +// ValidPath attempts to see if the path provided is indeed an acceptable +// filesystem path. This function does not check if the path exists. +func ValidPath(path string) bool { + path = filepath.Clean(path) + for _, validStart := range []string{".", "..", "/"} { + if strings.HasPrefix(path, validStart) { + return true + } + } + return false +} diff --git a/pkg/daemon/fsclient.go b/pkg/daemon/fsclient.go index 973e4512a7..fae3c6160d 100644 --- a/pkg/daemon/fsclient.go +++ b/pkg/daemon/fsclient.go @@ -1,6 +1,7 @@ package daemon import ( + "io" "io/ioutil" "os" ) @@ -16,6 +17,7 @@ type FileSystemClient interface { Chmod(string, os.FileMode) error Chown(string, int, int) error WriteFile(filename string, data []byte, perm os.FileMode) error + ReadAll(reader io.Reader) ([]byte, error) ReadFile(filename string) ([]byte, error) } @@ -72,6 +74,11 @@ func (f FsClient) ReadFile(filename string) ([]byte, error) { return ioutil.ReadFile(filename) } +// ReadAll implements ioutil.ReadAll +func (f FsClient) ReadAll(reader io.Reader) ([]byte, error) { + return ioutil.ReadAll(reader) +} + // NewFileSystemClient creates a new file system client using the default // implementations provided by the os package. func NewFileSystemClient() FileSystemClient { diff --git a/pkg/daemon/fsclient_test.go b/pkg/daemon/fsclient_test.go index 09c3e1a88c..c61ce80fce 100644 --- a/pkg/daemon/fsclient_test.go +++ b/pkg/daemon/fsclient_test.go @@ -1,6 +1,9 @@ package daemon -import "os" +import ( + "io" + "os" +) // CreateReturn is a structure used for testing. It holds a single return value // set for a mocked Create call. @@ -34,6 +37,7 @@ type FsClientMock struct { ChmodReturns []error ChownReturns []error WriteFileReturns []error + ReadAllReturns []ReadFileReturn ReadFileReturns []ReadFileReturn } @@ -101,6 +105,15 @@ func (f FsClientMock) WriteFile(filename string, data []byte, perm os.FileMode) return updateErrorReturns(&f.WriteFileReturns) } +// ReadAll provides a mocked implemention +func (f FsClientMock) ReadAll(reader io.Reader) ([]byte, error) { + returnValues := f.ReadAllReturns[0] + if len(f.ReadAllReturns) > 1 { + f.ReadAllReturns = f.ReadAllReturns[1:] + } + return returnValues.Bytes, returnValues.Error +} + // ReadFile provides a mocked implemention func (f FsClientMock) ReadFile(filename string) ([]byte, error) { returnValues := f.ReadFileReturns[0] diff --git a/pkg/daemon/update.go b/pkg/daemon/update.go index d37e69cb4b..0fc89e1fc4 100644 --- a/pkg/daemon/update.go +++ b/pkg/daemon/update.go @@ -48,24 +48,28 @@ func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) error { return err } - glog.Info("Update completed. Draining the node.") + // TODO: Change the logic to be clearer + // We need to skip draining of the node when we are running once + // and there is no cluster. + if dn.onceFrom != "" && !ValidPath(dn.onceFrom) { + glog.Info("Update completed. Draining the node.") - node, err := dn.kubeClient.CoreV1().Nodes().Get(dn.name, metav1.GetOptions{}) - if err != nil { - return err - } + node, err := dn.kubeClient.CoreV1().Nodes().Get(dn.name, metav1.GetOptions{}) + if err != nil { + return err + } - err = drain.Drain(dn.kubeClient, []*corev1.Node{node}, &drain.DrainOptions{ - DeleteLocalData: true, - Force: true, - GracePeriodSeconds: 600, - IgnoreDaemonsets: true, - }) - if err != nil { - return err + err = drain.Drain(dn.kubeClient, []*corev1.Node{node}, &drain.DrainOptions{ + DeleteLocalData: true, + Force: true, + GracePeriodSeconds: 600, + IgnoreDaemonsets: true, + }) + if err != nil { + return err + } + glog.V(2).Infof("Node successfully drained") } - glog.V(2).Infof("Node successfully drained") - // reboot. this function shouldn't actually return. return dn.reboot() } @@ -80,6 +84,12 @@ func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) error { func (dn *Daemon) reconcilable(oldConfig, newConfig *mcfgv1.MachineConfig) (bool, error) { glog.Info("Checking if configs are reconcilable") + // We skip out of reconcilable if there is no Kind and we are in runOnce mode. The + // reason is that there is a good chance a previous state is not available to match against. + if oldConfig.Kind == "" && dn.onceFrom != "" { + glog.Infof("Missing kind in old config. Assuming reconcilable with new.") + return true, nil + } oldIgn := oldConfig.Spec.Config newIgn := newConfig.Spec.Config