Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
nodeHealthNodes []string
nodeHealthVersion string
k8sEndpoint string
checkNodeReady bool
)

var checkCmd = &cobra.Command{
Expand Down Expand Up @@ -81,7 +82,8 @@ var checkNodeHealthCmd = &cobra.Command{
ctx = context.WithValue(ctx, "timeout", nodeHealthTimeout)
ctx = context.WithValue(ctx, "version", nodeHealthVersion)
ctx = context.WithValue(ctx, "k8s-endpoint", k8sEndpoint)
ctx = context.WithValue(ctx, "k8s-endpoint-provided", k8sEndpoint != "")
ctx = context.WithValue(ctx, "k8s-endpoint-provided", k8sEndpoint != "" || checkNodeReady)
ctx = context.WithValue(ctx, "check-node-ready", checkNodeReady)
ctx = context.WithValue(ctx, "output", outputFunc)

// Set up the check pipeline
Expand Down Expand Up @@ -109,4 +111,5 @@ func init() {
checkNodeHealthCmd.Flags().StringVar(&nodeHealthVersion, "version", "", "Expected version to check against (optional)")
checkNodeHealthCmd.Flags().StringVar(&k8sEndpoint, "k8s-endpoint", "", "Perform Kubernetes API health check (use --k8s-endpoint or --k8s-endpoint=https://endpoint:6443)")
checkNodeHealthCmd.Flags().Lookup("k8s-endpoint").NoOptDefVal = "true"
checkNodeHealthCmd.Flags().BoolVar(&checkNodeReady, "ready", false, "Check Kubernetes node readiness status")
}
96 changes: 94 additions & 2 deletions pkg/kubernetes/kubernetes_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// Interfaces
// =============================================================================

// KubernetesClient defines methods for low-level Kubernetes operations
// KubernetesClient defines methods for Kubernetes resource operations
type KubernetesClient interface {
// Resource operations
GetResource(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error)
Expand All @@ -31,10 +31,12 @@ type KubernetesClient interface {
DeleteResource(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error
PatchResource(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error)
CheckHealth(ctx context.Context, endpoint string) error
// Node health operations
GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error)
}

// =============================================================================
// Constructor
// Types
// =============================================================================

// DynamicKubernetesClient implements KubernetesClient using dynamic client
Expand All @@ -43,6 +45,10 @@ type DynamicKubernetesClient struct {
endpoint string
}

// =============================================================================
// Constructor
// =============================================================================

// NewDynamicKubernetesClient creates a new DynamicKubernetesClient
func NewDynamicKubernetesClient() *DynamicKubernetesClient {
return &DynamicKubernetesClient{}
Expand Down Expand Up @@ -116,6 +122,62 @@ func (c *DynamicKubernetesClient) CheckHealth(ctx context.Context, endpoint stri
return nil
}

// GetNodeReadyStatus returns a map of node names to their Ready condition status.
// It checks the Ready condition for each specified node using the dynamic client.
// If nodeNames is empty, all nodes are checked. Nodes not found are omitted from the result.
// Returns a map of node names to Ready status (true if Ready, false if NotReady), or an error if listing fails.
func (c *DynamicKubernetesClient) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) {
if err := c.ensureClient(); err != nil {
return nil, fmt.Errorf("failed to initialize Kubernetes client: %w", err)
}

nodeGVR := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "nodes",
}

var nodes *unstructured.UnstructuredList
var err error

// Get all nodes and filter by name if specific nodes are requested
nodes, err = c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}

// Filter nodes if specific node names are requested
if len(nodeNames) > 0 {
var filteredNodes []unstructured.Unstructured
nodeNameSet := make(map[string]bool)
for _, name := range nodeNames {
nodeNameSet[name] = true
}

for _, node := range nodes.Items {
if nodeNameSet[node.GetName()] {
filteredNodes = append(filteredNodes, node)
}
}

// Replace the items with filtered ones
nodes.Items = filteredNodes
}

if err != nil {
return nil, fmt.Errorf("failed to list nodes: %w", err)
}

readyStatus := make(map[string]bool)
for _, node := range nodes.Items {
nodeName := node.GetName()
ready := c.isNodeReady(&node)
readyStatus[nodeName] = ready
}

return readyStatus, nil
}

// =============================================================================
// Private Methods
// =============================================================================
Expand Down Expand Up @@ -159,3 +221,33 @@ func (c *DynamicKubernetesClient) ensureClient() error {
c.client = cli
return nil
}

// isNodeReady checks if a node is in Ready state by examining its conditions.
// Returns true if the node has a Ready condition with status "True".
func (c *DynamicKubernetesClient) isNodeReady(node *unstructured.Unstructured) bool {
conditions, found, err := unstructured.NestedSlice(node.Object, "status", "conditions")
if err != nil || !found {
return false
}

for _, condition := range conditions {
conditionMap, ok := condition.(map[string]interface{})
if !ok {
continue
}

conditionType, found, err := unstructured.NestedString(conditionMap, "type")
if err != nil || !found || conditionType != "Ready" {
continue
}

conditionStatus, found, err := unstructured.NestedString(conditionMap, "status")
if err != nil || !found {
continue
}

return conditionStatus == "True"
}

return false
}
126 changes: 119 additions & 7 deletions pkg/kubernetes/kubernetes_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type KubernetesManager interface {
WaitForKustomizationsDeleted(message string, names ...string) error
CheckGitRepositoryStatus() error
GetKustomizationStatus(names []string) (map[string]bool, error)
WaitForKubernetesHealthy(ctx context.Context, endpoint string) error
WaitForKubernetesHealthy(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error
GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error)
}

// =============================================================================
Expand Down Expand Up @@ -548,10 +549,10 @@ func (k *BaseKubernetesManager) GetKustomizationStatus(names []string) (map[stri
return status, nil
}

// WaitForKubernetesHealthy polls the Kubernetes API endpoint for health until it is reachable or the context deadline is exceeded.
// If the client is not initialized, returns an error. Uses a default timeout of 5 minutes if the context has no deadline.
// Polls every 10 seconds. Returns nil if the API becomes healthy, or an error if the timeout is reached or the context is canceled.
func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string) error {
// WaitForKubernetesHealthy waits for the Kubernetes API to be healthy and optionally checks node Ready state.
// If nodeNames are provided, it will also verify that all specified nodes are in Ready state.
// Returns an error if the API is unreachable or if any specified nodes are not Ready.
func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string, outputFunc func(string), nodeNames ...string) error {
if k.client == nil {
return fmt.Errorf("kubernetes client not initialized")
}
Expand All @@ -568,14 +569,125 @@ func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, en
case <-ctx.Done():
return fmt.Errorf("timeout waiting for Kubernetes API to be healthy")
default:
if err := k.client.CheckHealth(ctx, endpoint); err == nil {
// Check API connectivity
if err := k.client.CheckHealth(ctx, endpoint); err != nil {
time.Sleep(pollInterval)
continue
}

// If node names are specified, check their Ready state
if len(nodeNames) > 0 {
if err := k.waitForNodesReady(ctx, nodeNames, outputFunc); err != nil {
time.Sleep(pollInterval)
continue
}
}

return nil
}
}

return fmt.Errorf("timeout waiting for Kubernetes API to be healthy")
}

// waitForNodesReady waits until all specified nodes exist and are in Ready state.
// Returns an error if any nodes are missing or not Ready within the context deadline.
func (k *BaseKubernetesManager) waitForNodesReady(ctx context.Context, nodeNames []string, outputFunc func(string)) error {
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(5 * time.Minute)
}

pollInterval := 5 * time.Second
lastStatus := make(map[string]string)

for time.Now().Before(deadline) {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for nodes to be ready")
default:
readyStatus, err := k.client.GetNodeReadyStatus(ctx, nodeNames)
if err != nil {
time.Sleep(pollInterval)
continue
}

var missingNodes []string
var notReadyNodes []string
var readyNodes []string

for _, nodeName := range nodeNames {
if ready, exists := readyStatus[nodeName]; !exists {
missingNodes = append(missingNodes, nodeName)
} else if !ready {
notReadyNodes = append(notReadyNodes, nodeName)
} else {
readyNodes = append(readyNodes, nodeName)
}
}

// Report status changes
if outputFunc != nil {
for _, nodeName := range nodeNames {
var currentStatus string
if ready, exists := readyStatus[nodeName]; !exists {
currentStatus = "NOT FOUND"
} else if ready {
currentStatus = "READY"
} else {
currentStatus = "NOT READY"
}

if lastStatus[nodeName] != currentStatus {
outputFunc(fmt.Sprintf("Node %s: %s", nodeName, currentStatus))
lastStatus[nodeName] = currentStatus
}
}
}

if len(missingNodes) == 0 && len(notReadyNodes) == 0 {
return nil
}

time.Sleep(pollInterval)
}
}

return fmt.Errorf("timeout waiting for Kubernetes API to be healthy")
// Final check to get the current status for error reporting
readyStatus, err := k.client.GetNodeReadyStatus(ctx, nodeNames)
if err != nil {
return fmt.Errorf("timeout waiting for nodes to be ready: failed to get final status: %w", err)
}

var missingNodes []string
var notReadyNodes []string

for _, nodeName := range nodeNames {
if ready, exists := readyStatus[nodeName]; !exists {
missingNodes = append(missingNodes, nodeName)
} else if !ready {
notReadyNodes = append(notReadyNodes, nodeName)
}
}

if len(missingNodes) > 0 {
return fmt.Errorf("timeout waiting for nodes to appear: %s", strings.Join(missingNodes, ", "))
}

if len(notReadyNodes) > 0 {
return fmt.Errorf("timeout waiting for nodes to be ready: %s", strings.Join(notReadyNodes, ", "))
}

return fmt.Errorf("timeout waiting for nodes to be ready")
}

// GetNodeReadyStatus returns a map of node names to their Ready condition status.
// Returns a map of node names to Ready status (true if Ready, false if NotReady), or an error if listing fails.
func (k *BaseKubernetesManager) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) {
if k.client == nil {
return nil, fmt.Errorf("kubernetes client not initialized")
}
return k.client.GetNodeReadyStatus(ctx, nodeNames)
}

// applyWithRetry applies a resource using SSA with minimal logic
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubernetes/kubernetes_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443")
err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil)
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
Expand All @@ -1989,7 +1989,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443")
err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil)
if err == nil {
t.Error("Expected error, got nil")
}
Expand All @@ -2009,7 +2009,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443")
err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil)
if err == nil {
t.Error("Expected error, got nil")
}
Expand All @@ -2029,7 +2029,7 @@ func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately

err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443")
err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443", nil)
if err == nil {
t.Error("Expected error, got nil")
}
Expand Down
21 changes: 15 additions & 6 deletions pkg/kubernetes/mock_kubernetes_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (

// MockKubernetesClient is a mock implementation of KubernetesClient interface for testing
type MockKubernetesClient struct {
GetResourceFunc func(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error)
ListResourcesFunc func(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error)
ApplyResourceFunc func(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) (*unstructured.Unstructured, error)
DeleteResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error
PatchResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error)
CheckHealthFunc func(ctx context.Context, endpoint string) error
GetResourceFunc func(gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error)
ListResourcesFunc func(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error)
ApplyResourceFunc func(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) (*unstructured.Unstructured, error)
DeleteResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error
PatchResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error)
CheckHealthFunc func(ctx context.Context, endpoint string) error
GetNodeReadyStatusFunc func(ctx context.Context, nodeNames []string) (map[string]bool, error)
}

// =============================================================================
Expand Down Expand Up @@ -87,3 +88,11 @@ func (m *MockKubernetesClient) CheckHealth(ctx context.Context, endpoint string)
}
return nil
}

// GetNodeReadyStatus implements KubernetesClient interface
func (m *MockKubernetesClient) GetNodeReadyStatus(ctx context.Context, nodeNames []string) (map[string]bool, error) {
if m.GetNodeReadyStatusFunc != nil {
return m.GetNodeReadyStatusFunc(ctx, nodeNames)
}
return make(map[string]bool), nil
}
Loading
Loading