diff --git a/cmd/machine-config-operator/start.go b/cmd/machine-config-operator/start.go index e467791792..25bd0bc766 100644 --- a/cmd/machine-config-operator/start.go +++ b/cmd/machine-config-operator/start.go @@ -75,6 +75,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { ctrlctx.ClientBuilder.APIExtClientOrDie(componentName), ctrlctx.ClientBuilder.ConfigClientOrDie(componentName), ctrlctx.OpenShiftKubeAPIServerKubeNamespacedInformerFactory.Core().V1().ConfigMaps(), + ctrlctx.KubeInformerFactory.Core().V1().Nodes(), ) ctrlctx.NamespacedInformerFactory.Start(ctrlctx.Stop) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index ca8c4c9335..a72846d939 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -81,6 +81,7 @@ type Operator struct { clusterCmLister corelisterv1.ConfigMapLister proxyLister configlistersv1.ProxyLister oseKubeAPILister corelisterv1.ConfigMapLister + nodeLister corelisterv1.NodeLister dnsLister configlistersv1.DNSLister crdListerSynced cache.InformerSynced @@ -98,6 +99,7 @@ type Operator struct { clusterRoleBindingInformerSynced cache.InformerSynced proxyListerSynced cache.InformerSynced oseKubeAPIListerSynced cache.InformerSynced + nodeListerSynced cache.InformerSynced dnsListerSynced cache.InformerSynced // queue only ever has one item, but it has nice error handling backoff/retry semantics @@ -131,6 +133,7 @@ func New( apiExtClient apiextclientset.Interface, configClient configclientset.Interface, oseKubeAPIInformer coreinformersv1.ConfigMapInformer, + nodeInformer coreinformersv1.NodeInformer, ) *Operator { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) @@ -163,6 +166,7 @@ func New( mcpInformer.Informer(), proxyInformer.Informer(), oseKubeAPIInformer.Informer(), + nodeInformer.Informer(), dnsInformer.Informer(), } { i.AddEventHandler(optr.eventHandler()) @@ -182,6 +186,8 @@ func New( optr.proxyListerSynced = proxyInformer.Informer().HasSynced optr.oseKubeAPILister = oseKubeAPIInformer.Lister() optr.oseKubeAPIListerSynced = oseKubeAPIInformer.Informer().HasSynced + optr.nodeLister = nodeInformer.Lister() + optr.nodeListerSynced = nodeInformer.Informer().HasSynced optr.serviceAccountInformerSynced = serviceAccountInfomer.Informer().HasSynced optr.clusterRoleInformerSynced = clusterRoleInformer.Informer().HasSynced @@ -235,6 +241,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) { optr.networkListerSynced, optr.proxyListerSynced, optr.oseKubeAPIListerSynced, + optr.nodeListerSynced, optr.mcpListerSynced, optr.mcListerSynced, optr.dnsListerSynced) { diff --git a/pkg/operator/status.go b/pkg/operator/status.go index 88d373d6db..40c1ee9900 100644 --- a/pkg/operator/status.go +++ b/pkg/operator/status.go @@ -4,6 +4,10 @@ import ( "context" "encoding/json" "fmt" + "strconv" + "strings" + + "k8s.io/apimachinery/pkg/util/errors" "github.com/golang/glog" configv1 "github.com/openshift/api/config/v1" @@ -289,9 +293,83 @@ func (optr *Operator) syncUpgradeableStatus() error { coStatus.Message = "One or more machine config pools are updating, please see `oc get mcp` for further details" } + // don't overwrite status if we didn't run + if ran, err := optr.isKubeletSkewTooFar(); ran && err != nil { + if coStatus.Status != configv1.ConditionFalse { + coStatus.Status = configv1.ConditionFalse + coStatus.Reason = "KubeletSkewTooFar" + coStatus.Message = err.Error() + } else { + coStatus.Reason = coStatus.Reason + "::" + "KubeletSkewTooFar" + coStatus.Message = coStatus.Message + "\n" + err.Error() + } + } + return optr.updateStatus(co, coStatus) } +func (optr *Operator) isKubeletSkewTooFar() (bool, error) { + nodes, err := optr.nodeLister.List(labels.Everything()) + if err != nil { + return true, err + } + kubeAPIServerStatus, err := optr.configClient.ConfigV1().ClusterOperators().Get(context.TODO(), "kube-apiserver", metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return false, err + } + if err != nil { + return true, err + } + + // looks like + // - name: kube-apiserver + // version: 1.21.0-rc.0 + kubeAPIServerVersion := "" + for _, version := range kubeAPIServerStatus.Status.Versions { + if version.Name != "kube-apiserver" { + continue + } + kubeAPIServerVersion = version.Version + break + } + if len(kubeAPIServerVersion) == 0 { + return false, fmt.Errorf("kube-apiserver does not yet have a version") + } + kubeAPIServerMinorVersion, err := getMinorVersion(kubeAPIServerVersion) + if err != nil { + return true, err + } + + skewedNodeErrors := []error{} + for _, node := range nodes { + // looks like kubeletVersion: v1.21.0-rc.0+6143dea + kubeletVersion := node.Status.NodeInfo.KubeletVersion + if len(kubeletVersion) == 0 { + continue + } + nodeMinorVersion, err := getMinorVersion(kubeletVersion) + if err != nil { + skewedNodeErrors = append(skewedNodeErrors, fmt.Errorf("node/%v has malformed version %q: %v", node.Name, kubeletVersion, err)) + } + if nodeMinorVersion+2 <= kubeAPIServerMinorVersion { + skewedNodeErrors = append(skewedNodeErrors, fmt.Errorf("node/%v must be updated or removed before the cluster can upgrade, current version %v", node.Name, kubeletVersion)) + } + } + return true, errors.NewAggregate(skewedNodeErrors) +} + +func getMinorVersion(version string) (int, error) { + if strings.HasPrefix(version, "v") { + version = version[1:] + } + tokens := strings.Split(version, ".") + minorVersion, err := strconv.ParseInt(tokens[1], 10, 32) + if err != nil { + return 0, err + } + return int(minorVersion), nil +} + func (optr *Operator) fetchClusterOperator() (*configv1.ClusterOperator, error) { co, err := optr.configClient.ConfigV1().ClusterOperators().Get(context.TODO(), optr.name, metav1.GetOptions{}) if meta.IsNoMatchError(err) { diff --git a/pkg/operator/status_test.go b/pkg/operator/status_test.go index 8d3608c892..13d60ffff4 100644 --- a/pkg/operator/status_test.go +++ b/pkg/operator/status_test.go @@ -7,6 +7,10 @@ import ( "reflect" "testing" + corelisterv1 "k8s.io/client-go/listers/core/v1" + clientgotesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -530,6 +534,10 @@ func TestOperatorSyncStatus(t *testing.T) { } optr.vStore = newVersionStore() optr.mcpLister = &mockMCPLister{} + + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + optr.nodeLister = corelisterv1.NewNodeLister(nodeIndexer) + coName := fmt.Sprintf("test-%s", uuid.NewUUID()) co := &configv1.ClusterOperator{ObjectMeta: metav1.ObjectMeta{Name: coName}} cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}) @@ -577,6 +585,8 @@ func TestInClusterBringUpStayOnErr(t *testing.T) { optr.vStore = newVersionStore() optr.vStore.Set("operator", "test-version") optr.mcpLister = &mockMCPLister{} + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + optr.nodeLister = corelisterv1.NewNodeLister(nodeIndexer) co := &configv1.ClusterOperator{} cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}) cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse}) @@ -600,3 +610,84 @@ func TestInClusterBringUpStayOnErr(t *testing.T) { assert.False(t, optr.inClusterBringup) } + +func TestKubeletSkewUpgradeable(t *testing.T) { + kasOperator := &configv1.ClusterOperator{ + ObjectMeta: metav1.ObjectMeta{Name: "kube-apiserver"}, + Status: configv1.ClusterOperatorStatus{ + Versions: []configv1.OperandVersion{ + {Name: "kube-apiserver", Version: "1.20"}, + }, + }, + } + optr := &Operator{ + eventRecorder: &record.FakeRecorder{}, + } + optr.vStore = newVersionStore() + optr.vStore.Set("operator", "test-version") + optr.mcpLister = &mockMCPLister{} + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + optr.nodeLister = corelisterv1.NewNodeLister(nodeIndexer) + nodeIndexer.Add(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "first-node"}, + Status: corev1.NodeStatus{ + NodeInfo: corev1.NodeSystemInfo{ + KubeletVersion: "v1.18", + }, + }, + }) + + co := &configv1.ClusterOperator{} + cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}) + cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse}) + cov1helpers.SetStatusCondition(&co.Status.Conditions, configv1.ClusterOperatorStatusCondition{Type: configv1.OperatorDegraded, Status: configv1.ConditionFalse}) + fakeClient := fakeconfigclientset.NewSimpleClientset(co, kasOperator) + optr.configClient = fakeClient + optr.inClusterBringup = true + + fn1 := func(config *renderConfig) error { + return errors.New("mocked fn1") + } + err := optr.syncAll([]syncFunc{{name: "mock1", fn: fn1}}) + assert.NotNil(t, err, "expected syncAll to fail") + + assert.True(t, optr.inClusterBringup) + + fn1 = func(config *renderConfig) error { + return nil + } + err = optr.syncAll([]syncFunc{{name: "mock1", fn: fn1}}) + assert.Nil(t, err, "expected syncAll to pass") + + assert.False(t, optr.inClusterBringup) + + var lastUpdate clientgotesting.UpdateAction + for _, action := range fakeClient.Actions() { + if action.GetVerb() == "update" { + lastUpdate = action.(clientgotesting.UpdateAction) + } + } + if lastUpdate == nil { + t.Fatal("missing update") + } + operatorStatus := lastUpdate.GetObject().(*configv1.ClusterOperator) + var upgradeable *configv1.ClusterOperatorStatusCondition + for _, condition := range operatorStatus.Status.Conditions { + if condition.Type == configv1.OperatorUpgradeable { + upgradeable = &condition + break + } + } + if upgradeable == nil { + t.Fatal("missing condition") + } + if upgradeable.Status != configv1.ConditionFalse { + t.Fatal(upgradeable) + } + if upgradeable.Message != "node/first-node must be updated or removed before the cluster can upgrade, current version v1.18" { + t.Fatal(upgradeable) + } + if upgradeable.Reason != "KubeletSkewTooFar" { + t.Fatal(upgradeable) + } +}