Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/machine-config-operator/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
ctrlctx.ClientBuilder.APIExtClientOrDie(componentName),
ctrlctx.ClientBuilder.ConfigClientOrDie(componentName),
ctrlctx.OpenShiftKubeAPIServerKubeNamespacedInformerFactory.Core().V1().ConfigMaps(),
ctrlctx.KubeInformerFactory.Core().V1().Nodes(),
)

ctrlctx.NamespacedInformerFactory.Start(ctrlctx.Stop)
Expand Down
7 changes: 7 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Operator struct {
clusterCmLister corelisterv1.ConfigMapLister
proxyLister configlistersv1.ProxyLister
oseKubeAPILister corelisterv1.ConfigMapLister
nodeLister corelisterv1.NodeLister
dnsLister configlistersv1.DNSLister

crdListerSynced cache.InformerSynced
Expand All @@ -98,6 +99,7 @@ type Operator struct {
clusterRoleBindingInformerSynced cache.InformerSynced
proxyListerSynced cache.InformerSynced
oseKubeAPIListerSynced cache.InformerSynced
nodeListerSynced cache.InformerSynced
dnsListerSynced cache.InformerSynced

// queue only ever has one item, but it has nice error handling backoff/retry semantics
Expand Down Expand Up @@ -131,6 +133,7 @@ func New(
apiExtClient apiextclientset.Interface,
configClient configclientset.Interface,
oseKubeAPIInformer coreinformersv1.ConfigMapInformer,
nodeInformer coreinformersv1.NodeInformer,
) *Operator {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
Expand Down Expand Up @@ -163,6 +166,7 @@ func New(
mcpInformer.Informer(),
proxyInformer.Informer(),
oseKubeAPIInformer.Informer(),
nodeInformer.Informer(),
dnsInformer.Informer(),
} {
i.AddEventHandler(optr.eventHandler())
Expand All @@ -182,6 +186,8 @@ func New(
optr.proxyListerSynced = proxyInformer.Informer().HasSynced
optr.oseKubeAPILister = oseKubeAPIInformer.Lister()
optr.oseKubeAPIListerSynced = oseKubeAPIInformer.Informer().HasSynced
optr.nodeLister = nodeInformer.Lister()
optr.nodeListerSynced = nodeInformer.Informer().HasSynced

optr.serviceAccountInformerSynced = serviceAccountInfomer.Informer().HasSynced
optr.clusterRoleInformerSynced = clusterRoleInformer.Informer().HasSynced
Expand Down Expand Up @@ -235,6 +241,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {
optr.networkListerSynced,
optr.proxyListerSynced,
optr.oseKubeAPIListerSynced,
optr.nodeListerSynced,
optr.mcpListerSynced,
optr.mcListerSynced,
optr.dnsListerSynced) {
Expand Down
78 changes: 78 additions & 0 deletions pkg/operator/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/util/errors"

"github.com/golang/glog"
configv1 "github.com/openshift/api/config/v1"
Expand Down Expand Up @@ -289,9 +293,83 @@ func (optr *Operator) syncUpgradeableStatus() error {
coStatus.Message = "One or more machine config pools are updating, please see `oc get mcp` for further details"
}

// don't overwrite status if we didn't run
if ran, err := optr.isKubeletSkewTooFar(); ran && err != nil {
if coStatus.Status != configv1.ConditionFalse {
coStatus.Status = configv1.ConditionFalse
Copy link
Copy Markdown

@nstielau nstielau Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still love a coStatus.DocumentationLink type field here for these use-cases, but I knw that's a bigger change ;)

coStatus.Reason = "KubeletSkewTooFar"
coStatus.Message = err.Error()
} else {
coStatus.Reason = coStatus.Reason + "::" + "KubeletSkewTooFar"
coStatus.Message = coStatus.Message + "\n" + err.Error()
}
}

return optr.updateStatus(co, coStatus)
}

func (optr *Operator) isKubeletSkewTooFar() (bool, error) {
nodes, err := optr.nodeLister.List(labels.Everything())
if err != nil {
return true, err
}
kubeAPIServerStatus, err := optr.configClient.ConfigV1().ClusterOperators().Get(context.TODO(), "kube-apiserver", metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return false, err
}
if err != nil {
return true, err
}

// looks like
// - name: kube-apiserver
// version: 1.21.0-rc.0
kubeAPIServerVersion := ""
for _, version := range kubeAPIServerStatus.Status.Versions {
if version.Name != "kube-apiserver" {
continue
}
kubeAPIServerVersion = version.Version
break
}
if len(kubeAPIServerVersion) == 0 {
return false, fmt.Errorf("kube-apiserver does not yet have a version")
}
kubeAPIServerMinorVersion, err := getMinorVersion(kubeAPIServerVersion)
if err != nil {
return true, err
}

skewedNodeErrors := []error{}
for _, node := range nodes {
// looks like kubeletVersion: v1.21.0-rc.0+6143dea
kubeletVersion := node.Status.NodeInfo.KubeletVersion
if len(kubeletVersion) == 0 {
continue
}
nodeMinorVersion, err := getMinorVersion(kubeletVersion)
if err != nil {
skewedNodeErrors = append(skewedNodeErrors, fmt.Errorf("node/%v has malformed version %q: %v", node.Name, kubeletVersion, err))
}
if nodeMinorVersion+2 <= kubeAPIServerMinorVersion {
skewedNodeErrors = append(skewedNodeErrors, fmt.Errorf("node/%v must be updated or removed before the cluster can upgrade, current version %v", node.Name, kubeletVersion))
}
}
return true, errors.NewAggregate(skewedNodeErrors)
}

func getMinorVersion(version string) (int, error) {
if strings.HasPrefix(version, "v") {
version = version[1:]
}
tokens := strings.Split(version, ".")
minorVersion, err := strconv.ParseInt(tokens[1], 10, 32)
if err != nil {
return 0, err
}
return int(minorVersion), nil
}

func (optr *Operator) fetchClusterOperator() (*configv1.ClusterOperator, error) {
co, err := optr.configClient.ConfigV1().ClusterOperators().Get(context.TODO(), optr.name, metav1.GetOptions{})
if meta.IsNoMatchError(err) {
Expand Down
91 changes: 91 additions & 0 deletions pkg/operator/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"reflect"
"testing"

corelisterv1 "k8s.io/client-go/listers/core/v1"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -530,6 +534,10 @@ func TestOperatorSyncStatus(t *testing.T) {
}
optr.vStore = newVersionStore()
optr.mcpLister = &mockMCPLister{}

nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
optr.nodeLister = corelisterv1.NewNodeLister(nodeIndexer)

coName := fmt.Sprintf("test-%s", uuid.NewUUID())
co := &configv1.ClusterOperator{ObjectMeta: metav1.ObjectMeta{Name: coName}}
cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse})
Expand Down Expand Up @@ -577,6 +585,8 @@ func TestInClusterBringUpStayOnErr(t *testing.T) {
optr.vStore = newVersionStore()
optr.vStore.Set("operator", "test-version")
optr.mcpLister = &mockMCPLister{}
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
optr.nodeLister = corelisterv1.NewNodeLister(nodeIndexer)
co := &configv1.ClusterOperator{}
cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse})
cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse})
Expand All @@ -600,3 +610,84 @@ func TestInClusterBringUpStayOnErr(t *testing.T) {

assert.False(t, optr.inClusterBringup)
}

func TestKubeletSkewUpgradeable(t *testing.T) {
kasOperator := &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{Name: "kube-apiserver"},
Status: configv1.ClusterOperatorStatus{
Versions: []configv1.OperandVersion{
{Name: "kube-apiserver", Version: "1.20"},
},
},
}
optr := &Operator{
eventRecorder: &record.FakeRecorder{},
}
optr.vStore = newVersionStore()
optr.vStore.Set("operator", "test-version")
optr.mcpLister = &mockMCPLister{}
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
optr.nodeLister = corelisterv1.NewNodeLister(nodeIndexer)
nodeIndexer.Add(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "first-node"},
Status: corev1.NodeStatus{
NodeInfo: corev1.NodeSystemInfo{
KubeletVersion: "v1.18",
},
},
})

co := &configv1.ClusterOperator{}
cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse})
cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse})
cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorDegraded, Status: configv1.ConditionFalse})
fakeClient := fakeconfigclientset.NewSimpleClientset(co, kasOperator)
optr.configClient = fakeClient
optr.inClusterBringup = true

fn1 := func(config *renderConfig) error {
return errors.New("mocked fn1")
}
err := optr.syncAll([]syncFunc{{name: "mock1", fn: fn1}})
assert.NotNil(t, err, "expected syncAll to fail")

assert.True(t, optr.inClusterBringup)

fn1 = func(config *renderConfig) error {
return nil
}
err = optr.syncAll([]syncFunc{{name: "mock1", fn: fn1}})
assert.Nil(t, err, "expected syncAll to pass")

assert.False(t, optr.inClusterBringup)

var lastUpdate clientgotesting.UpdateAction
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
lastUpdate = action.(clientgotesting.UpdateAction)
}
}
if lastUpdate == nil {
t.Fatal("missing update")
}
operatorStatus := lastUpdate.GetObject().(*configv1.ClusterOperator)
var upgradeable *configv1.ClusterOperatorStatusCondition
for _, condition := range operatorStatus.Status.Conditions {
if condition.Type == configv1.OperatorUpgradeable {
upgradeable = &condition
break
}
}
if upgradeable == nil {
t.Fatal("missing condition")
}
if upgradeable.Status != configv1.ConditionFalse {
t.Fatal(upgradeable)
}
if upgradeable.Message != "node/first-node must be updated or removed before the cluster can upgrade, current version v1.18" {
t.Fatal(upgradeable)
}
if upgradeable.Reason != "KubeletSkewTooFar" {
t.Fatal(upgradeable)
}
}