From f265e3dbbbc5280063470c7c21babfe3e3fd18f4 Mon Sep 17 00:00:00 2001 From: Ryan VanGundy Date: Sat, 26 Jul 2025 14:38:10 -0400 Subject: [PATCH 1/2] feat(check): Check node readiness state Checks the node's readiness state in addition to the status of the k8s api endpoint when --nodes and --k8s-endpoint are both included in the node-health command. --- pkg/kubernetes/kubernetes_client.go | 108 +++++++++++++++++- pkg/kubernetes/kubernetes_manager.go | 49 ++++++-- pkg/kubernetes/mock_kubernetes_client.go | 30 ++++- pkg/kubernetes/mock_kubernetes_manager.go | 6 +- .../mock_kubernetes_manager_test.go | 2 +- pkg/pipelines/check.go | 70 +++++++----- pkg/pipelines/check_test.go | 6 +- pkg/pipelines/pipeline_test.go | 2 +- 8 files changed, 218 insertions(+), 55 deletions(-) diff --git a/pkg/kubernetes/kubernetes_client.go b/pkg/kubernetes/kubernetes_client.go index cb243504e..ecb7623c7 100644 --- a/pkg/kubernetes/kubernetes_client.go +++ b/pkg/kubernetes/kubernetes_client.go @@ -7,6 +7,8 @@ package kubernetes import ( "context" "fmt" + "strings" + "os" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -22,7 +24,7 @@ import ( // Interfaces // ============================================================================= -// KubernetesClient defines methods for low-level Kubernetes operations +// KubernetesClient defines methods for Kubernetes resource operations type KubernetesClient interface { // Resource operations GetResource(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error) @@ -31,10 +33,13 @@ type KubernetesClient interface { DeleteResource(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error PatchResource(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error) CheckHealth(ctx context.Context, endpoint string) error + // Node health operations + GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) + GetAllNodes(ctx context.Context) ([]*unstructured.Unstructured, error) } // ============================================================================= -// Constructor +// Types // ============================================================================= // DynamicKubernetesClient implements KubernetesClient using dynamic client @@ -43,6 +48,10 @@ type DynamicKubernetesClient struct { endpoint string } +// ============================================================================= +// Constructor +// ============================================================================= + // NewDynamicKubernetesClient creates a new DynamicKubernetesClient func NewDynamicKubernetesClient() *DynamicKubernetesClient { return &DynamicKubernetesClient{} @@ -116,6 +125,71 @@ func (c *DynamicKubernetesClient) CheckHealth(ctx context.Context, endpoint stri return nil } +// GetNodeReadyStatus returns a map of node names to their Ready condition status. +// It checks the Ready condition for each specified node using the dynamic client. +// If nodeNames is empty, all nodes are checked. Nodes not found are omitted from the result. +// Returns a map of node names to Ready status (true if Ready, false if NotReady), or an error if listing fails. +func (c *DynamicKubernetesClient) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) { + if err := c.ensureClient(); err != nil { + return nil, fmt.Errorf("failed to initialize Kubernetes client: %w", err) + } + + nodeGVR := schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "nodes", + } + + var nodes *unstructured.UnstructuredList + var err error + + if len(nodeNames) == 0 { + nodes, err = c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{}) + } else { + fieldSelector := fmt.Sprintf("metadata.name in (%s)", strings.Join(nodeNames, ",")) + nodes, err = c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + } + + if err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } + + readyStatus := make(map[string]bool) + for _, node := range nodes.Items { + nodeName := node.GetName() + ready := c.isNodeReady(&node) + readyStatus[nodeName] = ready + } + + return readyStatus, nil +} + +// GetAllNodes retrieves all nodes from the cluster. +// Returns a slice of unstructured node objects. +func (c *DynamicKubernetesClient) GetAllNodes(ctx context.Context) ([]*unstructured.Unstructured, error) { + if err := c.ensureClient(); err != nil { + return nil, fmt.Errorf("failed to initialize Kubernetes client: %w", err) + } + + nodeGVR := schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "nodes", + } + + nodes, err := c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } + + result := make([]*unstructured.Unstructured, len(nodes.Items)) + for i := range nodes.Items { + result[i] = &nodes.Items[i] + } + + return result, nil +} + // ============================================================================= // Private Methods // ============================================================================= @@ -159,3 +233,33 @@ func (c *DynamicKubernetesClient) ensureClient() error { c.client = cli return nil } + +// isNodeReady checks if a node is in Ready state by examining its conditions. +// Returns true if the node has a Ready condition with status "True". +func (c *DynamicKubernetesClient) isNodeReady(node *unstructured.Unstructured) bool { + conditions, found, err := unstructured.NestedSlice(node.Object, "status", "conditions") + if err != nil || !found { + return false + } + + for _, condition := range conditions { + conditionMap, ok := condition.(map[string]interface{}) + if !ok { + continue + } + + conditionType, found, err := unstructured.NestedString(conditionMap, "type") + if err != nil || !found || conditionType != "Ready" { + continue + } + + conditionStatus, found, err := unstructured.NestedString(conditionMap, "status") + if err != nil || !found { + continue + } + + return conditionStatus == "True" + } + + return false +} diff --git a/pkg/kubernetes/kubernetes_manager.go b/pkg/kubernetes/kubernetes_manager.go index ea9c3ccc7..834a9a928 100644 --- a/pkg/kubernetes/kubernetes_manager.go +++ b/pkg/kubernetes/kubernetes_manager.go @@ -46,7 +46,7 @@ type KubernetesManager interface { WaitForKustomizationsDeleted(message string, names ...string) error CheckGitRepositoryStatus() error GetKustomizationStatus(names []string) (map[string]bool, error) - WaitForKubernetesHealthy(ctx context.Context, endpoint string) error + WaitForKubernetesHealthy(ctx context.Context, endpoint string, nodeNames ...string) error } // ============================================================================= @@ -548,10 +548,10 @@ func (k *BaseKubernetesManager) GetKustomizationStatus(names []string) (map[stri return status, nil } -// WaitForKubernetesHealthy polls the Kubernetes API endpoint for health until it is reachable or the context deadline is exceeded. -// If the client is not initialized, returns an error. Uses a default timeout of 5 minutes if the context has no deadline. -// Polls every 10 seconds. Returns nil if the API becomes healthy, or an error if the timeout is reached or the context is canceled. -func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string) error { +// WaitForKubernetesHealthy waits for the Kubernetes API to be healthy and optionally checks node Ready state. +// If nodeNames are provided, it will also verify that all specified nodes are in Ready state. +// Returns an error if the API is unreachable or if any specified nodes are not Ready. +func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, nodeNames ...string) error { if k.client == nil { return fmt.Errorf("kubernetes client not initialized") } @@ -568,16 +568,49 @@ func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, en case <-ctx.Done(): return fmt.Errorf("timeout waiting for Kubernetes API to be healthy") default: - if err := k.client.CheckHealth(ctx, endpoint); err == nil { - return nil + // Check API connectivity + if err := k.client.CheckHealth(ctx, endpoint); err != nil { + time.Sleep(pollInterval) + continue + } + + // If node names are specified, check their Ready state + if len(nodeNames) > 0 { + if err := k.waitForNodesReady(ctx, nodeNames); err != nil { + time.Sleep(pollInterval) + continue + } } - time.Sleep(pollInterval) + + return nil } } return fmt.Errorf("timeout waiting for Kubernetes API to be healthy") } +// waitForNodesReady waits for all specified nodes to be in Ready state. +// Returns an error if any nodes are not Ready within the context timeout. +func (k *BaseKubernetesManager) waitForNodesReady(ctx context.Context, nodeNames []string) error { + readyStatus, err := k.client.GetNodeReadyStatus(ctx, nodeNames) + if err != nil { + return fmt.Errorf("failed to get node ready status: %w", err) + } + + var notReadyNodes []string + for _, nodeName := range nodeNames { + if ready, exists := readyStatus[nodeName]; !exists || !ready { + notReadyNodes = append(notReadyNodes, nodeName) + } + } + + if len(notReadyNodes) > 0 { + return fmt.Errorf("nodes not ready: %s", strings.Join(notReadyNodes, ", ")) + } + + return nil +} + // applyWithRetry applies a resource using SSA with minimal logic func (k *BaseKubernetesManager) applyWithRetry(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) error { existing, err := k.client.GetResource(gvr, obj.GetNamespace(), obj.GetName()) diff --git a/pkg/kubernetes/mock_kubernetes_client.go b/pkg/kubernetes/mock_kubernetes_client.go index ccf80fe3b..70c926288 100644 --- a/pkg/kubernetes/mock_kubernetes_client.go +++ b/pkg/kubernetes/mock_kubernetes_client.go @@ -19,12 +19,14 @@ import ( // MockKubernetesClient is a mock implementation of KubernetesClient interface for testing type MockKubernetesClient struct { - GetResourceFunc func(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error) - ListResourcesFunc func(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error) - ApplyResourceFunc func(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) (*unstructured.Unstructured, error) - DeleteResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error - PatchResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error) - CheckHealthFunc func(ctx context.Context, endpoint string) error + GetResourceFunc func(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error) + ListResourcesFunc func(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error) + ApplyResourceFunc func(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) (*unstructured.Unstructured, error) + DeleteResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error + PatchResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error) + CheckHealthFunc func(ctx context.Context, endpoint string) error + GetNodeReadyStatusFunc func(ctx context.Context, nodeNames []string) (map[string]bool, error) + GetAllNodesFunc func(ctx context.Context) ([]*unstructured.Unstructured, error) } // ============================================================================= @@ -87,3 +89,19 @@ func (m *MockKubernetesClient) CheckHealth(ctx context.Context, endpoint string) } return nil } + +// GetNodeReadyStatus implements KubernetesClient interface +func (m *MockKubernetesClient) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) { + if m.GetNodeReadyStatusFunc != nil { + return m.GetNodeReadyStatusFunc(ctx, nodeNames) + } + return make(map[string]bool), nil +} + +// GetAllNodes implements KubernetesClient interface +func (m *MockKubernetesClient) GetAllNodes(ctx context.Context) ([]*unstructured.Unstructured, error) { + if m.GetAllNodesFunc != nil { + return m.GetAllNodesFunc(ctx) + } + return []*unstructured.Unstructured{}, nil +} diff --git a/pkg/kubernetes/mock_kubernetes_manager.go b/pkg/kubernetes/mock_kubernetes_manager.go index d4497fbd4..ca4a684c2 100644 --- a/pkg/kubernetes/mock_kubernetes_manager.go +++ b/pkg/kubernetes/mock_kubernetes_manager.go @@ -34,7 +34,7 @@ type MockKubernetesManager struct { ApplyOCIRepositoryFunc func(repo *sourcev1.OCIRepository) error WaitForKustomizationsDeletedFunc func(message string, names ...string) error CheckGitRepositoryStatusFunc func() error - WaitForKubernetesHealthyFunc func(ctx context.Context, endpoint string) error + WaitForKubernetesHealthyFunc func(ctx context.Context, endpoint string, nodeNames ...string) error } // ============================================================================= @@ -171,9 +171,9 @@ func (m *MockKubernetesManager) CheckGitRepositoryStatus() error { } // WaitForKubernetesHealthy waits for the Kubernetes API endpoint to be healthy with polling and timeout -func (m *MockKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string) error { +func (m *MockKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, nodeNames ...string) error { if m.WaitForKubernetesHealthyFunc != nil { - return m.WaitForKubernetesHealthyFunc(ctx, endpoint) + return m.WaitForKubernetesHealthyFunc(ctx, endpoint, nodeNames...) } return nil } diff --git a/pkg/kubernetes/mock_kubernetes_manager_test.go b/pkg/kubernetes/mock_kubernetes_manager_test.go index 8c5e92139..6c38b75ac 100644 --- a/pkg/kubernetes/mock_kubernetes_manager_test.go +++ b/pkg/kubernetes/mock_kubernetes_manager_test.go @@ -420,7 +420,7 @@ func TestMockKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { t.Run("FuncSet", func(t *testing.T) { manager := setup(t) errVal := fmt.Errorf("kubernetes health check failed") - manager.WaitForKubernetesHealthyFunc = func(c context.Context, e string) error { + manager.WaitForKubernetesHealthyFunc = func(c context.Context, e string, nodeNames ...string) error { if c != ctx { t.Errorf("Expected context %v, got %v", ctx, c) } diff --git a/pkg/pipelines/check.go b/pkg/pipelines/check.go index d5bc204f7..ad946273d 100644 --- a/pkg/pipelines/check.go +++ b/pkg/pipelines/check.go @@ -149,6 +149,7 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { return fmt.Errorf("No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform") } + // Handle provider-specific node health checks if hasNodeCheck { if p.clusterClient == nil { return fmt.Errorf("No cluster client found") @@ -176,29 +177,7 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { } } - k8sEndpoint := ctx.Value("k8s-endpoint") - k8sEndpointProvided := ctx.Value("k8s-endpoint-provided") - - var k8sEndpointStr string - var shouldCheckK8s bool - - if k8sEndpoint != nil { - if e, ok := k8sEndpoint.(string); ok { - if e == "true" { - k8sEndpointStr = "" - } else { - k8sEndpointStr = e - } - } - } - - if k8sEndpointProvided != nil { - if provided, ok := k8sEndpointProvided.(bool); ok { - shouldCheckK8s = provided - } - } - - // Perform node health checks first + // Perform provider-specific node health checks var checkCtx context.Context var cancel context.CancelFunc if timeoutDuration > 0 { @@ -222,19 +201,48 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { fn(message) } } + } + + // Handle Kubernetes health checks (API + optional node Ready state) + if hasK8sCheck { + if p.kubernetesManager == nil { + return fmt.Errorf("No kubernetes manager found") + } - if shouldCheckK8s { - if p.kubernetesManager == nil { - return fmt.Errorf("No kubernetes manager found") + k8sEndpoint := ctx.Value("k8s-endpoint") + var k8sEndpointStr string + if k8sEndpoint != nil { + if e, ok := k8sEndpoint.(string); ok { + if e == "true" { + k8sEndpointStr = "" + } else { + k8sEndpointStr = e + } } + } - if err := p.kubernetesManager.WaitForKubernetesHealthy(ctx, k8sEndpointStr); err != nil { - return fmt.Errorf("Kubernetes health check failed: %w", err) + // If nodes are specified, include them in the K8s health check for Ready state + var nodeNames []string + if hasNodeCheck { + if nodeAddresses, ok := nodes.([]string); ok { + nodeNames = nodeAddresses } + } + + if err := p.kubernetesManager.WaitForKubernetesHealthy(ctx, k8sEndpointStr, nodeNames...); err != nil { + return fmt.Errorf("Kubernetes health check failed: %w", err) + } - outputFunc := ctx.Value("output") - if outputFunc != nil { - if fn, ok := outputFunc.(func(string)); ok { + outputFunc := ctx.Value("output") + if outputFunc != nil { + if fn, ok := outputFunc.(func(string)); ok { + if len(nodeNames) > 0 { + if k8sEndpointStr != "" { + fn(fmt.Sprintf("Kubernetes API endpoint %s is healthy and all nodes are Ready", k8sEndpointStr)) + } else { + fn("Kubernetes API endpoint (kubeconfig default) is healthy and all nodes are Ready") + } + } else { if k8sEndpointStr != "" { fn(fmt.Sprintf("Kubernetes API endpoint %s is healthy", k8sEndpointStr)) } else { diff --git a/pkg/pipelines/check_test.go b/pkg/pipelines/check_test.go index f8de8e9b4..abaac18ab 100644 --- a/pkg/pipelines/check_test.go +++ b/pkg/pipelines/check_test.go @@ -106,13 +106,13 @@ func setupCheckMocks(t *testing.T, opts ...*SetupOptions) *CheckMocks { // If existing is not a MockKubernetesManager, create a new one mockKubernetesManager = kubernetes.NewMockKubernetesManager(baseMocks.Injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, nodeNames ...string) error { return nil } baseMocks.Injector.Register("kubernetesManager", mockKubernetesManager) } } else { mockKubernetesManager = kubernetes.NewMockKubernetesManager(baseMocks.Injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, nodeNames ...string) error { return nil } baseMocks.Injector.Register("kubernetesManager", mockKubernetesManager) } @@ -757,7 +757,7 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { // Given a check pipeline with only k8s endpoint specified pipeline, mocks := setup(t) - mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string) error { + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, nodeNames ...string) error { return nil } diff --git a/pkg/pipelines/pipeline_test.go b/pkg/pipelines/pipeline_test.go index 184a65d77..34eb13b25 100644 --- a/pkg/pipelines/pipeline_test.go +++ b/pkg/pipelines/pipeline_test.go @@ -732,7 +732,7 @@ func TestWithPipeline(t *testing.T) { // Set up kubernetes manager mockKubernetesManager := kubernetes.NewMockKubernetesManager(injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, nodeNames ...string) error { return nil } injector.Register("kubernetesManager", mockKubernetesManager) } From 811700213fcd2fb09bb5ea389ba2230ea5a457ee Mon Sep 17 00:00:00 2001 From: Ryan VanGundy Date: Sat, 26 Jul 2025 19:57:32 -0400 Subject: [PATCH 2/2] Expand, write tests --- cmd/check.go | 5 +- pkg/kubernetes/kubernetes_client.go | 56 +-- pkg/kubernetes/kubernetes_manager.go | 99 ++++- pkg/kubernetes/kubernetes_manager_test.go | 8 +- pkg/kubernetes/mock_kubernetes_client.go | 9 - pkg/kubernetes/mock_kubernetes_manager.go | 15 +- .../mock_kubernetes_manager_test.go | 6 +- pkg/pipelines/check.go | 100 +++-- pkg/pipelines/check_test.go | 383 +++++++++++++++++- pkg/pipelines/pipeline_test.go | 2 +- 10 files changed, 587 insertions(+), 96 deletions(-) diff --git a/cmd/check.go b/cmd/check.go index 1d985f82c..9699be941 100644 --- a/cmd/check.go +++ b/cmd/check.go @@ -16,6 +16,7 @@ var ( nodeHealthNodes []string nodeHealthVersion string k8sEndpoint string + checkNodeReady bool ) var checkCmd = &cobra.Command{ @@ -81,7 +82,8 @@ var checkNodeHealthCmd = &cobra.Command{ ctx = context.WithValue(ctx, "timeout", nodeHealthTimeout) ctx = context.WithValue(ctx, "version", nodeHealthVersion) ctx = context.WithValue(ctx, "k8s-endpoint", k8sEndpoint) - ctx = context.WithValue(ctx, "k8s-endpoint-provided", k8sEndpoint != "") + ctx = context.WithValue(ctx, "k8s-endpoint-provided", k8sEndpoint != "" || checkNodeReady) + ctx = context.WithValue(ctx, "check-node-ready", checkNodeReady) ctx = context.WithValue(ctx, "output", outputFunc) // Set up the check pipeline @@ -109,4 +111,5 @@ func init() { checkNodeHealthCmd.Flags().StringVar(&nodeHealthVersion, "version", "", "Expected version to check against (optional)") checkNodeHealthCmd.Flags().StringVar(&k8sEndpoint, "k8s-endpoint", "", "Perform Kubernetes API health check (use --k8s-endpoint or --k8s-endpoint=https://endpoint:6443)") checkNodeHealthCmd.Flags().Lookup("k8s-endpoint").NoOptDefVal = "true" + checkNodeHealthCmd.Flags().BoolVar(&checkNodeReady, "ready", false, "Check Kubernetes node readiness status") } diff --git a/pkg/kubernetes/kubernetes_client.go b/pkg/kubernetes/kubernetes_client.go index ecb7623c7..135869567 100644 --- a/pkg/kubernetes/kubernetes_client.go +++ b/pkg/kubernetes/kubernetes_client.go @@ -7,8 +7,6 @@ package kubernetes import ( "context" "fmt" - "strings" - "os" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,7 +33,6 @@ type KubernetesClient interface { CheckHealth(ctx context.Context, endpoint string) error // Node health operations GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) - GetAllNodes(ctx context.Context) ([]*unstructured.Unstructured, error) } // ============================================================================= @@ -143,51 +140,42 @@ func (c *DynamicKubernetesClient) GetNodeReadyStatus(ctx context.Context, nodeNa var nodes *unstructured.UnstructuredList var err error - if len(nodeNames) == 0 { - nodes, err = c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{}) - } else { - fieldSelector := fmt.Sprintf("metadata.name in (%s)", strings.Join(nodeNames, ",")) - nodes, err = c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) - } - + // Get all nodes and filter by name if specific nodes are requested + nodes, err = c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list nodes: %w", err) } - readyStatus := make(map[string]bool) - for _, node := range nodes.Items { - nodeName := node.GetName() - ready := c.isNodeReady(&node) - readyStatus[nodeName] = ready - } - - return readyStatus, nil -} + // Filter nodes if specific node names are requested + if len(nodeNames) > 0 { + var filteredNodes []unstructured.Unstructured + nodeNameSet := make(map[string]bool) + for _, name := range nodeNames { + nodeNameSet[name] = true + } -// GetAllNodes retrieves all nodes from the cluster. -// Returns a slice of unstructured node objects. -func (c *DynamicKubernetesClient) GetAllNodes(ctx context.Context) ([]*unstructured.Unstructured, error) { - if err := c.ensureClient(); err != nil { - return nil, fmt.Errorf("failed to initialize Kubernetes client: %w", err) - } + for _, node := range nodes.Items { + if nodeNameSet[node.GetName()] { + filteredNodes = append(filteredNodes, node) + } + } - nodeGVR := schema.GroupVersionResource{ - Group: "", - Version: "v1", - Resource: "nodes", + // Replace the items with filtered ones + nodes.Items = filteredNodes } - nodes, err := c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list nodes: %w", err) } - result := make([]*unstructured.Unstructured, len(nodes.Items)) - for i := range nodes.Items { - result[i] = &nodes.Items[i] + readyStatus := make(map[string]bool) + for _, node := range nodes.Items { + nodeName := node.GetName() + ready := c.isNodeReady(&node) + readyStatus[nodeName] = ready } - return result, nil + return readyStatus, nil } // ============================================================================= diff --git a/pkg/kubernetes/kubernetes_manager.go b/pkg/kubernetes/kubernetes_manager.go index 834a9a928..f960c9997 100644 --- a/pkg/kubernetes/kubernetes_manager.go +++ b/pkg/kubernetes/kubernetes_manager.go @@ -46,7 +46,8 @@ type KubernetesManager interface { WaitForKustomizationsDeleted(message string, names ...string) error CheckGitRepositoryStatus() error GetKustomizationStatus(names []string) (map[string]bool, error) - WaitForKubernetesHealthy(ctx context.Context, endpoint string, nodeNames ...string) error + WaitForKubernetesHealthy(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error + GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) } // ============================================================================= @@ -551,7 +552,7 @@ func (k *BaseKubernetesManager) GetKustomizationStatus(names []string) (map[stri // WaitForKubernetesHealthy waits for the Kubernetes API to be healthy and optionally checks node Ready state. // If nodeNames are provided, it will also verify that all specified nodes are in Ready state. // Returns an error if the API is unreachable or if any specified nodes are not Ready. -func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, nodeNames ...string) error { +func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { if k.client == nil { return fmt.Errorf("kubernetes client not initialized") } @@ -576,7 +577,7 @@ func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, en // If node names are specified, check their Ready state if len(nodeNames) > 0 { - if err := k.waitForNodesReady(ctx, nodeNames); err != nil { + if err := k.waitForNodesReady(ctx, nodeNames, outputFunc); err != nil { time.Sleep(pollInterval) continue } @@ -589,26 +590,104 @@ func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, en return fmt.Errorf("timeout waiting for Kubernetes API to be healthy") } -// waitForNodesReady waits for all specified nodes to be in Ready state. -// Returns an error if any nodes are not Ready within the context timeout. -func (k *BaseKubernetesManager) waitForNodesReady(ctx context.Context, nodeNames []string) error { +// waitForNodesReady waits until all specified nodes exist and are in Ready state. +// Returns an error if any nodes are missing or not Ready within the context deadline. +func (k *BaseKubernetesManager) waitForNodesReady(ctx context.Context, nodeNames []string, outputFunc func(string)) error { + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(5 * time.Minute) + } + + pollInterval := 5 * time.Second + lastStatus := make(map[string]string) + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for nodes to be ready") + default: + readyStatus, err := k.client.GetNodeReadyStatus(ctx, nodeNames) + if err != nil { + time.Sleep(pollInterval) + continue + } + + var missingNodes []string + var notReadyNodes []string + var readyNodes []string + + for _, nodeName := range nodeNames { + if ready, exists := readyStatus[nodeName]; !exists { + missingNodes = append(missingNodes, nodeName) + } else if !ready { + notReadyNodes = append(notReadyNodes, nodeName) + } else { + readyNodes = append(readyNodes, nodeName) + } + } + + // Report status changes + if outputFunc != nil { + for _, nodeName := range nodeNames { + var currentStatus string + if ready, exists := readyStatus[nodeName]; !exists { + currentStatus = "NOT FOUND" + } else if ready { + currentStatus = "READY" + } else { + currentStatus = "NOT READY" + } + + if lastStatus[nodeName] != currentStatus { + outputFunc(fmt.Sprintf("Node %s: %s", nodeName, currentStatus)) + lastStatus[nodeName] = currentStatus + } + } + } + + if len(missingNodes) == 0 && len(notReadyNodes) == 0 { + return nil + } + + time.Sleep(pollInterval) + } + } + + // Final check to get the current status for error reporting readyStatus, err := k.client.GetNodeReadyStatus(ctx, nodeNames) if err != nil { - return fmt.Errorf("failed to get node ready status: %w", err) + return fmt.Errorf("timeout waiting for nodes to be ready: failed to get final status: %w", err) } + var missingNodes []string var notReadyNodes []string + for _, nodeName := range nodeNames { - if ready, exists := readyStatus[nodeName]; !exists || !ready { + if ready, exists := readyStatus[nodeName]; !exists { + missingNodes = append(missingNodes, nodeName) + } else if !ready { notReadyNodes = append(notReadyNodes, nodeName) } } + if len(missingNodes) > 0 { + return fmt.Errorf("timeout waiting for nodes to appear: %s", strings.Join(missingNodes, ", ")) + } + if len(notReadyNodes) > 0 { - return fmt.Errorf("nodes not ready: %s", strings.Join(notReadyNodes, ", ")) + return fmt.Errorf("timeout waiting for nodes to be ready: %s", strings.Join(notReadyNodes, ", ")) } - return nil + return fmt.Errorf("timeout waiting for nodes to be ready") +} + +// GetNodeReadyStatus returns a map of node names to their Ready condition status. +// Returns a map of node names to Ready status (true if Ready, false if NotReady), or an error if listing fails. +func (k *BaseKubernetesManager) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) { + if k.client == nil { + return nil, fmt.Errorf("kubernetes client not initialized") + } + return k.client.GetNodeReadyStatus(ctx, nodeNames) } // applyWithRetry applies a resource using SSA with minimal logic diff --git a/pkg/kubernetes/kubernetes_manager_test.go b/pkg/kubernetes/kubernetes_manager_test.go index de6050bb5..af85198d1 100644 --- a/pkg/kubernetes/kubernetes_manager_test.go +++ b/pkg/kubernetes/kubernetes_manager_test.go @@ -1976,7 +1976,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443") + err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -1989,7 +1989,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443") + err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil) if err == nil { t.Error("Expected error, got nil") } @@ -2009,7 +2009,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443") + err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil) if err == nil { t.Error("Expected error, got nil") } @@ -2029,7 +2029,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // Cancel immediately - err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443") + err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil) if err == nil { t.Error("Expected error, got nil") } diff --git a/pkg/kubernetes/mock_kubernetes_client.go b/pkg/kubernetes/mock_kubernetes_client.go index 70c926288..db749b8aa 100644 --- a/pkg/kubernetes/mock_kubernetes_client.go +++ b/pkg/kubernetes/mock_kubernetes_client.go @@ -26,7 +26,6 @@ type MockKubernetesClient struct { PatchResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error) CheckHealthFunc func(ctx context.Context, endpoint string) error GetNodeReadyStatusFunc func(ctx context.Context, nodeNames []string) (map[string]bool, error) - GetAllNodesFunc func(ctx context.Context) ([]*unstructured.Unstructured, error) } // ============================================================================= @@ -97,11 +96,3 @@ func (m *MockKubernetesClient) GetNodeReadyStatus(ctx context.Context, nodeNames } return make(map[string]bool), nil } - -// GetAllNodes implements KubernetesClient interface -func (m *MockKubernetesClient) GetAllNodes(ctx context.Context) ([]*unstructured.Unstructured, error) { - if m.GetAllNodesFunc != nil { - return m.GetAllNodesFunc(ctx) - } - return []*unstructured.Unstructured{}, nil -} diff --git a/pkg/kubernetes/mock_kubernetes_manager.go b/pkg/kubernetes/mock_kubernetes_manager.go index ca4a684c2..c4c3ab870 100644 --- a/pkg/kubernetes/mock_kubernetes_manager.go +++ b/pkg/kubernetes/mock_kubernetes_manager.go @@ -34,7 +34,8 @@ type MockKubernetesManager struct { ApplyOCIRepositoryFunc func(repo *sourcev1.OCIRepository) error WaitForKustomizationsDeletedFunc func(message string, names ...string) error CheckGitRepositoryStatusFunc func() error - WaitForKubernetesHealthyFunc func(ctx context.Context, endpoint string, nodeNames ...string) error + WaitForKubernetesHealthyFunc func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error + GetNodeReadyStatusFunc func(ctx context.Context, nodeNames []string) (map[string]bool, error) } // ============================================================================= @@ -171,9 +172,17 @@ func (m *MockKubernetesManager) CheckGitRepositoryStatus() error { } // WaitForKubernetesHealthy waits for the Kubernetes API endpoint to be healthy with polling and timeout -func (m *MockKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, nodeNames ...string) error { +func (m *MockKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { if m.WaitForKubernetesHealthyFunc != nil { - return m.WaitForKubernetesHealthyFunc(ctx, endpoint, nodeNames...) + return m.WaitForKubernetesHealthyFunc(ctx, endpoint, outputFunc, nodeNames...) } return nil } + +// GetNodeReadyStatus returns a map of node names to their Ready condition status. +func (m *MockKubernetesManager) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) { + if m.GetNodeReadyStatusFunc != nil { + return m.GetNodeReadyStatusFunc(ctx, nodeNames) + } + return make(map[string]bool), nil +} diff --git a/pkg/kubernetes/mock_kubernetes_manager_test.go b/pkg/kubernetes/mock_kubernetes_manager_test.go index 6c38b75ac..967664b46 100644 --- a/pkg/kubernetes/mock_kubernetes_manager_test.go +++ b/pkg/kubernetes/mock_kubernetes_manager_test.go @@ -420,7 +420,7 @@ func TestMockKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { t.Run("FuncSet", func(t *testing.T) { manager := setup(t) errVal := fmt.Errorf("kubernetes health check failed") - manager.WaitForKubernetesHealthyFunc = func(c context.Context, e string, nodeNames ...string) error { + manager.WaitForKubernetesHealthyFunc = func(c context.Context, e string, outputFunc func(string), nodeNames ...string) error { if c != ctx { t.Errorf("Expected context %v, got %v", ctx, c) } @@ -429,7 +429,7 @@ func TestMockKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { } return errVal } - err := manager.WaitForKubernetesHealthy(ctx, endpoint) + err := manager.WaitForKubernetesHealthy(ctx, endpoint, nil) if err != errVal { t.Errorf("Expected err, got %v", err) } @@ -437,7 +437,7 @@ func TestMockKubernetesManager_WaitForKubernetesHealthy(t *testing.T) { t.Run("FuncNotSet", func(t *testing.T) { manager := setup(t) - err := manager.WaitForKubernetesHealthy(ctx, endpoint) + err := manager.WaitForKubernetesHealthy(ctx, endpoint, nil) if err != nil { t.Errorf("Expected nil, got %v", err) } diff --git a/pkg/pipelines/check.go b/pkg/pipelines/check.go index ad946273d..c7bb0c574 100644 --- a/pkg/pipelines/check.go +++ b/pkg/pipelines/check.go @@ -53,7 +53,6 @@ func (p *CheckPipeline) Initialize(injector di.Injector, ctx context.Context) er p.toolsManager = p.withToolsManager() p.clusterClient = p.withClusterClient() p.withKubernetesClient() - p.kubernetesManager = p.withKubernetesManager() if p.toolsManager != nil { @@ -149,11 +148,13 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { return fmt.Errorf("No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform") } - // Handle provider-specific node health checks - if hasNodeCheck { - if p.clusterClient == nil { - return fmt.Errorf("No cluster client found") - } + // If we have nodes but no cluster client and no k8s endpoint, we can't perform any checks + if hasNodeCheck && p.clusterClient == nil && !hasK8sCheck { + return fmt.Errorf("No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform") + } + + // Handle provider-specific node health checks (when --nodes is provided and cluster client is available) + if hasNodeCheck && p.clusterClient != nil { defer p.clusterClient.Close() nodeAddresses, ok := nodes.([]string) @@ -188,17 +189,22 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { defer cancel() if err := p.clusterClient.WaitForNodesHealthy(checkCtx, nodeAddresses, expectedVersion); err != nil { - return fmt.Errorf("nodes failed health check: %w", err) - } - - outputFunc := ctx.Value("output") - if outputFunc != nil { - if fn, ok := outputFunc.(func(string)); ok { - message := fmt.Sprintf("All %d nodes are healthy", len(nodeAddresses)) - if expectedVersion != "" { - message += fmt.Sprintf(" and running version %s", expectedVersion) + // If cluster client fails and we have k8s endpoint, continue with k8s checks + if hasK8sCheck { + fmt.Printf("Warning: Cluster client failed (%v), continuing with Kubernetes checks\n", err) + } else { + return fmt.Errorf("nodes failed health check: %w", err) + } + } else { + outputFunc := ctx.Value("output") + if outputFunc != nil { + if fn, ok := outputFunc.(func(string)); ok { + message := fmt.Sprintf("All %d nodes are healthy", len(nodeAddresses)) + if expectedVersion != "" { + message += fmt.Sprintf(" and running version %s", expectedVersion) + } + fn(message) } - fn(message) } } } @@ -221,15 +227,43 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { } } - // If nodes are specified, include them in the K8s health check for Ready state + // Only include nodes in the K8s health check if --ready flag is explicitly specified var nodeNames []string - if hasNodeCheck { - if nodeAddresses, ok := nodes.([]string); ok { - nodeNames = nodeAddresses + checkNodeReady := ctx.Value("check-node-ready") + if checkNodeReady != nil { + if ready, ok := checkNodeReady.(bool); ok && ready { + if hasNodeCheck { + // If specific nodes are provided, check those nodes + if nodeAddresses, ok := nodes.([]string); ok { + nodeNames = nodeAddresses + } + } else { + // If --ready is specified but no --nodes are provided, return an error + return fmt.Errorf("--ready flag requires --nodes to be specified") + } + } + } + + // Show waiting message if we're going to check node readiness + if len(nodeNames) > 0 { + outputFunc := ctx.Value("output") + if outputFunc != nil { + if fn, ok := outputFunc.(func(string)); ok { + fn(fmt.Sprintf("Waiting for %d nodes to be Ready...", len(nodeNames))) + } } } - if err := p.kubernetesManager.WaitForKubernetesHealthy(ctx, k8sEndpointStr, nodeNames...); err != nil { + // Get output function for progress feedback + var progressOutputFunc func(string) + output := ctx.Value("output") + if output != nil { + if fn, ok := output.(func(string)); ok { + progressOutputFunc = fn + } + } + + if err := p.kubernetesManager.WaitForKubernetesHealthy(ctx, k8sEndpointStr, progressOutputFunc, nodeNames...); err != nil { return fmt.Errorf("Kubernetes health check failed: %w", err) } @@ -237,10 +271,28 @@ func (p *CheckPipeline) executeNodeHealthCheck(ctx context.Context) error { if outputFunc != nil { if fn, ok := outputFunc.(func(string)); ok { if len(nodeNames) > 0 { - if k8sEndpointStr != "" { - fn(fmt.Sprintf("Kubernetes API endpoint %s is healthy and all nodes are Ready", k8sEndpointStr)) + // Check if all requested nodes were found and ready + readyStatus, err := p.kubernetesManager.GetNodeReadyStatus(ctx, nodeNames) + allFoundAndReady := err == nil && len(readyStatus) == len(nodeNames) + for _, ready := range readyStatus { + if !ready { + allFoundAndReady = false + break + } + } + + if allFoundAndReady { + if k8sEndpointStr != "" { + fn(fmt.Sprintf("Kubernetes API endpoint %s is healthy and all nodes are Ready", k8sEndpointStr)) + } else { + fn("Kubernetes API endpoint (kubeconfig default) is healthy and all nodes are Ready") + } } else { - fn("Kubernetes API endpoint (kubeconfig default) is healthy and all nodes are Ready") + if k8sEndpointStr != "" { + fn(fmt.Sprintf("Kubernetes API endpoint %s is healthy", k8sEndpointStr)) + } else { + fn("Kubernetes API endpoint (kubeconfig default) is healthy") + } } } else { if k8sEndpointStr != "" { diff --git a/pkg/pipelines/check_test.go b/pkg/pipelines/check_test.go index abaac18ab..8a735941c 100644 --- a/pkg/pipelines/check_test.go +++ b/pkg/pipelines/check_test.go @@ -106,13 +106,17 @@ func setupCheckMocks(t *testing.T, opts ...*SetupOptions) *CheckMocks { // If existing is not a MockKubernetesManager, create a new one mockKubernetesManager = kubernetes.NewMockKubernetesManager(baseMocks.Injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, nodeNames ...string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } baseMocks.Injector.Register("kubernetesManager", mockKubernetesManager) } } else { mockKubernetesManager = kubernetes.NewMockKubernetesManager(baseMocks.Injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, nodeNames ...string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } baseMocks.Injector.Register("kubernetesManager", mockKubernetesManager) } @@ -700,8 +704,31 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { } }) - t.Run("ReturnsErrorWhenClusterClientIsNil", func(t *testing.T) { - // Given a check pipeline with nil cluster client + t.Run("SucceedsWhenClusterClientIsNilButK8sEndpointProvided", func(t *testing.T) { + // Given a check pipeline with nil cluster client but k8s endpoint provided + pipeline, mocks := setup(t) + pipeline.clusterClient = nil + + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } + + ctx := context.WithValue(context.Background(), "nodes", []string{"node1"}) + ctx = context.WithValue(ctx, "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned (k8s check succeeds) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + }) + + t.Run("ReturnsErrorWhenClusterClientIsNilAndNoK8sEndpoint", func(t *testing.T) { + // Given a check pipeline with nil cluster client and no k8s endpoint pipeline, _ := setup(t) pipeline.clusterClient = nil @@ -714,8 +741,8 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { if err == nil { t.Fatal("Expected error, got nil") } - if err.Error() != "No cluster client found" { - t.Errorf("Expected cluster client error, got: %v", err) + if err.Error() != "No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform" { + t.Errorf("Expected health checks required error, got: %v", err) } }) @@ -757,7 +784,7 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { // Given a check pipeline with only k8s endpoint specified pipeline, mocks := setup(t) - mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, nodeNames ...string) error { + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { return nil } @@ -810,4 +837,346 @@ func TestCheckPipeline_executeNodeHealthCheck(t *testing.T) { t.Errorf("Expected no error, got %v", err) } }) + + // ============================================================================= + // --ready flag tests + // ============================================================================= + + t.Run("K8sEndpointOnly_NoReadyFlag", func(t *testing.T) { + // Given a check pipeline with only k8s endpoint specified (no --ready) + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no node names should be passed (no readiness check) + if len(capturedNodeNames) != 0 { + t.Errorf("Expected no node names, got %v", capturedNodeNames) + } + }) + + t.Run("K8sEndpointWithReadyFlag_NoSpecificNodes", func(t *testing.T) { + // Given a check pipeline with k8s endpoint and --ready flag (no specific nodes) + pipeline, _ := setup(t) + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then an error should be returned + if err == nil { + t.Fatal("Expected error, got nil") + } + if err.Error() != "--ready flag requires --nodes to be specified" { + t.Errorf("Expected error about --ready requiring --nodes, got: %v", err) + } + }) + + t.Run("K8sEndpointWithReadyFlag_SpecificNodes", func(t *testing.T) { + // Given a check pipeline with k8s endpoint, --ready flag, and specific nodes + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"specific-node1", "specific-node2"}) + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And specific node names should be passed for readiness check + expectedNodeNames := []string{"specific-node1", "specific-node2"} + if len(capturedNodeNames) != len(expectedNodeNames) { + t.Errorf("Expected %d node names, got %d", len(expectedNodeNames), len(capturedNodeNames)) + } + for i, name := range expectedNodeNames { + if capturedNodeNames[i] != name { + t.Errorf("Expected node name %s at index %d, got %s", name, i, capturedNodeNames[i]) + } + } + }) + + t.Run("K8sEndpointWithNodes_NoReadyFlag", func(t *testing.T) { + // Given a check pipeline with k8s endpoint and specific nodes but no --ready flag + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "nodes", []string{"node1", "node2"}) + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no node names should be passed (no readiness check) + if len(capturedNodeNames) != 0 { + t.Errorf("Expected no node names, got %v", capturedNodeNames) + } + }) + + t.Run("ReadyFlagOnly_NoK8sEndpoint", func(t *testing.T) { + // Given a check pipeline with only --ready flag (no k8s endpoint) + pipeline, _ := setup(t) + + ctx := context.WithValue(context.Background(), "check-node-ready", true) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then an error should be returned + if err == nil { + t.Fatal("Expected error, got nil") + } + if err.Error() != "No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform" { + t.Errorf("Expected health checks required error, got: %v", err) + } + }) + + t.Run("ReadyFlagFalse", func(t *testing.T) { + // Given a check pipeline with k8s endpoint and --ready=false + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", false) + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no node names should be passed (ready=false) + if len(capturedNodeNames) != 0 { + t.Errorf("Expected no node names, got %v", capturedNodeNames) + } + }) + + t.Run("ReadyFlagNil", func(t *testing.T) { + // Given a check pipeline with k8s endpoint and no ready flag (nil) + pipeline, mocks := setup(t) + + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedNodeNames = nodeNames + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "output", func(msg string) {}) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no node names should be passed (ready=nil) + if len(capturedNodeNames) != 0 { + t.Errorf("Expected no node names, got %v", capturedNodeNames) + } + }) + + t.Run("ShowsWaitingMessageWhenReadyFlagUsed", func(t *testing.T) { + // Given a check pipeline with --ready flag and specific nodes + pipeline, mocks := setup(t) + + var capturedMessages []string + outputFunc := func(msg string) { + capturedMessages = append(capturedMessages, msg) + } + + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"node1", "node2"}) + ctx = context.WithValue(ctx, "output", outputFunc) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And waiting message should be shown + expectedMessage := "Waiting for 2 nodes to be Ready..." + found := false + for _, msg := range capturedMessages { + if msg == expectedMessage { + found = true + break + } + } + if !found { + t.Errorf("Expected waiting message '%s', got messages: %v", expectedMessage, capturedMessages) + } + }) + + t.Run("PassesOutputFuncToWaitForKubernetesHealthy", func(t *testing.T) { + // Given a check pipeline with --ready flag + pipeline, mocks := setup(t) + + var capturedOutputFunc func(string) + var capturedNodeNames []string + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + capturedOutputFunc = outputFunc + capturedNodeNames = nodeNames + return nil + } + + outputFunc := func(msg string) {} + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"node1"}) + ctx = context.WithValue(ctx, "output", outputFunc) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And output function should be passed to WaitForKubernetesHealthy + if capturedOutputFunc == nil { + t.Error("Expected output function to be passed to WaitForKubernetesHealthy") + } + + // And node names should be passed + if len(capturedNodeNames) != 1 || capturedNodeNames[0] != "node1" { + t.Errorf("Expected node names ['node1'], got %v", capturedNodeNames) + } + }) + + t.Run("NoImmediateNotFoundMessages", func(t *testing.T) { + // Given a check pipeline with --ready flag + pipeline, mocks := setup(t) + + var capturedMessages []string + outputFunc := func(msg string) { + capturedMessages = append(capturedMessages, msg) + } + + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"node1"}) + ctx = context.WithValue(ctx, "output", outputFunc) + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + // And no immediate "NOT FOUND" messages should be shown + for _, msg := range capturedMessages { + if strings.Contains(msg, "NOT FOUND") { + t.Errorf("Expected no immediate NOT FOUND messages, but found: %s", msg) + } + } + + // And the waiting message should be shown (along with Talos health check messages) + waitingMessageFound := false + for _, msg := range capturedMessages { + if msg == "Waiting for 1 nodes to be Ready..." { + waitingMessageFound = true + break + } + } + if !waitingMessageFound { + t.Errorf("Expected waiting message 'Waiting for 1 nodes to be Ready...', got messages: %v", capturedMessages) + } + }) + + t.Run("HandlesNilOutputFunc", func(t *testing.T) { + // Given a check pipeline with --ready flag but no output function + pipeline, mocks := setup(t) + + mocks.KubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { + return nil + } + + ctx := context.WithValue(context.Background(), "k8s-endpoint-provided", true) + ctx = context.WithValue(ctx, "k8s-endpoint", "") + ctx = context.WithValue(ctx, "check-node-ready", true) + ctx = context.WithValue(ctx, "nodes", []string{"node1"}) + // No output function in context + + // When executing node health check + err := pipeline.executeNodeHealthCheck(ctx) + + // Then no error should be returned + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + }) } diff --git a/pkg/pipelines/pipeline_test.go b/pkg/pipelines/pipeline_test.go index 34eb13b25..bd838167b 100644 --- a/pkg/pipelines/pipeline_test.go +++ b/pkg/pipelines/pipeline_test.go @@ -732,7 +732,7 @@ func TestWithPipeline(t *testing.T) { // Set up kubernetes manager mockKubernetesManager := kubernetes.NewMockKubernetesManager(injector) mockKubernetesManager.InitializeFunc = func() error { return nil } - mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, nodeNames ...string) error { return nil } + mockKubernetesManager.WaitForKubernetesHealthyFunc = func(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error { return nil } injector.Register("kubernetesManager", mockKubernetesManager) }