diff --git a/cmd/check.go b/cmd/check.go index 1d985f82c..9699be941 100644 --- a/cmd/check.go +++ b/cmd/check.go @@ -16,6 +16,7 @@ var ( nodeHealthNodes []string nodeHealthVersion string k8sEndpoint string + checkNodeReady bool ) var checkCmd = &cobra.Command{ @@ -81,7 +82,8 @@ var checkNodeHealthCmd = &cobra.Command{ ctx = context.WithValue(ctx, "timeout", nodeHealthTimeout) ctx = context.WithValue(ctx, "version", nodeHealthVersion) ctx = context.WithValue(ctx, "k8s-endpoint", k8sEndpoint) - ctx = context.WithValue(ctx, "k8s-endpoint-provided", k8sEndpoint != "") + ctx = context.WithValue(ctx, "k8s-endpoint-provided", k8sEndpoint != "" || checkNodeReady) + ctx = context.WithValue(ctx, "check-node-ready", checkNodeReady) ctx = context.WithValue(ctx, "output", outputFunc) // Set up the check pipeline @@ -109,4 +111,5 @@ func init() { checkNodeHealthCmd.Flags().StringVar(&nodeHealthVersion, "version", "", "Expected version to check against (optional)") checkNodeHealthCmd.Flags().StringVar(&k8sEndpoint, "k8s-endpoint", "", "Perform Kubernetes API health check (use --k8s-endpoint or --k8s-endpoint=https://endpoint:6443)") checkNodeHealthCmd.Flags().Lookup("k8s-endpoint").NoOptDefVal = "true" + checkNodeHealthCmd.Flags().BoolVar(&checkNodeReady, "ready", false, "Check Kubernetes node readiness status") } diff --git a/pkg/kubernetes/kubernetes_client.go b/pkg/kubernetes/kubernetes_client.go index cb243504e..135869567 100644 --- a/pkg/kubernetes/kubernetes_client.go +++ b/pkg/kubernetes/kubernetes_client.go @@ -22,7 +22,7 @@ import ( // Interfaces // ============================================================================= -// KubernetesClient defines methods for low-level Kubernetes operations +// KubernetesClient defines methods for Kubernetes resource operations type KubernetesClient interface { // Resource operations GetResource(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error) @@ -31,10 +31,12 @@ type KubernetesClient interface { DeleteResource(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error PatchResource(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error) CheckHealth(ctx context.Context, endpoint string) error + // Node health operations + GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) } // ============================================================================= -// Constructor +// Types // ============================================================================= // DynamicKubernetesClient implements KubernetesClient using dynamic client @@ -43,6 +45,10 @@ type DynamicKubernetesClient struct { endpoint string } +// ============================================================================= +// Constructor +// ============================================================================= + // NewDynamicKubernetesClient creates a new DynamicKubernetesClient func NewDynamicKubernetesClient() *DynamicKubernetesClient { return &DynamicKubernetesClient{} @@ -116,6 +122,62 @@ func (c *DynamicKubernetesClient) CheckHealth(ctx context.Context, endpoint stri return nil } +// GetNodeReadyStatus returns a map of node names to their Ready condition status. +// It checks the Ready condition for each specified node using the dynamic client. +// If nodeNames is empty, all nodes are checked. Nodes not found are omitted from the result. +// Returns a map of node names to Ready status (true if Ready, false if NotReady), or an error if listing fails. +func (c *DynamicKubernetesClient) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) { + if err := c.ensureClient(); err != nil { + return nil, fmt.Errorf("failed to initialize Kubernetes client: %w", err) + } + + nodeGVR := schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "nodes", + } + + var nodes *unstructured.UnstructuredList + var err error + + // Get all nodes and filter by name if specific nodes are requested + nodes, err = c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } + + // Filter nodes if specific node names are requested + if len(nodeNames) > 0 { + var filteredNodes []unstructured.Unstructured + nodeNameSet := make(map[string]bool) + for _, name := range nodeNames { + nodeNameSet[name] = true + } + + for _, node := range nodes.Items { + if nodeNameSet[node.GetName()] { + filteredNodes = append(filteredNodes, node) + } + } + + // Replace the items with filtered ones + nodes.Items = filteredNodes + } + + if err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } + + readyStatus := make(map[string]bool) + for _, node := range nodes.Items { + nodeName := node.GetName() + ready := c.isNodeReady(&node) + readyStatus[nodeName] = ready + } + + return readyStatus, nil +} + // ============================================================================= // Private Methods // ============================================================================= @@ -159,3 +221,33 @@ func (c *DynamicKubernetesClient) ensureClient() error { c.client = cli return nil } + +// isNodeReady checks if a node is in Ready state by examining its conditions. +// Returns true if the node has a Ready condition with status "True". +func (c *DynamicKubernetesClient) isNodeReady(node *unstructured.Unstructured) bool { + conditions, found, err := unstructured.NestedSlice(node.Object, "status", "conditions") + if err != nil || !found { + return false + } + + for _, condition := range conditions { + conditionMap, ok := condition.(map[string]interface{}) + if !ok { + continue + } + + conditionType, found, err := unstructured.NestedString(conditionMap, "type") + if err != nil || !found || conditionType != "Ready" { + continue + } + + conditionStatus, found, err := unstructured.NestedString(conditionMap, "status") + if err != nil || !found { + continue + } + + return conditionStatus == "True" + } + + return false +} diff --git a/pkg/kubernetes/kubernetes_manager.go b/pkg/kubernetes/kubernetes_manager.go index ea9c3ccc7..f960c9997 100644 --- a/pkg/kubernetes/kubernetes_manager.go +++ b/pkg/kubernetes/kubernetes_manager.go @@ -46,7 +46,8 @@ type KubernetesManager interface { WaitForKustomizationsDeleted(message string, names ...string) error CheckGitRepositoryStatus() error GetKustomizationStatus(names []string) (map[string]bool, error) - WaitForKubernetesHealthy(ctx context.Context, endpoint string) error + WaitForKubernetesHealthy(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error + GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) } // ============================================================================= @@ -548,10 +549,10 @@ func (k *BaseKubernetesManager) GetKustomizationStatus(names []string) (map[stri return status, nil } -// WaitForKubernetesHealthy polls the Kubernetes API endpoint for health until it is reachable or the context deadline is exceeded. -// If the client is not initialized, returns an error. Uses a default timeout of 5 minutes if the context has no deadline. -// Polls every 10 seconds. Returns nil if the API becomes healthy, or an error if the timeout is reached or the context is canceled. -func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string) error { +// WaitForKubernetesHealthy waits for the Kubernetes API to be healthy and optionally checks node Ready state. +// If nodeNames are provided, it will also verify that all specified nodes are in Ready state. +// Returns an error if the API is unreachable or if any specified nodes are not Ready. +func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { if k.client == nil { return fmt.Errorf("kubernetes client not initialized") } @@ -568,14 +569,125 @@ func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, en case <-ctx.Done(): return fmt.Errorf("timeout waiting for Kubernetes API to be healthy") default: - if err := k.client.CheckHealth(ctx, endpoint); err == nil { + // Check API connectivity + if err := k.client.CheckHealth(ctx, endpoint); err != nil { + time.Sleep(pollInterval) + continue + } + + // If node names are specified, check their Ready state + if len(nodeNames) > 0 { + if err := k.waitForNodesReady(ctx, nodeNames, outputFunc); err != nil { + time.Sleep(pollInterval) + continue + } + } + + return nil + } + } + + return fmt.Errorf("timeout waiting for Kubernetes API to be healthy") +} + +// waitForNodesReady waits until all specified nodes exist and are in Ready state. +// Returns an error if any nodes are missing or not Ready within the context deadline. +func (k *BaseKubernetesManager) waitForNodesReady(ctx context.Context, nodeNames []string, outputFunc func(string)) error { + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(5 * time.Minute) + } + + pollInterval := 5 * time.Second + lastStatus := make(map[string]string) + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for nodes to be ready") + default: + readyStatus, err := k.client.GetNodeReadyStatus(ctx, nodeNames) + if err != nil { + time.Sleep(pollInterval) + continue + } + + var missingNodes []string + var notReadyNodes []string + var readyNodes []string + + for _, nodeName := range nodeNames { + if ready, exists := readyStatus[nodeName]; !exists { + missingNodes = append(missingNodes, nodeName) + } else if !ready { + notReadyNodes = append(notReadyNodes, nodeName) + } else { + readyNodes = append(readyNodes, nodeName) + } + } + + // Report status changes + if outputFunc != nil { + for _, nodeName := range nodeNames { + var currentStatus string + if ready, exists := readyStatus[nodeName]; !exists { + currentStatus = "NOT FOUND" + } else if ready { + currentStatus = "READY" + } else { + currentStatus = "NOT READY" + } + + if lastStatus[nodeName] != currentStatus { + outputFunc(fmt.Sprintf("Node %s: %s", nodeName, currentStatus)) + lastStatus[nodeName] = currentStatus + } + } + } + + if len(missingNodes) == 0 && len(notReadyNodes) == 0 { return nil } + time.Sleep(pollInterval) } } - return fmt.Errorf("timeout waiting for Kubernetes API to be healthy") + // Final check to get the current status for error reporting + readyStatus, err := k.client.GetNodeReadyStatus(ctx, nodeNames) + if err != nil { + return fmt.Errorf("timeout waiting for nodes to be ready: failed to get final status: %w", err) + } + + var missingNodes []string + var notReadyNodes []string + + for _, nodeName := range nodeNames { + if ready, exists := readyStatus[nodeName]; !exists { + missingNodes = append(missingNodes, nodeName) + } else if !ready { + notReadyNodes = append(notReadyNodes, nodeName) + } + } + + if len(missingNodes) > 0 { + return fmt.Errorf("timeout waiting for nodes to appear: %s", strings.Join(missingNodes, ", ")) + } + + if len(notReadyNodes) > 0 { + return fmt.Errorf("timeout waiting for nodes to be ready: %s", strings.Join(notReadyNodes, ", ")) + } + + return fmt.Errorf("timeout waiting for nodes to be ready") +} + +// GetNodeReadyStatus returns a map of node names to their Ready condition status. +// Returns a map of node names to Ready status (true if Ready, false if NotReady), or an error if listing fails. +func (k *BaseKubernetesManager) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) { + if k.client == nil { + return nil, fmt.Errorf("kubernetes client not initialized") + } + return k.client.GetNodeReadyStatus(ctx, nodeNames) } // applyWithRetry applies a resource using SSA with minimal logic diff --git a/pkg/kubernetes/kubernetes_manager_test.go b/pkg/kubernetes/kubernetes_manager_test.go index de6050bb5..af85198d1 100644 --- a/pkg/kubernetes/kubernetes_manager_test.go +++ b/pkg/kubernetes/kubernetes_manager_test.go @@ -1976,7 +1976,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443") + err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -1989,7 +1989,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443") + err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil) if err == nil { t.Error("Expected error, got nil") } @@ -2009,7 +2009,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443") + err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil) if err == nil { t.Error("Expected error, got nil") } @@ -2029,7 +2029,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // Cancel immediately - err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443") + err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil) if err == nil { t.Error("Expected error, got nil") } diff --git a/pkg/kubernetes/mock_kubernetes_client.go b/pkg/kubernetes/mock_kubernetes_client.go index ccf80fe3b..db749b8aa 100644 --- a/pkg/kubernetes/mock_kubernetes_client.go +++ b/pkg/kubernetes/mock_kubernetes_client.go @@ -19,12 +19,13 @@ import ( // MockKubernetesClient is a mock implementation of KubernetesClient interface for testing type MockKubernetesClient struct { - GetResourceFunc func(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error) - ListResourcesFunc func(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error) - ApplyResourceFunc func(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) (*unstructured.Unstructured, error) - DeleteResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error - PatchResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error) - CheckHealthFunc func(ctx context.Context, endpoint string) error + GetResourceFunc func(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error) + ListResourcesFunc func(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error) + ApplyResourceFunc func(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) (*unstructured.Unstructured, error) + DeleteResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error + PatchResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error) + CheckHealthFunc func(ctx context.Context, endpoint string) error + GetNodeReadyStatusFunc func(ctx context.Context, nodeNames []string) (map[string]bool, error) } // ============================================================================= @@ -87,3 +88,11 @@ func (m *MockKubernetesClient) CheckHealth(ctx context.Context, endpoint string) } return nil } + +// GetNodeReadyStatus implements KubernetesClient interface +func (m *MockKubernetesClient) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) { + if m.GetNodeReadyStatusFunc != nil { + return m.GetNodeReadyStatusFunc(ctx, nodeNames) + } + return make(map[string]bool), nil +} diff --git a/pkg/kubernetes/mock_kubernetes_manager.go b/pkg/kubernetes/mock_kubernetes_manager.go index d4497fbd4..c4c3ab870 100644 --- a/pkg/kubernetes/mock_kubernetes_manager.go +++ b/pkg/kubernetes/mock_kubernetes_manager.go @@ -34,7 +34,8 @@ type MockKubernetesManager struct { ApplyOCIRepositoryFunc func(repo *sourcev1.OCIRepository) error WaitForKustomizationsDeletedFunc func(message string, names ...string) error CheckGitRepositoryStatusFunc func() error - WaitForKubernetesHealthyFunc func(ctx context.Context, endpoint string) error + WaitForKubernetesHealthyFunc func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error + GetNodeReadyStatusFunc func(ctx context.Context, nodeNames []string) (map[string]bool, error) } // ============================================================================= @@ -171,9 +172,17 @@ func (m *MockKubernetesManager) CheckGitRepositoryStatus() error { } // WaitForKubernetesHealthy waits for the Kubernetes API endpoint to be healthy with polling and timeout -func (m *MockKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string) error { +func (m *MockKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { if m.WaitForKubernetesHealthyFunc != nil { - return m.WaitForKubernetesHealthyFunc(ctx, endpoint) + return m.WaitForKubernetesHealthyFunc(ctx, endpoint, outputFunc, nodeNames...) } return nil } + +// GetNodeReadyStatus returns a map of node names to their Ready condition status. +func (m *MockKubernetesManager) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) { + if m.GetNodeReadyStatusFunc != nil { + return m.GetNodeReadyStatusFunc(ctx, nodeNames) + } + return make(map[string]bool), nil +} diff --git a/pkg/kubernetes/mock_kubernetes_manager_test.go b/pkg/kubernetes/mock_kubernetes_manager_test.go index 8c5e92139..967664b46 100644 --- a/pkg/kubernetes/mock_kubernetes_manager_test.go +++ b/pkg/kubernetes/mock_kubernetes_manager_test.go @@ -420,7 +420,7 @@ func TestMockKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { t.Run("FuncSet", func(t *testing.T) { manager := setup(t) errVal := fmt.Errorf("kubernetes health check failed") - manager.WaitForKubernetesHealthyFunc = func(c context.Context, e string) error { + manager.WaitForKubernetesHealthyFunc = func(c context.Context, e string, outputFunc func(string), nodeNames ...string) error { if c != ctx { t.Errorf("Expected context %v, got %v", ctx, c) } @@ -429,7 +429,7 @@ func TestMockKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { } return errVal } - err := manager.WaitForKubernetesHealthy(ctx, endpoint) + err := manager.WaitForKubernetesHealthy(ctx, endpoint, nil) if err != errVal { t.Errorf("Expected err, got %v", err) } @@ -437,7 +437,7 @@ func TestMockKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { t.Run("FuncNotSet", func(t *testing.T) { manager := setup(t) - err := manager.WaitForKubernetesHealthy(ctx, endpoint) + err := manager.WaitForKubernetesHealthy(ctx, endpoint, nil) if err != nil { t.Errorf("Expected nil, got %v", err) } diff --git a/pkg/pipelines/check.go b/pkg/pipelines/check.go index d5bc204f7..c7bb0c574 100644 --- a/pkg/pipelines/check.go +++ b/pkg/pipelines/check.go @@ -53,7 +53,6 @@ func (p *CheckPipeline) Initialize(injector di.Injector, ctx context.Context) er p.toolsManager = p.withToolsManager() p.clusterClient = p.withClusterClient() p.withKubernetesClient() - p.kubernetesManager = p.withKubernetesManager() if p.toolsManager != nil { @@ -149,10 +148,13 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { return fmt.Errorf("No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform") } - if hasNodeCheck { - if p.clusterClient == nil { - return fmt.Errorf("No cluster client found") - } + // If we have nodes but no cluster client and no k8s endpoint, we can't perform any checks + if hasNodeCheck && p.clusterClient == nil && !hasK8sCheck { + return fmt.Errorf("No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform") + } + + // Handle provider-specific node health checks (when --nodes is provided and cluster client is available) + if hasNodeCheck && p.clusterClient != nil { defer p.clusterClient.Close() nodeAddresses, ok := nodes.([]string) @@ -176,12 +178,45 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { } } - k8sEndpoint := ctx.Value("k8s-endpoint") - k8sEndpointProvided := ctx.Value("k8s-endpoint-provided") + // Perform provider-specific node health checks + var checkCtx context.Context + var cancel context.CancelFunc + if timeoutDuration > 0 { + checkCtx, cancel = context.WithTimeout(ctx, timeoutDuration) + } else { + checkCtx, cancel = context.WithCancel(ctx) + } + defer cancel() - var k8sEndpointStr string - var shouldCheckK8s bool + if err := p.clusterClient.WaitForNodesHealthy(checkCtx, nodeAddresses, expectedVersion); err != nil { + // If cluster client fails and we have k8s endpoint, continue with k8s checks + if hasK8sCheck { + fmt.Printf("Warning: Cluster client failed (%v), continuing with Kubernetes checks\n", err) + } else { + return fmt.Errorf("nodes failed health check: %w", err) + } + } else { + outputFunc := ctx.Value("output") + if outputFunc != nil { + if fn, ok := outputFunc.(func(string)); ok { + message := fmt.Sprintf("All %d nodes are healthy", len(nodeAddresses)) + if expectedVersion != "" { + message += fmt.Sprintf(" and running version %s", expectedVersion) + } + fn(message) + } + } + } + } + + // Handle Kubernetes health checks (API + optional node Ready state) + if hasK8sCheck { + if p.kubernetesManager == nil { + return fmt.Errorf("No kubernetes manager found") + } + k8sEndpoint := ctx.Value("k8s-endpoint") + var k8sEndpointStr string if k8sEndpoint != nil { if e, ok := k8sEndpoint.(string); ok { if e == "true" { @@ -192,49 +227,74 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { } } - if k8sEndpointProvided != nil { - if provided, ok := k8sEndpointProvided.(bool); ok { - shouldCheckK8s = provided + // Only include nodes in the K8s health check if --ready flag is explicitly specified + var nodeNames []string + checkNodeReady := ctx.Value("check-node-ready") + if checkNodeReady != nil { + if ready, ok := checkNodeReady.(bool); ok && ready { + if hasNodeCheck { + // If specific nodes are provided, check those nodes + if nodeAddresses, ok := nodes.([]string); ok { + nodeNames = nodeAddresses + } + } else { + // If --ready is specified but no --nodes are provided, return an error + return fmt.Errorf("--ready flag requires --nodes to be specified") + } } } - // Perform node health checks first - var checkCtx context.Context - var cancel context.CancelFunc - if timeoutDuration > 0 { - checkCtx, cancel = context.WithTimeout(ctx, timeoutDuration) - } else { - checkCtx, cancel = context.WithCancel(ctx) + // Show waiting message if we're going to check node readiness + if len(nodeNames) > 0 { + outputFunc := ctx.Value("output") + if outputFunc != nil { + if fn, ok := outputFunc.(func(string)); ok { + fn(fmt.Sprintf("Waiting for %d nodes to be Ready...", len(nodeNames))) + } + } } - defer cancel() - if err := p.clusterClient.WaitForNodesHealthy(checkCtx, nodeAddresses, expectedVersion); err != nil { - return fmt.Errorf("nodes failed health check: %w", err) + // Get output function for progress feedback + var progressOutputFunc func(string) + output := ctx.Value("output") + if output != nil { + if fn, ok := output.(func(string)); ok { + progressOutputFunc = fn + } + } + + if err := p.kubernetesManager.WaitForKubernetesHealthy(ctx, k8sEndpointStr, progressOutputFunc, nodeNames...); err != nil { + return fmt.Errorf("Kubernetes health check failed: %w", err) } outputFunc := ctx.Value("output") if outputFunc != nil { if fn, ok := outputFunc.(func(string)); ok { - message := fmt.Sprintf("All %d nodes are healthy", len(nodeAddresses)) - if expectedVersion != "" { - message += fmt.Sprintf(" and running version %s", expectedVersion) - } - fn(message) - } - } - - if shouldCheckK8s { - if p.kubernetesManager == nil { - return fmt.Errorf("No kubernetes manager found") - } - - if err := p.kubernetesManager.WaitForKubernetesHealthy(ctx, k8sEndpointStr); err != nil { - return fmt.Errorf("Kubernetes health check failed: %w", err) - } + if len(nodeNames) > 0 { + // Check if all requested nodes were found and ready + readyStatus, err := p.kubernetesManager.GetNodeReadyStatus(ctx, nodeNames) + allFoundAndReady := err == nil && len(readyStatus) == len(nodeNames) + for _, ready := range readyStatus { + if !ready { + allFoundAndReady = false + break + } + } - outputFunc := ctx.Value("output") - if outputFunc != nil { - if fn, ok := outputFunc.(func(string)); ok { + if allFoundAndReady { + if k8sEndpointStr != "" { + fn(fmt.Sprintf("Kubernetes API endpoint %s is healthy and all nodes are Ready", k8sEndpointStr)) + } else { + fn("Kubernetes API endpoint (kubeconfig default) is healthy and all nodes are Ready") + } + } else { + if k8sEndpointStr != "" { + fn(fmt.Sprintf("Kubernetes API endpoint %s is healthy", k8sEndpointStr)) + } else { + fn("Kubernetes API endpoint (kubeconfig default) is healthy") + } + } + } else { if k8sEndpointStr != "" { fn(fmt.Sprintf("Kubernetes API endpoint %s is healthy", k8sEndpointStr)) } else { diff --git a/pkg/pipelines/check_test.go b/pkg/pipelines/check_test.go index f8de8e9b4..8a735941c 100644 --- a/pkg/pipelines/check_test.go +++ b/pkg/pipelines/check_test.go @@ -106,13 +106,17 @@ func setupCheckMocks(t *testing.T, opts ...*SetupOptions) *CheckMocks { // If existing is not a MockKubernetesManager, create a new one mockKubernetesManager = kubernetes.NewMockKubernetesManager(baseMocks.Injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } baseMocks.Injector.Register("kubernetesManager", mockKubernetesManager) } } else { mockKubernetesManager = kubernetes.NewMockKubernetesManager(baseMocks.Injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } baseMocks.Injector.Register("kubernetesManager", mockKubernetesManager) } @@ -700,8 +704,31 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { } }) - t.Run("ReturnsErrorWhenClusterClientIsNil", func(t *testing.T) { - // Given a check pipeline with nil cluster client + t.Run("SucceedsWhenClusterClientIsNilButK8sEndpointProvided", func(t *testing.T) { + // Given a check pipeline with nil cluster client but k8s endpoint provided + pipeline, mocks := setup(t) + pipeline.clusterClient = nil + + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } + + ctx := context.WithValue(context.Background(), "nodes", []string{"node1"}) + ctx = context.WithValue(ctx, "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned (k8s check succeeds) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + }) + + t.Run("ReturnsErrorWhenClusterClientIsNilAndNoK8sEndpoint", func(t *testing.T) { + // Given a check pipeline with nil cluster client and no k8s endpoint pipeline, _ := setup(t) pipeline.clusterClient = nil @@ -714,8 +741,8 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { if err == nil { t.Fatal("Expected error, got nil") } - if err.Error() != "No cluster client found" { - t.Errorf("Expected cluster client error, got: %v", err) + if err.Error() != "No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform" { + t.Errorf("Expected health checks required error, got: %v", err) } }) @@ -757,7 +784,7 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { // Given a check pipeline with only k8s endpoint specified pipeline, mocks := setup(t) - mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string) error { + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { return nil } @@ -810,4 +837,346 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { t.Errorf("Expected no error, got %v", err) } }) + + // ============================================================================= + // --ready flag tests + // ============================================================================= + + t.Run("K8sEndpointOnly_NoReadyFlag", func(t *testing.T) { + // Given a check pipeline with only k8s endpoint specified (no --ready) + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no node names should be passed (no readiness check) + if len(capturedNodeNames) != 0 { + t.Errorf("Expected no node names, got %v", capturedNodeNames) + } + }) + + t.Run("K8sEndpointWithReadyFlag_NoSpecificNodes", func(t *testing.T) { + // Given a check pipeline with k8s endpoint and --ready flag (no specific nodes) + pipeline, _ := setup(t) + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then an error should be returned + if err == nil { + t.Fatal("Expected error, got nil") + } + if err.Error() != "--ready flag requires --nodes to be specified" { + t.Errorf("Expected error about --ready requiring --nodes, got: %v", err) + } + }) + + t.Run("K8sEndpointWithReadyFlag_SpecificNodes", func(t *testing.T) { + // Given a check pipeline with k8s endpoint, --ready flag, and specific nodes + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"specific-node1", "specific-node2"}) + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And specific node names should be passed for readiness check + expectedNodeNames := []string{"specific-node1", "specific-node2"} + if len(capturedNodeNames) != len(expectedNodeNames) { + t.Errorf("Expected %d node names, got %d", len(expectedNodeNames), len(capturedNodeNames)) + } + for i, name := range expectedNodeNames { + if capturedNodeNames[i] != name { + t.Errorf("Expected node name %s at index %d, got %s", name, i, capturedNodeNames[i]) + } + } + }) + + t.Run("K8sEndpointWithNodes_NoReadyFlag", func(t *testing.T) { + // Given a check pipeline with k8s endpoint and specific nodes but no --ready flag + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "nodes", []string{"node1", "node2"}) + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no node names should be passed (no readiness check) + if len(capturedNodeNames) != 0 { + t.Errorf("Expected no node names, got %v", capturedNodeNames) + } + }) + + t.Run("ReadyFlagOnly_NoK8sEndpoint", func(t *testing.T) { + // Given a check pipeline with only --ready flag (no k8s endpoint) + pipeline, _ := setup(t) + + ctx := context.WithValue(context.Background(), "check-node-ready", true) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then an error should be returned + if err == nil { + t.Fatal("Expected error, got nil") + } + if err.Error() != "No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform" { + t.Errorf("Expected health checks required error, got: %v", err) + } + }) + + t.Run("ReadyFlagFalse", func(t *testing.T) { + // Given a check pipeline with k8s endpoint and --ready=false + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", false) + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no node names should be passed (ready=false) + if len(capturedNodeNames) != 0 { + t.Errorf("Expected no node names, got %v", capturedNodeNames) + } + }) + + t.Run("ReadyFlagNil", func(t *testing.T) { + // Given a check pipeline with k8s endpoint and no ready flag (nil) + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no node names should be passed (ready=nil) + if len(capturedNodeNames) != 0 { + t.Errorf("Expected no node names, got %v", capturedNodeNames) + } + }) + + t.Run("ShowsWaitingMessageWhenReadyFlagUsed", func(t *testing.T) { + // Given a check pipeline with --ready flag and specific nodes + pipeline, mocks := setup(t) + + var capturedMessages []string + outputFunc := func(msg string) { + capturedMessages = append(capturedMessages, msg) + } + + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"node1", "node2"}) + ctx = context.WithValue(ctx, "output", outputFunc) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And waiting message should be shown + expectedMessage := "Waiting for 2 nodes to be Ready..." + found := false + for _, msg := range capturedMessages { + if msg == expectedMessage { + found = true + break + } + } + if !found { + t.Errorf("Expected waiting message '%s', got messages: %v", expectedMessage, capturedMessages) + } + }) + + t.Run("PassesOutputFuncToWaitForKubernetesHealthy", func(t *testing.T) { + // Given a check pipeline with --ready flag + pipeline, mocks := setup(t) + + var capturedOutputFunc func(string) + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedOutputFunc = outputFunc + capturedNodeNames = nodeNames + return nil + } + + outputFunc := func(msg string) {} + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"node1"}) + ctx = context.WithValue(ctx, "output", outputFunc) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And output function should be passed to WaitForKubernetesHealthy + if capturedOutputFunc == nil { + t.Error("Expected output function to be passed to WaitForKubernetesHealthy") + } + + // And node names should be passed + if len(capturedNodeNames) != 1 || capturedNodeNames[0] != "node1" { + t.Errorf("Expected node names ['node1'], got %v", capturedNodeNames) + } + }) + + t.Run("NoImmediateNotFoundMessages", func(t *testing.T) { + // Given a check pipeline with --ready flag + pipeline, mocks := setup(t) + + var capturedMessages []string + outputFunc := func(msg string) { + capturedMessages = append(capturedMessages, msg) + } + + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"node1"}) + ctx = context.WithValue(ctx, "output", outputFunc) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no immediate "NOT FOUND" messages should be shown + for _, msg := range capturedMessages { + if strings.Contains(msg, "NOT FOUND") { + t.Errorf("Expected no immediate NOT FOUND messages, but found: %s", msg) + } + } + + // And the waiting message should be shown (along with Talos health check messages) + waitingMessageFound := false + for _, msg := range capturedMessages { + if msg == "Waiting for 1 nodes to be Ready..." { + waitingMessageFound = true + break + } + } + if !waitingMessageFound { + t.Errorf("Expected waiting message 'Waiting for 1 nodes to be Ready...', got messages: %v", capturedMessages) + } + }) + + t.Run("HandlesNilOutputFunc", func(t *testing.T) { + // Given a check pipeline with --ready flag but no output function + pipeline, mocks := setup(t) + + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"node1"}) + // No output function in context + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + }) } diff --git a/pkg/pipelines/pipeline_test.go b/pkg/pipelines/pipeline_test.go index 184a65d77..bd838167b 100644 --- a/pkg/pipelines/pipeline_test.go +++ b/pkg/pipelines/pipeline_test.go @@ -732,7 +732,7 @@ func TestWithPipeline(t *testing.T) { // Set up kubernetes manager mockKubernetesManager := kubernetes.NewMockKubernetesManager(injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { return nil } injector.Register("kubernetesManager", mockKubernetesManager) }