Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions pkg/eventing/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v1

import (
"context"
"fmt"
"time"

apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -241,25 +242,27 @@ func (c *knEventingClient) GetBroker(ctx context.Context, name string) (*eventin
}

// WatchBroker is used to create watcher object
func (c *knEventingClient) WatchBroker(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, c.client.Brokers(c.namespace).Watch, c.client.RESTClient(), c.namespace, "brokers", name, timeout)
func (c *knEventingClient) WatchBroker(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcherWithVersion(ctx, c.client.Brokers(c.namespace).Watch, c.client.RESTClient(), c.namespace, "brokers", name, initialVersion, timeout)
}

// DeleteBroker is used to delete an instance of broker and wait for completion until given timeout
// For `timeout == 0` delete is performed async without any wait
func (c *knEventingClient) DeleteBroker(ctx context.Context, name string, timeout time.Duration) error {
broker, err := c.GetBroker(ctx, name)
if err != nil {
return err
}
if broker.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't delete broker '%s' because it has been already marked for deletion", name)
}
if timeout == 0 {
return c.deleteBroker(ctx, name, apis_v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := c.WatchBroker(ctx, name, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("broker", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("broker", c.WatchBroker, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, name, broker.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = c.deleteBroker(ctx, name, apis_v1.DeletePropagationForeground)
Expand Down
44 changes: 40 additions & 4 deletions pkg/eventing/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func setup() (fakeSvr fake.FakeEventingV1, client KnEventingClient) {
return fakeE, cli
}

func TestNamespace(t *testing.T) {
_, client := setup()
assert.Equal(t, testNamespace, client.Namespace())
}

func TestDeleteTrigger(t *testing.T) {
var name = "new-trigger"
server, client := setup()
Expand Down Expand Up @@ -245,35 +250,62 @@ func TestBrokerDelete(t *testing.T) {
var name = "fooBroker"
server, client := setup()

server.AddReactor("get", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.GetAction).GetName()
if name == "notFound" {
return true, nil, errors.NewNotFound(eventingv1.Resource("broker"), "notFound")
}
return false, nil, nil
})
server.AddReactor("delete", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.DeleteAction).GetName()
if name == "errorBroker" {
return true, nil, fmt.Errorf("error while deleting broker %s", name)
}
return true, nil, nil
return false, nil, nil
})

err := client.DeleteBroker(context.Background(), name, 0)
assert.NilError(t, err)

err = client.DeleteBroker(context.Background(), "errorBroker", 0)
assert.ErrorContains(t, err, "errorBroker", 0)

err = client.DeleteBroker(context.Background(), "notFound", 0)
assert.ErrorContains(t, err, "not found", 0)
assert.ErrorContains(t, err, "notFound", 0)
}

func TestBrokerDeleteWithWait(t *testing.T) {
var name = "fooBroker"
var brokerName = "fooBroker"
var deleted = "deletedBroker"
server, client := setup()

server.AddReactor("get", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.DeleteAction).GetName()
if name == deleted {
deletedBroker := newBroker(deleted)
deletedBroker.DeletionTimestamp = &metav1.Time{Time: time.Now()}
return true, deletedBroker, nil
}
return false, nil, nil
})
server.AddReactor("delete", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.DeleteAction).GetName()
if name == "errorBroker" {
return true, nil, fmt.Errorf("error while deleting broker %s", name)
}
if name == deleted {
deletedBroker := newBroker(deleted)
deletedBroker.DeletionTimestamp = &metav1.Time{Time: time.Now()}
return true, deletedBroker, nil
}
return true, nil, nil
})

server.AddWatchReactor("brokers",
func(a client_testing.Action) (bool, watch.Interface, error) {
watchAction := a.(client_testing.WatchAction)
Expand All @@ -286,11 +318,15 @@ func TestBrokerDeleteWithWait(t *testing.T) {
return true, w, nil
})

err := client.DeleteBroker(context.Background(), name, time.Duration(10)*time.Second)
err := client.DeleteBroker(context.Background(), brokerName, time.Duration(10)*time.Second)
assert.NilError(t, err)

err = client.DeleteBroker(context.Background(), "errorBroker", time.Duration(10)*time.Second)
assert.ErrorContains(t, err, "errorBroker", time.Duration(10)*time.Second)

err = client.DeleteBroker(context.Background(), deleted, time.Duration(10)*time.Second)
assert.ErrorContains(t, err, "marked for deletion")
assert.ErrorContains(t, err, deleted, time.Duration(10)*time.Second)
}

func TestBrokerList(t *testing.T) {
Expand Down
48 changes: 25 additions & 23 deletions pkg/serving/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ func (cl *knServingClient) GetService(ctx context.Context, name string) (*servin
return service, nil
}

func (cl *knServingClient) WatchService(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, cl.client.Services(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "services", name, timeout)
func (cl *knServingClient) WatchServiceWithVersion(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcherWithVersion(ctx, cl.client.Services(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "services", name, initialVersion, timeout)
}

func (cl *knServingClient) WatchRevision(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, cl.client.Revisions(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "revision", name, timeout)
func (cl *knServingClient) WatchRevisionWithVersion(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcherWithVersion(ctx, cl.client.Revisions(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "revision", name, initialVersion, timeout)
}

// List services
Expand Down Expand Up @@ -314,24 +314,28 @@ func (cl *knServingClient) ApplyService(ctx context.Context, modifiedService *se
// Param `timeout` represents a duration to wait for a delete op to finish.
// For `timeout == 0` delete is performed async without any wait.
func (cl *knServingClient) DeleteService(ctx context.Context, serviceName string, timeout time.Duration) error {
service, err := cl.GetService(ctx, serviceName)
if err != nil {
return err
}
if service.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't delete service '%s' because it has been already marked for deletion", serviceName)
}
if timeout == 0 {
return cl.deleteService(ctx, serviceName, v1.DeletePropagationBackground)
}

waitC := make(chan error)
watcher, err := cl.WatchService(ctx, serviceName, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("service", cl.WatchServiceWithVersion, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, serviceName, service.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = cl.deleteService(ctx, serviceName, v1.DeletePropagationForeground)
if err != nil {
return err
}

return <-waitC
}

Expand All @@ -350,13 +354,16 @@ func (cl *knServingClient) deleteService(ctx context.Context, serviceName string

// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(ctx context.Context, name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
watcher, err := cl.WatchService(ctx, name, timeout)
waitForReady := wait.NewWaitForReady("service", cl.WatchServiceWithVersion, serviceConditionExtractor)

service, err := cl.GetService(ctx, name)
if err != nil {
return err, timeout
if apierrors.IsNotFound(err) {
return waitForReady.Wait(ctx, name, "", wait.Options{Timeout: &timeout}, msgCallback)
}
return err, 0
}
defer watcher.Stop()
waitForReady := wait.NewWaitForReady("service", serviceConditionExtractor)
return waitForReady.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, msgCallback)
return waitForReady.Wait(ctx, name, service.ResourceVersion, wait.Options{Timeout: &timeout}, msgCallback)
}

// Get the configuration for a service
Expand Down Expand Up @@ -469,14 +476,9 @@ func (cl *knServingClient) DeleteRevision(ctx context.Context, name string, time
return cl.deleteRevision(ctx, name)
}
waitC := make(chan error)
watcher, err := cl.WatchRevision(ctx, name, timeout)
if err != nil {
return err
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("revision", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevisionWithVersion, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, name, revision.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = cl.deleteRevision(ctx, name)
Expand Down
10 changes: 10 additions & 0 deletions pkg/serving/v1/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ func (c *MockKnServingClient) DeleteRevision(ctx context.Context, name string, t
return mock.ErrorOrNil(call.Result[0])
}

// Wait for a revision to become ready, but not longer than provided timeout
func (sr *ServingRecorder) WaitForRevision(name interface{}, timeout interface{}, callback interface{}, err error, duration time.Duration) {
sr.r.Add("WaitForRevision", []interface{}{name, timeout, callback}, []interface{}{err, duration})
}

func (c *MockKnServingClient) WaitForRevision(ctx context.Context, name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
call := c.recorder.r.VerifyCall("WaitForRevision", name, timeout, msgCallback)
return mock.ErrorOrNil(call.Result[0]), call.Result[1].(time.Duration)
}

// Get a route by its unique name
func (sr *ServingRecorder) GetRoute(name interface{}, route *servingv1.Route, err error) {
sr.r.Add("GetRoute", []interface{}{name}, []interface{}{route, err})
Expand Down
2 changes: 2 additions & 0 deletions pkg/serving/v1/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestMockKnClient(t *testing.T) {
recorder.CreateRevision(&servingv1.Revision{}, nil)
recorder.UpdateRevision(&servingv1.Revision{}, nil)
recorder.DeleteRevision("hello", time.Duration(10)*time.Second, nil)
recorder.WaitForRevision("hello", time.Duration(10)*time.Second, wait.NoopMessageCallback(), nil, 10*time.Second)
recorder.GetRoute("hello", nil, nil)
recorder.ListRoutes(mock.Any(), nil, nil)
recorder.GetConfiguration("hello", nil, nil)
Expand All @@ -65,6 +66,7 @@ func TestMockKnClient(t *testing.T) {
client.CreateRevision(ctx, &servingv1.Revision{})
client.UpdateRevision(ctx, &servingv1.Revision{})
client.DeleteRevision(ctx, "hello", time.Duration(10)*time.Second)
client.WaitForRevision(ctx, "hello", time.Duration(10)*time.Second, wait.NoopMessageCallback())
client.GetRoute(ctx, "hello")
client.ListRoutes(ctx, WithName("blub"))
client.GetConfiguration(ctx, "hello")
Expand Down
Loading