diff --git a/test/upgrade/prober/wathola/event/services.go b/test/upgrade/prober/wathola/event/services.go index c5717810af6..f7f055b4ce9 100644 --- a/test/upgrade/prober/wathola/event/services.go +++ b/test/upgrade/prober/wathola/event/services.go @@ -108,7 +108,11 @@ func (f *finishedStore) RegisterFinished(finished *Finished) { log.Infof("waiting additional %v to be sure all events came", d) time.Sleep(d) receivedEvents := f.steps.Count() - if receivedEvents != finished.EventsSent { + + if receivedEvents != finished.EventsSent && + // If sending was interrupted, tolerate one more received + // event as there's no way to check if the last event is delivered or not. + !(finished.SendingInterrupted && receivedEvents == finished.EventsSent+1) { f.errors.throwUnexpected("expecting to have %v unique events received, "+ "but received %v unique events", finished.EventsSent, receivedEvents) f.reportViolations(finished) diff --git a/test/upgrade/prober/wathola/event/types.go b/test/upgrade/prober/wathola/event/types.go index 3cab3915991..d53284ba420 100644 --- a/test/upgrade/prober/wathola/event/types.go +++ b/test/upgrade/prober/wathola/event/types.go @@ -46,6 +46,7 @@ type Finished struct { EventsSent int TotalRequests int UnavailablePeriods []UnavailablePeriod + SendingInterrupted bool } // Type returns a type of a event diff --git a/test/upgrade/prober/wathola/sender/services.go b/test/upgrade/prober/wathola/sender/services.go index 7827e12ad2a..38ace276c73 100644 --- a/test/upgrade/prober/wathola/sender/services.go +++ b/test/upgrade/prober/wathola/sender/services.go @@ -58,6 +58,8 @@ type sender struct { totalRequests int // unavailablePeriods is an array for non-zero retries for each event unavailablePeriods []event.UnavailablePeriod + // sendingInterrupted indicates whether sending last event was interrupted by shutdown + sendingInterrupted bool } func (s *sender) SendContinually() { @@ -90,6 +92,7 @@ func (s *sender) SendContinually() { Period: time.Since(start), LastErr: err.Error(), }) + s.sendingInterrupted = true } return default: @@ -100,7 +103,7 @@ func (s *sender) SendContinually() { start = time.Now() } log.Warnf("Could not send step event %v, retrying (%d): %v", - s.eventsSent, retry, err) + currentStep.Number, retry, err) retry++ lastErr = err } else { @@ -219,7 +222,12 @@ func (s *sender) sendFinished() { if s.eventsSent == 0 { return } - finished := event.Finished{EventsSent: s.eventsSent, TotalRequests: s.totalRequests, UnavailablePeriods: s.unavailablePeriods} + finished := event.Finished{ + EventsSent: s.eventsSent, + TotalRequests: s.totalRequests, + UnavailablePeriods: s.unavailablePeriods, + SendingInterrupted: s.sendingInterrupted, + } endpoint := senderConfig.Address ce := NewCloudEvent(finished, event.FinishedType) ctx, span := PopulateSpanWithEvent(context.Background(), ce, Name)