Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/tracing"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -78,19 +78,19 @@ func main() {

// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
receiver, err := broker.New(logger, mgr.GetClient())
handler, err := filter.NewHandler(logger, mgr.GetClient())
if err != nil {
logger.Fatal("Error creating Receiver", zap.Error(err))
logger.Fatal("Error creating Handler", zap.Error(err))
}
err = mgr.Add(receiver)
err = mgr.Add(handler)
if err != nil {
logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("broker_receiver", receiver))
logger.Fatal("Unable to start the handler", zap.Error(err), zap.Any("broker_filter", handler))
}

// TODO watch logging config map.

// Watch the observability config map and dynamically update metrics exporter.
configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_receiver", logger.Sugar()))
configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_filter", logger.Sugar()))

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()
Expand Down
6 changes: 4 additions & 2 deletions pkg/broker/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
func SendingContext(ctx context.Context, tctx cloudevents.HTTPTransportContext, targetURI *url.URL) context.Context {
sendingCTX := cloudevents.ContextWithTarget(ctx, targetURI.String())

h := extractPassThroughHeaders(tctx)
h := ExtractPassThroughHeaders(tctx)
for n, v := range h {
for _, iv := range v {
sendingCTX = cloudevents.ContextWithHeader(sendingCTX, n, iv)
Expand All @@ -60,7 +60,9 @@ func SendingContext(ctx context.Context, tctx cloudevents.HTTPTransportContext,
return sendingCTX
}

func extractPassThroughHeaders(tctx cloudevents.HTTPTransportContext) http.Header {
// ExtractPassThroughHeaders extracts the headers that are in the `forwardHeaders` set
// or has any of the prefixes in `forwardPrefixes`.
func ExtractPassThroughHeaders(tctx cloudevents.HTTPTransportContext) http.Header {
Comment thread
nachocano marked this conversation as resolved.
h := http.Header{}

for n, v := range tctx.Header {
Expand Down
45 changes: 21 additions & 24 deletions pkg/broker/receiver.go → pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package broker
package filter

import (
"context"
Expand All @@ -33,29 +33,26 @@ import (
"go.opencensus.io/tag"
"go.uber.org/zap"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/reconciler/trigger/path"
"knative.dev/pkg/tracing"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
writeTimeout = 1 * time.Minute
// TimeInFlightMetadataName is used to access the metadata stored on a
// CloudEvent to measure the time difference between when an event is
// received and when it is dispatched to the trigger function.
TimeInFlightMetadataName = "kn00timeinflight"
)

// Receiver parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
type Receiver struct {
// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
type Handler struct {
logger *zap.Logger
client client.Client
ceClient cloudevents.Client
}

// New creates a new Receiver and its associated MessageReceiver. The caller is responsible for
// Start()ing the returned MessageReceiver.
func New(logger *zap.Logger, client client.Client) (*Receiver, error) {
// NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for
// Start()ing the returned Handler.
func NewHandler(logger *zap.Logger, client client.Client) (*Handler, error) {
httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(tracing.HTTPSpanMiddleware))
if err != nil {
return nil, err
Expand All @@ -80,7 +77,7 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, error) {
return nil, err
}

r := &Receiver{
r := &Handler{
logger: logger,
client: client,
ceClient: ceClient,
Expand All @@ -95,7 +92,7 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, error) {
}

// Initialize the client. Mainly intended to load stuff in its cache.
func (r *Receiver) initClient() error {
func (r *Handler) initClient() error {
// We list triggers so that we do not drop messages. Otherwise, on receiving an event, it
// may not find the Trigger and would return an error.
opts := &client.ListOptions{}
Expand All @@ -106,14 +103,14 @@ func (r *Receiver) initClient() error {
return nil
}

// Start begins to receive messages for the receiver.
// Start begins to receive messages for the handler.
//
// Only HTTP POST requests to the root path (/) are accepted. If other paths or
// methods are needed, use the HandleRequest method directly with another HTTP
// server.
//
// This method will block until a message is received on the stop channel.
func (r *Receiver) Start(stopCh <-chan struct{}) error {
func (r *Handler) Start(stopCh <-chan struct{}) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -141,7 +138,7 @@ func (r *Receiver) Start(stopCh <-chan struct{}) error {
}
}

func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
func (r *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
tctx := cloudevents.HTTPTransportContextFrom(ctx)
if tctx.Method != http.MethodPost {
resp.Status = http.StatusMethodNotAllowed
Expand All @@ -157,7 +154,7 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp

// Remove the TTL attribute that is used by the Broker.
originalV3 := event.Context.AsV03()
ttl, ttlKey := GetTTL(event.Context)
ttl, ttlKey := broker.GetTTL(event.Context)
if ttl == nil {
// Only messages sent by the Broker should be here. If the attribute isn't here, then the
// event wasn't sent by the Broker, so we can drop it.
Expand All @@ -184,20 +181,20 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp
}

// Reattach the TTL (with the same value) to the response event before sending it to the Broker.
responseEvent.Context, err = SetTTL(responseEvent.Context, ttl)
responseEvent.Context, err = broker.SetTTL(responseEvent.Context, ttl)
if err != nil {
return err
}
resp.Event = responseEvent
resp.Context = &cloudevents.HTTPTransportResponseContext{
Header: extractPassThroughHeaders(tctx),
Header: broker.ExtractPassThroughHeaders(tctx),
}

return nil
}

// sendEvent sends an event to a subscriber if the trigger filter passes.
func (r *Receiver) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportContext, trigger path.NamespacedNameUID, event *cloudevents.Event) (*cloudevents.Event, error) {
func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportContext, trigger path.NamespacedNameUID, event *cloudevents.Event) (*cloudevents.Event, error) {
t, err := r.getTrigger(ctx, trigger)
if err != nil {
r.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", trigger))
Expand All @@ -217,7 +214,7 @@ func (r *Receiver) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransport
dispatchTimeMS := int64(now.Sub(startTS) / time.Millisecond)
stats.Record(ctx, MeasureTriggerDispatchTime.M(dispatchTimeMS))
stats.Record(ctx, MeasureTriggerEventsTotal.M(1))
if err := event.ExtensionAs(TimeInFlightMetadataName, &deliveryTime); err != nil {
if err := event.ExtensionAs(broker.TimeInFlightMetadataName, &deliveryTime); err != nil {
return
}
timeInFlightMS := int64(now.Sub(deliveryTime) / time.Millisecond)
Expand All @@ -244,7 +241,7 @@ func (r *Receiver) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransport
return nil, nil
}

sendingCTX := SendingContext(ctx, tctx, subscriberURI)
sendingCTX := broker.SendingContext(ctx, tctx, subscriberURI)
replyEvent, err := r.ceClient.Send(sendingCTX, *event)
if err == nil {
ctx, _ = tag.New(ctx, tag.Upsert(TagResult, "accept"))
Expand All @@ -254,7 +251,7 @@ func (r *Receiver) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransport
return replyEvent, err
}

func (r *Receiver) getTrigger(ctx context.Context, ref path.NamespacedNameUID) (*eventingv1alpha1.Trigger, error) {
func (r *Handler) getTrigger(ctx context.Context, ref path.NamespacedNameUID) (*eventingv1alpha1.Trigger, error) {
t := &eventingv1alpha1.Trigger{}
err := r.client.Get(ctx, ref.NamespacedName, t)
if err != nil {
Expand All @@ -270,7 +267,7 @@ func (r *Receiver) getTrigger(ctx context.Context, ref path.NamespacedNameUID) (
// Currently it supports exact matching on event context attributes.
// If no filter is present, shouldSendMessage returns false.
// If no filter strategy is present, shouldSendMessage returns true.
func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {
func (r *Handler) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {
if ts.Filter == nil {
r.logger.Error("No filter specified")
ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail"))
Expand Down Expand Up @@ -309,7 +306,7 @@ func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.T
return result
}

func (r *Receiver) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool {
func (r *Handler) filterEventByAttributes(ctx context.Context, attrs map[string]string, event *cloudevents.Event) bool {
// Set standard context attributes. The attributes available may not be
// exactly the same as the attributes defined in the current version of the
// CloudEvents spec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package broker
package filter

import (
"context"
Expand All @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/broker"
controllertesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/eventing/pkg/utils"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -290,7 +291,7 @@ func TestReceiver(t *testing.T) {
correctURI = append(correctURI, trig)
}

r, err := New(
r, err := NewHandler(
zap.NewNop(),
getClient(correctURI, tc.mocks))
if tc.expectNewToFail {
Expand Down Expand Up @@ -365,7 +366,7 @@ func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
h.requestReceived = true

for n, v := range h.headers {
if strings.Contains(strings.ToLower(n), strings.ToLower(v03TTLAttribute)) {
if strings.Contains(strings.ToLower(n), strings.ToLower(broker.V03TTLAttribute)) {
h.t.Errorf("Broker TTL should not be seen by the subscriber: %s", n)
}
if diff := cmp.Diff(v, req.Header[n]); diff != "" {
Expand Down Expand Up @@ -494,7 +495,7 @@ func makeEvent() *cloudevents.Event {
}

func addTTLToEvent(e cloudevents.Event) cloudevents.Event {
e.Context, _ = SetTTL(e.Context, 1)
e.Context, _ = broker.SetTTL(e.Context, 1)
return e
}

Expand Down
112 changes: 112 additions & 0 deletions pkg/broker/filter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2019 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package filter

import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
utils "knative.dev/eventing/pkg/broker"
)

var (
// MeasureTriggerEventsTotal is a counter which records the number of events received
// by a Trigger.
MeasureTriggerEventsTotal = stats.Int64(
"knative.dev/eventing/trigger/measures/events_total",
"Total number of events received by a Trigger",
stats.UnitNone,
)

// MeasureTriggerDispatchTime records the time spent dispatching an event for
// a Trigger, in milliseconds.
MeasureTriggerDispatchTime = stats.Int64(
"knative.dev/eventing/trigger/measures/dispatch_time",
"Time spent dispatching an event to a Trigger",
stats.UnitMilliseconds,
)

// MeasureTriggerFilterTime records the time spent filtering a message for a
// Trigger, in milliseconds.
MeasureTriggerFilterTime = stats.Int64(
"knative.dev/eventing/trigger/measures/filter_time",
"Time spent filtering a message for a Trigger",
stats.UnitMilliseconds,
)

// MeasureDeliveryTime records the time spent between arrival at ingress
// and delivery to the trigger subscriber.
MeasureDeliveryTime = stats.Int64(
"knative.dev/eventing/trigger/measures/delivery_time",
"Time between an event arriving at ingress and delivery to the trigger subscriber",
stats.UnitMilliseconds,
)

// Tag keys must conform to the restrictions described in
// go.opencensus.io/tag/validate.go. Currently those restrictions are:
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII

// TagResult is a tag key referring to the observed result of an operation.
TagResult = utils.MustNewTagKey("result")

// TagFilterResult is a tag key referring to the observed result of a filter
// operation.
TagFilterResult = utils.MustNewTagKey("filter_result")

// TagBroker is a tag key referring to the Broker name serviced by this
// filter process.
TagBroker = utils.MustNewTagKey("broker")

// TagTrigger is a tag key referring to the Trigger name serviced by this
// filter process.
TagTrigger = utils.MustNewTagKey("trigger")
)

func init() {
// Create views for exporting measurements. This returns an error if a
// previously registered view has the same name with a different value.
err := view.Register(
&view.View{
Name: "trigger_events_total",
Measure: MeasureTriggerEventsTotal,
Aggregation: view.Count(),
TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger},
},
&view.View{
Name: "trigger_dispatch_time",
Measure: MeasureTriggerDispatchTime,
Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100,
TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger},
},
&view.View{
Name: "trigger_filter_time",
Measure: MeasureTriggerFilterTime,
Aggregation: view.Distribution(utils.Buckets125(0.1, 10)...), // 0.1, 0.2, 0.5, 1, 2, 5, 10
TagKeys: []tag.Key{TagResult, TagFilterResult, TagBroker, TagTrigger},
},
&view.View{
Name: "broker_to_function_delivery_time",
Measure: MeasureDeliveryTime,
Aggregation: view.Distribution(utils.Buckets125(1, 100)...), // 1, 2, 5, 10, 20, 50, 100
TagKeys: []tag.Key{TagResult, TagBroker, TagTrigger},
},
)
if err != nil {
panic(err)
}
}
Loading