Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ func main() {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

reporter, err := filter.NewStatsReporter()
if err != nil {
logger.Fatal("Error creating stats reporter", zap.Error(err))
}

// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
handler, err := filter.NewHandler(logger, mgr.GetClient())
handler, err := filter.NewHandler(logger, mgr.GetClient(), reporter)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,18 @@ func main() {
if err != nil {
logger.Fatal("Unable to create CE client", zap.Error(err))
}
reporter, err := ingress.NewStatsReporter()
if err != nil {
logger.Fatal("Unable to create StatsReporter", zap.Error(err))
}

h := &ingress.Handler{
Logger: logger,
CeClient: ceClient,
ChannelURI: channelURI,
BrokerName: env.Broker,
Namespace: env.Namespace,
Reporter: reporter,
}

// Run the event handler with the manager.
Expand Down
135 changes: 75 additions & 60 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,39 @@ import (
"sync/atomic"
"time"

"knative.dev/eventing/pkg/logging"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/logging"
"knative.dev/eventing/pkg/reconciler/trigger/path"
"knative.dev/pkg/tracing"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
writeTimeout = 1 * time.Minute

passFilter FilterResult = "pass"
failFilter FilterResult = "fail"
noFilter FilterResult = "no_filter"
)

// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
type Handler struct {
logger *zap.Logger
client client.Client
ceClient cloudevents.Client
reporter StatsReporter
}

// FilterResult has the result of the filtering operation.
type FilterResult string

// NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for
// Start()ing the returned Handler.
func NewHandler(logger *zap.Logger, client client.Client) (*Handler, error) {
func NewHandler(logger *zap.Logger, client client.Client, reporter StatsReporter) (*Handler, error) {
httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(tracing.HTTPSpanMiddleware))
if err != nil {
return nil, err
Expand Down Expand Up @@ -81,6 +86,7 @@ func NewHandler(logger *zap.Logger, client client.Client) (*Handler, error) {
logger: logger,
client: client,
ceClient: ceClient,
reporter: reporter,
}
err = r.initClient()
if err != nil {
Expand Down Expand Up @@ -201,52 +207,56 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
return nil, err
}

// Set up the metrics context
ctx, _ = tag.New(ctx,
tag.Insert(TagTrigger, trigger.String()),
tag.Insert(TagBroker, fmt.Sprintf("%s/%s", trigger.Namespace, t.Spec.Broker)),
)
// Record event count and filtering time
startTS := time.Now()
defer func() {
var deliveryTime time.Time
now := time.Now()
dispatchTimeMS := int64(now.Sub(startTS) / time.Millisecond)
stats.Record(ctx, MeasureTriggerDispatchTime.M(dispatchTimeMS))
stats.Record(ctx, MeasureTriggerEventsTotal.M(1))
if err := event.ExtensionAs(broker.TimeInFlightMetadataName, &deliveryTime); err != nil {
return
}
timeInFlightMS := int64(now.Sub(deliveryTime) / time.Millisecond)
stats.Record(ctx, MeasureDeliveryTime.M(timeInFlightMS))
}()
reportArgs := &ReportArgs{
ns: t.Namespace,
trigger: t.Name,
broker: t.Spec.Broker,
eventType: triggerFilterAttribute(t.Spec.Filter, "type"),
eventSource: triggerFilterAttribute(t.Spec.Filter, "source"),
}

subscriberURIString := t.Status.SubscriberURI
if subscriberURIString == "" {
ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "error"))
return nil, errors.New("unable to read subscriberURI")
err = errors.New("unable to read subscriberURI")
// Record the event count.
r.reporter.ReportEventCount(reportArgs, err)
return nil, err
}
// We could just send the request to this URI regardless, but let's just check to see if it well
// formed first, that way we can generate better error message if it isn't.
subscriberURI, err := url.Parse(subscriberURIString)
if err != nil {
r.logger.Error("Unable to parse subscriberURI", zap.Error(err), zap.String("subscriberURIString", subscriberURIString))
ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "error"))
// Record the event count.
r.reporter.ReportEventCount(reportArgs, err)
return nil, err
}

if !r.shouldSendMessage(ctx, &t.Spec, event) {
r.logger.Debug("Message did not pass filter", zap.Any("triggerRef", trigger))
ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "drop"))
// Check if the event should be sent, and record filtering time.
start := time.Now()
filterResult := r.shouldSendEvent(ctx, &t.Spec, event)
r.reporter.ReportFilterTime(reportArgs, filterResult, time.Since(start))

if filterResult == failFilter {
r.logger.Debug("Event did not pass filter", zap.Any("triggerRef", trigger))
// Record the event count.
r.reporter.ReportEventCount(reportArgs, errors.New("event did not pass filter"))
return nil, nil
}

start = time.Now()
sendingCTX := broker.SendingContext(ctx, tctx, subscriberURI)
// TODO get HTTP status codes and use those.
replyEvent, err := r.ceClient.Send(sendingCTX, *event)
if err == nil {
ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "accept"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "error"))
// Record the dispatch time.
r.reporter.ReportDispatchTime(reportArgs, err, time.Since(start))
// Record the event count.
r.reporter.ReportEventCount(reportArgs, err)
// Record the event latency. This might be off if the receiver and the filter pods are running in
// different nodes with different clocks.
var arrivalTime time.Time
if extErr := event.ExtensionAs(broker.EventArrivalTime, &arrivalTime); extErr == nil {
r.reporter.ReportEventDeliveryTime(reportArgs, err, time.Since(arrivalTime))
}
return replyEvent, err
}
Expand All @@ -263,28 +273,13 @@ func (r *Handler) getTrigger(ctx context.Context, ref path.NamespacedNameUID) (*
return t, nil
}

// shouldSendMessage determines whether message 'm' should be sent based on the triggerSpec 'ts'.
// Currently it supports exact matching on event context attributes.
// If no filter is present, shouldSendMessage returns false.
// If no filter strategy is present, shouldSendMessage returns true.
func (r *Handler) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {
if ts.Filter == nil {
r.logger.Error("No filter specified")
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail"))
return false
}

// Record event count and filtering time.
startTS := time.Now()
defer func() {
filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond)
stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS))
}()

// shouldSendEvent determines whether event 'event' should be sent based on the triggerSpec 'ts'.
// Currently it supports exact matching on event context attributes and extension attributes.
// If no filter is present, shouldSendEvent returns passFilter.
func (r *Handler) shouldSendEvent(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) FilterResult {
// No filter specified, default to passing everything.
if ts.Filter.DeprecatedSourceAndType == nil && ts.Filter.Attributes == nil {
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-pass"))
return true
if ts.Filter == nil || (ts.Filter.DeprecatedSourceAndType == nil && ts.Filter.Attributes == nil) {
return noFilter
}

attrs := map[string]string{}
Expand All @@ -298,12 +293,11 @@ func (r *Handler) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.Tr
}

result := r.filterEventByAttributes(ctx, attrs, event)
filterResult := failFilter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if r.filterEventByAttributes(ctx, attrs, event) {
    return passFilter
}
return filterResult

Even better is to have filterEventByAttributes return the filterResult.

if result {
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))
filterResult = passFilter
}
return result
return filterResult
}

func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool {
Expand Down Expand Up @@ -345,3 +339,24 @@ func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]
}
return true
}

// triggerFilterAttribute returns the filter attribute value for a given `attributeName`. If it doesn't not exist,
// returns the any value filter.
func triggerFilterAttribute(filter *eventingv1alpha1.TriggerFilter, attributeName string) string {
attributeValue := eventingv1alpha1.TriggerAnyFilter
if filter != nil {
if filter.DeprecatedSourceAndType != nil {
if attributeName == "type" {
attributeValue = filter.DeprecatedSourceAndType.Type
} else if attributeName == "source" {
attributeValue = filter.DeprecatedSourceAndType.Source
}
} else if filter.Attributes != nil {
attrs := map[string]string(*filter.Attributes)
if v, ok := attrs[attributeName]; ok {
attributeValue = v
}
}
}
return attributeValue
}
23 changes: 22 additions & 1 deletion pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/url"
"strings"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
Expand Down Expand Up @@ -147,6 +148,7 @@ func TestReceiver(t *testing.T) {
triggers: []*eventingv1alpha1.Trigger{
makeTriggerWithoutFilter(),
},
expectedDispatch: true,
},
"No TTL": {
triggers: []*eventingv1alpha1.Trigger{
Expand Down Expand Up @@ -293,7 +295,8 @@ func TestReceiver(t *testing.T) {

r, err := NewHandler(
zap.NewNop(),
getClient(correctURI, tc.mocks))
getClient(correctURI, tc.mocks),
&mockReporter{})
if tc.expectNewToFail {
if err == nil {
t.Fatal("Expected New to fail, it didn't")
Expand Down Expand Up @@ -354,6 +357,24 @@ func TestReceiver(t *testing.T) {
}
}

type mockReporter struct{}

func (r *mockReporter) ReportEventCount(args *ReportArgs, err error) error {
return nil
}

func (r *mockReporter) ReportDispatchTime(args *ReportArgs, err error, d time.Duration) error {
return nil
}

func (r *mockReporter) ReportFilterTime(args *ReportArgs, filterResult FilterResult, d time.Duration) error {
return nil
}

func (r *mockReporter) ReportEventDeliveryTime(args *ReportArgs, err error, d time.Duration) error {
return nil
}

type fakeHandler struct {
failRequest bool
requestReceived bool
Expand Down
Loading