Skip to content
Merged
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ images.rhel7: $(imc7)

# This was copied from https://github.com/openshift/cluster-image-registry-operato
test-e2e:
GOCACHE=off go test -timeout 55m -v$${WHAT:+ -run="$$WHAT"} ./test/e2e/
GOCACHE=off go test -timeout 75m -v$${WHAT:+ -run="$$WHAT"} ./test/e2e/
5 changes: 4 additions & 1 deletion cmd/machine-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ func runStartCmd(cmd *cobra.Command, args []string) {
glog.Info("Starting MachineConfigDaemon")
defer glog.Info("Shutting down MachineConfigDaemon")

if err := dn.Run(stopCh, exitCh); err != nil {
signaled := make(chan struct{})
dn.InstallSignalHandler(signaled)

if err := dn.Run(stopCh, signaled, exitCh); err != nil {
glog.Fatalf("Failed to run: %v", err)
}
}
9 changes: 8 additions & 1 deletion pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,14 @@ func (ctrl *Controller) updateNode(old, cur interface{}) {
if pool == nil {
return
}
glog.V(4).Infof("Node %s updated", curNode.Name)

// Specifically log when a node has completed an update so the MCC logs are a useful central aggregate of state changes
if oldNode.Annotations[daemonconsts.CurrentMachineConfigAnnotationKey] != oldNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] &&
curNode.Annotations[daemonconsts.CurrentMachineConfigAnnotationKey] == curNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] {
glog.Infof("Pool %s: node %s has completed update to %s", pool.Name, curNode.Name, curNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey])
} else {
glog.V(4).Infof("Node %s updated", curNode.Name)
}
ctrl.enqueueMachineConfigPool(pool)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/node/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func calculateStatus(pool *mcfgv1.MachineConfigPool, nodes []*corev1.Node) mcfgv
return status
}

// getUpdatedMachines filters the provided nodes to ones whose current == desired
// and the "done" flag is set.
func getUpdatedMachines(currentConfig string, nodes []*corev1.Node) []*corev1.Node {
var updated []*corev1.Node
for _, node := range nodes {
Expand Down Expand Up @@ -138,6 +140,8 @@ func getUpdatedMachines(currentConfig string, nodes []*corev1.Node) []*corev1.No
return updated
}

// getReadyMachines filters the provided nodes to ones which are updated
// and marked ready.
func getReadyMachines(currentConfig string, nodes []*corev1.Node) []*corev1.Node {
updated := getUpdatedMachines(currentConfig, nodes)
var ready []*corev1.Node
Expand Down
36 changes: 31 additions & 5 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

imgref "github.com/containers/image/docker/reference"
Expand Down Expand Up @@ -88,7 +90,7 @@ type Daemon struct {
kubeletHealthzEnabled bool
kubeletHealthzEndpoint string

installedSigterm bool
updateActive bool

nodeWriter NodeWriter

Expand Down Expand Up @@ -133,7 +135,7 @@ const (
pathStateJSON = "/etc/machine-config-daemon/state.json"
// currentConfigPath is where we store the current config on disk to validate
// against annotations changes
currentConfigPath = "/var/machine-config-daemon/currentconfig"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait I'm confused, at this point aren't we chrooted into the host?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yaeh, I changed this for consistency with the other /etc paths and since /etc/machine-config-daemon is mounted in the daemonset. Will wait for tests to report and revert this if you don't want it (which I understand why)

currentConfigPath = "/etc/machine-config-daemon/currentconfig"

kubeletHealthzEndpoint = "http://localhost:10248/healthz"
kubeletHealthzPollingInterval = time.Duration(30 * time.Second)
Expand Down Expand Up @@ -266,7 +268,7 @@ func NewClusterDrivenDaemon(
eventBroadcaster.StartRecordingToSink(&clientsetcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
dn.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "machineconfigdaemon", Host: nodeName})

glog.Infof("Managing node: %s", nodeName)
dn.logSystem("Starting to manage node: %s", nodeName)

go dn.runLoginMonitor(dn.stopCh, dn.exitCh)

Expand Down Expand Up @@ -469,10 +471,32 @@ func (dn *Daemon) runOnceFrom() error {
return errors.New("unsupported onceFrom type provided")
}

// InstallSignalHandler installs the handler for the signals the daemon should act on
func (dn *Daemon) InstallSignalHandler(signaled chan struct{}) {
termChan := make(chan os.Signal, 2048)
signal.Notify(termChan, syscall.SIGTERM)

// Catch SIGTERM - if we're actively updating, we should avoid
// having the process be killed.
// https://github.com/openshift/machine-config-operator/issues/407
go func() {
for sig := range termChan {
switch sig {
case syscall.SIGTERM:
if dn.updateActive {
glog.Info("Got SIGTERM, but actively updating")
} else {
close(signaled)
}
}
}
}()
}

// Run finishes informer setup and then blocks, and the informer will be
// responsible for triggering callbacks to handle updates. Successful
// updates shouldn't return, and should just reboot the node.
func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
func (dn *Daemon) Run(stopCh, signaled <-chan struct{}, exitCh <-chan error) error {
if dn.kubeletHealthzEnabled {
glog.Info("Enabling Kubelet Healthz Monitor")
go dn.runKubeletHealthzMonitor(stopCh, dn.exitCh)
Expand All @@ -496,6 +520,8 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
select {
case <-stopCh:
return nil
case <-signaled:
return nil
case err := <-exitCh:
// This channel gets errors from auxiliary goroutines like loginmonitor and kubehealth
glog.Warningf("Got an error from auxiliary tools: %v", err)
Expand Down Expand Up @@ -976,7 +1002,7 @@ func (dn *Daemon) completeUpdate(node *corev1.Node, desiredConfigName string) er
return err
}

dn.logSystem("machine-config-daemon: completed update for config %s", desiredConfigName)
dn.logSystem("completed update for config %s", desiredConfigName)

return nil
}
Expand Down
44 changes: 14 additions & 30 deletions pkg/daemon/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ import (
"io"
"os"
"os/exec"
"os/signal"
"os/user"
"path/filepath"
"reflect"
"strconv"
"syscall"
"time"

ignv2_2types "github.com/coreos/ignition/config/v2_2/types"
Expand Down Expand Up @@ -118,7 +116,7 @@ func (dn *Daemon) updateOSAndReboot(newConfig *mcfgv1.MachineConfig) (retErr err

// Skip draining of the node when we're not cluster driven
if dn.onceFrom == "" {
glog.Info("Update prepared; draining the node")
dn.logSystem("Update prepared; beginning drain")

dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Drain", "Draining node to update config.")

Expand Down Expand Up @@ -147,35 +145,13 @@ func (dn *Daemon) updateOSAndReboot(newConfig *mcfgv1.MachineConfig) (retErr err
}
return errors.Wrap(err, "failed to drain node")
}
glog.Info("Node successfully drained")
dn.logSystem("drain complete")
}

// reboot. this function shouldn't actually return.
return dn.reboot(fmt.Sprintf("Node will reboot into config %v", newConfig.GetName()), defaultRebootTimeout, exec.Command(defaultRebootCommand))
}

// isUpdating returns true if the MCD is actively applying an update
func (dn *Daemon) catchIgnoreSIGTERM() {
if dn.installedSigterm {
return
}

termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGTERM)

dn.installedSigterm = true

// Catch SIGTERM - if we're actively updating, we should avoid
// having the process be killed.
// https://github.com/openshift/machine-config-operator/issues/407
go func() {
for {
<-termChan
glog.Info("Got SIGTERM, but actively updating")
}
}()
}

var errUnreconcilable = errors.New("unreconcilable")

// update the node to the provided node configuration.
Expand Down Expand Up @@ -218,6 +194,7 @@ func (dn *Daemon) update(oldConfig, newConfig *mcfgv1.MachineConfig) (retErr err
dn.logSystem(wrappedErr.Error())
return errors.Wrapf(errUnreconcilable, "%v", wrappedErr)
}
dn.logSystem("Starting update from %s to %s", oldConfigName, newConfigName)

// update files on disk that need updating
if err := dn.updateFiles(oldConfig, newConfig); err != nil {
Expand Down Expand Up @@ -742,6 +719,7 @@ func (dn *Daemon) logSystem(format string, a ...interface{}) {

go func() {
defer stdin.Close()
io.WriteString(stdin, "machine-config-daemon: ")
io.WriteString(stdin, message)
}()
err = logger.Run()
Expand All @@ -751,10 +729,16 @@ func (dn *Daemon) logSystem(format string, a ...interface{}) {
}
}

func (dn *Daemon) catchIgnoreSIGTERM() {
if dn.updateActive {
return
}
dn.updateActive = true
}

func (dn *Daemon) cancelSIGTERM() {
if dn.installedSigterm {
signal.Reset(syscall.SIGTERM)
dn.installedSigterm = false
if dn.updateActive {
dn.updateActive = false
}
}

Expand All @@ -766,7 +750,7 @@ func (dn *Daemon) reboot(rationale string, timeout time.Duration, rebootCmd *exe
if dn.recorder != nil {
dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Reboot", rationale)
}
dn.logSystem("machine-config-daemon initiating reboot: %s", rationale)
dn.logSystem("initiating reboot: %s", rationale)

// Now that everything is done, avoid delaying shutdown.
dn.cancelSIGTERM()
Expand Down
32 changes: 28 additions & 4 deletions test/e2e/mcd_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e_test

import (
"encoding/json"
"fmt"
"os/exec"
"strings"
Expand All @@ -11,10 +12,14 @@ import (
mcv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
"github.com/openshift/machine-config-operator/pkg/daemon/constants"
"github.com/openshift/machine-config-operator/test/e2e/framework"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
)
Expand Down Expand Up @@ -92,8 +97,10 @@ func createMCToAddFile(name, filename, data, fs string) *mcv1.MachineConfig {

func TestMCDeployed(t *testing.T) {
cs := framework.NewClientSet("")
bumpPoolMaxUnavailableTo(t, cs, 3)

for i := 0; i < 10; i++ {
// TODO: bring this back to 10
for i := 0; i < 5; i++ {
mcadd := createMCToAddFile("add-a-file", fmt.Sprintf("/etc/mytestconf%d", i), "test", "root")

// create the dummy MC now
Expand Down Expand Up @@ -121,7 +128,7 @@ func TestMCDeployed(t *testing.T) {
}

visited := make(map[string]bool)
if err := wait.Poll(2*time.Second, 10*time.Minute, func() (bool, error) {
if err := wait.Poll(2*time.Second, 30*time.Minute, func() (bool, error) {
nodes, err := getNodesByRole(cs, "worker")
if err != nil {
return false, nil
Expand All @@ -145,8 +152,24 @@ func TestMCDeployed(t *testing.T) {
}
}

func bumpPoolMaxUnavailableTo(t *testing.T, cs *framework.ClientSet, max int) {
pool, err := cs.MachineConfigPools().Get("worker", metav1.GetOptions{})
require.Nil(t, err)
old, err := json.Marshal(pool)
require.Nil(t, err)
maxUnavailable := intstr.FromInt(max)
pool.Spec.MaxUnavailable = &maxUnavailable
new, err := json.Marshal(pool)
require.Nil(t, err)
patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(old, new, old)
require.Nil(t, err)
_, err = cs.MachineConfigPools().Patch("worker", types.MergePatchType, patch)
require.Nil(t, err)
}

func TestUpdateSSH(t *testing.T) {
cs := framework.NewClientSet("")
bumpPoolMaxUnavailableTo(t, cs, 3)

// create a dummy MC with an sshKey for user Core
mcName := fmt.Sprintf("sshkeys-worker-%s", uuid.NewUUID())
Expand Down Expand Up @@ -310,6 +333,7 @@ func TestPoolDegradedOnFailToRender(t *testing.T) {

func TestReconcileAfterBadMC(t *testing.T) {
cs := framework.NewClientSet("")
bumpPoolMaxUnavailableTo(t, cs, 3)

// create a bad MC w/o a filesystem field which is going to fail reconciling
mcadd := createMCToAddFile("add-a-file", "/etc/mytestconfs", "test", "")
Expand Down Expand Up @@ -372,7 +396,7 @@ func TestReconcileAfterBadMC(t *testing.T) {
if err != nil {
return false, err
}
if mcp.Status.UnavailableMachineCount == 1 {
if mcp.Status.UnavailableMachineCount >= 1 {
return true, nil
}
return false, nil
Expand Down Expand Up @@ -400,7 +424,7 @@ func TestReconcileAfterBadMC(t *testing.T) {
}

visited := make(map[string]bool)
if err := wait.Poll(2*time.Second, 10*time.Minute, func() (bool, error) {
if err := wait.Poll(2*time.Second, 30*time.Minute, func() (bool, error) {
nodes, err := getNodesByRole(cs, "worker")
if err != nil {
return false, err
Expand Down