Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ required = [

[[constraint]]
name = "github.com/cloudevents/sdk-go"
version = "=0.7.0"
branch = "master"

# needed because pkg upgraded
[[override]]
Expand Down
2 changes: 1 addition & 1 deletion cmd/sendevent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func main() {
os.Exit(1)
}

if resp, err := c.Send(context.Background(), event); err != nil {
if _, resp, err := c.Send(context.Background(), event); err != nil {
fmt.Printf("Failed to send event to %s: %s\n", target, err)
os.Exit(1)
} else if resp != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/adapter/apiserver/ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (a *ref) Add(obj interface{}) error {
return err
}

if _, err := a.ce.Send(context.Background(), *event); err != nil {
if _, _, err := a.ce.Send(context.Background(), *event); err != nil {
a.logger.Info("event delivery failed", zap.Error(err))
return err
}
Expand All @@ -82,7 +82,7 @@ func (a *ref) Update(obj interface{}) error {
return err
}

if _, err := a.ce.Send(context.Background(), *event); err != nil {
if _, _, err := a.ce.Send(context.Background(), *event); err != nil {
a.logger.Info("event delivery failed", zap.Error(err))
return err
}
Expand All @@ -97,7 +97,7 @@ func (a *ref) Delete(obj interface{}) error {
return err
}

if _, err := a.ce.Send(context.Background(), *event); err != nil {
if _, _, err := a.ce.Send(context.Background(), *event); err != nil {
a.logger.Info("event delivery failed", zap.Error(err))
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/adapter/apiserver/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (a *resource) Add(obj interface{}) error {
return err
}

if _, err := a.ce.Send(context.Background(), *event); err != nil {
if _, _, err := a.ce.Send(context.Background(), *event); err != nil {
a.logger.Info("event delivery failed", zap.Error(err))
return err
}
Expand All @@ -56,7 +56,7 @@ func (a *resource) Update(obj interface{}) error {
return err
}

if _, err := a.ce.Send(context.Background(), *event); err != nil {
if _, _, err := a.ce.Send(context.Background(), *event); err != nil {
a.logger.Info("event delivery failed", zap.Error(err))
return err
}
Expand All @@ -71,7 +71,7 @@ func (a *resource) Delete(obj interface{}) error {
return err
}

if _, err := a.ce.Send(context.Background(), *event); err != nil {
if _, _, err := a.ce.Send(context.Background(), *event); err != nil {
a.logger.Info("event delivery failed", zap.Error(err))
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/adapter/cronjobevents/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (a *Adapter) cronTick() {
event.SetSource(sourcesv1alpha1.CronJobEventSource(a.Namespace, a.Name))
event.SetData(message(a.Data))

if _, err := a.client.Send(context.TODO(), event); err != nil {
if _, _, err := a.client.Send(context.TODO(), event); err != nil {
logger.Error("failed to send cloudevent", err)
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
if subscriberURIString == "" {
err = errors.New("unable to read subscriberURI")
// Record the event count.
r.reporter.ReportEventCount(reportArgs, err)
r.reporter.ReportEventCount(reportArgs, http.StatusNotFound)
return nil, err
}
// We could just send the request to this URI regardless, but let's just check to see if it well
Expand All @@ -222,7 +222,7 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
if err != nil {
r.logger.Error("Unable to parse subscriberURI", zap.Error(err), zap.String("subscriberURIString", subscriberURIString))
// Record the event count.
r.reporter.ReportEventCount(reportArgs, err)
r.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError)
return nil, err
}

Expand All @@ -232,7 +232,7 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
if filterResult == failFilter {
r.logger.Debug("Event did not pass filter", zap.Any("triggerRef", trigger))
// Record the event count.
r.reporter.ReportEventCount(reportArgs, errors.New("event did not pass filter"))
r.reporter.ReportEventCount(reportArgs, http.StatusExpectationFailed)
return nil, nil
}

Expand All @@ -242,18 +242,18 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTimeStr); extErr == nil {
arrivalTime, err := time.Parse(time.RFC3339, arrivalTimeStr)
if err != nil {
r.reporter.ReportEventProcessingTime(reportArgs, err, time.Since(arrivalTime))
r.reporter.ReportEventProcessingTime(reportArgs, time.Since(arrivalTime))
}
}

start := time.Now()
sendingCTX := broker.SendingContext(ctx, tctx, subscriberURI)
// TODO use HTTP codes: https://github.com/cloudevents/sdk-go/pull/177
replyEvent, err := r.ceClient.Send(sendingCTX, *event)
rctx, replyEvent, err := r.ceClient.Send(sendingCTX, *event)
rtctx := cloudevents.HTTPTransportContextFrom(rctx)
Comment thread
nachocano marked this conversation as resolved.
// Record the dispatch time.
r.reporter.ReportEventDispatchTime(reportArgs, err, time.Since(start))
r.reporter.ReportEventDispatchTime(reportArgs, rtctx.StatusCode, time.Since(start))
// Record the event count.
r.reporter.ReportEventCount(reportArgs, err)
r.reporter.ReportEventCount(reportArgs, rtctx.StatusCode)
return replyEvent, err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"

Expand Down Expand Up @@ -350,15 +350,15 @@ func TestReceiver(t *testing.T) {

type mockReporter struct{}

func (r *mockReporter) ReportEventCount(args *ReportArgs, err error) error {
func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error {
return nil
}

func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error {
func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
return nil
}

func (r *mockReporter) ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error {
func (r *mockReporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) error {
return nil
}

Expand Down Expand Up @@ -392,7 +392,7 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
}

c := &cehttp.CodecV03{}
m, err := c.Encode(*h.returnedEvent)
m, err := c.Encode(context.Background(), *h.returnedEvent)
if err != nil {
h.t.Fatalf("Could not encode message: %v", err)
}
Expand Down
64 changes: 42 additions & 22 deletions pkg/broker/filter/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package filter

import (
"context"
"strconv"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
utils "knative.dev/eventing/pkg/broker"
. "knative.dev/eventing/pkg/metrics/metricskey"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/metrics"
"knative.dev/pkg/metrics/metricskey"
)
Expand Down Expand Up @@ -65,9 +66,9 @@ type ReportArgs struct {

// StatsReporter defines the interface for sending filter metrics.
type StatsReporter interface {
ReportEventCount(args *ReportArgs, err error) error
ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error
ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error
ReportEventCount(args *ReportArgs, responseCode int) error
ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error
ReportEventProcessingTime(args *ReportArgs, d time.Duration) error
}

var _ StatsReporter = (*reporter)(nil)
Expand All @@ -79,7 +80,8 @@ type reporter struct {
brokerTagKey tag.Key
triggerFilterTypeKey tag.Key
triggerFilterSourceKey tag.Key
resultKey tag.Key
responseCodeKey tag.Key
responseCodeClassKey tag.Key
filterResultKey tag.Key
}

Expand Down Expand Up @@ -118,31 +120,36 @@ func NewStatsReporter() (StatsReporter, error) {
return nil, err
}
r.filterResultKey = filterResultTag
resultTag, err := tag.NewKey(LabelResult)
responseCodeTag, err := tag.NewKey(LabelResponseCode)
if err != nil {
return nil, err
}
r.resultKey = resultTag
r.responseCodeKey = responseCodeTag
responseCodeClassTag, err := tag.NewKey(LabelResponseCodeClass)
if err != nil {
return nil, err
}
r.responseCodeClassKey = responseCodeClassTag

// Create view to see our measurements.
err = view.Register(
&view.View{
Description: eventCountM.Description(),
Measure: eventCountM,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey},
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.responseCodeKey, r.responseCodeClassKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey},
Aggregation: view.Distribution(metrics.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.responseCodeKey, r.responseCodeClassKey},
},
&view.View{
Description: processingTimeInMsecM.Description(),
Measure: processingTimeInMsecM,
Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey},
Aggregation: view.Distribution(metrics.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey},
},
)
if err != nil {
Expand All @@ -153,8 +160,10 @@ func NewStatsReporter() (StatsReporter, error) {
}

// ReportEventCount captures the event count.
func (r *reporter) ReportEventCount(args *ReportArgs, err error) error {
ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err)))
func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error {
ctx, err := r.generateTag(args,
tag.Insert(r.responseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(r.responseCodeClassKey, utils.ResponseCodeClass(responseCode)))
if err != nil {
return err
}
Expand All @@ -163,8 +172,10 @@ func (r *reporter) ReportEventCount(args *ReportArgs, err error) error {
}

// ReportEventDispatchTime captures dispatch times.
func (r *reporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error {
ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err)))
func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
ctx, err := r.generateTag(args,
tag.Insert(r.responseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(r.responseCodeClassKey, utils.ResponseCodeClass(responseCode)))
if err != nil {
return err
}
Expand All @@ -174,8 +185,8 @@ func (r *reporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.D
}

// ReportEventProcessingTime captures event processing times.
func (r *reporter) ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error {
ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err)))
func (r *reporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) error {
ctx, err := r.generateTag(args)
if err != nil {
return err
}
Expand All @@ -185,16 +196,25 @@ func (r *reporter) ReportEventProcessingTime(args *ReportArgs, err error, d time
return nil
}

func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context, error) {
func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.Context, error) {
// Note that filterType and filterSource can be empty strings, so they need a special treatment.
return tag.New(
ctx, err := tag.New(
context.Background(),
tag.Insert(r.namespaceTagKey, args.ns),
tag.Insert(r.triggerTagKey, args.trigger),
tag.Insert(r.brokerTagKey, args.broker),
tag.Insert(r.triggerFilterTypeKey, valueOrAny(args.filterType)),
Comment thread
nachocano marked this conversation as resolved.
tag.Insert(r.triggerFilterSourceKey, valueOrAny(args.filterSource)),
t)
tag.Insert(r.triggerFilterSourceKey, valueOrAny(args.filterSource)))
if err != nil {
return nil, err
}
for _, t := range tags {
ctx, err = tag.New(ctx, t)
if err != nil {
return nil, err
}
}
return ctx, err
}

func valueOrAny(v string) string {
Expand Down
Loading