From 4279ba9ce94c4f382cd44341a6500935eeb24fe9 Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Tue, 14 Nov 2023 08:07:54 +0100 Subject: [PATCH 1/2] Fix reporting index of the failed step The warning had a wrong index: {"level":"info","ts":"2023-11-13T14:00:03.816770975Z","caller":"sender/services.go:207","msg":"Sending step event #16691 to \"http://sut-kn-channel.eventing-e2e0.svc.cluster.local\""} {"level":"warn","ts":"2023-11-13T14:00:03.821102525Z","caller":"sender/services.go:102","msg":"Could not send step event 16690, retrying (1): Post \"http://sut-kn-channel.eventing-e2e0.svc.cluster.local\": dial tcp 172.30.99.91:80: connect: connection refused"} --- test/upgrade/prober/wathola/sender/services.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/upgrade/prober/wathola/sender/services.go b/test/upgrade/prober/wathola/sender/services.go index 7827e12ad2a..f09ab0453bb 100644 --- a/test/upgrade/prober/wathola/sender/services.go +++ b/test/upgrade/prober/wathola/sender/services.go @@ -100,7 +100,7 @@ func (s *sender) SendContinually() { start = time.Now() } log.Warnf("Could not send step event %v, retrying (%d): %v", - s.eventsSent, retry, err) + currentStep.Number, retry, err) retry++ lastErr = err } else { From 9cfdc56f8a64074ae1c1d7d936fe2c1445b59bdf Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Tue, 14 Nov 2023 10:44:36 +0100 Subject: [PATCH 2/2] Upgrade tests account for last event being interrupted --- test/upgrade/prober/wathola/event/services.go | 6 +++++- test/upgrade/prober/wathola/event/types.go | 1 + test/upgrade/prober/wathola/sender/services.go | 10 +++++++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/test/upgrade/prober/wathola/event/services.go b/test/upgrade/prober/wathola/event/services.go index c5717810af6..f7f055b4ce9 100644 --- a/test/upgrade/prober/wathola/event/services.go +++ b/test/upgrade/prober/wathola/event/services.go @@ -108,7 +108,11 @@ func (f *finishedStore) RegisterFinished(finished *Finished) { log.Infof("waiting additional %v to be sure all events came", d) time.Sleep(d) receivedEvents := f.steps.Count() - if receivedEvents != finished.EventsSent { + + if receivedEvents != finished.EventsSent && + // If sending was interrupted, tolerate one more received + // event as there's no way to check if the last event is delivered or not. + !(finished.SendingInterrupted && receivedEvents == finished.EventsSent+1) { f.errors.throwUnexpected("expecting to have %v unique events received, "+ "but received %v unique events", finished.EventsSent, receivedEvents) f.reportViolations(finished) diff --git a/test/upgrade/prober/wathola/event/types.go b/test/upgrade/prober/wathola/event/types.go index 3cab3915991..d53284ba420 100644 --- a/test/upgrade/prober/wathola/event/types.go +++ b/test/upgrade/prober/wathola/event/types.go @@ -46,6 +46,7 @@ type Finished struct { EventsSent int TotalRequests int UnavailablePeriods []UnavailablePeriod + SendingInterrupted bool } // Type returns a type of a event diff --git a/test/upgrade/prober/wathola/sender/services.go b/test/upgrade/prober/wathola/sender/services.go index f09ab0453bb..38ace276c73 100644 --- a/test/upgrade/prober/wathola/sender/services.go +++ b/test/upgrade/prober/wathola/sender/services.go @@ -58,6 +58,8 @@ type sender struct { totalRequests int // unavailablePeriods is an array for non-zero retries for each event unavailablePeriods []event.UnavailablePeriod + // sendingInterrupted indicates whether sending last event was interrupted by shutdown + sendingInterrupted bool } func (s *sender) SendContinually() { @@ -90,6 +92,7 @@ func (s *sender) SendContinually() { Period: time.Since(start), LastErr: err.Error(), }) + s.sendingInterrupted = true } return default: @@ -219,7 +222,12 @@ func (s *sender) sendFinished() { if s.eventsSent == 0 { return } - finished := event.Finished{EventsSent: s.eventsSent, TotalRequests: s.totalRequests, UnavailablePeriods: s.unavailablePeriods} + finished := event.Finished{ + EventsSent: s.eventsSent, + TotalRequests: s.totalRequests, + UnavailablePeriods: s.unavailablePeriods, + SendingInterrupted: s.sendingInterrupted, + } endpoint := senderConfig.Address ce := NewCloudEvent(finished, event.FinishedType) ctx, span := PopulateSpanWithEvent(context.Background(), ce, Name)