Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 0 additions & 77 deletions test/common/cloudevents_vegeta_targeter.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package main
package aggregator

import (
"context"
"fmt"
"log"
"net"
"sort"
Expand All @@ -31,7 +32,8 @@ import (
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"

pb "knative.dev/eventing/test/test_images/performance/event_state"
"knative.dev/eventing/test/common/performance/common"
pb "knative.dev/eventing/test/common/performance/event_state"
"knative.dev/pkg/test/mako"
)

Expand All @@ -43,7 +45,9 @@ type eventsRecord struct {
*pb.EventsRecord
}

type aggregatorExecutor struct {
var fatalf = log.Fatalf

type Aggregator struct {
// thread-safe events recording maps
sentEvents *eventsRecord
acceptedEvents *eventsRecord
Expand All @@ -57,14 +61,26 @@ type aggregatorExecutor struct {
listener net.Listener
server *grpc.Server

makoTags []string
makoTags []string
expectRecords uint

benchmarkKey string
benchmarkName string
}

func newAggregatorExecutor(lis net.Listener, makoTags []string) testExecutor {
executor := &aggregatorExecutor{
listener: lis,
func NewAggregator(benchmarkKey, benchmarkName, listenAddr string, expectRecords uint, makoTags []string) (common.Executor, error) {
l, err := net.Listen("tcp", listenAddr)
if err != nil {
return nil, fmt.Errorf("failed to create listener: %v", err)
}

executor := &Aggregator{
listener: l,
notifyEventsReceived: make(chan struct{}),
makoTags: makoTags,
expectRecords: expectRecords,
benchmarkKey: benchmarkKey,
benchmarkName: benchmarkName,
}

// --- Create GRPC server
Expand Down Expand Up @@ -92,17 +108,15 @@ func newAggregatorExecutor(lis net.Listener, makoTags []string) testExecutor {
Events: make(map[string]*timestamp.Timestamp),
}}

return executor
return executor, nil
}

func (ex *aggregatorExecutor) Run(ctx context.Context) {
// --- Configure mako

printf("Configuring Mako")
func (ag *Aggregator) Run(ctx context.Context) {
log.Printf("Configuring Mako")

// Use the benchmark key created
// TODO support to check benchmark key for dev or prod
client, err := mako.SetupWithBenchmarkConfig(ctx, &benchmarkKey, &benchmarkName, ex.makoTags...)
client, err := mako.SetupWithBenchmarkConfig(ctx, &ag.benchmarkKey, &ag.benchmarkName, ag.makoTags...)
if err != nil {
fatalf("Failed to setup mako: %v", err)
}
Expand All @@ -114,56 +128,55 @@ func (ex *aggregatorExecutor) Run(ctx context.Context) {
// Wrap fatalf in a helper or our sidecar will live forever.
fatalf = func(f string, args ...interface{}) {
client.ShutDownFunc(context.Background())
fatalf(f, args...)
log.Fatalf(f, args...)
}

// --- Run GRPC events receiver

printf("Starting events recorder server")
log.Printf("Starting events recorder server")

go func() {
if err := ex.server.Serve(ex.listener); err != nil {
if err := ag.server.Serve(ag.listener); err != nil {
fatalf("Failed to serve: %v", err)
}
}()
go func() {
<-ctx.Done()
printf("Terminating events recorder server")
ex.server.GracefulStop()
log.Printf("Terminating events recorder server")
ag.server.GracefulStop()
}()

printf("Expecting %d events records", expectRecords)
ex.waitForEvents()
printf("Received all expected events records")
// --- Wait for all records
log.Printf("Expecting %d events records", ag.expectRecords)
ag.waitForEvents()
log.Printf("Received all expected events records")

ex.server.GracefulStop()
ag.server.GracefulStop()

// --- Publish latencies
log.Printf("%-15s: %d", "Sent count", len(ag.sentEvents.Events))
log.Printf("%-15s: %d", "Accepted count", len(ag.acceptedEvents.Events))
log.Printf("%-15s: %d", "Failed count", len(ag.failedEvents.Events))
log.Printf("%-15s: %d", "Received count", len(ag.receivedEvents.Events))

printf("%-15s: %d", "Sent count", len(ex.sentEvents.Events))
printf("%-15s: %d", "Accepted count", len(ex.acceptedEvents.Events))
printf("%-15s: %d", "Failed count", len(ex.failedEvents.Events))
printf("%-15s: %d", "Received count", len(ex.receivedEvents.Events))

printf("Publishing latencies")
log.Printf("Publishing latencies")

// count errors
var publishErrorCount int
var deliverErrorCount int

for sentID := range ex.sentEvents.Events {
timestampSentProto := ex.sentEvents.Events[sentID]
for sentID := range ag.sentEvents.Events {
timestampSentProto := ag.sentEvents.Events[sentID]
timestampSent, _ := ptypes.Timestamp(timestampSentProto)

timestampAcceptedProto, accepted := ex.acceptedEvents.Events[sentID]
timestampAcceptedProto, accepted := ag.acceptedEvents.Events[sentID]
timestampAccepted, _ := ptypes.Timestamp(timestampAcceptedProto)

timestampReceivedProto, received := ex.receivedEvents.Events[sentID]
timestampReceivedProto, received := ag.receivedEvents.Events[sentID]
timestampReceived, _ := ptypes.Timestamp(timestampReceivedProto)

if !accepted {
errMsg := "Failed on broker"
if _, failed := ex.failedEvents.Events[sentID]; !failed {
if _, failed := ag.failedEvents.Events[sentID]; !failed {
errMsg = "Event not accepted but missing from failed map"
}

Expand Down Expand Up @@ -203,21 +216,21 @@ func (ex *aggregatorExecutor) Run(ctx context.Context) {

// --- Publish throughput

printf("Publishing throughputs")
log.Printf("Publishing throughputs")

sentTimestamps := eventsToTimestampsArray(&ex.sentEvents.Events)
sentTimestamps := eventsToTimestampsArray(&ag.sentEvents.Events)
err = publishThpt(sentTimestamps, client.Quickstore, "st")
if err != nil {
log.Printf("ERROR AddSamplePoint: %v", err)
}

receivedTimestamps := eventsToTimestampsArray(&ex.receivedEvents.Events)
receivedTimestamps := eventsToTimestampsArray(&ag.receivedEvents.Events)
err = publishThpt(receivedTimestamps, client.Quickstore, "dt")
if err != nil {
log.Printf("ERROR AddSamplePoint: %v", err)
}

failureTimestamps := eventsToTimestampsArray(&ex.failedEvents.Events)
failureTimestamps := eventsToTimestampsArray(&ag.failedEvents.Events)
if len(failureTimestamps) > 2 {
err = publishThpt(failureTimestamps, client.Quickstore, "ft")
if err != nil {
Expand All @@ -227,18 +240,18 @@ func (ex *aggregatorExecutor) Run(ctx context.Context) {

// --- Publish error counts as aggregate metrics

printf("Publishing aggregates")
log.Printf("Publishing aggregates")

client.Quickstore.AddRunAggregate("pe", float64(publishErrorCount))
client.Quickstore.AddRunAggregate("de", float64(deliverErrorCount))

printf("Store to mako")
log.Printf("Store to mako")

if out, err := client.Quickstore.Store(); err != nil {
fatalf("Failed to store data: %v\noutput: %v", err, out)
}

printf("Aggregation completed")
log.Printf("Aggregation completed")
}

func eventsToTimestampsArray(events *map[string]*timestamp.Timestamp) []time.Time {
Expand Down Expand Up @@ -267,16 +280,16 @@ func publishThpt(timestamps []time.Time, q *quickstore.Quickstore, metricName st
}

// waitForEvents blocks until the expected number of events records has been received.
func (ex *aggregatorExecutor) waitForEvents() {
for receivedRecords := uint(0); receivedRecords < expectRecords; receivedRecords++ {
<-ex.notifyEventsReceived
func (ag *Aggregator) waitForEvents() {
for receivedRecords := uint(0); receivedRecords < ag.expectRecords; receivedRecords++ {
<-ag.notifyEventsReceived
}
}

// RecordSentEvents implements event_state.EventsRecorder
func (ex *aggregatorExecutor) RecordEvents(_ context.Context, in *pb.EventsRecordList) (*pb.RecordReply, error) {
func (ag *Aggregator) RecordEvents(_ context.Context, in *pb.EventsRecordList) (*pb.RecordReply, error) {
defer func() {
ex.notifyEventsReceived <- struct{}{}
ag.notifyEventsReceived <- struct{}{}
}()

for _, recIn := range in.Items {
Expand All @@ -286,19 +299,19 @@ func (ex *aggregatorExecutor) RecordEvents(_ context.Context, in *pb.EventsRecor

switch recType {
case pb.EventsRecord_SENT:
rec = ex.sentEvents
rec = ag.sentEvents
case pb.EventsRecord_ACCEPTED:
rec = ex.acceptedEvents
rec = ag.acceptedEvents
case pb.EventsRecord_FAILED:
rec = ex.failedEvents
rec = ag.failedEvents
case pb.EventsRecord_RECEIVED:
rec = ex.receivedEvents
rec = ag.receivedEvents
default:
printf("Ignoring events record of type %s", recType)
log.Printf("Ignoring events record of type %s", recType)
continue
}

printf("-> Recording %d %s events", uint64(len(recIn.Events)), recType)
log.Printf("-> Recording %d %s events", uint64(len(recIn.Events)), recType)

func() {
rec.Lock()
Expand Down
29 changes: 29 additions & 0 deletions test/common/performance/common/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import "time"

const (
WarmupEventType = "warmup.perf-test"
MeasureEventType = "measure.perf-test"
GCEventType = "gc.perf-test"
EndEventType = "end.perf-test"
CEReceiverPort = "8080"
WaitForFlush = 1 * time.Second
WaitForReceiverGC = 1 * time.Second
)
Loading