diff --git a/pkg/eventing/v1/client.go b/pkg/eventing/v1/client.go index deb908fda3..6401346f49 100644 --- a/pkg/eventing/v1/client.go +++ b/pkg/eventing/v1/client.go @@ -16,6 +16,7 @@ package v1 import ( "context" + "fmt" "time" apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -241,25 +242,27 @@ func (c *knEventingClient) GetBroker(ctx context.Context, name string) (*eventin } // WatchBroker is used to create watcher object -func (c *knEventingClient) WatchBroker(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) { - return wait.NewWatcher(ctx, c.client.Brokers(c.namespace).Watch, c.client.RESTClient(), c.namespace, "brokers", name, timeout) +func (c *knEventingClient) WatchBroker(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return wait.NewWatcherWithVersion(ctx, c.client.Brokers(c.namespace).Watch, c.client.RESTClient(), c.namespace, "brokers", name, initialVersion, timeout) } // DeleteBroker is used to delete an instance of broker and wait for completion until given timeout // For `timeout == 0` delete is performed async without any wait func (c *knEventingClient) DeleteBroker(ctx context.Context, name string, timeout time.Duration) error { + broker, err := c.GetBroker(ctx, name) + if err != nil { + return err + } + if broker.GetDeletionTimestamp() != nil { + return fmt.Errorf("can't delete broker '%s' because it has been already marked for deletion", name) + } if timeout == 0 { return c.deleteBroker(ctx, name, apis_v1.DeletePropagationBackground) } waitC := make(chan error) - watcher, err := c.WatchBroker(ctx, name, timeout) - if err != nil { - return nil - } - defer watcher.Stop() go func() { - waitForEvent := wait.NewWaitForEvent("broker", func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) - err, _ := waitForEvent.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) + waitForEvent := wait.NewWaitForEvent("broker", c.WatchBroker, func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) + err, _ := waitForEvent.Wait(ctx, name, broker.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) waitC <- err }() err = c.deleteBroker(ctx, name, apis_v1.DeletePropagationForeground) diff --git a/pkg/eventing/v1/client_test.go b/pkg/eventing/v1/client_test.go index ca133726b6..280152f7f9 100644 --- a/pkg/eventing/v1/client_test.go +++ b/pkg/eventing/v1/client_test.go @@ -43,6 +43,11 @@ func setup() (fakeSvr fake.FakeEventingV1, client KnEventingClient) { return fakeE, cli } +func TestNamespace(t *testing.T) { + _, client := setup() + assert.Equal(t, testNamespace, client.Namespace()) +} + func TestDeleteTrigger(t *testing.T) { var name = "new-trigger" server, client := setup() @@ -245,13 +250,21 @@ func TestBrokerDelete(t *testing.T) { var name = "fooBroker" server, client := setup() + server.AddReactor("get", "brokers", + func(a client_testing.Action) (bool, runtime.Object, error) { + name := a.(client_testing.GetAction).GetName() + if name == "notFound" { + return true, nil, errors.NewNotFound(eventingv1.Resource("broker"), "notFound") + } + return false, nil, nil + }) server.AddReactor("delete", "brokers", func(a client_testing.Action) (bool, runtime.Object, error) { name := a.(client_testing.DeleteAction).GetName() if name == "errorBroker" { return true, nil, fmt.Errorf("error while deleting broker %s", name) } - return true, nil, nil + return false, nil, nil }) err := client.DeleteBroker(context.Background(), name, 0) @@ -259,21 +272,40 @@ func TestBrokerDelete(t *testing.T) { err = client.DeleteBroker(context.Background(), "errorBroker", 0) assert.ErrorContains(t, err, "errorBroker", 0) + + err = client.DeleteBroker(context.Background(), "notFound", 0) + assert.ErrorContains(t, err, "not found", 0) + assert.ErrorContains(t, err, "notFound", 0) } func TestBrokerDeleteWithWait(t *testing.T) { - var name = "fooBroker" + var brokerName = "fooBroker" + var deleted = "deletedBroker" server, client := setup() + server.AddReactor("get", "brokers", + func(a client_testing.Action) (bool, runtime.Object, error) { + name := a.(client_testing.DeleteAction).GetName() + if name == deleted { + deletedBroker := newBroker(deleted) + deletedBroker.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return true, deletedBroker, nil + } + return false, nil, nil + }) server.AddReactor("delete", "brokers", func(a client_testing.Action) (bool, runtime.Object, error) { name := a.(client_testing.DeleteAction).GetName() if name == "errorBroker" { return true, nil, fmt.Errorf("error while deleting broker %s", name) } + if name == deleted { + deletedBroker := newBroker(deleted) + deletedBroker.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return true, deletedBroker, nil + } return true, nil, nil }) - server.AddWatchReactor("brokers", func(a client_testing.Action) (bool, watch.Interface, error) { watchAction := a.(client_testing.WatchAction) @@ -286,11 +318,15 @@ func TestBrokerDeleteWithWait(t *testing.T) { return true, w, nil }) - err := client.DeleteBroker(context.Background(), name, time.Duration(10)*time.Second) + err := client.DeleteBroker(context.Background(), brokerName, time.Duration(10)*time.Second) assert.NilError(t, err) err = client.DeleteBroker(context.Background(), "errorBroker", time.Duration(10)*time.Second) assert.ErrorContains(t, err, "errorBroker", time.Duration(10)*time.Second) + + err = client.DeleteBroker(context.Background(), deleted, time.Duration(10)*time.Second) + assert.ErrorContains(t, err, "marked for deletion") + assert.ErrorContains(t, err, deleted, time.Duration(10)*time.Second) } func TestBrokerList(t *testing.T) { diff --git a/pkg/serving/v1/client.go b/pkg/serving/v1/client.go index a0310a6b50..caf303e78e 100644 --- a/pkg/serving/v1/client.go +++ b/pkg/serving/v1/client.go @@ -197,12 +197,12 @@ func (cl *knServingClient) GetService(ctx context.Context, name string) (*servin return service, nil } -func (cl *knServingClient) WatchService(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) { - return wait.NewWatcher(ctx, cl.client.Services(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "services", name, timeout) +func (cl *knServingClient) WatchServiceWithVersion(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return wait.NewWatcherWithVersion(ctx, cl.client.Services(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "services", name, initialVersion, timeout) } -func (cl *knServingClient) WatchRevision(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) { - return wait.NewWatcher(ctx, cl.client.Revisions(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "revision", name, timeout) +func (cl *knServingClient) WatchRevisionWithVersion(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return wait.NewWatcherWithVersion(ctx, cl.client.Revisions(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "revision", name, initialVersion, timeout) } // List services @@ -314,24 +314,28 @@ func (cl *knServingClient) ApplyService(ctx context.Context, modifiedService *se // Param `timeout` represents a duration to wait for a delete op to finish. // For `timeout == 0` delete is performed async without any wait. func (cl *knServingClient) DeleteService(ctx context.Context, serviceName string, timeout time.Duration) error { + service, err := cl.GetService(ctx, serviceName) + if err != nil { + return err + } + if service.GetDeletionTimestamp() != nil { + return fmt.Errorf("can't delete service '%s' because it has been already marked for deletion", serviceName) + } if timeout == 0 { return cl.deleteService(ctx, serviceName, v1.DeletePropagationBackground) } + waitC := make(chan error) - watcher, err := cl.WatchService(ctx, serviceName, timeout) - if err != nil { - return nil - } - defer watcher.Stop() go func() { - waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) - err, _ := waitForEvent.Wait(ctx, watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) + waitForEvent := wait.NewWaitForEvent("service", cl.WatchServiceWithVersion, func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) + err, _ := waitForEvent.Wait(ctx, serviceName, service.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) waitC <- err }() err = cl.deleteService(ctx, serviceName, v1.DeletePropagationForeground) if err != nil { return err } + return <-waitC } @@ -350,13 +354,16 @@ func (cl *knServingClient) deleteService(ctx context.Context, serviceName string // Wait for a service to become ready, but not longer than provided timeout func (cl *knServingClient) WaitForService(ctx context.Context, name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) { - watcher, err := cl.WatchService(ctx, name, timeout) + waitForReady := wait.NewWaitForReady("service", cl.WatchServiceWithVersion, serviceConditionExtractor) + + service, err := cl.GetService(ctx, name) if err != nil { - return err, timeout + if apierrors.IsNotFound(err) { + return waitForReady.Wait(ctx, name, "", wait.Options{Timeout: &timeout}, msgCallback) + } + return err, 0 } - defer watcher.Stop() - waitForReady := wait.NewWaitForReady("service", serviceConditionExtractor) - return waitForReady.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, msgCallback) + return waitForReady.Wait(ctx, name, service.ResourceVersion, wait.Options{Timeout: &timeout}, msgCallback) } // Get the configuration for a service @@ -469,14 +476,9 @@ func (cl *knServingClient) DeleteRevision(ctx context.Context, name string, time return cl.deleteRevision(ctx, name) } waitC := make(chan error) - watcher, err := cl.WatchRevision(ctx, name, timeout) - if err != nil { - return err - } - defer watcher.Stop() go func() { - waitForEvent := wait.NewWaitForEvent("revision", func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) - err, _ := waitForEvent.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) + waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevisionWithVersion, func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) + err, _ := waitForEvent.Wait(ctx, name, revision.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback()) waitC <- err }() err = cl.deleteRevision(ctx, name) diff --git a/pkg/serving/v1/client_mock.go b/pkg/serving/v1/client_mock.go index 6321a76762..22d266c3e2 100644 --- a/pkg/serving/v1/client_mock.go +++ b/pkg/serving/v1/client_mock.go @@ -165,6 +165,16 @@ func (c *MockKnServingClient) DeleteRevision(ctx context.Context, name string, t return mock.ErrorOrNil(call.Result[0]) } +// Wait for a revision to become ready, but not longer than provided timeout +func (sr *ServingRecorder) WaitForRevision(name interface{}, timeout interface{}, callback interface{}, err error, duration time.Duration) { + sr.r.Add("WaitForRevision", []interface{}{name, timeout, callback}, []interface{}{err, duration}) +} + +func (c *MockKnServingClient) WaitForRevision(ctx context.Context, name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) { + call := c.recorder.r.VerifyCall("WaitForRevision", name, timeout, msgCallback) + return mock.ErrorOrNil(call.Result[0]), call.Result[1].(time.Duration) +} + // Get a route by its unique name func (sr *ServingRecorder) GetRoute(name interface{}, route *servingv1.Route, err error) { sr.r.Add("GetRoute", []interface{}{name}, []interface{}{route, err}) diff --git a/pkg/serving/v1/client_mock_test.go b/pkg/serving/v1/client_mock_test.go index 2d36ecfc8f..6f64af6fd7 100644 --- a/pkg/serving/v1/client_mock_test.go +++ b/pkg/serving/v1/client_mock_test.go @@ -46,6 +46,7 @@ func TestMockKnClient(t *testing.T) { recorder.CreateRevision(&servingv1.Revision{}, nil) recorder.UpdateRevision(&servingv1.Revision{}, nil) recorder.DeleteRevision("hello", time.Duration(10)*time.Second, nil) + recorder.WaitForRevision("hello", time.Duration(10)*time.Second, wait.NoopMessageCallback(), nil, 10*time.Second) recorder.GetRoute("hello", nil, nil) recorder.ListRoutes(mock.Any(), nil, nil) recorder.GetConfiguration("hello", nil, nil) @@ -65,6 +66,7 @@ func TestMockKnClient(t *testing.T) { client.CreateRevision(ctx, &servingv1.Revision{}) client.UpdateRevision(ctx, &servingv1.Revision{}) client.DeleteRevision(ctx, "hello", time.Duration(10)*time.Second) + client.WaitForRevision(ctx, "hello", time.Duration(10)*time.Second, wait.NoopMessageCallback()) client.GetRoute(ctx, "hello") client.ListRoutes(ctx, WithName("blub")) client.GetConfiguration(ctx, "hello") diff --git a/pkg/serving/v1/client_test.go b/pkg/serving/v1/client_test.go index 7306b542bf..ef3969a013 100644 --- a/pkg/serving/v1/client_test.go +++ b/pkg/serving/v1/client_test.go @@ -20,6 +20,9 @@ import ( "testing" "time" + "knative.dev/pkg/apis" + duck "knative.dev/pkg/apis/duck/v1" + "gotest.tools/v3/assert" "gotest.tools/v3/assert/cmp" "k8s.io/apimachinery/pkg/fields" @@ -49,6 +52,11 @@ func setup() (serving servingv1fake.FakeServingV1, client KnServingClient) { return } +func TestNamespace(t *testing.T) { + _, client := setup() + assert.Equal(t, testNamespace, client.Namespace()) +} + func TestGetService(t *testing.T) { serving, client := setup() serviceName := "test-service" @@ -190,8 +198,23 @@ func TestDeleteService(t *testing.T) { const ( serviceName = "test-service" nonExistingServiceName = "no-service" + deletedServiceName = "deleted-service" ) + serving.AddReactor("get", "services", + func(a clienttesting.Action) (bool, runtime.Object, error) { + name := a.(clienttesting.GetAction).GetName() + if name == serviceName { + // Don't handle existing service, just continue to next + return false, nil, nil + } + if name == deletedServiceName { + deleted := newService(deletedServiceName) + deleted.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return true, deleted, nil + } + return true, nil, errors.NewNotFound(servingv1.Resource("service"), name) + }) serving.AddReactor("delete", "services", func(a clienttesting.Action) (bool, runtime.Object, error) { name := a.(clienttesting.DeleteAction).GetName() @@ -201,7 +224,7 @@ func TestDeleteService(t *testing.T) { if name == serviceName { return true, nil, nil } - return true, nil, errors.NewNotFound(servingv1.Resource("service"), name) + return false, nil, nil }) serving.AddWatchReactor("services", func(a clienttesting.Action) (bool, watch.Interface, error) { @@ -222,6 +245,55 @@ func TestDeleteService(t *testing.T) { t.Run("trying to delete non-existing service returns error", func(t *testing.T) { err := client.DeleteService(context.Background(), nonExistingServiceName, time.Duration(10)*time.Second) + println(err.Error()) + assert.ErrorContains(t, err, "not found") + assert.ErrorContains(t, err, nonExistingServiceName) + }) + + t.Run("trying to delete service DeletionTimestamp returns error", func(t *testing.T) { + err := client.DeleteService(context.Background(), deletedServiceName, time.Duration(10)*time.Second) + println(err.Error()) + assert.ErrorContains(t, err, "marked for deletion") + assert.ErrorContains(t, err, deletedServiceName) + }) +} + +func TestDeleteServiceNoWait(t *testing.T) { + serving, client := setup() + const ( + serviceName = "test-service" + nonExistingServiceName = "no-service" + ) + + serving.AddReactor("get", "services", + func(a clienttesting.Action) (bool, runtime.Object, error) { + name := a.(clienttesting.GetAction).GetName() + if name == serviceName { + // Don't handle existing service, just continue to next + return false, nil, nil + } + return true, nil, errors.NewNotFound(servingv1.Resource("service"), name) + }) + serving.AddReactor("delete", "services", + func(a clienttesting.Action) (bool, runtime.Object, error) { + name := a.(clienttesting.DeleteAction).GetName() + + assert.Assert(t, name != "") + assert.Equal(t, testNamespace, a.GetNamespace()) + if name == serviceName { + return true, nil, nil + } + return false, nil, nil + }) + + t.Run("delete existing service returns no error", func(t *testing.T) { + err := client.DeleteService(context.Background(), serviceName, 0) + assert.NilError(t, err) + }) + + t.Run("trying to delete non-existing service returns error", func(t *testing.T) { + err := client.DeleteService(context.Background(), nonExistingServiceName, 0) + println(err.Error()) assert.ErrorContains(t, err, "not found") assert.ErrorContains(t, err, nonExistingServiceName) }) @@ -271,6 +343,134 @@ func TestGetRevision(t *testing.T) { }) } +func TestDeleteRevision(t *testing.T) { + serving, client := setup() + const ( + revisionName = "test-revision" + nonExistingRevisionName = "no-revision" + deletedRevisionName = "deleted-revision" + ) + + serving.AddReactor("get", "revisions", + func(a clienttesting.Action) (bool, runtime.Object, error) { + name := a.(clienttesting.GetAction).GetName() + if name == revisionName { + // Don't handle existing service, just continue to next + return false, nil, nil + } + if name == deletedRevisionName { + deleted := newRevision(deletedRevisionName) + deleted.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return true, deleted, nil + } + return true, nil, errors.NewNotFound(servingv1.Resource("revision"), name) + }) + serving.AddReactor("delete", "revisions", + func(a clienttesting.Action) (bool, runtime.Object, error) { + name := a.(clienttesting.DeleteAction).GetName() + + assert.Assert(t, name != "") + assert.Equal(t, testNamespace, a.GetNamespace()) + if name == revisionName { + return true, nil, nil + } + return false, nil, nil + }) + serving.AddWatchReactor("revisions", + func(a clienttesting.Action) (bool, watch.Interface, error) { + watchAction := a.(clienttesting.WatchAction) + name, found := watchAction.GetWatchRestrictions().Fields.RequiresExactMatch("metadata.name") + if !found { + return true, nil, errors.NewNotFound(servingv1.Resource("revisions"), name) + } + w := wait.NewFakeWatch(getRevisionDeleteEvents(revisionName)) + w.Start() + return true, w, nil + }) + + t.Run("delete existing revision returns no error", func(t *testing.T) { + err := client.DeleteRevision(context.Background(), revisionName, 0) + assert.NilError(t, err) + }) + + t.Run("trying to delete non-existing revision returns error", func(t *testing.T) { + err := client.DeleteRevision(context.Background(), nonExistingRevisionName, 0) + assert.ErrorContains(t, err, "not found") + assert.ErrorContains(t, err, nonExistingRevisionName) + }) + + t.Run("trying to delete revision DeletionTimestamp returns error", func(t *testing.T) { + err := client.DeleteRevision(context.Background(), deletedRevisionName, time.Duration(10)*time.Second) + assert.ErrorContains(t, err, "marked for deletion") + assert.ErrorContains(t, err, deletedRevisionName) + }) +} + +func TestDeleteRevisionNoWait(t *testing.T) { + serving, client := setup() + const ( + revisionName = "test-revision" + nonExistingRevisionName = "no-revision" + ) + + serving.AddReactor("get", "revisions", + func(a clienttesting.Action) (bool, runtime.Object, error) { + name := a.(clienttesting.GetAction).GetName() + if name == revisionName { + // Don't handle existing service, just continue to next + return false, nil, nil + } + return true, nil, errors.NewNotFound(servingv1.Resource("revision"), name) + }) + serving.AddReactor("delete", "revisions", + func(a clienttesting.Action) (bool, runtime.Object, error) { + name := a.(clienttesting.DeleteAction).GetName() + + assert.Assert(t, name != "") + assert.Equal(t, testNamespace, a.GetNamespace()) + if name == revisionName { + return true, nil, nil + } + return false, nil, nil + }) + + t.Run("delete existing service returns no error", func(t *testing.T) { + err := client.DeleteRevision(context.Background(), revisionName, 0) + assert.NilError(t, err) + }) + + t.Run("trying to delete non-existing service returns error", func(t *testing.T) { + err := client.DeleteRevision(context.Background(), nonExistingRevisionName, 0) + assert.ErrorContains(t, err, "not found") + assert.ErrorContains(t, err, nonExistingRevisionName) + }) +} + +func getRevisionDeleteEvents(name string) []watch.Event { + return []watch.Event{ + {Type: watch.Added, Object: createTestRevisionWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")}, + {Type: watch.Modified, Object: createTestRevisionWithConditions(name, corev1.ConditionUnknown, corev1.ConditionTrue, "", "msg2")}, + {Type: watch.Deleted, Object: createTestRevisionWithConditions(name, corev1.ConditionTrue, corev1.ConditionTrue, "", "")}, + } +} + +func createTestRevisionWithConditions(name string, readyStatus corev1.ConditionStatus, otherReadyStatus corev1.ConditionStatus, reason string, message string, generations ...int64) runtime.Object { + revision := servingv1.Revision{ObjectMeta: metav1.ObjectMeta{Name: name}} + if len(generations) == 2 { + revision.Generation = generations[0] + revision.Status.ObservedGeneration = generations[1] + } else { + revision.Generation = 1 + revision.Status.ObservedGeneration = 1 + } + revision.Status.Conditions = duck.Conditions([]apis.Condition{ + {Type: "RoutesReady", Status: otherReadyStatus}, + {Type: apis.ConditionReady, Status: readyStatus, Reason: reason, Message: message}, + {Type: "ConfigurationsReady", Status: otherReadyStatus}, + }) + return &revision +} + func TestListRevisions(t *testing.T) { serving, client := setup() diff --git a/pkg/wait/poll_watcher.go b/pkg/wait/poll_watcher.go index 86189c2196..c88950b8ac 100644 --- a/pkg/wait/poll_watcher.go +++ b/pkg/wait/poll_watcher.go @@ -69,16 +69,16 @@ func newTickerPollInterval(d time.Duration) *tickerPollInterval { return &tickerPollInterval{time.NewTicker(d)} } -// NewWatcher makes a watch.Interface on the given resource in the client, +// NewWatcherWithVersion makes a watch.Interface on the given resource in the client, // falling back to polling if the server does not support Watch. -func NewWatcher(ctx context.Context, watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) { - native, err := nativeWatch(ctx, watchFunc, name, timeout) +func NewWatcherWithVersion(ctx context.Context, watchFunc watchF, c rest.Interface, ns string, resource string, name string, initialResourceVersion string, timeout time.Duration) (watch.Interface, error) { + native, err := nativeWatchWithVersion(ctx, watchFunc, name, initialResourceVersion, timeout) if err == nil { return native, nil } polling := &pollingWatcher{ c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{}, - newTickerPollInterval(pollInterval), nativePoll(ctx, c, ns, resource, name)} + newTickerPollInterval(time.Second), nativePoll(ctx, c, ns, resource, name)} polling.start() return polling, nil } @@ -161,9 +161,10 @@ func (w *pollingWatcher) Stop() { close(w.done) } -func nativeWatch(ctx context.Context, watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) { +func nativeWatchWithVersion(ctx context.Context, watchFunc watchF, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { opts := v1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), + ResourceVersion: initialVersion, + FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), } opts.Watch = true addWatchTimeout(&opts, timeout) diff --git a/pkg/wait/wait_for_ready.go b/pkg/wait/wait_for_ready.go index ba6a27c102..edee29f1ae 100644 --- a/pkg/wait/wait_for_ready.go +++ b/pkg/wait/wait_for_ready.go @@ -28,14 +28,16 @@ import ( // Callbacks and configuration used while waiting type waitForReadyConfig struct { + watchMaker WatchMaker conditionsExtractor ConditionsExtractor kind string } // Callbacks and configuration used while waiting for event type waitForEvent struct { - eventDone EventDone - kind string + watchMaker WatchMaker + eventDone EventDone + kind string } // EventDone is a marker to stop actual waiting on given event state @@ -48,7 +50,7 @@ type Wait interface { // Wait on resource the resource with this name // and write event messages for unknown event to the status writer. // Returns an error (if any) and the overall time it took to wait - Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) + Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) } type Options struct { @@ -61,7 +63,7 @@ type Options struct { } // Create watch which is used when waiting for Ready condition -type WatchMaker func(name string, timeout time.Duration) (watch.Interface, error) +type WatchMaker func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) // Extract conditions from a runtime object type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error) @@ -70,19 +72,21 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error) type MessageCallback func(durationSinceState time.Duration, message string) // NewWaitForReady waits until the condition is set to Ready == True -func NewWaitForReady(kind string, extractor ConditionsExtractor) Wait { +func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait { return &waitForReadyConfig{ kind: kind, + watchMaker: watchMaker, conditionsExtractor: extractor, } } // NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when // the EventDone function returns true) -func NewWaitForEvent(kind string, eventDone EventDone) Wait { +func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait { return &waitForEvent{ - kind: kind, - eventDone: eventDone, + kind: kind, + watchMaker: watchMaker, + eventDone: eventDone, } } @@ -109,14 +113,13 @@ func NoopMessageCallback() MessageCallback { // (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no // target state has been entered yet and `out` is used for printing out status messages // msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message. -func (w *waitForReadyConfig) Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) { - +func (w *waitForReadyConfig) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) { timeout := options.timeoutWithDefault() timeoutTimer := time.NewTimer(timeout) defer timeoutTimer.Stop() for { start := time.Now() - retry, timeoutReached, err := w.waitForReadyCondition(ctx, watcher, start, timeoutTimer, options.errorWindowWithDefault(), msgCallback) + retry, timeoutReached, err := w.waitForReadyCondition(ctx, name, initialVersion, start, timeoutTimer, options.errorWindowWithDefault(), options, msgCallback) if err != nil { return err, time.Since(start) @@ -140,10 +143,14 @@ func (w *waitForReadyConfig) Wait(ctx context.Context, watcher watch.Interface, // An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning // an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime // for the "Ready" condition, then the method continues to wait. -func (w *waitForReadyConfig) waitForReadyCondition( - ctx context.Context, watcher watch.Interface, start time.Time, timeoutTimer *time.Timer, errorWindow time.Duration, msgCallback MessageCallback, -) (retry bool, timeoutReached bool, err error) { +func (w *waitForReadyConfig) waitForReadyCondition(ctx context.Context, name string, initialVersion string, start time.Time, + timeoutTimer *time.Timer, errorWindow time.Duration, options Options, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) { + watcher, err := w.watchMaker(ctx, name, initialVersion, options.timeoutWithDefault()) + if err != nil { + return false, false, err + } + defer watcher.Stop() // channel used to transport the error that has been received errChan := make(chan error) @@ -170,8 +177,7 @@ func (w *waitForReadyConfig) waitForReadyCondition( return false, false, err case event, ok := <-watcher.ResultChan(): if !ok || event.Object == nil { - // retry only if the channel is still open - return ok, false, nil + return true, false, nil } // Check whether resource is in sync already (meta.generation == status.observedGeneration) @@ -240,7 +246,13 @@ func (w *waitForReadyConfig) waitForReadyCondition( } // Wait until the expected EventDone is satisfied -func (w *waitForEvent) Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) { +func (w *waitForEvent) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) { + watcher, err := w.watchMaker(ctx, name, initialVersion, options.timeoutWithDefault()) + if err != nil { + return err, 0 + } + defer watcher.Stop() + timeout := options.timeoutWithDefault() start := time.Now() // channel used to transport the error diff --git a/pkg/wait/wait_for_ready_test.go b/pkg/wait/wait_for_ready_test.go index 6fbcefdbff..b40b493f54 100644 --- a/pkg/wait/wait_for_ready_test.go +++ b/pkg/wait/wait_for_ready_test.go @@ -17,6 +17,7 @@ package wait import ( "context" "errors" + "fmt" "testing" "time" @@ -39,9 +40,13 @@ type waitForReadyTestCase struct { func TestWaitCancellation(t *testing.T) { fakeWatchApi := NewFakeWatch([]watch.Event{}) fakeWatchApi.Start() - wfe := NewWaitForEvent("foobar", func(e *watch.Event) bool { - return false - }) + wfe := NewWaitForEvent("foobar", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return fakeWatchApi, nil + }, + func(e *watch.Event) bool { + return false + }) timeout := time.Second * 5 @@ -51,7 +56,7 @@ func TestWaitCancellation(t *testing.T) { time.Sleep(time.Millisecond * 500) cancel() }() - err, _ := wfe.Wait(ctx, fakeWatchApi, "foobar", Options{Timeout: &timeout}, NoopMessageCallback()) + err, _ := wfe.Wait(ctx, "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback()) assert.Assert(t, errors.Is(err, context.Canceled)) ctx, cancel = context.WithCancel(context.Background()) @@ -62,10 +67,13 @@ func TestWaitCancellation(t *testing.T) { }() wfr := NewWaitForReady( "blub", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return fakeWatchApi, nil + }, func(obj runtime.Object) (apis.Conditions, error) { return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil }) - err, _ = wfr.Wait(ctx, fakeWatchApi, "foobar", Options{Timeout: &timeout}, NoopMessageCallback()) + err, _ = wfr.Wait(ctx, "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback()) assert.Assert(t, errors.Is(err, context.Canceled)) } @@ -73,15 +81,65 @@ func TestAddWaitForReady(t *testing.T) { for i, tc := range prepareTestCases("test-service") { fakeWatchApi := NewFakeWatch(tc.events) - waitForReady := NewWaitForReady( "blub", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return fakeWatchApi, nil + }, func(obj runtime.Object) (apis.Conditions, error) { return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil }) fakeWatchApi.Start() var msgs []string - err, _ := waitForReady.Wait(context.Background(), fakeWatchApi, "foobar", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) { + err, _ := waitForReady.Wait(context.Background(), "foobar", "", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) { + msgs = append(msgs, msg) + }) + close(fakeWatchApi.eventChan) + + if tc.errorText == "" && err != nil { + t.Errorf("%d: Error received %v", i, err) + continue + } + if tc.errorText != "" { + if err == nil { + t.Errorf("%d: No error but expected one", i) + } else { + assert.ErrorContains(t, err, tc.errorText) + } + } + + // check messages + assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i) + + if fakeWatchApi.StopCalled != 1 { + t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled) + } + + } +} + +func TestAddWaitForReadyWithChannelClose(t *testing.T) { + for i, tc := range prepareTestCases("test-service") { + fakeWatchApi := NewFakeWatch(tc.events) + counter := 0 + waitForReady := NewWaitForReady( + "blub", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + if counter == 0 { + close(fakeWatchApi.eventChan) + counter++ + return fakeWatchApi, nil + } + fakeWatchApi.eventChan = make(chan watch.Event) + fakeWatchApi.Start() + return fakeWatchApi, nil + }, + func(obj runtime.Object) (apis.Conditions, error) { + return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil + }) + var msgs []string + + err, _ := waitForReady.Wait(context.Background(), "foobar", "", Options{Timeout: &tc.timeout}, func(_ time.Duration, msg string) { msgs = append(msgs, msg) }) close(fakeWatchApi.eventChan) @@ -101,23 +159,80 @@ func TestAddWaitForReady(t *testing.T) { // check messages assert.Assert(t, cmp.DeepEqual(tc.messagesExpected, msgs), "%d: Messages expected to be equal", i) - if fakeWatchApi.StopCalled != 0 { - t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled) + if fakeWatchApi.StopCalled != 2 { + t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled) } } } +func TestWaitTimeout(t *testing.T) { + fakeWatchApi := NewFakeWatch([]watch.Event{}) + timeout := time.Second * 3 + wfe := NewWaitForEvent("foobar", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return fakeWatchApi, nil + }, + func(e *watch.Event) bool { + return false + }) + + err, _ := wfe.Wait(context.Background(), "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback()) + assert.ErrorContains(t, err, "not ready") + assert.Assert(t, fakeWatchApi.StopCalled == 1) + + fakeWatchApi = NewFakeWatch([]watch.Event{}) + wfr := NewWaitForReady( + "blub", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return fakeWatchApi, nil + }, + func(obj runtime.Object) (apis.Conditions, error) { + return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil + }) + err, _ = wfr.Wait(context.Background(), "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback()) + assert.ErrorContains(t, err, "not ready") + assert.Assert(t, fakeWatchApi.StopCalled == 1) +} + +func TestWaitWatchError(t *testing.T) { + timeout := time.Second * 3 + wfe := NewWaitForEvent("foobar", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return nil, fmt.Errorf("error creating watcher") + }, + func(e *watch.Event) bool { + return false + }) + + err, _ := wfe.Wait(context.Background(), "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback()) + assert.ErrorContains(t, err, "error creating watcher") + + wfr := NewWaitForReady( + "blub", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return nil, fmt.Errorf("error creating watcher") + }, + func(obj runtime.Object) (apis.Conditions, error) { + return apis.Conditions(obj.(*servingv1.Service).Status.Conditions), nil + }) + err, _ = wfr.Wait(context.Background(), "foobar", "", Options{Timeout: &timeout}, NoopMessageCallback()) + assert.ErrorContains(t, err, "error creating watcher") +} + func TestAddWaitForDelete(t *testing.T) { for i, tc := range prepareDeleteTestCases("test-service") { fakeWatchAPI := NewFakeWatch(tc.events) waitForEvent := NewWaitForEvent( "blub", + func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) { + return fakeWatchAPI, nil + }, func(evt *watch.Event) bool { return evt.Type == watch.Deleted }) fakeWatchAPI.Start() - err, _ := waitForEvent.Wait(context.Background(), fakeWatchAPI, "foobar", Options{Timeout: &tc.timeout}, NoopMessageCallback()) + err, _ := waitForEvent.Wait(context.Background(), "foobar", "", Options{Timeout: &tc.timeout}, NoopMessageCallback()) close(fakeWatchAPI.eventChan) if tc.errorText == "" && err != nil { @@ -132,8 +247,8 @@ func TestAddWaitForDelete(t *testing.T) { } } - if fakeWatchAPI.StopCalled != 0 { - t.Errorf("%d: Exactly zero 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled) + if fakeWatchAPI.StopCalled != 1 { + t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchAPI.StopCalled) } } } @@ -142,10 +257,10 @@ func TestAddWaitForDelete(t *testing.T) { func prepareTestCases(name string) []waitForReadyTestCase { return []waitForReadyTestCase{ errorTest(name), - tc(peNormal, name, time.Second, ""), - tc(peWrongGeneration, name, 1*time.Second, "timeout"), - tc(peTimeout, name, time.Second, "timeout"), - tc(peReadyFalseWithinErrorWindow, name, time.Second, ""), + tc(peNormal, name, 5*time.Second, ""), + tc(peWrongGeneration, name, 5*time.Second, "timeout"), + tc(peTimeout, name, 5*time.Second, "timeout"), + tc(peReadyFalseWithinErrorWindow, name, 5*time.Second, ""), } } @@ -164,7 +279,7 @@ func errorTest(name string) waitForReadyTestCase { return waitForReadyTestCase{ events: events, - timeout: 3 * time.Second, + timeout: 5 * time.Second, errorText: "FakeError", messagesExpected: []string{"msg1", "Test Error"}, }