Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func main() {

// TODO watch logging config map.

// TODO change the component name to trigger once Stackdriver metrics are approved.
// Watch the observability config map and dynamically update metrics exporter.
cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_filter", logger.Sugar()))

Expand Down
1 change: 1 addition & 0 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func main() {

// TODO watch logging config map.

// TODO change the component name to broker once Stackdriver metrics are approved.
// Watch the observability config map and dynamically update metrics exporter.
cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", logger.Sugar()))

Expand Down
51 changes: 24 additions & 27 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
}

reportArgs := &ReportArgs{
ns: t.Namespace,
trigger: t.Name,
broker: t.Spec.Broker,
eventType: triggerFilterAttribute(t.Spec.Filter, "type"),
eventSource: triggerFilterAttribute(t.Spec.Filter, "source"),
ns: t.Namespace,
trigger: t.Name,
broker: t.Spec.Broker,
filterType: triggerFilterAttribute(t.Spec.Filter, "type"),
filterSource: triggerFilterAttribute(t.Spec.Filter, "source"),
}

subscriberURIString := t.Status.SubscriberURI
Expand All @@ -226,10 +226,8 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
return nil, err
}

// Check if the event should be sent, and record filtering time.
start := time.Now()
// Check if the event should be sent.
filterResult := r.shouldSendEvent(ctx, &t.Spec, event)
r.reporter.ReportFilterTime(reportArgs, filterResult, time.Since(start))

if filterResult == failFilter {
r.logger.Debug("Event did not pass filter", zap.Any("triggerRef", trigger))
Expand All @@ -238,20 +236,24 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
return nil, nil
}

start = time.Now()
// Record the event processing time. This might be off if the receiver and the filter pods are running in
// different nodes with different clocks.
var arrivalTimeStr string
if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTimeStr); extErr == nil {
arrivalTime, err := time.Parse(time.RFC3339, arrivalTimeStr)
if err != nil {
r.reporter.ReportEventProcessingTime(reportArgs, err, time.Since(arrivalTime))
}
}

start := time.Now()
sendingCTX := broker.SendingContext(ctx, tctx, subscriberURI)
// TODO get HTTP status codes and use those.
// TODO use HTTP codes: https://github.com/cloudevents/sdk-go/pull/177
replyEvent, err := r.ceClient.Send(sendingCTX, *event)
// Record the dispatch time.
r.reporter.ReportDispatchTime(reportArgs, err, time.Since(start))
r.reporter.ReportEventDispatchTime(reportArgs, err, time.Since(start))
// Record the event count.
r.reporter.ReportEventCount(reportArgs, err)
// Record the event latency. This might be off if the receiver and the filter pods are running in
// different nodes with different clocks.
var arrivalTime time.Time
if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTime); extErr == nil {
r.reporter.ReportEventDeliveryTime(reportArgs, err, time.Since(arrivalTime))
}
return replyEvent, err
}

Expand Down Expand Up @@ -285,15 +287,10 @@ func (r *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1alpha1.Trig
attrs = map[string]string(*ts.Filter.Attributes)
}

result := r.filterEventByAttributes(ctx, attrs, event)
filterResult := failFilter
if result {
filterResult = passFilter
}
return filterResult
return r.filterEventByAttributes(ctx, attrs, event)
}

func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool {
func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) FilterResult {
// Set standard context attributes. The attributes available may not be
// exactly the same as the attributes defined in the current version of the
// CloudEvents spec.
Expand Down Expand Up @@ -322,15 +319,15 @@ func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]
// If the attribute does not exist in the event, return false.
if !ok {
logging.FromContext(ctx).Debug("Attribute not found", zap.String("attribute", k))
return false
return failFilter
}
// If the attribute is not set to any and is different than the one from the event, return false.
if v != eventingv1alpha1.TriggerAnyFilter && v != value {
logging.FromContext(ctx).Debug("Attribute had non-matching value", zap.String("attribute", k), zap.String("filter", v), zap.Any("received", value))
return false
return failFilter
}
}
return true
return passFilter
}

// triggerFilterAttribute returns the filter attribute value for a given `attributeName`. If it doesn't not exist,
Expand Down
8 changes: 2 additions & 6 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,11 @@ func (r *mockReporter) ReportEventCount(args *ReportArgs, err error) error {
return nil
}

func (r *mockReporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error {
func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error {
return nil
}

func (r *mockReporter) ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error {
return nil
}

func (r *mockReporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error {
func (r *mockReporter) ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error {
return nil
}

Expand Down
116 changes: 45 additions & 71 deletions pkg/broker/filter/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
utils "knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/metrics/metricskey"
. "knative.dev/eventing/pkg/metrics/metricskey"
"knative.dev/pkg/metrics"
"knative.dev/pkg/metrics/metricskey"
)

var (
Expand All @@ -40,93 +41,84 @@ var (
// dispatchTimeInMsecM records the time spent dispatching an event to
// a Trigger subscriber, in milliseconds.
dispatchTimeInMsecM = stats.Float64(
"dispatch_latencies",
"event_dispatch_latencies",
"The time spent dispatching an event to a Trigger subscriber",
stats.UnitMilliseconds,
)

// filterTimeInMsecM records the time spent filtering an event for a
// Trigger, in milliseconds.
filterTimeInMsecM = stats.Float64(
"filter_latencies",
"The time spent filtering an event for a Trigger",
stats.UnitMilliseconds,
)

// deliveryTimeInMsecM records the time spent between arrival at the Broker
// and delivery to the Trigger subscriber.
deliveryTimeInMsecM = stats.Float64(
"event_latencies",
"The time spent routing an event from a Broker to a Trigger subscriber",
// processingTimeInMsecM records the time spent between arrival at the Broker
// and the delivery to the Trigger subscriber.
processingTimeInMsecM = stats.Float64(
"event_processing_latencies",
"The time spent processing an event before it is dispatched to a Trigger subscriber",
stats.UnitMilliseconds,
)
)

type ReportArgs struct {
ns string
trigger string
broker string
eventType string
eventSource string
ns string
trigger string
broker string
filterType string
filterSource string
}

// StatsReporter defines the interface for sending filter metrics.
type StatsReporter interface {
ReportEventCount(args *ReportArgs, err error) error
ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error
ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error
ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error
ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error
ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error
}

var _ StatsReporter = (*reporter)(nil)

// reporter holds cached metric objects to report filter metrics.
type reporter struct {
namespaceTagKey tag.Key
triggerTagKey tag.Key
brokerTagKey tag.Key
triggerTypeKey tag.Key
triggerSourceKey tag.Key
resultKey tag.Key
filterResultKey tag.Key
namespaceTagKey tag.Key
triggerTagKey tag.Key
brokerTagKey tag.Key
triggerFilterTypeKey tag.Key
triggerFilterSourceKey tag.Key
resultKey tag.Key
filterResultKey tag.Key
}

// NewStatsReporter creates a reporter that collects and reports filter metrics.
func NewStatsReporter() (StatsReporter, error) {
var r = &reporter{}

// Create the tag keys that will be used to add tags to our measurements.
nsTag, err := tag.NewKey(metricskey.NamespaceName)
nsTag, err := tag.NewKey(metricskey.LabelNamespaceName)
if err != nil {
return nil, err
}
r.namespaceTagKey = nsTag
triggerTag, err := tag.NewKey(metricskey.TriggerName)
triggerTag, err := tag.NewKey(metricskey.LabelTriggerName)
if err != nil {
return nil, err
}
r.triggerTagKey = triggerTag
brokerTag, err := tag.NewKey(metricskey.BrokerName)
brokerTag, err := tag.NewKey(metricskey.LabelBrokerName)
if err != nil {
return nil, err
}
r.brokerTagKey = brokerTag
triggerTypeTag, err := tag.NewKey(metricskey.TriggerType)
triggerFilterTypeTag, err := tag.NewKey(metricskey.LabelFilterType)
if err != nil {
return nil, err
}
r.triggerTypeKey = triggerTypeTag
triggerSourceKey, err := tag.NewKey(metricskey.TriggerSource)
r.triggerFilterTypeKey = triggerFilterTypeTag
triggerFilterSourceKey, err := tag.NewKey(metricskey.LabelFilterSource)
if err != nil {
return nil, err
}
r.triggerSourceKey = triggerSourceKey
filterResultTag, err := tag.NewKey(metricskey.FilterResult)
r.triggerFilterSourceKey = triggerFilterSourceKey
filterResultTag, err := tag.NewKey(LabelFilterResult)
if err != nil {
return nil, err
}
r.filterResultKey = filterResultTag
resultTag, err := tag.NewKey(metricskey.Result)
resultTag, err := tag.NewKey(LabelResult)
if err != nil {
return nil, err
}
Expand All @@ -137,27 +129,20 @@ func NewStatsReporter() (StatsReporter, error) {
&view.View{
Description: eventCountM.Description(),
Measure: eventCountM,
// TODO count or sum aggregation?
Aggregation: view.Count(),
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey},
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey},
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey},
},
&view.View{
Description: filterTimeInMsecM.Description(),
Measure: filterTimeInMsecM,
Aggregation: view.Distribution(utils.Buckets125(0.1, 10)...), // 0.1, 0.2, 0.5, 1, 2, 5, 10
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.filterResultKey},
},
&view.View{
Description: deliveryTimeInMsecM.Description(),
Measure: deliveryTimeInMsecM,
Description: processingTimeInMsecM.Description(),
Measure: processingTimeInMsecM,
Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerTypeKey, r.triggerSourceKey, r.resultKey},
TagKeys: []tag.Key{r.namespaceTagKey, r.triggerTagKey, r.brokerTagKey, r.triggerFilterTypeKey, r.triggerFilterSourceKey, r.resultKey},
},
)
if err != nil {
Expand All @@ -177,8 +162,8 @@ func (r *reporter) ReportEventCount(args *ReportArgs, err error) error {
return nil
}

// ReportDispatchTime captures dispatch times.
func (r *reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error {
// ReportEventDispatchTime captures dispatch times.
func (r *reporter) ReportEventDispatchTime(args *ReportArgs, err error, d time.Duration) error {
ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err)))
if err != nil {
return err
Expand All @@ -188,44 +173,33 @@ func (r *reporter) ReportDispatchTime(args *ReportArgs, err error, d time.Durati
return nil
}

// ReportFilterTime captures filtering times.
func (r *reporter) ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error {
ctx, err := r.generateTag(args, tag.Insert(r.filterResultKey, string(filterResult)))
if err != nil {
return err
}
// convert time.Duration in nanoseconds to milliseconds.
metrics.Record(ctx, filterTimeInMsecM.M(float64(d/time.Millisecond)))
return nil
}

// ReportEventDeliveryTime captures event delivery times.
func (r *reporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error {
// ReportEventProcessingTime captures event processing times.
func (r *reporter) ReportEventProcessingTime(args *ReportArgs, err error, d time.Duration) error {
ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err)))
if err != nil {
return err
}

// convert time.Duration in nanoseconds to milliseconds.
metrics.Record(ctx, deliveryTimeInMsecM.M(float64(d/time.Millisecond)))
metrics.Record(ctx, processingTimeInMsecM.M(float64(d/time.Millisecond)))
return nil
}

func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context, error) {
// Note that eventType and eventSource can be empty strings, so they need a special treatment.
// Note that filterType and filterSource can be empty strings, so they need a special treatment.
return tag.New(
context.Background(),
tag.Insert(r.namespaceTagKey, args.ns),
tag.Insert(r.triggerTagKey, args.trigger),
tag.Insert(r.brokerTagKey, args.broker),
tag.Insert(r.triggerTypeKey, valueOrAny(args.eventType)),
tag.Insert(r.triggerSourceKey, valueOrAny(args.eventSource)),
tag.Insert(r.triggerFilterTypeKey, valueOrAny(args.filterType)),
tag.Insert(r.triggerFilterSourceKey, valueOrAny(args.filterSource)),
t)
}

func valueOrAny(v string) string {
if v != "" {
return v
}
return metricskey.Any
return AnyValue
}
Loading