Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 66 additions & 22 deletions test/kube_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"strings"
"time"

"k8s.io/client-go/kubernetes"

"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
k8styped "k8s.io/client-go/kubernetes/typed/core/v1"
"knative.dev/pkg/test/logging"
)
Expand All @@ -51,14 +51,20 @@ func WaitForDeploymentState(ctx context.Context, client kubernetes.Interface, na
d := client.AppsV1().Deployments(namespace)
span := logging.GetEmitableSpan(ctx, fmt.Sprintf("WaitForDeploymentState/%s/%s", name, desc))
defer span.End()

return wait.PollImmediate(interval, timeout, func() (bool, error) {
d, err := d.Get(ctx, name, metav1.GetOptions{})
var lastState *appsv1.Deployment
waitErr := wait.PollImmediate(interval, timeout, func() (bool, error) {
var err error
lastState, err = d.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return true, err
}
return inState(d)
return inState(lastState)
})

if waitErr != nil {
return fmt.Errorf("deployment %q is not in desired state, got: %s: %w", name, spew.Sprint(lastState), waitErr)
}
return nil
}

// WaitForPodListState polls the status of the PodList
Expand All @@ -70,13 +76,20 @@ func WaitForPodListState(ctx context.Context, client kubernetes.Interface, inSta
span := logging.GetEmitableSpan(ctx, "WaitForPodListState/"+desc)
defer span.End()

return wait.PollImmediate(interval, podTimeout, func() (bool, error) {
p, err := p.List(ctx, metav1.ListOptions{})
var lastState *corev1.PodList
waitErr := wait.PollImmediate(interval, podTimeout, func() (bool, error) {
var err error
lastState, err = p.List(ctx, metav1.ListOptions{})
if err != nil {
return true, err
}
return inState(p)
return inState(lastState)
})

if waitErr != nil {
return fmt.Errorf("pod list is not in desired state, got: %s: %w", spew.Sprint(lastState), waitErr)
}
return nil
}

// WaitForPodState polls the status of the specified Pod
Expand All @@ -88,13 +101,20 @@ func WaitForPodState(ctx context.Context, client kubernetes.Interface, inState f
span := logging.GetEmitableSpan(ctx, "WaitForPodState/"+name)
defer span.End()

return wait.PollImmediate(interval, podTimeout, func() (bool, error) {
p, err := p.Get(ctx, name, metav1.GetOptions{})
var lastState *corev1.Pod
waitErr := wait.PollImmediate(interval, podTimeout, func() (bool, error) {
var err error
lastState, err = p.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return false, err
}
return inState(p)
return inState(lastState)
})

if waitErr != nil {
return fmt.Errorf("pod %q is not in desired state, got: %s: %w", name, spew.Sprint(lastState), waitErr)
}
return nil
}

// WaitForPodDeleted waits for the given pod to disappear from the given namespace.
Expand All @@ -117,14 +137,20 @@ func WaitForServiceEndpoints(ctx context.Context, client kubernetes.Interface, s
span := logging.GetEmitableSpan(ctx, "WaitForServiceHasAtLeastOneEndpoint/"+svcName)
defer span.End()

return wait.PollImmediate(interval, podTimeout, func() (bool, error) {
endpoint, err := endpointsService.Get(ctx, svcName, metav1.GetOptions{})
var endpoints *corev1.Endpoints
waitErr := wait.PollImmediate(interval, podTimeout, func() (bool, error) {
var err error
endpoints, err = endpointsService.Get(ctx, svcName, metav1.GetOptions{})
if err != nil {
return false, err
}

return countEndpointsNum(endpoint) == numOfEndpoints, nil
return countEndpointsNum(endpoints) == numOfEndpoints, nil
})
if waitErr != nil {
return fmt.Errorf("did not reach the desired number of endpoints, got: %d: %w", countEndpointsNum(endpoints), waitErr)
}
return nil
}

func countEndpointsNum(e *corev1.Endpoints) int {
Expand Down Expand Up @@ -155,10 +181,16 @@ func GetEndpointAddresses(ctx context.Context, client kubernetes.Interface, svcN

// WaitForChangedEndpoints waits until the endpoints for the given service differ from origEndpoints.
func WaitForChangedEndpoints(ctx context.Context, client kubernetes.Interface, svcName, svcNamespace string, origEndpoints []string) error {
return wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
newEndpoints, err := GetEndpointAddresses(ctx, client, svcName, svcNamespace)
var newEndpoints []string
waitErr := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
var err error
newEndpoints, err = GetEndpointAddresses(ctx, client, svcName, svcNamespace)
return !cmp.Equal(origEndpoints, newEndpoints), err
})
if waitErr != nil {
return fmt.Errorf("new endpoints are not different from the original ones, got %q: %w", newEndpoints, waitErr)
}
return nil
}

// GetConfigMap gets the configmaps for a given namespace
Expand All @@ -176,13 +208,19 @@ func DeploymentScaledToZeroFunc() func(d *appsv1.Deployment) (bool, error) {
// WaitForLogContent waits until logs for given Pod/Container include the given content.
// If the content is not present within timeout it returns error.
func WaitForLogContent(ctx context.Context, client *KubeClient, podName, containerName, namespace, content string) error {
return wait.PollImmediate(interval, logTimeout, func() (bool, error) {
logs, err := client.PodLogs(ctx, podName, containerName, namespace)
var logs []byte
waitErr := wait.PollImmediate(interval, logTimeout, func() (bool, error) {
var err error
logs, err = client.PodLogs(ctx, podName, containerName, namespace)
if err != nil {
return true, err
}
return strings.Contains(string(logs), content), nil
})
if waitErr != nil {
return fmt.Errorf("logs do not contain the desired content %q, got %q: %w", content, logs, waitErr)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("logs do not contain the desired content %q, got %q: %w", content, logs, waitErr)
return fmt.Errorf("logs do not contain the desired content %q, got %q: %w", content, string(logs), waitErr)

I believe this gets us a messy byte output otherwise.

Copy link
Copy Markdown
Contributor Author

@skonto skonto Apr 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked that before, so the rules say for byte slices with %q: %q a double-quoted string safely escaped with Go syntax. Even %s works (but no quotes). Check here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow interesting!

}
return nil
}

// WaitForAllPodsRunning waits for all the pods to be in running state
Expand All @@ -192,14 +230,20 @@ func WaitForAllPodsRunning(ctx context.Context, client kubernetes.Interface, nam

// WaitForPodRunning waits for the given pod to be in running state
func WaitForPodRunning(ctx context.Context, client kubernetes.Interface, name string, namespace string) error {
p := client.CoreV1().Pods(namespace)
return wait.PollImmediate(interval, podTimeout, func() (bool, error) {
p, err := p.Get(ctx, name, metav1.GetOptions{})
var p *corev1.Pod
pods := client.CoreV1().Pods(namespace)
waitErr := wait.PollImmediate(interval, podTimeout, func() (bool, error) {
var err error
p, err = pods.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return true, err
}
return podRunning(p), nil
})
if waitErr != nil {
return fmt.Errorf("pod %q did not reach the running state, got %+v: %w", name, p.Status.Phase, waitErr)
}
return nil
}

// podsRunning will check the status conditions of the pod list and return true all pods are Running
Expand Down