diff --git a/assets/core/kubelet.yaml b/assets/core/kubelet.yaml index 47f4102c1f..60298cd0d3 100644 --- a/assets/core/kubelet.yaml +++ b/assets/core/kubelet.yaml @@ -18,14 +18,14 @@ featureGates: APIPriorityAndFairness: true DownwardAPIHugePages: true PodSecurity: true - RotateKubeletServerCertificate: false # TODO + RotateKubeletServerCertificate: false kubeAPIBurst: 100 kubeAPIQPS: 50 maxPods: 250 nodeStatusReportFrequency: 5m -rotateCertificates: false # TODO +rotateCertificates: false serializeImagePulls: false -serverTLSBootstrap: false # TODO +serverTLSBootstrap: false tlsCertFile: "{{ .tlsCertFile }}" tlsCipherSuites: [{{ .tlsCipherSuites }}] tlsMinVersion: "{{ .tlsMinVersion }}" diff --git a/cmd/microshift/main.go b/cmd/microshift/main.go index e303b16c8e..bf275fb316 100644 --- a/cmd/microshift/main.go +++ b/cmd/microshift/main.go @@ -41,5 +41,6 @@ func newCommand() *cobra.Command { cmd.AddCommand(cmds.NewBackupCommand()) cmd.AddCommand(cmds.NewRestoreCommand()) cmd.AddCommand(cmds.NewHealthcheckCommand()) + cmd.AddCommand(cmds.NewAddNodeCommand()) return cmd } diff --git a/etcd/cmd/microshift-etcd/run.go b/etcd/cmd/microshift-etcd/run.go index c0a1204a19..018b129a76 100644 --- a/etcd/cmd/microshift-etcd/run.go +++ b/etcd/cmd/microshift-etcd/run.go @@ -9,6 +9,7 @@ import ( "os" "os/signal" "path/filepath" + "strings" "syscall" "time" @@ -75,20 +76,17 @@ func (s *EtcdService) configure(cfg *config.Config) { // based on https://github.com/openshift/cluster-etcd-operator/blob/master/bindata/bootkube/bootstrap-manifests/etcd-member-pod.yaml#L19 s.etcdCfg = etcd.NewConfig() s.etcdCfg.ClusterState = "new" - //s.etcdCfg.ForceNewCluster = true //TODO s.etcdCfg.Logger = "zap" s.etcdCfg.Dir = dataDir s.etcdCfg.QuotaBackendBytes = cfg.Etcd.QuotaBackendBytes - url2380 := setURL([]string{"localhost"}, "2380") - url2379 := setURL([]string{"localhost"}, "2379") - s.etcdCfg.AdvertisePeerUrls = url2380 - s.etcdCfg.ListenPeerUrls = url2380 - s.etcdCfg.AdvertiseClientUrls = url2379 - s.etcdCfg.ListenClientUrls = url2379 - s.etcdCfg.ListenMetricsUrls = setURL([]string{"localhost"}, "2381") + s.etcdCfg.AdvertisePeerUrls = setURL([]string{cfg.Node.NodeIP}, "2380") + s.etcdCfg.ListenPeerUrls = setURL([]string{"0.0.0.0"}, "2380") + s.etcdCfg.AdvertiseClientUrls = setURL([]string{cfg.Node.NodeIP}, "2379") + s.etcdCfg.ListenClientUrls = setURL([]string{"0.0.0.0"}, "2379") + s.etcdCfg.ListenMetricsUrls = setURL([]string{cfg.Node.NodeIP}, "2381") s.etcdCfg.Name = cfg.Node.HostnameOverride - s.etcdCfg.InitialCluster = fmt.Sprintf("%s=https://%s:2380", cfg.Node.HostnameOverride, "localhost") + s.etcdCfg.InitialCluster = fmt.Sprintf("%s=https://%s", cfg.Node.HostnameOverride, net.JoinHostPort(cfg.Node.NodeIP, "2380")) s.etcdCfg.TlsMinVersion = getTLSMinVersion(cfg.ApiServer.TLS.MinVersion) if cfg.ApiServer.TLS.MinVersion != string(configv1.VersionTLS13) { @@ -103,6 +101,8 @@ func (s *EtcdService) configure(cfg *config.Config) { s.etcdCfg.PeerTLSInfo.TrustedCAFile = etcdSignerCertPath s.etcdCfg.ExperimentalMaxLearners = MaxLearners + + updateConfigFromFile(s.etcdCfg, getConfigFilePath()) } func (s *EtcdService) Run() error { @@ -217,3 +217,35 @@ func checkFragmentationPercentage(ondisk, inuse int64) float64 { fragmentedPercentage := (diff / float64(ondisk)) * 100 return math.Round(fragmentedPercentage*100) / 100 } + +func getConfigFilePath() string { + return filepath.Join(config.DataDir, "etcd", "config") +} + +func updateConfigFromFile(etcdCfg *etcd.Config, configPath string) { + data, err := os.ReadFile(configPath) + if err != nil { + klog.Errorf("failed to read config file: %v", err) + return + } + lines := strings.Split(string(data), "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + eqIdx := strings.Index(line, "=") + if eqIdx == -1 { + continue + } + parts := []string{line[:eqIdx], line[eqIdx+1:]} + key := strings.TrimSpace(parts[0]) + val := strings.TrimSpace(parts[1]) + switch key { + case "ETCD_INITIAL_CLUSTER": + etcdCfg.InitialCluster = val + case "ETCD_INITIAL_CLUSTER_STATE": + etcdCfg.ClusterState = val + } + } +} diff --git a/pkg/admin/prerun/version.go b/pkg/admin/prerun/version.go index 4644948564..1682e0cd56 100644 --- a/pkg/admin/prerun/version.go +++ b/pkg/admin/prerun/version.go @@ -109,24 +109,9 @@ func getVersions() (versions, error) { // error is something else than "file does not exist", like permissions return versions{}, fmt.Errorf("failed to get version of existing MicroShift data: %w", err) } - - // Ignoring .nodename to not get false positives from mere existence of the path - dataExists, err := util.PathExistsAndIsNotEmpty(config.DataDir, ".nodename") - if err != nil { - return versions{}, err - } - - if !dataExists { - // Data directory does not exist so it's first run of MicroShift - klog.InfoS("Version file does not exist yet - assuming first run of MicroShift") - vs.data = nil // repeated for clarity - return vs, nil - } - - // Data exists but without version file, let's assume 4.13 and compare versions - klog.InfoS("MicroShift data directory exists, but doesn't contain version file" + - " - assuming 4.13.0 and proceeding with version compatibility checks") - vs.data = &versionMetadata{Major: 4, Minor: 13, Patch: 0} + // Data directory or version does not exist + klog.InfoS("Version file does not exist yet - assuming first run of MicroShift") + vs.data = nil // repeated for clarity return vs, nil } diff --git a/pkg/cmd/addnode.go b/pkg/cmd/addnode.go new file mode 100644 index 0000000000..864876e892 --- /dev/null +++ b/pkg/cmd/addnode.go @@ -0,0 +1,587 @@ +package cmd + +import ( + "context" + "crypto/x509" + "fmt" + "math/big" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + librarycrypto "github.com/openshift/library-go/pkg/crypto" + "github.com/openshift/microshift/pkg/components" + "github.com/openshift/microshift/pkg/config" + "github.com/openshift/microshift/pkg/util" + "github.com/openshift/microshift/pkg/util/cryptomaterial" + "github.com/openshift/microshift/pkg/version" + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/authentication/user" + + "go.etcd.io/etcd/client/pkg/v3/transport" + clientv3 "go.etcd.io/etcd/client/v3" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" +) + +const ( + // Default timeout for operations + joinDefaultTimeout = 10 * time.Minute +) + +type AddNodeOptions struct { + KubeconfigPath string + Timeout time.Duration + Learner bool +} + +func NewAddNodeCommand() *cobra.Command { + opts := &AddNodeOptions{ + KubeconfigPath: "", + Timeout: joinDefaultTimeout, + } + + cmd := &cobra.Command{ + Use: "add-node", + Short: "Adds a new node to an existing MicroShift cluster", + Long: `This command joins a node to an existing MicroShift cluster by: +1. Loading the MicroShift configuration for current node. +2. Fetch Certificate Authorities from the cluster using provided kubeconfig. +4. Configuring etcd cluster to add the new member. +5. Configuring kubelet to bootstrap into the cluster. +6. Restarting the MicroShift systemd unit. +7. Verifying the node is ready in the cluster.`, + RunE: func(cmd *cobra.Command, args []string) error { + return runAddNode(cmd.Context(), opts) + }, + } + + cmd.Flags().StringVar(&opts.KubeconfigPath, "kubeconfig", opts.KubeconfigPath, + "Path to kubeconfig file for connecting to the cluster") + cmd.Flags().DurationVar(&opts.Timeout, "timeout", opts.Timeout, + "Timeout for cluster join operations") + cmd.Flags().BoolVar(&opts.Learner, "learner", true, + "Join the cluster as a learner node") + + if version.Get().BuildVariant != version.BuildVariantCommunity { + cmd.Hidden = true + } + + return cmd +} + +func runAddNode(ctx context.Context, opts *AddNodeOptions) error { + ctx, cancel := context.WithTimeout(ctx, opts.Timeout) + defer cancel() + + klog.Info("Starting cluster join process...") + if opts.Learner { + klog.Info("Will add etcd node as learner") + } + + cfg, err := config.ActiveConfig() + if err != nil { + return fmt.Errorf("failed to load MicroShift configuration: %w", err) + } + klog.Info("MicroShift configuration loaded successfully") + + client, err := createKubernetesClient(opts.KubeconfigPath) + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + nodeName := cfg.CanonicalNodeName() + if isNodeAlreadyInCluster(ctx, client, nodeName) { + klog.Infof("Node %s is already part of the cluster. Skipping join process.", nodeName) + return nil + } + + if err := cleanupMicroShiftData(cfg); err != nil { + return fmt.Errorf("failed to cleanup MicroShift data directories: %w", err) + } + klog.Info("MicroShift data directories cleaned up successfully") + + for _, resource := range components.CertificateAuthorityResources { + if err := fetchCertificateAuthority(ctx, client, resource.Name, resource.Dir); err != nil { + return fmt.Errorf("failed to fetch certificate authority %s: %w", resource.Name, err) + } + } + for _, resource := range components.ServiceAccountKeyResources { + if err := fetchServiceAccountKey(ctx, client, resource.Name, resource.Dir); err != nil { + return fmt.Errorf("failed to fetch service account key %s: %w", resource.Name, err) + } + } + klog.Info("Certificate authorities fetched and written successfully") + + if err := generateEtcdCertificates(cfg); err != nil { + return fmt.Errorf("failed to generate etcd certificates: %w", err) + } + klog.Info("Etcd certificates generated successfully") + + clusterMembers, err := getClusterNodes(ctx, client) + if err != nil { + return fmt.Errorf("failed to get cluster information: %w", err) + } + + if err := configureEtcdForCluster(ctx, cfg, clusterMembers, opts.Learner); err != nil { + return fmt.Errorf("failed to configure etcd for cluster: %w", err) + } + + if err := configureBootstrapKubeconfig(cfg, opts.KubeconfigPath); err != nil { + return fmt.Errorf("failed to configure bootstrap kubeconfig: %w", err) + } + + if err := restartMicroShift(); err != nil { + return fmt.Errorf("failed to restart MicroShift service: %w", err) + } + klog.Info("MicroShift service restarted") + + if err := waitForNodeReady(ctx, client, cfg.CanonicalNodeName()); err != nil { + return fmt.Errorf("node failed to become ready: %w", err) + } + + klog.Info("Node successfully joined the cluster") + return nil +} + +func createKubernetesClient(kubeconfigPath string) (*kubernetes.Clientset, error) { + exists, err := util.PathExists(kubeconfigPath) + if err != nil { + return nil, fmt.Errorf("failed to check if kubeconfig file exists: %w", err) + } + if !exists { + return nil, fmt.Errorf("kubeconfig file %s does not exist", kubeconfigPath) + } + + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + return nil, err + } + + return kubernetes.NewForConfig(restConfig) +} + +func fetchCertificateAuthority(ctx context.Context, client kubernetes.Interface, name, dir string) error { + secret, err := client.CoreV1().Secrets("kube-system").Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get %s secret: %w", name, err) + } + + caCert, exists := secret.Data["ca.crt"] + if !exists { + return fmt.Errorf("ca.crt not found in secret") + } + + caKey, exists := secret.Data["ca.key"] + if !exists { + return fmt.Errorf("ca.key not found in secret") + } + + serial, exists := secret.Data["serial.txt"] + if !exists { + return fmt.Errorf("serial.txt not found in secret") + } + + caBundle, caBundleExists := secret.Data["ca-bundle.crt"] + + if err := os.MkdirAll(dir, 0750); err != nil { + return fmt.Errorf("failed to create destination directory: %w", err) + } + + if err := os.WriteFile(cryptomaterial.CACertPath(dir), caCert, 0600); err != nil { + return fmt.Errorf("failed to write ca.crt: %w", err) + } + + if err := os.WriteFile(cryptomaterial.CAKeyPath(dir), caKey, 0600); err != nil { + return fmt.Errorf("failed to write ca.key: %w", err) + } + + if err := os.WriteFile(cryptomaterial.CASerialsPath(dir), serial, 0600); err != nil { + return fmt.Errorf("failed to write serial.txt: %w", err) + } + + if caBundleExists { + if err := os.WriteFile(cryptomaterial.CABundlePath(dir), caBundle, 0600); err != nil { + return fmt.Errorf("failed to write ca-bundle.crt: %w", err) + } + } + + return nil +} + +func fetchServiceAccountKey(ctx context.Context, client kubernetes.Interface, name, dir string) error { + secret, err := client.CoreV1().Secrets("kube-system").Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get %s secret: %w", name, err) + } + + serviceAccountKey, exists := secret.Data["service-account.key"] + if !exists { + return fmt.Errorf("service-account.key not found in secret") + } + + serviceAccountPubKey, exists := secret.Data["service-account.pub"] + if !exists { + return fmt.Errorf("service-account.pub not found in secret") + } + + if err := os.MkdirAll(dir, 0750); err != nil { + return fmt.Errorf("failed to create destination directory: %w", err) + } + + if err := os.WriteFile(filepath.Join(dir, "service-account.key"), serviceAccountKey, 0600); err != nil { + return fmt.Errorf("failed to write service-account.key: %w", err) + } + + if err := os.WriteFile(filepath.Join(dir, "service-account.pub"), serviceAccountPubKey, 0400); err != nil { + return fmt.Errorf("failed to write service-account.pub: %w", err) + } + + return nil +} + +func generateEtcdCertificates(cfg *config.Config) error { + certsDir := cryptomaterial.CertsDirectory(config.DataDir) + etcdSignerDir := cryptomaterial.EtcdSignerDir(certsDir) + + caCertPath := cryptomaterial.CACertPath(etcdSignerDir) + caKeyPath := cryptomaterial.CAKeyPath(etcdSignerDir) + + caCert, err := os.ReadFile(caCertPath) + if err != nil { + return fmt.Errorf("failed to read CA certificate: %w", err) + } + caKey, err := os.ReadFile(caKeyPath) + if err != nil { + return fmt.Errorf("failed to read CA key: %w", err) + } + + // Create CA config from the provided cert and key + caTLSConfig, err := librarycrypto.GetTLSCertificateConfigFromBytes(caCert, caKey) + if err != nil { + return fmt.Errorf("failed to load CA certificate config: %w", err) + } + + // Create CA object for signing + caConfig := &librarycrypto.CA{ + Config: caTLSConfig, + SerialGenerator: &librarycrypto.RandomSerialGenerator{}, + } + + // Create directories for etcd certificates + servingCertDir := cryptomaterial.EtcdServingCertDir(certsDir) + if err := os.MkdirAll(servingCertDir, 0750); err != nil { + return fmt.Errorf("failed to create serving cert directory: %w", err) + } + + peerCertDir := cryptomaterial.EtcdPeerCertDir(certsDir) + if err := os.MkdirAll(peerCertDir, 0750); err != nil { + return fmt.Errorf("failed to create peer cert directory: %w", err) + } + + clientCertDir := cryptomaterial.EtcdAPIServerClientCertDir(certsDir) + if err := os.MkdirAll(clientCertDir, 0750); err != nil { + return fmt.Errorf("failed to create client cert directory: %w", err) + } + + // Prepare hostnames and IPs for etcd certificates + hostnames := []string{"localhost", cfg.Node.HostnameOverride} + ips := []net.IP{net.ParseIP("127.0.0.1")} + if cfg.Node.NodeIP != "" { + if ip := net.ParseIP(cfg.Node.NodeIP); ip != nil { + ips = append(ips, ip) + } + } + + // Generate serving certificate + servingTLS, err := caConfig.MakeServerCertForDuration( + sets.New[string](hostnames...), + cryptomaterial.ShortLivedCertificateValidity, + func(certTemplate *x509.Certificate) error { + certTemplate.Subject.CommonName = "system:etcd-server:etcd-client" + certTemplate.Subject.Organization = []string{"system:etcd-servers"} + certTemplate.ExtKeyUsage = []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth} + certTemplate.IPAddresses = ips + certTemplate.SerialNumber = big.NewInt(4) + serialNumberPath := filepath.Join(servingCertDir, "serial.txt") + if err := os.WriteFile(serialNumberPath, []byte(certTemplate.SerialNumber.String()), 0600); err != nil { + return fmt.Errorf("failed to write serial number to disk: %w", err) + } + return nil + }, + ) + if err != nil { + return fmt.Errorf("failed to generate serving certificate: %w", err) + } + + servingCertPath := cryptomaterial.PeerCertPath(servingCertDir) + servingKeyPath := cryptomaterial.PeerKeyPath(servingCertDir) + if err := servingTLS.WriteCertConfigFile(servingCertPath, servingKeyPath); err != nil { + return fmt.Errorf("failed to write serving certificate: %w", err) + } + + peerTLS, err := caConfig.MakeServerCertForDuration( + sets.New[string](hostnames...), + cryptomaterial.ShortLivedCertificateValidity, + func(certTemplate *x509.Certificate) error { + certTemplate.Subject.CommonName = "system:etcd-peer:etcd-client" + certTemplate.Subject.Organization = []string{"system:etcd-peers"} + certTemplate.ExtKeyUsage = []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth} + certTemplate.IPAddresses = ips + certTemplate.SerialNumber = big.NewInt(4) + serialNumberPath := filepath.Join(peerCertDir, "serial.txt") + if err := os.WriteFile(serialNumberPath, []byte(certTemplate.SerialNumber.String()), 0600); err != nil { + return fmt.Errorf("failed to write serial number to disk: %w", err) + } + return nil + }, + ) + if err != nil { + return fmt.Errorf("failed to generate peer certificate: %w", err) + } + + peerCertPath := cryptomaterial.PeerCertPath(peerCertDir) + peerKeyPath := cryptomaterial.PeerKeyPath(peerCertDir) + if err := peerTLS.WriteCertConfigFile(peerCertPath, peerKeyPath); err != nil { + return fmt.Errorf("failed to write peer certificate: %w", err) + } + + // Generate client certificate + clientUserInfo := &user.DefaultInfo{Name: "etcd", Groups: []string{"etcd"}} + clientTLS, err := caConfig.MakeClientCertificateForDuration( + clientUserInfo, + cryptomaterial.ShortLivedCertificateValidity, + ) + if err != nil { + return fmt.Errorf("failed to generate client certificate: %w", err) + } + + clientCertPath := cryptomaterial.ClientCertPath(clientCertDir) + clientKeyPath := cryptomaterial.ClientKeyPath(clientCertDir) + if err := clientTLS.WriteCertConfigFile(clientCertPath, clientKeyPath); err != nil { + return fmt.Errorf("failed to write client certificate: %w", err) + } + + klog.Info("All etcd certificates generated successfully with proper signatures and SAN entries") + return nil +} + +func getClusterNodes(ctx context.Context, client kubernetes.Interface) ([]string, error) { + nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } + + var members []string + for _, node := range nodes.Items { + if !isNodeReady(&node) { + continue + } + nodeIP := "" + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + nodeIP = addr.Address + break + } + } + if nodeIP != "" { + members = append(members, fmt.Sprintf("%s=https://%s:2380", node.Name, nodeIP)) + } + } + + return members, nil +} + +func isNodeReady(node *corev1.Node) bool { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + return condition.Status == corev1.ConditionTrue + } + } + return false +} + +func configureEtcdForCluster(ctx context.Context, cfg *config.Config, clusterMembers []string, isLearner bool) error { + dataDir := filepath.Dir(cfg.EtcdConfigPath()) + if err := os.MkdirAll(dataDir, 0750); err != nil { + return fmt.Errorf("failed to create etcd data directory: %w", err) + } + + nodeIP := cfg.Node.NodeIP + if nodeIP == "" { + nodeIP = "127.0.0.1" // fallback + } + currentNodeMember := fmt.Sprintf("%s=https://%s:2380", cfg.CanonicalNodeName(), nodeIP) + cfgInitialCluster := append(clusterMembers, currentNodeMember) + + clusterConfig := fmt.Sprintf("ETCD_INITIAL_CLUSTER=%s\nETCD_INITIAL_CLUSTER_STATE=existing", strings.Join(cfgInitialCluster, ",")) + + if err := os.WriteFile(cfg.EtcdConfigPath(), []byte(clusterConfig), 0600); err != nil { + return fmt.Errorf("failed to write etcd cluster configuration: %w", err) + } + + klog.Infof("Etcd configuration written to %s", cfg.EtcdConfigPath()) + + certsDir := cryptomaterial.CertsDirectory(config.DataDir) + etcdPeerClientCertDir := cryptomaterial.EtcdPeerCertDir(certsDir) + + tlsInfo := transport.TLSInfo{ + CertFile: cryptomaterial.PeerCertPath(etcdPeerClientCertDir), + KeyFile: cryptomaterial.PeerKeyPath(etcdPeerClientCertDir), + TrustedCAFile: cryptomaterial.CACertPath(cryptomaterial.EtcdSignerDir(certsDir)), + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return fmt.Errorf("failed to create etcd client TLS config: %v", err) + } + + var endpoints []string + for _, member := range clusterMembers { + parts := strings.SplitN(member, "=", 2) + if len(parts) == 2 { + endpoint := strings.Replace(parts[1], ":2380", ":2379", 1) + endpoints = append(endpoints, endpoint) + } + } + + client, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + TLS: tlsConfig, + Context: ctx, + }) + if err != nil { + return fmt.Errorf("failed to create etcd client: %v", err) + } + + memberResponse, err := client.MemberList(ctx) + if err != nil { + return fmt.Errorf("failed to list etcd members: %v", err) + } + + var filteredEndpoints []string + initialCluster := fmt.Sprintf("%s=https://%s:2380", cfg.Node.HostnameOverride, cfg.Node.NodeIP) + for _, member := range memberResponse.Members { + if member.Name == cfg.Node.HostnameOverride { + klog.Infof("etcd member %s already exists", cfg.Node.HostnameOverride) + continue + } + if !member.IsLearner { + filteredEndpoints = append(filteredEndpoints, member.ClientURLs[0]) + } + initialCluster = fmt.Sprintf("%s,%s=%s", initialCluster, member.Name, member.PeerURLs[0]) + } + + klog.Infof("initial cluster: %v. Member endpoints: %v", initialCluster, filteredEndpoints) + + client, err = clientv3.New(clientv3.Config{ + Endpoints: filteredEndpoints, + DialTimeout: 5 * time.Second, + TLS: tlsConfig, + Context: ctx, + }) + if err != nil { + return fmt.Errorf("failed to create etcd client with filtered endpoints: %v", err) + } + + addFunction := client.MemberAdd + if isLearner { + addFunction = client.MemberAddAsLearner + } + response, err := addFunction(ctx, []string{fmt.Sprintf("https://%s", net.JoinHostPort(cfg.Node.NodeIP, "2380"))}) + if err != nil { + return fmt.Errorf("failed to add etcd node: %v", err) + } + klog.Infof("Successfully added etcd node: %v", response) + return nil +} + +func configureBootstrapKubeconfig(cfg *config.Config, kubeconfigPath string) error { + bootstrapKubeConfigPath := cfg.BootstrapKubeConfigPath() + if err := os.MkdirAll(filepath.Dir(bootstrapKubeConfigPath), 0750); err != nil { + return fmt.Errorf("failed to create kubelet directory: %w", err) + } + + if err := copyFile(kubeconfigPath, bootstrapKubeConfigPath); err != nil { + return fmt.Errorf("failed to copy kubeconfig for kubelet: %w", err) + } + return nil +} + +func copyFile(src, dst string) error { + data, err := os.ReadFile(src) + if err != nil { + return err + } + return os.WriteFile(dst, data, 0600) +} + +func restartMicroShift() error { + cmd := exec.Command("systemctl", "restart", "microshift") + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to restart microshift service: %w", err) + } + return nil +} + +func isNodeAlreadyInCluster(ctx context.Context, client kubernetes.Interface, nodeName string) bool { + _, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return false + } + return true +} + +func waitForNodeReady(ctx context.Context, client kubernetes.Interface, nodeName string) error { + klog.Infof("Waiting for node %s to become ready...", nodeName) + + timeout := time.After(5 * time.Minute) + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeout: + return fmt.Errorf("timeout waiting for node to become ready") + case <-ticker.C: + node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + klog.Warningf("Failed to get node %s: %v", nodeName, err) + continue + } + + if isNodeReady(node) { + klog.Infof("Node %s is ready!", nodeName) + return nil + } + + klog.Infof("Node %s is not ready yet, waiting...", nodeName) + } + } +} + +func cleanupMicroShiftData(cfg *config.Config) error { + directoriesToClean := []string{ + filepath.Dir(cfg.EtcdConfigPath()), + cryptomaterial.CertsDirectory(config.DataDir), + filepath.Join(config.DataDir, "resources"), + filepath.Join(config.DataDir, "bootstrap"), + } + + for _, dir := range directoriesToClean { + if err := os.RemoveAll(dir); err != nil { + return fmt.Errorf("failed to remove directory %s: %w", dir, err) + } + } + return nil +} diff --git a/pkg/cmd/init.go b/pkg/cmd/init.go index 798dfdf2e6..2bb339f19b 100644 --- a/pkg/cmd/init.go +++ b/pkg/cmd/init.go @@ -324,7 +324,7 @@ func certSetup(cfg *config.Config) (*certchains.CertificateChains, error) { Validity: cryptomaterial.LongLivedCertificateValidity, }, UserInfo: &user.DefaultInfo{Name: "system:etcd-peer:etcd-client", Groups: []string{"system:etcd-peers"}}, - Hostnames: []string{"localhost", cfg.Node.HostnameOverride}, + Hostnames: []string{"localhost", cfg.Node.HostnameOverride, cfg.Node.NodeIP}, }, &certchains.PeerCertificateSigningRequestInfo{ CSRMeta: certchains.CSRMeta{ @@ -332,7 +332,7 @@ func certSetup(cfg *config.Config) (*certchains.CertificateChains, error) { Validity: cryptomaterial.LongLivedCertificateValidity, }, UserInfo: &user.DefaultInfo{Name: "system:etcd-server:etcd-client", Groups: []string{"system:etcd-servers"}}, - Hostnames: []string{"localhost", cfg.Node.HostnameOverride}, + Hostnames: []string{"localhost", cfg.Node.HostnameOverride, cfg.Node.NodeIP}, }, ), ).WithCABundle( diff --git a/pkg/components/certificateauthority.go b/pkg/components/certificateauthority.go new file mode 100644 index 0000000000..b9a87e6383 --- /dev/null +++ b/pkg/components/certificateauthority.go @@ -0,0 +1,208 @@ +package components + +import ( + "context" + "fmt" + "os" + "path/filepath" + + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "github.com/openshift/microshift/pkg/config" + "github.com/openshift/microshift/pkg/util/cryptomaterial" +) + +const ( + // Resource namespace + caResourceNamespace = "kube-system" +) + +var ( + CertificateAuthorityResources = []struct { + Name string + Dir string + }{ + {Name: "kube-control-plane-signer", Dir: cryptomaterial.KubeControlPlaneSignerCertDir(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "kube-apiserver-to-kubelet-signer", Dir: cryptomaterial.KubeAPIServerToKubeletSignerCertDir(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "admin-kubeconfig-signer", Dir: cryptomaterial.AdminKubeconfigSignerDir(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "kubelet-signer", Dir: cryptomaterial.KubeletCSRSignerSignerCertDir(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "kube-csr-signer", Dir: cryptomaterial.CSRSignerCertDir(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "aggregator-signer", Dir: cryptomaterial.AggregatorSignerDir(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "service-ca", Dir: cryptomaterial.ServiceCADir(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "ingress-ca", Dir: cryptomaterial.IngressCADir(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "kube-apiserver-external-signer", Dir: cryptomaterial.KubeAPIServerExternalSigner(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "kube-apiserver-localhost-signer", Dir: cryptomaterial.KubeAPIServerLocalhostSigner(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "kube-apiserver-service-network-signer", Dir: cryptomaterial.KubeAPIServerServiceNetworkSigner(cryptomaterial.CertsDirectory(config.DataDir))}, + {Name: "etcd-signer", Dir: cryptomaterial.EtcdSignerDir(cryptomaterial.CertsDirectory(config.DataDir))}, + } + ServiceAccountKeyResources = []struct { + Name string + Dir string + }{ + {Name: "service-account-key", Dir: filepath.Join(config.DataDir, "/resources/kube-apiserver/secrets/service-account-key")}, + } +) + +func startCertificateAuthorityController(ctx context.Context, kubeconfigPath string) error { + client, err := getKubernetesClient(kubeconfigPath) + if err != nil { + return fmt.Errorf("failed to get Kubernetes client: %w", err) + } + + resourceNames := make([]string, len(CertificateAuthorityResources)+len(ServiceAccountKeyResources)) + for i, resource := range CertificateAuthorityResources { + resourceNames[i] = resource.Name + if err := exposeCertificateAuthority(ctx, client, resource.Dir, resource.Name); err != nil { + return fmt.Errorf("failed to expose certificate authority %s: %w", resource.Name, err) + } + } + + for i, resource := range ServiceAccountKeyResources { + resourceNames[i] = resource.Name + if err := exposeServiceAccountKey(ctx, client, resource.Dir, resource.Name); err != nil { + return fmt.Errorf("failed to expose service account key %s: %w", resource.Name, err) + } + } + + err = createClusterRole(ctx, client, resourceNames) + if err != nil { + return fmt.Errorf("failed to create etcd CA admin Role: %w", err) + } + return nil +} + +func getKubernetesClient(kubeconfigPath string) (*kubernetes.Clientset, error) { + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + return nil, err + } + return kubernetes.NewForConfig(restConfig) +} + +func createClusterRole(ctx context.Context, client kubernetes.Interface, resourceNames []string) error { + role := &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: "microshift-ca-admin", + Namespace: caResourceNamespace, + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"secrets"}, + ResourceNames: resourceNames, + Verbs: []string{"*"}, + }, + }, + } + + _, err := client.RbacV1().Roles(caResourceNamespace).Create(ctx, role, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create etcd CA admin Role: %w", err) + } + + roleBinding := &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "microshift-ca-admin-binding", + Namespace: caResourceNamespace, + }, + Subjects: []rbacv1.Subject{ + { + Kind: "Group", + Name: "system:masters", + APIGroup: rbacv1.GroupName, + }, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.GroupName, + Kind: "Role", + Name: "microshift-ca-admin", + }, + } + + _, err = client.RbacV1().RoleBindings(caResourceNamespace).Create(ctx, roleBinding, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create etcd CA admin RoleBinding: %w", err) + } + + return nil +} + +func exposeCertificateAuthority(ctx context.Context, client kubernetes.Interface, dir, name string) error { + caCertPath := cryptomaterial.CACertPath(dir) + caKeyPath := cryptomaterial.CAKeyPath(dir) + serialPath := filepath.Join(dir, "serial.txt") + + caCert, err := os.ReadFile(caCertPath) + if err != nil { + return fmt.Errorf("failed to read CA certificate from %s: %w", caCertPath, err) + } + caKey, err := os.ReadFile(caKeyPath) + if err != nil { + return fmt.Errorf("failed to read CA key from %s: %w", caKeyPath, err) + } + serial, err := os.ReadFile(serialPath) + if err != nil { + return fmt.Errorf("failed to read CA serial: %w", err) + } + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: caResourceNamespace, + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "ca.crt": caCert, + "ca.key": caKey, + "serial.txt": serial, + }, + } + + caBundlePath := cryptomaterial.CABundlePath(dir) + if _, err := os.Stat(caBundlePath); err == nil { + caBundle, err := os.ReadFile(caBundlePath) + if err != nil { + return fmt.Errorf("failed to read CA bundle from %s: %w", caBundlePath, err) + } + secret.Data["ca-bundle.crt"] = caBundle + } + + _, err = client.CoreV1().Secrets(caResourceNamespace).Create(ctx, secret, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CA secret %s: %w", name, err) + } + return nil +} + +func exposeServiceAccountKey(ctx context.Context, client kubernetes.Interface, dir, name string) error { + serviceAccountKeyPath := filepath.Join(dir, "service-account.key") + serviceAccountPubKeyPath := filepath.Join(dir, "service-account.pub") + serviceAccountKey, err := os.ReadFile(serviceAccountKeyPath) + if err != nil { + return fmt.Errorf("failed to read service account key from %s: %w", serviceAccountKeyPath, err) + } + serviceAccountPubKey, err := os.ReadFile(serviceAccountPubKeyPath) + if err != nil { + return fmt.Errorf("failed to read service account public key from %s: %w", serviceAccountPubKeyPath, err) + } + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: caResourceNamespace, + }, + } + secret.Data = map[string][]byte{ + "service-account.key": serviceAccountKey, + "service-account.pub": serviceAccountPubKey, + } + _, err = client.CoreV1().Secrets(caResourceNamespace).Create(ctx, secret, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create service account key secret %s: %w", name, err) + } + return nil +} diff --git a/pkg/components/components.go b/pkg/components/components.go index ed1ca8cb09..b94da5d034 100755 --- a/pkg/components/components.go +++ b/pkg/components/components.go @@ -44,5 +44,12 @@ func StartComponents(cfg *config.Config, ctx context.Context) error { return err } + if cfg.MultiNode.Enabled { + if err := startCertificateAuthorityController(ctx, kubeAdminConfig); err != nil { + klog.Warningf("Failed to start certificate authority controller: %v", err) + return err + } + } + return nil } diff --git a/pkg/config/etcd.go b/pkg/config/etcd.go index 52dbb49cf4..427ed9b5ff 100644 --- a/pkg/config/etcd.go +++ b/pkg/config/etcd.go @@ -1,6 +1,9 @@ package config -import "time" +import ( + "path/filepath" + "time" +) const ( // Etcd performance degrades significantly if the memory available @@ -27,3 +30,7 @@ type EtcdConfig struct { // defrags, except for a single on startup). DefragCheckFreq time.Duration `json:"-"` } + +func (cfg *Config) EtcdConfigPath() string { + return filepath.Join(DataDir, "etcd", "config") +} diff --git a/pkg/config/kubeconfig.go b/pkg/config/kubeconfig.go index bd0ec842b2..22504c47ab 100644 --- a/pkg/config/kubeconfig.go +++ b/pkg/config/kubeconfig.go @@ -1,6 +1,10 @@ package config -import "path/filepath" +import ( + "path/filepath" + + "github.com/openshift/microshift/pkg/util" +) // KubeConfigID identifies the different kubeconfigs managed in the DataDir type KubeConfigID string @@ -27,3 +31,15 @@ func (cfg *Config) KubeConfigAdminPath(id string) string { func (cfg *Config) KubeConfigRootAdminPath() string { return filepath.Join(DataDir, "resources", string(KubeAdmin)) } + +func (cfg *Config) BootstrapKubeConfigPath() string { + return filepath.Join(DataDir, "bootstrap", "kubeconfig") +} + +func (cfg *Config) BootstrapKubeConfigExists() bool { + exists, err := util.PathExists(cfg.BootstrapKubeConfigPath()) + if err != nil { + return false + } + return exists +} diff --git a/pkg/config/multinode.go b/pkg/config/multinode.go index ea5d8255c3..6db5e73322 100644 --- a/pkg/config/multinode.go +++ b/pkg/config/multinode.go @@ -14,12 +14,5 @@ func ConfigMultiNode(c *Config, enabled bool) *Config { } c.MultiNode.Enabled = enabled c.MultiNode.Controlplane = c.Node.NodeIP - - // Use controlplane node IP as APIServer backend (instead of next available - // IP from service network) - c.ApiServer.AdvertiseAddress = c.Node.NodeIP - // Don't configure the advertise address on the device - c.ApiServer.SkipInterface = true - return c } diff --git a/pkg/controllers/etcd.go b/pkg/controllers/etcd.go index 06cd7f6be8..e5631c6173 100644 --- a/pkg/controllers/etcd.go +++ b/pkg/controllers/etcd.go @@ -169,22 +169,59 @@ func stopMicroshiftEtcdScopeIfExists() error { } func checkIfEtcdIsReady(ctx context.Context) error { - client, err := getEtcdClient(ctx) - if err != nil { - return fmt.Errorf("failed to obtain etcd client: %v", err) - } - defer func() { _ = client.Close() }() + var client *clientv3.Client + defer func() { + if client != nil { + _ = client.Close() + } + }() + + for attempt := 0; attempt < HealthCheckRetries; attempt++ { + if client == nil { + var err error + client, err = getEtcdClient(ctx) + if err != nil { + klog.Infof("failed to obtain etcd client: %v", err) + if attempt < HealthCheckRetries-1 { + time.Sleep(HealthCheckWait) + continue + } + return fmt.Errorf("failed to obtain etcd client after %d attempts: %v", HealthCheckRetries, err) + } + } + + s, err := client.Status(ctx, "localhost:2379") + if err != nil { + _ = client.Close() + client = nil + klog.Infof("failed to get etcd status: %v", err) + if attempt < HealthCheckRetries-1 { + time.Sleep(HealthCheckWait) + continue + } + return fmt.Errorf("failed to get etcd status after %d attempts: %v", HealthCheckRetries, err) + } + + // If my own instance is a learner I dont need to check readiness because apiserver is going + // to connect to other voting members in the cluster. + if s.IsLearner { + return nil + } - for i := 0; i < HealthCheckRetries; i++ { - time.Sleep(HealthCheckWait) if _, err = client.Get(ctx, "health"); err == nil { return nil } else { + _ = client.Close() + client = nil klog.Infof("etcd not ready yet: %v", err) if err == context.Canceled { return err } } + + if attempt < HealthCheckRetries-1 { + time.Sleep(HealthCheckWait) + } } return fmt.Errorf("etcd still not healthy after checking %d times", HealthCheckRetries) } diff --git a/pkg/controllers/kube-apiserver.go b/pkg/controllers/kube-apiserver.go index 8b58193b8d..41b4ac3976 100644 --- a/pkg/controllers/kube-apiserver.go +++ b/pkg/controllers/kube-apiserver.go @@ -28,6 +28,8 @@ import ( "strings" "time" + "go.etcd.io/etcd/client/pkg/v3/transport" + clientv3 "go.etcd.io/etcd/client/v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -75,7 +77,7 @@ func init() { type KubeAPIServer struct { kasConfigBytes []byte verbosity int - configureErr error // todo: report configuration errors immediately + configuration *config.Config masterURL string servingCAPath string @@ -83,9 +85,8 @@ type KubeAPIServer struct { } func NewKubeAPIServer(cfg *config.Config) *KubeAPIServer { - s := &KubeAPIServer{} - if err := s.configure(cfg); err != nil { - s.configureErr = err + s := &KubeAPIServer{ + configuration: cfg, } return s } @@ -93,7 +94,7 @@ func NewKubeAPIServer(cfg *config.Config) *KubeAPIServer { func (s *KubeAPIServer) Name() string { return "kube-apiserver" } func (s *KubeAPIServer) Dependencies() []string { return []string{"etcd", "network-configuration"} } -func (s *KubeAPIServer) configure(cfg *config.Config) error { +func (s *KubeAPIServer) configure(ctx context.Context, cfg *config.Config) error { s.verbosity = cfg.GetVerbosity() certsDir := cryptomaterial.CertsDirectory(config.DataDir) @@ -163,20 +164,23 @@ func (s *KubeAPIServer) configure(cfg *config.Config) error { } } + etcdServers, err := discoverEtcdServers(ctx, s.configuration.BootstrapKubeConfigPath()) + if err != nil { + return fmt.Errorf("failed to discover etcd servers: %w", err) + } + overrides := &kubecontrolplanev1.KubeAPIServerConfig{ APIServerArguments: map[string]kubecontrolplanev1.Arguments{ - "advertise-address": {s.advertiseAddress}, - "audit-policy-file": {filepath.Join(config.DataDir, "/resources/kube-apiserver-audit-policies/default.yaml")}, - "audit-log-maxage": {strconv.Itoa(cfg.ApiServer.AuditLog.MaxFileAge)}, - "audit-log-maxbackup": {strconv.Itoa(cfg.ApiServer.AuditLog.MaxFiles)}, - "audit-log-maxsize": {strconv.Itoa(cfg.ApiServer.AuditLog.MaxFileSize)}, - "client-ca-file": {clientCABundlePath}, - "etcd-cafile": {cryptomaterial.CACertPath(cryptomaterial.EtcdSignerDir(certsDir))}, - "etcd-certfile": {cryptomaterial.ClientCertPath(etcdClientCertDir)}, - "etcd-keyfile": {cryptomaterial.ClientKeyPath(etcdClientCertDir)}, - "etcd-servers": { - "https://localhost:2379", - }, + "advertise-address": {s.advertiseAddress}, + "audit-policy-file": {filepath.Join(config.DataDir, "/resources/kube-apiserver-audit-policies/default.yaml")}, + "audit-log-maxage": {strconv.Itoa(cfg.ApiServer.AuditLog.MaxFileAge)}, + "audit-log-maxbackup": {strconv.Itoa(cfg.ApiServer.AuditLog.MaxFiles)}, + "audit-log-maxsize": {strconv.Itoa(cfg.ApiServer.AuditLog.MaxFileSize)}, + "client-ca-file": {clientCABundlePath}, + "etcd-cafile": {cryptomaterial.CACertPath(cryptomaterial.EtcdSignerDir(certsDir))}, + "etcd-certfile": {cryptomaterial.ClientCertPath(etcdClientCertDir)}, + "etcd-keyfile": {cryptomaterial.ClientKeyPath(etcdClientCertDir)}, + "etcd-servers": etcdServers, "kubelet-certificate-authority": {cryptomaterial.CABundlePath(kubeCSRSignerDir)}, "kubelet-client-certificate": {cryptomaterial.ClientCertPath(kubeletClientDir)}, "kubelet-client-key": {cryptomaterial.ClientKeyPath(kubeletClientDir)}, @@ -309,8 +313,8 @@ func (s *KubeAPIServer) configureAuditPolicy(cfg *config.Config) error { } func (s *KubeAPIServer) Run(ctx context.Context, ready chan<- struct{}, stopped chan<- struct{}) error { - if s.configureErr != nil { - return fmt.Errorf("configuration failed: %w", s.configureErr) + if err := s.configure(ctx, s.configuration); err != nil { + return fmt.Errorf("configuration failed: %w", err) } defer close(stopped) @@ -407,3 +411,91 @@ func (s *KubeAPIServer) Run(ctx context.Context, ready chan<- struct{}, stopped panic(perr) } } + +func discoverEtcdServers(ctx context.Context, kubeconfigPath string) ([]string, error) { + certsDir := cryptomaterial.CertsDirectory(config.DataDir) + etcdPeerCertDir := cryptomaterial.EtcdPeerCertDir(certsDir) + + tlsInfo := transport.TLSInfo{ + CertFile: cryptomaterial.PeerCertPath(etcdPeerCertDir), + KeyFile: cryptomaterial.PeerKeyPath(etcdPeerCertDir), + TrustedCAFile: cryptomaterial.CACertPath(cryptomaterial.EtcdSignerDir(certsDir)), + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, fmt.Errorf("failed to create etcd client TLS config: %v", err) + } + + client, err := clientv3.New(clientv3.Config{ + DialTimeout: 5 * time.Second, + Endpoints: []string{"https://localhost:2379"}, + TLS: tlsConfig, + Context: ctx, + }) + if err != nil { + return nil, fmt.Errorf("failed to create etcd client: %w", err) + } + defer func() { _ = client.Close() }() + + st, err := client.Status(ctx, "localhost:2379") + if err != nil { + return nil, fmt.Errorf("failed to get etcd status: %w", err) + } + + // If I am not a learner it means I am a voting member, so connecting to my own etcd instance + // is fine because everything is synced. + if !st.IsLearner { + return []string{"https://localhost:2379"}, nil + } + + // If I am a learner I need to connect to a member, retrieve the list of voting + // members and connect to all of them. + kubeconfig, err := clientcmd.LoadFromFile(kubeconfigPath) + if err != nil { + return nil, fmt.Errorf("failed to load bootstrap kubeconfig: %w", err) + } + + if kubeconfig == nil || kubeconfig.Clusters == nil || len(kubeconfig.Clusters) == 0 { + return nil, fmt.Errorf("invalid bootstrap kubeconfig: no clusters found") + } + + if len(kubeconfig.Clusters) > 1 { + return nil, fmt.Errorf("invalid bootstrap kubeconfig: multiple clusters found") + } + + var etcdHost string + for _, cluster := range kubeconfig.Clusters { + etcdHost = cluster.Server + break + } + + if etcdHost == "" { + return nil, fmt.Errorf("failed to extract etcd hostname from bootstrap kubeconfig") + } + + etcdHost = strings.TrimPrefix(etcdHost, "https://") + etcdHost, _, _ = net.SplitHostPort(etcdHost) + etcdHost = fmt.Sprintf("https://%s", net.JoinHostPort(etcdHost, "2379")) + client, err = clientv3.New(clientv3.Config{ + DialTimeout: 5 * time.Second, + Endpoints: []string{etcdHost}, + TLS: tlsConfig, + Context: ctx, + }) + if err != nil { + return nil, fmt.Errorf("failed to create etcd client: %w", err) + } + + resp, err := client.MemberList(ctx) + if err != nil { + return nil, fmt.Errorf("failed to retrieve etcd member list: %w", err) + } + + var members []string + for _, member := range resp.Members { + if !member.IsLearner { + members = append(members, member.ClientURLs...) + } + } + return members, nil +} diff --git a/pkg/controllers/kube-controller-manager.go b/pkg/controllers/kube-controller-manager.go index 6fb7c5b6e5..409f87e694 100644 --- a/pkg/controllers/kube-controller-manager.go +++ b/pkg/controllers/kube-controller-manager.go @@ -105,6 +105,10 @@ func configure(ctx context.Context, cfg *config.Config) (args []string, applyFn }, } + if cfg.MultiNode.Enabled { + overrides.ExtendedArguments["leader-elect"] = []string{"true"} + } + args, err = mergeAndConvertToArgs(overrides) applyFn = func() error { return assets.ApplyNamespaces(ctx, []string{ diff --git a/pkg/controllers/kube-scheduler.go b/pkg/controllers/kube-scheduler.go index 9d62171f4a..ef0de89c3d 100644 --- a/pkg/controllers/kube-scheduler.go +++ b/pkg/controllers/kube-scheduler.go @@ -64,12 +64,16 @@ func (s *KubeScheduler) configure(cfg *config.Config) { } func (s *KubeScheduler) writeConfig(cfg *config.Config) error { + leaderElect := "false" + if cfg.MultiNode.Enabled { + leaderElect = "true" + } data := []byte(`apiVersion: kubescheduler.config.k8s.io/v1 kind: KubeSchedulerConfiguration clientConnection: kubeconfig: ` + cfg.KubeConfigPath(config.KubeScheduler) + ` leaderElection: - leaderElect: false`) + leaderElect: ` + leaderElect) path := filepath.Join(config.DataDir, "resources", "kube-scheduler", "config", "config.yaml") if err := os.MkdirAll(filepath.Dir(path), os.FileMode(0700)); err != nil { diff --git a/pkg/node/kubelet.go b/pkg/node/kubelet.go index d5d67b2d38..de6f19d862 100644 --- a/pkg/node/kubelet.go +++ b/pkg/node/kubelet.go @@ -77,6 +77,9 @@ func (s *KubeletServer) configure(cfg *config.Config) { } kubeletFlags := kubeletoptions.NewKubeletFlags() kubeletFlags.BootstrapKubeconfig = cfg.KubeConfigPath(config.Kubelet) + if cfg.BootstrapKubeConfigExists() { + kubeletFlags.BootstrapKubeconfig = cfg.BootstrapKubeConfigPath() + } kubeletFlags.KubeConfig = cfg.KubeConfigPath(config.Kubelet) kubeletFlags.RuntimeCgroups = "/system.slice/crio.service" kubeletFlags.HostnameOverride = cfg.Node.HostnameOverride