From 46c561863b75fb2ae0bb7d35a07fc310923d95e5 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Thu, 12 Jul 2018 12:24:02 -0700 Subject: [PATCH] Enable passing CloudEvents headers ('CE-*'), and clean up some `go vet` warnings --- pkg/buses/gcppubsub/bus.go | 86 ++++++++++++++++-------- pkg/buses/kafka/dispatcher/dispatcher.go | 2 +- pkg/buses/stub/main.go | 57 ++++++++++------ 3 files changed, 95 insertions(+), 50 deletions(-) diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go index 1f6e0eb057e..840e774bcb6 100644 --- a/pkg/buses/gcppubsub/bus.go +++ b/pkg/buses/gcppubsub/bus.go @@ -31,15 +31,20 @@ import ( "github.com/knative/eventing/pkg/buses" ) +// PubSubBus implements a Bus transport using GCP PubSub Topics and +// Subscriptions to persist events as messages. type PubSubBus struct { - name string - monitor *buses.Monitor - pubsubClient *pubsub.Client - client *http.Client - forwardHeaders []string - receivers map[string]context.CancelFunc + name string + monitor *buses.Monitor + pubsubClient *pubsub.Client + client *http.Client + forwardHeaders map[string]bool + forwardPrefixes []string + receivers map[string]context.CancelFunc } +// CreateTopic creates a GCP PubSub Topic based on the channel's name and the +// bus's project id. func (b *PubSubBus) CreateTopic(channel *channelsv1alpha1.Channel, parameters buses.ResolvedParameters) error { ctx := context.Background() @@ -62,6 +67,7 @@ func (b *PubSubBus) CreateTopic(channel *channelsv1alpha1.Channel, parameters bu return nil } +// DeleteTopic deletes the GCP PubSub topic associated with the specified chanel. func (b *PubSubBus) DeleteTopic(channel *channelsv1alpha1.Channel) error { ctx := context.Background() @@ -79,6 +85,10 @@ func (b *PubSubBus) DeleteTopic(channel *channelsv1alpha1.Channel) error { return topic.Delete(ctx) } +// CreateOrUpdateSubscription creates a Subscription in GCP PubSub connected to +// a Topic based on the Subscription's channel and the project stored in the +// Bus. The Subscription is a pull subscription used by this controller to +// deliver events. func (b *PubSubBus) CreateOrUpdateSubscription(sub *channelsv1alpha1.Subscription, parameters buses.ResolvedParameters) error { ctx := context.Background() @@ -109,6 +119,7 @@ func (b *PubSubBus) CreateOrUpdateSubscription(sub *channelsv1alpha1.Subscriptio return err } +// DeleteSubscription deletes a GCP PubSub subscription. func (b *PubSubBus) DeleteSubscription(sub *channelsv1alpha1.Subscription) error { ctx := context.Background() @@ -126,7 +137,7 @@ func (b *PubSubBus) DeleteSubscription(sub *channelsv1alpha1.Subscription) error return subscription.Delete(ctx) } -func (b *PubSubBus) SendEventToTopic(channel *channelsv1alpha1.Channel, data []byte, attributes map[string]string) error { +func (b *PubSubBus) sendEventToTopic(channel *channelsv1alpha1.Channel, data []byte, attributes map[string]string) error { ctx := context.Background() topicID := b.topicID(channel) @@ -148,6 +159,7 @@ func (b *PubSubBus) SendEventToTopic(channel *channelsv1alpha1.Channel, data []b return nil } +// ReceiveEvents extracts a single event from a message on a GCP Subscription. func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters buses.ResolvedParameters) error { ctx := context.Background() cctx, cancel := context.WithCancel(ctx) @@ -170,7 +182,7 @@ func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters go func() { glog.Infof("Start receiving events for subscription %q\n", subscriptionID) err := subscription.Receive(cctx, func(ctx context.Context, message *pubsub.Message) { - err := b.DispatchHTTPEvent(sub, message.Data, message.Attributes) + err := b.dispatchHTTPEvent(sub, message.Data, message.Attributes) if err != nil { glog.Warningf("Unable to dispatch event %q to %q", message.ID, sub.Spec.Subscriber) message.Nack() @@ -189,6 +201,7 @@ func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters return nil } +// StopReceiveEvents cancels a running ReceieveEvents polling operation. func (b *PubSubBus) StopReceiveEvents(subscription *channelsv1alpha1.Subscription) error { subscriptionID := b.subscriptionID(subscription) if cancel, ok := b.receivers[subscriptionID]; ok { @@ -207,6 +220,8 @@ func (b *PubSubBus) subscriptionID(subscription *channelsv1alpha1.Subscription) return fmt.Sprintf("subscription-%s-%s-%s", b.name, subscription.Namespace, subscription.Name) } +// ReceiveHTTPEvent converts an event received on the Channel's HTTP endpoint +// and enqueues it in GCP PubSub. func (b *PubSubBus) ReceiveHTTPEvent(res http.ResponseWriter, req *http.Request) { host := req.Host glog.Infof("Received request for %s\n", host) @@ -226,7 +241,7 @@ func (b *PubSubBus) ReceiveHTTPEvent(res http.ResponseWriter, req *http.Request) attributes := b.headersToAttributes(b.safeHeaders(req.Header)) - err = b.SendEventToTopic(channel, data, attributes) + err = b.sendEventToTopic(channel, data, attributes) if err != nil { glog.Warningf("Unable to send event to topic %q: %v", channel.Name, err) res.WriteHeader(http.StatusInternalServerError) @@ -236,7 +251,7 @@ func (b *PubSubBus) ReceiveHTTPEvent(res http.ResponseWriter, req *http.Request) res.WriteHeader(http.StatusAccepted) } -func (b *PubSubBus) DispatchHTTPEvent(subscription *channelsv1alpha1.Subscription, data []byte, attributes map[string]string) error { +func (b *PubSubBus) dispatchHTTPEvent(subscription *channelsv1alpha1.Subscription, data []byte, attributes map[string]string) error { subscriber := subscription.Spec.Subscriber url := url.URL{ Scheme: "http", @@ -267,9 +282,16 @@ func (b *PubSubBus) splitChannelName(host string) (string, string) { func (b *PubSubBus) safeHeaders(raw http.Header) http.Header { safe := http.Header{} - for _, header := range b.forwardHeaders { - if value := raw.Get(header); value != "" { - safe.Set(header, value) + for h, v := range raw { + if _, ok := b.forwardHeaders[h]; ok { + safe[h] = v + break + } + for _, p := range b.forwardPrefixes { + if strings.HasPrefix(h, p) { + safe[h] = v + break + } } } return safe @@ -278,7 +300,7 @@ func (b *PubSubBus) safeHeaders(raw http.Header) http.Header { func (b *PubSubBus) headersToAttributes(headers http.Header) map[string]string { attributes := make(map[string]string) for name, value := range headers { - // TODO hanle compound headers + // TODO: handle compound headers attributes[name] = value[0] } return attributes @@ -292,16 +314,21 @@ func (b *PubSubBus) attributesToHeaders(attributes map[string]string) http.Heade return headers } +// NewPubSubBus creates a new bus backed by GCP PubSub topics and subscriptions +// in the specified project. func NewPubSubBus(name string, projectID string, monitor *buses.Monitor) (*PubSubBus, error) { - forwardHeaders := []string{ - "content-type", - "x-request-id", - "x-b3-traceid", - "x-b3-spanid", - "x-b3-parentspanid", - "x-b3-sampled", - "x-b3-flags", - "x-ot-span-context", + forwardHeaders := map[string]bool{ + "content-type": true, + "x-request-id": true, + "x-b3-traceid": true, + "x-b3-spanid": true, + "x-b3-parentspanid": true, + "x-b3-sampled": true, + "x-b3-flags": true, + "x-ot-span-context": true, + } + forwardPrefixes := []string{ + "ce-", } ctx := context.Background() @@ -311,12 +338,13 @@ func NewPubSubBus(name string, projectID string, monitor *buses.Monitor) (*PubSu } bus := PubSubBus{ - name: name, - monitor: monitor, - pubsubClient: pubsubClient, - client: &http.Client{}, - forwardHeaders: forwardHeaders, - receivers: map[string]context.CancelFunc{}, + name: name, + monitor: monitor, + pubsubClient: pubsubClient, + client: &http.Client{}, + forwardHeaders: forwardHeaders, + forwardPrefixes: forwardPrefixes, + receivers: map[string]context.CancelFunc{}, } return &bus, nil diff --git a/pkg/buses/kafka/dispatcher/dispatcher.go b/pkg/buses/kafka/dispatcher/dispatcher.go index de0f0e2abee..6b9e97ea99e 100644 --- a/pkg/buses/kafka/dispatcher/dispatcher.go +++ b/pkg/buses/kafka/dispatcher/dispatcher.go @@ -225,7 +225,7 @@ func initialOffset(parameters buses.ResolvedParameters) (int64, error) { func kafka2HttpHeaders(message *sarama.ConsumerMessage) http.Header { result := make(http.Header) for _, h := range message.Headers { - result[string(h.Key)] = []string{string(h.Value)} + result.Set(string(h.Key), string(h.Value)) } return result } diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index bee145d38ef..4398237ac4e 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -41,11 +41,14 @@ var ( kubeconfig string ) +// StubBus is an in-memory forwarding implementation that forwards each +// channel to subscribers in-memory. type StubBus struct { - name string - monitor *buses.Monitor - client *http.Client - forwardHeaders []string + name string + monitor *buses.Monitor + client *http.Client + forwardHeaders map[string]bool + forwardPrefixes []string } func (b *StubBus) handleEvent(res http.ResponseWriter, req *http.Request) { @@ -113,31 +116,45 @@ func (b *StubBus) splitChannelName(host string) (string, string) { func (b *StubBus) safeHeaders(raw http.Header) http.Header { safe := http.Header{} - for _, header := range b.forwardHeaders { - if value := raw.Get(header); value != "" { - safe.Set(header, value) + for h, v := range raw { + if _, ok := b.forwardHeaders[h]; ok { + safe[h] = v + break + } + for _, p := range b.forwardPrefixes { + if strings.HasPrefix(h, p) { + safe[h] = v + break + } } } return safe } +// NewStubBus creates a StubBus which forwards requests using a header +// whitelist. func NewStubBus(name string, monitor *buses.Monitor) *StubBus { - forwardHeaders := []string{ - "content-type", - "x-request-id", - "x-b3-traceid", - "x-b3-spanid", - "x-b3-parentspanid", - "x-b3-sampled", - "x-b3-flags", - "x-ot-span-context", + forwardHeaders := map[string]bool{ + "content-type": true, + "x-request-id": true, + "x-b3-traceid": true, + "x-b3-spanid": true, + "x-b3-parentspanid": true, + "x-b3-sampled": true, + "x-b3-flags": true, + "x-ot-span-context": true, + } + // TODO: should we instead blacklist specific headers? + forwardPrefixes := []string{ + "ce-", } bus := StubBus{ - name: name, - monitor: monitor, - client: &http.Client{}, - forwardHeaders: forwardHeaders, + name: name, + monitor: monitor, + client: &http.Client{}, + forwardHeaders: forwardHeaders, + forwardPrefixes: forwardPrefixes, } return &bus