From 24af86c93dbc27edcc2ad853a38c8e273e0b1a4a Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 22 Jul 2020 03:07:46 -0700 Subject: [PATCH 1/2] retry on webhook failures --- test/lib/creation.go | 229 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 195 insertions(+), 34 deletions(-) diff --git a/test/lib/creation.go b/test/lib/creation.go index 2321cfe2af8..345e1efe846 100644 --- a/test/lib/creation.go +++ b/test/lib/creation.go @@ -18,6 +18,7 @@ package lib import ( "fmt" + "strings" appsv1 "k8s.io/api/apps/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" @@ -25,6 +26,8 @@ import ( rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/retry" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/reconciler" @@ -50,12 +53,37 @@ var coreAPIVersion = corev1.SchemeGroupVersion.Version var rbacAPIGroup = rbacv1.SchemeGroupVersion.Group var rbacAPIVersion = rbacv1.SchemeGroupVersion.Version +// This is a workaround for https://github.com/knative/pkg/issues/1509 +// Because tests currently fail immediately on any creation failure, this +// is problematic. On the reconcilers it's not an issue because they recover, +// but tests need this retry. +func isWebhookError(err error) bool { + return strings.HasSuffix(err.Error(), "EOF") +} + +func (c *Client) RetryWebhookErrors(updater func(int) error) error { + attempts := 0 + return retry.OnError(retry.DefaultRetry, isWebhookError, func() error { + err := updater(attempts) + attempts++ + return err + }) +} + // CreateChannelOrFail will create a typed Channel Resource in Eventing or fail the test if there is an error. func (c *Client) CreateChannelOrFail(name string, channelTypeMeta *metav1.TypeMeta) { c.T.Logf("Creating channel %+v-%s", channelTypeMeta, name) namespace := c.Namespace metaResource := resources.NewMetaResource(name, namespace, channelTypeMeta) - gvr, err := duck.CreateGenericChannelObject(c.Dynamic, metaResource) + var gvr schema.GroupVersionResource + err := c.RetryWebhookErrors(func(attempts int) (err error) { + var e error + gvr, e = duck.CreateGenericChannelObject(c.Dynamic, metaResource) + if e != nil { + c.T.Errorf("Failed to create %q %q: %v", channelTypeMeta.Kind, name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create %q %q: %v", channelTypeMeta.Kind, name, err) } @@ -74,7 +102,13 @@ func (c *Client) CreateChannelsOrFail(names []string, channelTypeMeta *metav1.Ty func (c *Client) CreateChannelWithDefaultOrFail(channel *messagingv1beta1.Channel) { c.T.Logf("Creating default v1beta1 channel %+v", channel) channels := c.Eventing.MessagingV1beta1().Channels(c.Namespace) - _, err := channels.Create(channel) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := channels.Create(channel) + if e != nil { + c.T.Fatalf("Failed to create channel %q: %v", channel.Name, e) + } + return err + }) if err != nil { c.T.Fatalf("Failed to create channel %q: %v", channel.Name, err) } @@ -85,7 +119,13 @@ func (c *Client) CreateChannelWithDefaultOrFail(channel *messagingv1beta1.Channe func (c *Client) CreateChannelV1WithDefaultOrFail(channel *messagingv1.Channel) { c.T.Logf("Creating default v1 channel %+v", channel) channels := c.Eventing.MessagingV1().Channels(c.Namespace) - _, err := channels.Create(channel) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := channels.Create(channel) + if e != nil { + c.T.Errorf("Failed to create channel %q: %v", channel.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create channel %q: %v", channel.Name, err) } @@ -101,9 +141,16 @@ func (c *Client) CreateSubscriptionOrFail( namespace := c.Namespace subscription := resources.SubscriptionV1Beta1(name, channelName, channelTypeMeta, options...) subscriptions := c.Eventing.MessagingV1beta1().Subscriptions(namespace) - c.T.Logf("Creating v1beta1 subscription %s for channel %+v-%s", name, channelTypeMeta, channelName) - // update subscription with the new reference - subscription, err := subscriptions.Create(subscription) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + c.T.Logf("Creating v1beta1 subscription %s for channel %+v-%s", name, channelTypeMeta, channelName) + // update subscription with the new reference + var e error + subscription, e = subscriptions.Create(subscription) + if e != nil { + c.T.Errorf("Failed to create subscription %q: %v", name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create subscription %q: %v", name, err) } @@ -120,9 +167,16 @@ func (c *Client) CreateSubscriptionV1OrFail( namespace := c.Namespace subscription := resources.SubscriptionV1(name, channelName, channelTypeMeta, options...) subscriptions := c.Eventing.MessagingV1().Subscriptions(namespace) - c.T.Logf("Creating v1 subscription %s for channel %+v-%s", name, channelTypeMeta, channelName) - // update subscription with the new reference - subscription, err := subscriptions.Create(subscription) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + c.T.Logf("Creating v1 subscription %s for channel %+v-%s", name, channelTypeMeta, channelName) + // update subscription with the new reference + var e error + subscription, e = subscriptions.Create(subscription) + if e != nil { + c.T.Errorf("Failed to create subscription %q: %v", name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create subscription %q: %v", name, err) } @@ -197,9 +251,16 @@ func (c *Client) CreateBrokerV1Beta1OrFail(name string, options ...resources.Bro namespace := c.Namespace broker := resources.BrokerV1Beta1(name, options...) brokers := c.Eventing.EventingV1beta1().Brokers(namespace) - c.T.Logf("Creating v1beta1 broker %s", name) - // update broker with the new reference - broker, err := brokers.Create(broker) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + c.T.Logf("Creating v1beta1 broker %s", name) + // update broker with the new reference + var e error + broker, e = brokers.Create(broker) + if e != nil { + c.T.Errorf("Failed to create v1beta1 broker %q: %v", name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create v1beta1 broker %q: %v", name, err) } @@ -212,9 +273,17 @@ func (c *Client) CreateTriggerOrFailV1Beta1(name string, options ...resources.Tr namespace := c.Namespace trigger := resources.TriggerV1Beta1(name, options...) triggers := c.Eventing.EventingV1beta1().Triggers(namespace) - c.T.Logf("Creating v1beta1 trigger %s", name) - // update trigger with the new reference - trigger, err := triggers.Create(trigger) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + c.T.Logf("Creating v1beta1 trigger %s", name) + // update trigger with the new reference + var e error + trigger, e = triggers.Create(trigger) + if e != nil { + c.T.Errorf("Failed to create v1beta1 trigger %q: %v", name, e) + } + return e + }) + if err != nil { c.T.Fatalf("Failed to create v1beta1 trigger %q: %v", name, err) } @@ -227,9 +296,16 @@ func (c *Client) CreateBrokerV1OrFail(name string, options ...resources.BrokerV1 namespace := c.Namespace broker := resources.BrokerV1(name, options...) brokers := c.Eventing.EventingV1().Brokers(namespace) - c.T.Logf("Creating v1 broker %s", name) - // update broker with the new reference - broker, err := brokers.Create(broker) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + c.T.Logf("Creating v1 broker %s", name) + // update broker with the new reference + var e error + broker, e = brokers.Create(broker) + if e != nil { + c.T.Errorf("Failed to create v1 broker %q: %v", name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create v1 broker %q: %v", name, err) } @@ -242,9 +318,16 @@ func (c *Client) CreateTriggerV1OrFail(name string, options ...resources.Trigger namespace := c.Namespace trigger := resources.TriggerV1(name, options...) triggers := c.Eventing.EventingV1().Triggers(namespace) - c.T.Logf("Creating v1 trigger %s", name) - // update trigger with the new reference - trigger, err := triggers.Create(trigger) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + c.T.Logf("Creating v1 trigger %s", name) + // update trigger with the new reference + var e error + trigger, e = triggers.Create(trigger) + if e != nil { + c.T.Errorf("Failed to create v1 trigger %q: %v", name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create v1 trigger %q: %v", name, err) } @@ -257,7 +340,13 @@ func (c *Client) CreateTriggerV1OrFail(name string, options ...resources.Trigger func (c *Client) CreateFlowsSequenceOrFail(sequence *flowsv1beta1.Sequence) { c.T.Logf("Creating flows sequence %+v", sequence) sequences := c.Eventing.FlowsV1beta1().Sequences(c.Namespace) - _, err := sequences.Create(sequence) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := sequences.Create(sequence) + if e != nil { + c.T.Errorf("Failed to create flows sequence %q: %v", sequence.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create flows sequence %q: %v", sequence.Name, err) } @@ -269,7 +358,13 @@ func (c *Client) CreateFlowsSequenceOrFail(sequence *flowsv1beta1.Sequence) { func (c *Client) CreateFlowsSequenceV1OrFail(sequence *flowsv1.Sequence) { c.T.Logf("Creating flows sequence %+v", sequence) sequences := c.Eventing.FlowsV1().Sequences(c.Namespace) - _, err := sequences.Create(sequence) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := sequences.Create(sequence) + if e != nil { + c.T.Errorf("Failed to create flows sequence %q: %v", sequence.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create flows sequence %q: %v", sequence.Name, err) } @@ -281,7 +376,13 @@ func (c *Client) CreateFlowsSequenceV1OrFail(sequence *flowsv1.Sequence) { func (c *Client) CreateFlowsParallelOrFail(parallel *flowsv1beta1.Parallel) { c.T.Logf("Creating flows parallel %+v", parallel) parallels := c.Eventing.FlowsV1beta1().Parallels(c.Namespace) - _, err := parallels.Create(parallel) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := parallels.Create(parallel) + if e != nil { + c.T.Errorf("Failed to create flows parallel %q: %v", parallel.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create flows parallel %q: %v", parallel.Name, err) } @@ -293,7 +394,13 @@ func (c *Client) CreateFlowsParallelOrFail(parallel *flowsv1beta1.Parallel) { func (c *Client) CreateFlowsParallelV1OrFail(parallel *flowsv1.Parallel) { c.T.Logf("Creating flows parallel %+v", parallel) parallels := c.Eventing.FlowsV1().Parallels(c.Namespace) - _, err := parallels.Create(parallel) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := parallels.Create(parallel) + if e != nil { + c.T.Fatalf("Failed to create flows parallel %q: %v", parallel.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create flows parallel %q: %v", parallel.Name, err) } @@ -304,7 +411,13 @@ func (c *Client) CreateFlowsParallelV1OrFail(parallel *flowsv1.Parallel) { func (c *Client) CreateSinkBindingV1Alpha1OrFail(sb *sourcesv1alpha1.SinkBinding) { c.T.Logf("Creating sinkbinding %+v", sb) sbInterface := c.Eventing.SourcesV1alpha1().SinkBindings(c.Namespace) - _, err := sbInterface.Create(sb) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := sbInterface.Create(sb) + if e != nil { + c.T.Errorf("Failed to create sinkbinding %q: %v", sb.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create sinkbinding %q: %v", sb.Name, err) } @@ -315,7 +428,13 @@ func (c *Client) CreateSinkBindingV1Alpha1OrFail(sb *sourcesv1alpha1.SinkBinding func (c *Client) CreateSinkBindingV1Alpha2OrFail(sb *sourcesv1alpha2.SinkBinding) { c.T.Logf("Creating sinkbinding %+v", sb) sbInterface := c.Eventing.SourcesV1alpha2().SinkBindings(c.Namespace) - _, err := sbInterface.Create(sb) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := sbInterface.Create(sb) + if e != nil { + c.T.Fatalf("Failed to create sinkbinding %q: %v", sb.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create sinkbinding %q: %v", sb.Name, err) } @@ -326,7 +445,13 @@ func (c *Client) CreateSinkBindingV1Alpha2OrFail(sb *sourcesv1alpha2.SinkBinding func (c *Client) CreateSinkBindingV1Beta1OrFail(sb *sourcesv1beta1.SinkBinding) { c.T.Logf("Creating sinkbinding %+v", sb) sbInterface := c.Eventing.SourcesV1beta1().SinkBindings(c.Namespace) - _, err := sbInterface.Create(sb) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := sbInterface.Create(sb) + if e != nil { + c.T.Errorf("Failed to create sinkbinding %q: %v", sb.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create sinkbinding %q: %v", sb.Name, err) } @@ -337,7 +462,13 @@ func (c *Client) CreateSinkBindingV1Beta1OrFail(sb *sourcesv1beta1.SinkBinding) func (c *Client) CreateApiServerSourceV1Alpha2OrFail(apiServerSource *sourcesv1alpha2.ApiServerSource) { c.T.Logf("Creating apiserversource %+v", apiServerSource) apiServerInterface := c.Eventing.SourcesV1alpha2().ApiServerSources(c.Namespace) - _, err := apiServerInterface.Create(apiServerSource) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := apiServerInterface.Create(apiServerSource) + if e != nil { + c.T.Errorf("Failed to create apiserversource %q: %v", apiServerSource.Name, err) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create apiserversource %q: %v", apiServerSource.Name, err) } @@ -348,7 +479,13 @@ func (c *Client) CreateApiServerSourceV1Alpha2OrFail(apiServerSource *sourcesv1a func (c *Client) CreateApiServerSourceV1Beta1OrFail(apiServerSource *sourcesv1beta1.ApiServerSource) { c.T.Logf("Creating apiserversource %+v", apiServerSource) apiServerInterface := c.Eventing.SourcesV1beta1().ApiServerSources(c.Namespace) - _, err := apiServerInterface.Create(apiServerSource) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := apiServerInterface.Create(apiServerSource) + if e != nil { + c.T.Errorf("Failed to create apiserversource %q: %v", apiServerSource.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create apiserversource %q: %v", apiServerSource.Name, err) } @@ -359,7 +496,13 @@ func (c *Client) CreateApiServerSourceV1Beta1OrFail(apiServerSource *sourcesv1be func (c *Client) CreateContainerSourceV1Alpha2OrFail(containerSource *sourcesv1alpha2.ContainerSource) { c.T.Logf("Creating containersource %+v", containerSource) containerInterface := c.Eventing.SourcesV1alpha2().ContainerSources(c.Namespace) - _, err := containerInterface.Create(containerSource) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := containerInterface.Create(containerSource) + if e != nil { + c.T.Errorf("Failed to create containersource %q: %v", containerSource.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create containersource %q: %v", containerSource.Name, err) } @@ -370,7 +513,13 @@ func (c *Client) CreateContainerSourceV1Alpha2OrFail(containerSource *sourcesv1a func (c *Client) CreateContainerSourceV1Beta1OrFail(containerSource *sourcesv1beta1.ContainerSource) { c.T.Logf("Creating containersource %+v", containerSource) containerInterface := c.Eventing.SourcesV1beta1().ContainerSources(c.Namespace) - _, err := containerInterface.Create(containerSource) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := containerInterface.Create(containerSource) + if e != nil { + c.T.Errorf("Failed to create containersource %q: %v", containerSource.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create containersource %q: %v", containerSource.Name, err) } @@ -381,7 +530,13 @@ func (c *Client) CreateContainerSourceV1Beta1OrFail(containerSource *sourcesv1be func (c *Client) CreatePingSourceV1Alpha1OrFail(pingSource *sourcesv1alpha1.PingSource) { c.T.Logf("Creating pingsource %+v", pingSource) pingInterface := c.Eventing.SourcesV1alpha1().PingSources(c.Namespace) - _, err := pingInterface.Create(pingSource) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := pingInterface.Create(pingSource) + if e != nil { + c.T.Errorf("Failed to create pingsource %q: %v", pingSource.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create pingsource %q: %v", pingSource.Name, err) } @@ -392,7 +547,13 @@ func (c *Client) CreatePingSourceV1Alpha1OrFail(pingSource *sourcesv1alpha1.Ping func (c *Client) CreatePingSourceV1Alpha2OrFail(pingSource *sourcesv1alpha2.PingSource) { c.T.Logf("Creating pingsource %+v", pingSource) pingInterface := c.Eventing.SourcesV1alpha2().PingSources(c.Namespace) - _, err := pingInterface.Create(pingSource) + err := c.RetryWebhookErrors(func(attempts int) (err error) { + _, e := pingInterface.Create(pingSource) + if e != nil { + c.T.Errorf("Failed to create pingsource %q: %v", pingSource.Name, e) + } + return e + }) if err != nil { c.T.Fatalf("Failed to create pingsource %q: %v", pingSource.Name, err) } From cf239aae0a919e0d2e5cb33bd753fca76dd4e340 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 22 Jul 2020 05:51:01 -0700 Subject: [PATCH 2/2] retry on EOF and hangs --- test/lib/creation.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/lib/creation.go b/test/lib/creation.go index 345e1efe846..436f6553583 100644 --- a/test/lib/creation.go +++ b/test/lib/creation.go @@ -57,8 +57,10 @@ var rbacAPIVersion = rbacv1.SchemeGroupVersion.Version // Because tests currently fail immediately on any creation failure, this // is problematic. On the reconcilers it's not an issue because they recover, // but tests need this retry. +// +// https://github.com/knative/eventing/issues/3681 func isWebhookError(err error) bool { - return strings.HasSuffix(err.Error(), "EOF") + return strings.Contains(err.Error(), "eventing-webhook.knative-eventing") } func (c *Client) RetryWebhookErrors(updater func(int) error) error {