Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion test/upgrade/prober/wathola/event/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ func (f *finishedStore) RegisterFinished(finished *Finished) {
log.Infof("waiting additional %v to be sure all events came", d)
time.Sleep(d)
receivedEvents := f.steps.Count()
if receivedEvents != finished.EventsSent {

if receivedEvents != finished.EventsSent &&
// If sending was interrupted, tolerate one more received
// event as there's no way to check if the last event is delivered or not.
!(finished.SendingInterrupted && receivedEvents == finished.EventsSent+1) {
f.errors.throwUnexpected("expecting to have %v unique events received, "+
"but received %v unique events", finished.EventsSent, receivedEvents)
f.reportViolations(finished)
Expand Down
1 change: 1 addition & 0 deletions test/upgrade/prober/wathola/event/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Finished struct {
EventsSent int
TotalRequests int
UnavailablePeriods []UnavailablePeriod
SendingInterrupted bool
}

// Type returns a type of a event
Expand Down
12 changes: 10 additions & 2 deletions test/upgrade/prober/wathola/sender/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type sender struct {
totalRequests int
// unavailablePeriods is an array for non-zero retries for each event
unavailablePeriods []event.UnavailablePeriod
// sendingInterrupted indicates whether sending last event was interrupted by shutdown
sendingInterrupted bool
}

func (s *sender) SendContinually() {
Expand Down Expand Up @@ -90,6 +92,7 @@ func (s *sender) SendContinually() {
Period: time.Since(start),
LastErr: err.Error(),
})
s.sendingInterrupted = true
}
return
default:
Expand All @@ -100,7 +103,7 @@ func (s *sender) SendContinually() {
start = time.Now()
}
log.Warnf("Could not send step event %v, retrying (%d): %v",
s.eventsSent, retry, err)
currentStep.Number, retry, err)
retry++
lastErr = err
} else {
Expand Down Expand Up @@ -219,7 +222,12 @@ func (s *sender) sendFinished() {
if s.eventsSent == 0 {
return
}
finished := event.Finished{EventsSent: s.eventsSent, TotalRequests: s.totalRequests, UnavailablePeriods: s.unavailablePeriods}
finished := event.Finished{
EventsSent: s.eventsSent,
TotalRequests: s.totalRequests,
UnavailablePeriods: s.unavailablePeriods,
SendingInterrupted: s.sendingInterrupted,
}
endpoint := senderConfig.Address
ce := NewCloudEvent(finished, event.FinishedType)
ctx, span := PopulateSpanWithEvent(context.Background(), ce, Name)
Expand Down