diff --git a/pkg/controller/registry/grpc/source_test.go b/pkg/controller/registry/grpc/source_test.go index 1c06b0b19e..852a4be541 100644 --- a/pkg/controller/registry/grpc/source_test.go +++ b/pkg/controller/registry/grpc/source_test.go @@ -50,8 +50,8 @@ type FakeSourceSyncer struct { History map[registry.CatalogKey][]connectivity.State sync.Mutex - expectedEvents int - done chan struct{} + expectedReadies int + done chan struct{} } func (f *FakeSourceSyncer) sync(state SourceState) { @@ -60,18 +60,20 @@ func (f *FakeSourceSyncer) sync(state SourceState) { f.History[state.Key] = []connectivity.State{} } f.History[state.Key] = append(f.History[state.Key], state.State) - f.expectedEvents-- - if f.expectedEvents == 0 { + if state.State == connectivity.Ready { + f.expectedReadies-- + } + if f.expectedReadies == 0 { f.done <- struct{}{} } f.Unlock() } -func NewFakeSourceSyncer(expectedEvents int) *FakeSourceSyncer { +func NewFakeSourceSyncer(expectedReadies int) *FakeSourceSyncer { return &FakeSourceSyncer{ - History: map[registry.CatalogKey][]connectivity.State{}, - expectedEvents: expectedEvents, - done: make(chan struct{}), + History: map[registry.CatalogKey][]connectivity.State{}, + expectedReadies: expectedReadies, + done: make(chan struct{}), } } @@ -84,11 +86,9 @@ func TestConnectionEvents(t *testing.T) { test := func(tt testcase) func(t *testing.T) { return func(t *testing.T) { // start server for each catalog - totalEvents := 0 addresses := map[registry.CatalogKey]string{} - for catalog, events := range tt.expectedHistory { - totalEvents += len(events) + for catalog := range tt.expectedHistory { serve, address, stop := server(&fakes.FakeQuery{}) addresses[catalog] = address go serve() @@ -96,9 +96,9 @@ func TestConnectionEvents(t *testing.T) { } // start source manager - syncer := NewFakeSourceSyncer(totalEvents) + syncer := NewFakeSourceSyncer(len(tt.expectedHistory)) sources := NewSourceStore(logrus.New(), 1*time.Second, 5*time.Second, syncer.sync) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() sources.Start(ctx) @@ -115,7 +115,13 @@ func TestConnectionEvents(t *testing.T) { for catalog, events := range tt.expectedHistory { recordedEvents := syncer.History[catalog] for i := 0; i < len(recordedEvents); i++ { - require.Equal(t, (events[i]).String(), (recordedEvents[i]).String()) + found := false + for _, event := range events { + if event.String() == recordedEvents[i].String() { + found = true + } + } + require.True(t, found) } } }