Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions pkg/eventing/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v1

import (
"context"
"fmt"
"time"

apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -241,25 +242,27 @@ func (c *knEventingClient) GetBroker(ctx context.Context, name string) (*eventin
}

// WatchBroker is used to create watcher object
func (c *knEventingClient) WatchBroker(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, c.client.Brokers(c.namespace).Watch, c.client.RESTClient(), c.namespace, "brokers", name, timeout)
func (c *knEventingClient) WatchBroker(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcherWithVersion(ctx, c.client.Brokers(c.namespace).Watch, c.client.RESTClient(), c.namespace, "brokers", name, initialVersion, timeout)
}

// DeleteBroker is used to delete an instance of broker and wait for completion until given timeout
// For `timeout == 0` delete is performed async without any wait
func (c *knEventingClient) DeleteBroker(ctx context.Context, name string, timeout time.Duration) error {
broker, err := c.GetBroker(ctx, name)
if err != nil {
return err
}
Comment on lines +253 to +255
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can comment lines 253-255 and all tests passing

if broker.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't delete broker '%s' because it has been already marked for deletion", name)
}
if timeout == 0 {
return c.deleteBroker(ctx, name, apis_v1.DeletePropagationBackground)
}
waitC := make(chan error)
watcher, err := c.WatchBroker(ctx, name, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("broker", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("broker", c.WatchBroker, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, name, broker.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = c.deleteBroker(ctx, name, apis_v1.DeletePropagationForeground)
Expand Down
14 changes: 13 additions & 1 deletion pkg/eventing/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,20 +245,32 @@ func TestBrokerDelete(t *testing.T) {
var name = "fooBroker"
server, client := setup()

server.AddReactor("get", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.GetAction).GetName()
if name == "notFound" {
return true, nil, errors.NewNotFound(eventingv1.Resource("broker"), "notFound")
}
return false, nil, nil
})
server.AddReactor("delete", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.DeleteAction).GetName()
if name == "errorBroker" {
return true, nil, fmt.Errorf("error while deleting broker %s", name)
}
return true, nil, nil
return false, nil, nil
})

err := client.DeleteBroker(context.Background(), name, 0)
assert.NilError(t, err)

err = client.DeleteBroker(context.Background(), "errorBroker", 0)
assert.ErrorContains(t, err, "errorBroker", 0)

err = client.DeleteBroker(context.Background(), "notFound", 0)
assert.ErrorContains(t, err, "not found", 0)
assert.ErrorContains(t, err, "notFound", 0)
}

func TestBrokerDeleteWithWait(t *testing.T) {
Expand Down
48 changes: 25 additions & 23 deletions pkg/serving/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ func (cl *knServingClient) GetService(ctx context.Context, name string) (*servin
return service, nil
}

func (cl *knServingClient) WatchService(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, cl.client.Services(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "services", name, timeout)
func (cl *knServingClient) WatchServiceWithVersion(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcherWithVersion(ctx, cl.client.Services(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "services", name, initialVersion, timeout)
}

func (cl *knServingClient) WatchRevision(ctx context.Context, name string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcher(ctx, cl.client.Revisions(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "revision", name, timeout)
func (cl *knServingClient) WatchRevisionWithVersion(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
return wait.NewWatcherWithVersion(ctx, cl.client.Revisions(cl.namespace).Watch, cl.client.RESTClient(), cl.namespace, "revision", name, initialVersion, timeout)
}

// List services
Expand Down Expand Up @@ -314,24 +314,28 @@ func (cl *knServingClient) ApplyService(ctx context.Context, modifiedService *se
// Param `timeout` represents a duration to wait for a delete op to finish.
// For `timeout == 0` delete is performed async without any wait.
func (cl *knServingClient) DeleteService(ctx context.Context, serviceName string, timeout time.Duration) error {
service, err := cl.GetService(ctx, serviceName)
if err != nil {
return err
}
Comment on lines +318 to +320
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous...

if service.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't delete service '%s' because it has been already marked for deletion", serviceName)
}
if timeout == 0 {
return cl.deleteService(ctx, serviceName, v1.DeletePropagationBackground)
}

waitC := make(chan error)
watcher, err := cl.WatchService(ctx, serviceName, timeout)
if err != nil {
return nil
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("service", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, serviceName, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("service", cl.WatchServiceWithVersion, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, serviceName, service.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = cl.deleteService(ctx, serviceName, v1.DeletePropagationForeground)
if err != nil {
return err
}

return <-waitC
}

Expand All @@ -350,13 +354,16 @@ func (cl *knServingClient) deleteService(ctx context.Context, serviceName string

// Wait for a service to become ready, but not longer than provided timeout
func (cl *knServingClient) WaitForService(ctx context.Context, name string, timeout time.Duration, msgCallback wait.MessageCallback) (error, time.Duration) {
watcher, err := cl.WatchService(ctx, name, timeout)
waitForReady := wait.NewWaitForReady("service", cl.WatchServiceWithVersion, serviceConditionExtractor)

service, err := cl.GetService(ctx, name)
if err != nil {
return err, timeout
if apierrors.IsNotFound(err) {
return waitForReady.Wait(ctx, name, "", wait.Options{Timeout: &timeout}, msgCallback)
}
return err, 0
}
defer watcher.Stop()
waitForReady := wait.NewWaitForReady("service", serviceConditionExtractor)
return waitForReady.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, msgCallback)
return waitForReady.Wait(ctx, name, service.ResourceVersion, wait.Options{Timeout: &timeout}, msgCallback)
}

// Get the configuration for a service
Expand Down Expand Up @@ -469,14 +476,9 @@ func (cl *knServingClient) DeleteRevision(ctx context.Context, name string, time
return cl.deleteRevision(ctx, name)
}
waitC := make(chan error)
watcher, err := cl.WatchRevision(ctx, name, timeout)
if err != nil {
return err
}
defer watcher.Stop()
go func() {
waitForEvent := wait.NewWaitForEvent("revision", func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, watcher, name, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitForEvent := wait.NewWaitForEvent("revision", cl.WatchRevisionWithVersion, func(evt *watch.Event) bool { return evt.Type == watch.Deleted })
err, _ := waitForEvent.Wait(ctx, name, revision.ResourceVersion, wait.Options{Timeout: &timeout}, wait.NoopMessageCallback())
waitC <- err
}()
err = cl.deleteRevision(ctx, name)
Expand Down
12 changes: 11 additions & 1 deletion pkg/serving/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ func TestDeleteService(t *testing.T) {
nonExistingServiceName = "no-service"
)

serving.AddReactor("get", "services",
func(a clienttesting.Action) (bool, runtime.Object, error) {
name := a.(clienttesting.GetAction).GetName()
if name == serviceName {
// Don't handle existing service, just continue to next
return false, nil, nil
}
return true, nil, errors.NewNotFound(servingv1.Resource("service"), name)
})
serving.AddReactor("delete", "services",
func(a clienttesting.Action) (bool, runtime.Object, error) {
name := a.(clienttesting.DeleteAction).GetName()
Expand All @@ -201,7 +210,7 @@ func TestDeleteService(t *testing.T) {
if name == serviceName {
return true, nil, nil
}
return true, nil, errors.NewNotFound(servingv1.Resource("service"), name)
return false, nil, nil
})
serving.AddWatchReactor("services",
func(a clienttesting.Action) (bool, watch.Interface, error) {
Expand All @@ -222,6 +231,7 @@ func TestDeleteService(t *testing.T) {

t.Run("trying to delete non-existing service returns error", func(t *testing.T) {
err := client.DeleteService(context.Background(), nonExistingServiceName, time.Duration(10)*time.Second)
println(err.Error())
assert.ErrorContains(t, err, "not found")
assert.ErrorContains(t, err, nonExistingServiceName)
})
Expand Down
13 changes: 7 additions & 6 deletions pkg/wait/poll_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ func newTickerPollInterval(d time.Duration) *tickerPollInterval {
return &tickerPollInterval{time.NewTicker(d)}
}

// NewWatcher makes a watch.Interface on the given resource in the client,
// NewWatcherWithVersion makes a watch.Interface on the given resource in the client,
// falling back to polling if the server does not support Watch.
func NewWatcher(ctx context.Context, watchFunc watchF, c rest.Interface, ns string, resource string, name string, timeout time.Duration) (watch.Interface, error) {
native, err := nativeWatch(ctx, watchFunc, name, timeout)
func NewWatcherWithVersion(ctx context.Context, watchFunc watchF, c rest.Interface, ns string, resource string, name string, initialResourceVersion string, timeout time.Duration) (watch.Interface, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Golint comments: exported function NewWatcherWithVersion should have comment or be unexported. More info.

native, err := nativeWatchWithVersion(ctx, watchFunc, name, initialResourceVersion, timeout)
if err == nil {
return native, nil
}
polling := &pollingWatcher{
c, ns, resource, name, timeout, make(chan bool), make(chan watch.Event), &sync.WaitGroup{},
newTickerPollInterval(pollInterval), nativePoll(ctx, c, ns, resource, name)}
newTickerPollInterval(time.Second), nativePoll(ctx, c, ns, resource, name)}
polling.start()
return polling, nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can comment this line and all tests pass. So missing coverage.

}
Expand Down Expand Up @@ -161,9 +161,10 @@ func (w *pollingWatcher) Stop() {
close(w.done)
}

func nativeWatch(ctx context.Context, watchFunc watchF, name string, timeout time.Duration) (watch.Interface, error) {
func nativeWatchWithVersion(ctx context.Context, watchFunc watchF, name string, initialVersion string, timeout time.Duration) (watch.Interface, error) {
opts := v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
ResourceVersion: initialVersion,
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
}
opts.Watch = true
addWatchTimeout(&opts, timeout)
Expand Down
46 changes: 29 additions & 17 deletions pkg/wait/wait_for_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (

// Callbacks and configuration used while waiting
type waitForReadyConfig struct {
watchMaker WatchMaker
conditionsExtractor ConditionsExtractor
kind string
}

// Callbacks and configuration used while waiting for event
type waitForEvent struct {
eventDone EventDone
kind string
watchMaker WatchMaker
eventDone EventDone
kind string
}

// EventDone is a marker to stop actual waiting on given event state
Expand All @@ -48,7 +50,7 @@ type Wait interface {
// Wait on resource the resource with this name
// and write event messages for unknown event to the status writer.
// Returns an error (if any) and the overall time it took to wait
Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration)
Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration)
}

type Options struct {
Expand All @@ -61,7 +63,7 @@ type Options struct {
}

// Create watch which is used when waiting for Ready condition
type WatchMaker func(name string, timeout time.Duration) (watch.Interface, error)
type WatchMaker func(ctx context.Context, name string, initialVersion string, timeout time.Duration) (watch.Interface, error)

// Extract conditions from a runtime object
type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
Expand All @@ -70,19 +72,21 @@ type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
type MessageCallback func(durationSinceState time.Duration, message string)

// NewWaitForReady waits until the condition is set to Ready == True
func NewWaitForReady(kind string, extractor ConditionsExtractor) Wait {
func NewWaitForReady(kind string, watchMaker WatchMaker, extractor ConditionsExtractor) Wait {
return &waitForReadyConfig{
kind: kind,
watchMaker: watchMaker,
conditionsExtractor: extractor,
}
}

// NewWaitForEvent creates a Wait object which waits until a specific event (i.e. when
// the EventDone function returns true)
func NewWaitForEvent(kind string, eventDone EventDone) Wait {
func NewWaitForEvent(kind string, watchMaker WatchMaker, eventDone EventDone) Wait {
return &waitForEvent{
kind: kind,
eventDone: eventDone,
kind: kind,
watchMaker: watchMaker,
eventDone: eventDone,
}
}

Expand All @@ -109,14 +113,13 @@ func NoopMessageCallback() MessageCallback {
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages
// msgCallback gets called for every event with an 'Ready' condition == UNKNOWN with the event's message.
func (w *waitForReadyConfig) Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {

func (w *waitForReadyConfig) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Golint arg-order: error should be the last type when returning multiple items.

timeout := options.timeoutWithDefault()
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()
for {
start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(ctx, watcher, start, timeoutTimer, options.errorWindowWithDefault(), msgCallback)
retry, timeoutReached, err := w.waitForReadyCondition(ctx, name, initialVersion, start, timeoutTimer, options.errorWindowWithDefault(), options, msgCallback)

if err != nil {
return err, time.Since(start)
Expand All @@ -140,10 +143,14 @@ func (w *waitForReadyConfig) Wait(ctx context.Context, watcher watch.Interface,
// An errorWindow can be specified which takes into account of intermediate "false" ready conditions. So before returning
// an error, this methods waits for the errorWindow duration and if an "True" or "Unknown" event arrives in the meantime
// for the "Ready" condition, then the method continues to wait.
func (w *waitForReadyConfig) waitForReadyCondition(
ctx context.Context, watcher watch.Interface, start time.Time, timeoutTimer *time.Timer, errorWindow time.Duration, msgCallback MessageCallback,
) (retry bool, timeoutReached bool, err error) {
func (w *waitForReadyConfig) waitForReadyCondition(ctx context.Context, name string, initialVersion string, start time.Time,
timeoutTimer *time.Timer, errorWindow time.Duration, options Options, msgCallback MessageCallback) (retry bool, timeoutReached bool, err error) {

watcher, err := w.watchMaker(ctx, name, initialVersion, options.timeoutWithDefault())
if err != nil {
return false, false, err
}
defer watcher.Stop()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can comment lines 150-152 and all tests passing. Similar on the other changes. My guess is that your tests are not covering the cases for failed timeout or similar?

// channel used to transport the error that has been received
errChan := make(chan error)

Expand All @@ -170,8 +177,7 @@ func (w *waitForReadyConfig) waitForReadyCondition(
return false, false, err
case event, ok := <-watcher.ResultChan():
if !ok || event.Object == nil {
// retry only if the channel is still open
return ok, false, nil
return true, false, nil
}

// Check whether resource is in sync already (meta.generation == status.observedGeneration)
Expand Down Expand Up @@ -240,7 +246,13 @@ func (w *waitForReadyConfig) waitForReadyCondition(
}

// Wait until the expected EventDone is satisfied
func (w *waitForEvent) Wait(ctx context.Context, watcher watch.Interface, name string, options Options, msgCallback MessageCallback) (error, time.Duration) {
func (w *waitForEvent) Wait(ctx context.Context, name string, initialVersion string, options Options, msgCallback MessageCallback) (error, time.Duration) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Golint arg-order: error should be the last type when returning multiple items.

watcher, err := w.watchMaker(ctx, name, initialVersion, options.timeoutWithDefault())
if err != nil {
return err, 0
}
defer watcher.Stop()

timeout := options.timeoutWithDefault()
start := time.Now()
// channel used to transport the error
Expand Down
Loading