Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
nodeHealthTimeout time.Duration
nodeHealthNodes []string
nodeHealthVersion string
k8sEndpoint string
)

var checkCmd = &cobra.Command{
Expand Down Expand Up @@ -59,9 +60,9 @@ var checkNodeHealthCmd = &cobra.Command{
// Get shared dependency injector from context
injector := cmd.Context().Value(injectorKey).(di.Injector)

// Require nodes to be specified
if len(nodeHealthNodes) == 0 {
return fmt.Errorf("No nodes specified. Use --nodes flag to specify nodes to check")
// Require at least one health check type to be specified
if len(nodeHealthNodes) == 0 && k8sEndpoint == "" {
return fmt.Errorf("No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform")
}

// If timeout is not set via flag, use default
Expand All @@ -79,6 +80,8 @@ var checkNodeHealthCmd = &cobra.Command{
ctx = context.WithValue(ctx, "nodes", nodeHealthNodes)
ctx = context.WithValue(ctx, "timeout", nodeHealthTimeout)
ctx = context.WithValue(ctx, "version", nodeHealthVersion)
ctx = context.WithValue(ctx, "k8s-endpoint", k8sEndpoint)
ctx = context.WithValue(ctx, "k8s-endpoint-provided", k8sEndpoint != "")
ctx = context.WithValue(ctx, "output", outputFunc)

// Set up the check pipeline
Expand All @@ -102,7 +105,8 @@ func init() {

// Add flags for node health check
checkNodeHealthCmd.Flags().DurationVar(&nodeHealthTimeout, "timeout", 0, "Maximum time to wait for nodes to be ready (default 5m)")
checkNodeHealthCmd.Flags().StringSliceVar(&nodeHealthNodes, "nodes", []string{}, "Nodes to check (required)")
checkNodeHealthCmd.Flags().StringSliceVar(&nodeHealthNodes, "nodes", []string{}, "Nodes to check (optional)")
checkNodeHealthCmd.Flags().StringVar(&nodeHealthVersion, "version", "", "Expected version to check against (optional)")
_ = checkNodeHealthCmd.MarkFlagRequired("nodes")
checkNodeHealthCmd.Flags().StringVar(&k8sEndpoint, "k8s-endpoint", "", "Perform Kubernetes API health check (use --k8s-endpoint or --k8s-endpoint=https://endpoint:6443)")
checkNodeHealthCmd.Flags().Lookup("k8s-endpoint").NoOptDefVal = "true"
}
12 changes: 6 additions & 6 deletions cmd/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ func TestCheckNodeHealthCmd(t *testing.T) {
t.Error("Expected error, got nil")
}

// And error should contain nodes message
expectedError := "No nodes specified. Use --nodes flag to specify nodes to check"
// And error should contain health checks message
expectedError := "No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform"
if err.Error() != expectedError {
t.Errorf("Expected error about nodes, got: %v", err)
t.Errorf("Expected error about health checks, got: %v", err)
}
})

Expand All @@ -268,10 +268,10 @@ func TestCheckNodeHealthCmd(t *testing.T) {
t.Error("Expected error, got nil")
}

// And error should contain nodes message
expectedError := "No nodes specified. Use --nodes flag to specify nodes to check"
// And error should contain health checks message
expectedError := "No health checks specified. Use --nodes and/or --k8s-endpoint flags to specify health checks to perform"
if err.Error() != expectedError {
t.Errorf("Expected error about nodes, got: %v", err)
t.Errorf("Expected error about health checks, got: %v", err)
}
})
}
10 changes: 0 additions & 10 deletions pkg/cluster/cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@ import (
"github.com/windsorcli/cli/pkg/constants"
)

// =============================================================================
// Types
// =============================================================================

// HealthStatus represents the result of a health check.
type HealthStatus struct {
Healthy bool
Details string
}

// ClusterClient defines the interface for cluster operations
type ClusterClient interface {
// WaitForNodesHealthy waits for nodes to be healthy and optionally match a specific version
Expand Down
7 changes: 0 additions & 7 deletions pkg/cluster/cluster_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ type SetupOptions struct {
// Add setup options as needed
}

// setupMocks creates and configures mock objects for testing
func setupMocks(t *testing.T, opts ...*SetupOptions) *Mocks {
t.Helper()

return &Mocks{}
}

// =============================================================================
// Test Constructor
// =============================================================================
Expand Down
66 changes: 51 additions & 15 deletions pkg/kubernetes/kubernetes_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kubernetes

import (
"context"
"fmt"
"os"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -29,6 +30,7 @@ type KubernetesClient interface {
ApplyResource(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) (*unstructured.Unstructured, error)
DeleteResource(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error
PatchResource(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error)
CheckHealth(ctx context.Context, endpoint string) error
}

// =============================================================================
Expand All @@ -37,7 +39,8 @@ type KubernetesClient interface {

// DynamicKubernetesClient implements KubernetesClient using dynamic client
type DynamicKubernetesClient struct {
client dynamic.Interface
client dynamic.Interface
endpoint string
}

// NewDynamicKubernetesClient creates a new DynamicKubernetesClient
Expand Down Expand Up @@ -89,33 +92,66 @@ func (c *DynamicKubernetesClient) PatchResource(gvr schema.GroupVersionResource,
return c.client.Resource(gvr).Namespace(namespace).Patch(context.Background(), name, pt, data, opts)
}

// CheckHealth verifies Kubernetes API connectivity by listing nodes using the dynamic client.
// If an endpoint is specified, it overrides the default kubeconfig for this check.
// Returns an error if the client cannot be initialized or the API is unreachable.
func (c *DynamicKubernetesClient) CheckHealth(ctx context.Context, endpoint string) error {
c.endpoint = endpoint

if err := c.ensureClient(); err != nil {
return fmt.Errorf("failed to initialize Kubernetes client: %w", err)
}

nodeGVR := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "nodes",
}

_, err := c.client.Resource(nodeGVR).List(ctx, metav1.ListOptions{Limit: 1})
if err != nil {
return fmt.Errorf("failed to connect to Kubernetes API: %w", err)
}

return nil
}

// =============================================================================
// Private Methods
// =============================================================================

// ensureClient lazily initializes the dynamic client if not already set.
// This is a Windsor CLI exception: see comment above.
// ensureClient initializes the dynamic Kubernetes client if unset. Uses endpoint, in-cluster, or kubeconfig as available.
// Returns error if client setup fails at any stage.
func (c *DynamicKubernetesClient) ensureClient() error {
if c.client != nil {
return nil
}
// Try in-cluster config first
config, err := rest.InClusterConfig()
if err != nil {
// Fallback to kubeconfig from KUBECONFIG or default location
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
home, err := os.UserHomeDir()

var config *rest.Config
var err error

if c.endpoint != "" {
config = &rest.Config{
Host: c.endpoint,
}
} else {
config, err = rest.InClusterConfig()
if err != nil {
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
home, err := os.UserHomeDir()
if err != nil {
return err
}
kubeconfig = home + "/.kube/config"
}
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return err
}
kubeconfig = home + "/.kube/config"
}
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return err
}
}

cli, err := dynamic.NewForConfig(config)
if err != nil {
return err
Expand Down
37 changes: 34 additions & 3 deletions pkg/kubernetes/kubernetes_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package kubernetes

import (
"context"
"fmt"
"maps"
"os"
"strings"
"time"
Expand Down Expand Up @@ -44,6 +46,7 @@ type KubernetesManager interface {
WaitForKustomizationsDeleted(message string, names ...string) error
CheckGitRepositoryStatus() error
GetKustomizationStatus(names []string) (map[string]bool, error)
WaitForKubernetesHealthy(ctx context.Context, endpoint string) error
}

// =============================================================================
Expand Down Expand Up @@ -545,6 +548,36 @@ func (k *BaseKubernetesManager) GetKustomizationStatus(names []string) (map[stri
return status, nil
}

// WaitForKubernetesHealthy polls the Kubernetes API endpoint for health until it is reachable or the context deadline is exceeded.
// If the client is not initialized, returns an error. Uses a default timeout of 5 minutes if the context has no deadline.
// Polls every 10 seconds. Returns nil if the API becomes healthy, or an error if the timeout is reached or the context is canceled.
func (k *BaseKubernetesManager) WaitForKubernetesHealthy(ctx context.Context, endpoint string) error {
if k.client == nil {
return fmt.Errorf("kubernetes client not initialized")
}

deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(5 * time.Minute)
}

pollInterval := 10 * time.Second

for time.Now().Before(deadline) {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for Kubernetes API to be healthy")
default:
if err := k.client.CheckHealth(ctx, endpoint); err == nil {
return nil
}
time.Sleep(pollInterval)
}
}

return fmt.Errorf("timeout waiting for Kubernetes API to be healthy")
}

// applyWithRetry applies a resource using SSA with minimal logic
func (k *BaseKubernetesManager) applyWithRetry(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) error {
existing, err := k.client.GetResource(gvr, obj.GetNamespace(), obj.GetName())
Expand All @@ -554,9 +587,7 @@ func (k *BaseKubernetesManager) applyWithRetry(gvr schema.GroupVersionResource,
return fmt.Errorf("failed to convert existing object to unstructured: %w", err)
}

for k, v := range obj.Object {
applyConfig[k] = v
}
maps.Copy(applyConfig, obj.Object)

mergedObj := &unstructured.Unstructured{Object: applyConfig}
mergedObj.SetResourceVersion(existing.GetResourceVersion())
Expand Down
87 changes: 87 additions & 0 deletions pkg/kubernetes/kubernetes_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"context"

kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
"github.com/windsorcli/cli/pkg/di"
Expand Down Expand Up @@ -1951,3 +1953,88 @@ func TestBaseKubernetesManager_GetKustomizationStatus(t *testing.T) {
}
})
}

func TestBaseKubernetesManager_WaitForKubernetesHealthy(t *testing.T) {
setup := func(t *testing.T) *BaseKubernetesManager {
t.Helper()
mocks := setupMocks(t)
manager := NewKubernetesManager(mocks.Injector)
if err := manager.Initialize(); err != nil {
t.Fatalf("Initialize failed: %v", err)
}
return manager
}

t.Run("Success", func(t *testing.T) {
manager := setup(t)
client := NewMockKubernetesClient()
client.CheckHealthFunc = func(ctx context.Context, endpoint string) error {
return nil
}
manager.client = client

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443")
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
})

t.Run("ClientNotInitialized", func(t *testing.T) {
manager := setup(t)
manager.client = nil

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443")
if err == nil {
t.Error("Expected error, got nil")
}
if !strings.Contains(err.Error(), "kubernetes client not initialized") {
t.Errorf("Expected client not initialized error, got: %v", err)
}
})

t.Run("Timeout", func(t *testing.T) {
manager := setup(t)
client := NewMockKubernetesClient()
client.CheckHealthFunc = func(ctx context.Context, endpoint string) error {
return fmt.Errorf("health check failed")
}
manager.client = client

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443")
if err == nil {
t.Error("Expected error, got nil")
}
if !strings.Contains(err.Error(), "timeout waiting for Kubernetes API to be healthy") {
t.Errorf("Expected timeout error, got: %v", err)
}
})

t.Run("ContextCancelled", func(t *testing.T) {
manager := setup(t)
client := NewMockKubernetesClient()
client.CheckHealthFunc = func(ctx context.Context, endpoint string) error {
return fmt.Errorf("health check failed")
}
manager.client = client

ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately

err := manager.WaitForKubernetesHealthy(ctx, "https://test-endpoint:6443")
if err == nil {
t.Error("Expected error, got nil")
}
if !strings.Contains(err.Error(), "timeout waiting for Kubernetes API to be healthy") {
t.Errorf("Expected timeout error, got: %v", err)
}
})
}
11 changes: 11 additions & 0 deletions pkg/kubernetes/mock_kubernetes_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package kubernetes

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -22,6 +24,7 @@ type MockKubernetesClient struct {
ApplyResourceFunc func(gvr schema.GroupVersionResource, obj *unstructured.Unstructured, opts metav1.ApplyOptions) (*unstructured.Unstructured, error)
DeleteResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, opts metav1.DeleteOptions) error
PatchResourceFunc func(gvr schema.GroupVersionResource, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*unstructured.Unstructured, error)
CheckHealthFunc func(ctx context.Context, endpoint string) error
}

// =============================================================================
Expand Down Expand Up @@ -76,3 +79,11 @@ func (m *MockKubernetesClient) PatchResource(gvr schema.GroupVersionResource, na
}
return nil, nil
}

// CheckHealth implements KubernetesClient interface
func (m *MockKubernetesClient) CheckHealth(ctx context.Context, endpoint string) error {
if m.CheckHealthFunc != nil {
return m.CheckHealthFunc(ctx, endpoint)
}
return nil
}
Loading
Loading