From 9951f11b292f959d77bbf7abcebd8d13fe43035f Mon Sep 17 00:00:00 2001 From: markfisher Date: Tue, 31 Jul 2018 16:15:22 -0400 Subject: [PATCH 1/2] add support for Subsciption.replyTo property fixes #216 --- .../channels/v1alpha1/subscription_types.go | 3 + pkg/buses/gcppubsub/bus.go | 3 +- pkg/buses/kafka/dispatcher/dispatcher.go | 2 +- pkg/buses/message.go | 2 + pkg/buses/message_dispatcher.go | 67 ++++++++++++++++--- pkg/buses/stub/main.go | 5 +- 6 files changed, 68 insertions(+), 14 deletions(-) diff --git a/pkg/apis/channels/v1alpha1/subscription_types.go b/pkg/apis/channels/v1alpha1/subscription_types.go index a25dd48f1f8..2e0d6edebd6 100644 --- a/pkg/apis/channels/v1alpha1/subscription_types.go +++ b/pkg/apis/channels/v1alpha1/subscription_types.go @@ -46,6 +46,9 @@ type SubscriptionSpec struct { // Subscriber is the name of the subscriber service DNS name. Subscriber string `json:"subscriber"` + // Target service DNS name for replies returned by the subscriber. + ReplyTo string `json:"replyTo"` + // Arguments is a list of configuration arguments for the Subscription. The // Arguments for a channel must contain values for each of the Parameters // specified by the Bus' spec.parameters.Subscriptions field except the diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go index 5e9cf538f6e..aeae82a120a 100644 --- a/pkg/buses/gcppubsub/bus.go +++ b/pkg/buses/gcppubsub/bus.go @@ -167,12 +167,13 @@ func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters glog.Infof("Start receiving events for subscription %q\n", subscriptionID) err := subscription.Receive(cctx, func(ctx context.Context, pubsubMessage *pubsub.Message) { subscriber := sub.Spec.Subscriber + replyTo := sub.Spec.ReplyTo namespace := sub.Namespace message := &buses.Message{ Headers: pubsubMessage.Attributes, Payload: pubsubMessage.Data, } - err := b.messageDispatcher.DispatchMessage(subscriber, namespace, message) + err := b.messageDispatcher.DispatchMessage(subscriber, replyTo, namespace, message) if err != nil { glog.Warningf("Unable to dispatch event %q to %q", pubsubMessage.ID, subscriber) pubsubMessage.Nack() diff --git a/pkg/buses/kafka/dispatcher/dispatcher.go b/pkg/buses/kafka/dispatcher/dispatcher.go index d8611010025..d488bc83a0b 100644 --- a/pkg/buses/kafka/dispatcher/dispatcher.go +++ b/pkg/buses/kafka/dispatcher/dispatcher.go @@ -167,7 +167,7 @@ func (d *dispatcher) subscribe(subscription *channelsv1alpha1.Subscription, para glog.Infof("Dispatching a message for subscription %s/%s: %s -> %s", subscription.Namespace, subscription.Name, subscription.Spec.Channel, subscription.Spec.Subscriber) message := fromKafkaMessage(msg) - err := d.messageDispatcher.DispatchMessage(subscription.Spec.Subscriber, subscription.Namespace, message) + err := d.messageDispatcher.DispatchMessage(subscription.Spec.Subscriber, subscription.Spec.ReplyTo, subscription.Namespace, message) if err != nil { glog.Warningf("Got error trying to dispatch message: %v", err) } diff --git a/pkg/buses/message.go b/pkg/buses/message.go index 5103cce6360..162d7f6f726 100644 --- a/pkg/buses/message.go +++ b/pkg/buses/message.go @@ -27,6 +27,8 @@ var forwardHeaders = []string{ } var forwardPrefixes = []string{ + // knative + "x-knative-", // cloud events "ce-", // tracing diff --git a/pkg/buses/message_dispatcher.go b/pkg/buses/message_dispatcher.go index 61e848886e6..4e17527c476 100644 --- a/pkg/buses/message_dispatcher.go +++ b/pkg/buses/message_dispatcher.go @@ -19,11 +19,14 @@ package buses import ( "bytes" "fmt" + "io/ioutil" "net/http" "net/url" "strings" ) +const correlationIdHeaderName = "X-Knative-Correlation-Id" + // MessageDispatcher dispatches messages to a destination over HTTP. type MessageDispatcher struct { httpClient *http.Client @@ -48,21 +51,39 @@ func NewMessageDispatcher() *MessageDispatcher { // DispatchMessage dispatches a message to a destination over HTTP. // -// The destination is a DNS name. For destinations with a single label, the -// default namespace is used to expand the destination into a fully qualified -// name within the cluster. -func (d *MessageDispatcher) DispatchMessage(destination string, defaultNamespace string, message *Message) error { +// The destination and replyTo are DNS names. For names with a single label, +// the default namespace is used to expand it into a fully qualified name +// within the cluster. +func (d *MessageDispatcher) DispatchMessage(destination string, replyTo string, defaultNamespace string, message *Message) error { + res, err := d.executeRequest(destination, defaultNamespace, message) + if err != nil { + return fmt.Errorf("Unable to complete request %v", err) + } + if replyTo != "" && res != nil { + headers := d.fromHTTPHeaders(res.Header) + // TODO: add configurable whitelisting of propagated headers/prefixes (configmap?) + correlationID := message.Headers[correlationIdHeaderName] + if correlationID != "" { + headers[correlationIdHeaderName] = correlationID + } + payload, err := ioutil.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("Unable to read response %v", err) + } + replyMessage := Message{headers, payload} + d.executeRequest(replyTo, defaultNamespace, &replyMessage) + } + return nil +} + +func (d *MessageDispatcher) executeRequest(destination string, defaultNamespace string, message *Message) (*http.Response, error) { url := d.resolveURL(destination, defaultNamespace) req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(message.Payload)) if err != nil { - return fmt.Errorf("Unable to create request %v", err) + return nil, fmt.Errorf("Unable to create request %v", err) } req.Header = d.toHTTPHeaders(message.Headers) - _, err = d.httpClient.Do(req) - if err != nil { - return fmt.Errorf("Unable to complete request %v", err) - } - return nil + return d.httpClient.Do(req) } // toHTTPHeaders converts message headers to HTTP headers. @@ -90,6 +111,32 @@ func (d *MessageDispatcher) toHTTPHeaders(headers map[string]string) http.Header return safe } +// fromHTTPHeaders converts HTTP headers into a message header map. +// +// Only headers whitelisted as safe are copied. If an HTTP header exists +// multiple times, a single value will be retained. +func (d *MessageDispatcher) fromHTTPHeaders(headers http.Header) map[string]string { + safe := map[string]string{} + + // TODO handle multi-value headers + for h, v := range headers { + // Headers are case-insensitive but test case are all lower-case + comparable := strings.ToLower(h) + if _, ok := d.forwardHeaders[comparable]; ok { + safe[h] = v[0] + continue + } + for _, p := range d.forwardPrefixes { + if strings.HasPrefix(comparable, p) { + safe[h] = v[0] + break + } + } + } + + return safe +} + func (d *MessageDispatcher) resolveURL(destination string, defaultNamespace string) *url.URL { if url, err := url.Parse(destination); err == nil && d.supportedSchemes[url.Scheme] { // already a URL with a known scheme diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index 9a05874cd03..2cb6ed3cf76 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -88,8 +88,9 @@ func (b *StubBus) receiveMessage(channel *buses.ChannelReference, message *buses // dispatchMessage dispatches messages for the bus to a channel's subscriber. func (b *StubBus) dispatchMessage(subscription channelsv1alpha1.SubscriptionSpec, channel *buses.ChannelReference, message *buses.Message) { subscriber := subscription.Subscriber - glog.Infof("Sending to %q for %q", subscriber, channel) - b.dispatcher.DispatchMessage(subscriber, channel.Namespace, message) + replyTo := subscription.ReplyTo + glog.Infof("Sending to %q for %q and replying to %q", subscriber, channel, replyTo) + b.dispatcher.DispatchMessage(subscriber, replyTo, channel.Namespace, message) } func main() { From a23834d4424848914f2cbe453c940a9b27b03a4a Mon Sep 17 00:00:00 2001 From: markfisher Date: Mon, 6 Aug 2018 16:40:14 -0400 Subject: [PATCH 2/2] addressing review comments --- .../channels/v1alpha1/subscription_types.go | 2 +- pkg/buses/gcppubsub/bus.go | 8 +-- pkg/buses/kafka/dispatcher/dispatcher.go | 6 ++- pkg/buses/message.go | 2 +- pkg/buses/message_dispatcher.go | 49 ++++++++++++------- pkg/buses/stub/main.go | 6 +-- 6 files changed, 47 insertions(+), 26 deletions(-) diff --git a/pkg/apis/channels/v1alpha1/subscription_types.go b/pkg/apis/channels/v1alpha1/subscription_types.go index 2e0d6edebd6..65457232940 100644 --- a/pkg/apis/channels/v1alpha1/subscription_types.go +++ b/pkg/apis/channels/v1alpha1/subscription_types.go @@ -47,7 +47,7 @@ type SubscriptionSpec struct { Subscriber string `json:"subscriber"` // Target service DNS name for replies returned by the subscriber. - ReplyTo string `json:"replyTo"` + ReplyTo string `json:"replyTo,omitempty"` // Arguments is a list of configuration arguments for the Subscription. The // Arguments for a channel must contain values for each of the Parameters diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go index aeae82a120a..57ae3e50015 100644 --- a/pkg/buses/gcppubsub/bus.go +++ b/pkg/buses/gcppubsub/bus.go @@ -167,13 +167,15 @@ func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters glog.Infof("Start receiving events for subscription %q\n", subscriptionID) err := subscription.Receive(cctx, func(ctx context.Context, pubsubMessage *pubsub.Message) { subscriber := sub.Spec.Subscriber - replyTo := sub.Spec.ReplyTo - namespace := sub.Namespace message := &buses.Message{ Headers: pubsubMessage.Attributes, Payload: pubsubMessage.Data, } - err := b.messageDispatcher.DispatchMessage(subscriber, replyTo, namespace, message) + defaults := buses.DispatchDefaults{ + Namespace: sub.Namespace, + ReplyTo: sub.Spec.ReplyTo, + } + err := b.messageDispatcher.DispatchMessage(message, subscriber, defaults) if err != nil { glog.Warningf("Unable to dispatch event %q to %q", pubsubMessage.ID, subscriber) pubsubMessage.Nack() diff --git a/pkg/buses/kafka/dispatcher/dispatcher.go b/pkg/buses/kafka/dispatcher/dispatcher.go index d488bc83a0b..2bb2df93e95 100644 --- a/pkg/buses/kafka/dispatcher/dispatcher.go +++ b/pkg/buses/kafka/dispatcher/dispatcher.go @@ -167,7 +167,11 @@ func (d *dispatcher) subscribe(subscription *channelsv1alpha1.Subscription, para glog.Infof("Dispatching a message for subscription %s/%s: %s -> %s", subscription.Namespace, subscription.Name, subscription.Spec.Channel, subscription.Spec.Subscriber) message := fromKafkaMessage(msg) - err := d.messageDispatcher.DispatchMessage(subscription.Spec.Subscriber, subscription.Spec.ReplyTo, subscription.Namespace, message) + defaults := buses.DispatchDefaults{ + Namespace: subscription.Namespace, + ReplyTo: subscription.Spec.ReplyTo, + } + err := d.messageDispatcher.DispatchMessage(message, subscription.Spec.Subscriber, defaults) if err != nil { glog.Warningf("Got error trying to dispatch message: %v", err) } diff --git a/pkg/buses/message.go b/pkg/buses/message.go index 162d7f6f726..23a45b690a7 100644 --- a/pkg/buses/message.go +++ b/pkg/buses/message.go @@ -28,7 +28,7 @@ var forwardHeaders = []string{ var forwardPrefixes = []string{ // knative - "x-knative-", + "knative-", // cloud events "ce-", // tracing diff --git a/pkg/buses/message_dispatcher.go b/pkg/buses/message_dispatcher.go index 4e17527c476..0bb66064beb 100644 --- a/pkg/buses/message_dispatcher.go +++ b/pkg/buses/message_dispatcher.go @@ -25,7 +25,7 @@ import ( "strings" ) -const correlationIdHeaderName = "X-Knative-Correlation-Id" +const correlationIDHeaderName = "Knative-Correlation-Id" // MessageDispatcher dispatches messages to a destination over HTTP. type MessageDispatcher struct { @@ -35,6 +35,12 @@ type MessageDispatcher struct { supportedSchemes map[string]bool } +// DispatchDefaults provides default parameter values used when dispatching a message. +type DispatchDefaults struct { + Namespace string + ReplyTo string +} + // NewMessageDispatcher creates a new message dispatcher that can dispatch // messages to HTTP destinations. func NewMessageDispatcher() *MessageDispatcher { @@ -54,36 +60,45 @@ func NewMessageDispatcher() *MessageDispatcher { // The destination and replyTo are DNS names. For names with a single label, // the default namespace is used to expand it into a fully qualified name // within the cluster. -func (d *MessageDispatcher) DispatchMessage(destination string, replyTo string, defaultNamespace string, message *Message) error { - res, err := d.executeRequest(destination, defaultNamespace, message) +func (d *MessageDispatcher) DispatchMessage(message *Message, destination string, defaults DispatchDefaults) error { + destinationURL := d.resolveURL(destination, defaults.Namespace) + reply, err := d.executeRequest(destinationURL, message) if err != nil { return fmt.Errorf("Unable to complete request %v", err) } - if replyTo != "" && res != nil { - headers := d.fromHTTPHeaders(res.Header) - // TODO: add configurable whitelisting of propagated headers/prefixes (configmap?) - correlationID := message.Headers[correlationIdHeaderName] - if correlationID != "" { - headers[correlationIdHeaderName] = correlationID - } - payload, err := ioutil.ReadAll(res.Body) + if defaults.ReplyTo != "" && reply != nil { + replyToURL := d.resolveURL(defaults.ReplyTo, defaults.Namespace) + _, err = d.executeRequest(replyToURL, reply) if err != nil { - return fmt.Errorf("Unable to read response %v", err) + return fmt.Errorf("Failed to forward reply %v", err) } - replyMessage := Message{headers, payload} - d.executeRequest(replyTo, defaultNamespace, &replyMessage) } return nil } -func (d *MessageDispatcher) executeRequest(destination string, defaultNamespace string, message *Message) (*http.Response, error) { - url := d.resolveURL(destination, defaultNamespace) +func (d *MessageDispatcher) executeRequest(url *url.URL, message *Message) (*Message, error) { req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(message.Payload)) if err != nil { return nil, fmt.Errorf("Unable to create request %v", err) } req.Header = d.toHTTPHeaders(message.Headers) - return d.httpClient.Do(req) + res, err := d.httpClient.Do(req) + if err != nil { + return nil, err + } + if res != nil { + headers := d.fromHTTPHeaders(res.Header) + // TODO: add configurable whitelisting of propagated headers/prefixes (configmap?) + if correlationID, ok := message.Headers[correlationIDHeaderName]; ok { + headers[correlationIDHeaderName] = correlationID + } + payload, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("Unable to read response %v", err) + } + return &Message{headers, payload}, nil + } + return nil, nil } // toHTTPHeaders converts message headers to HTTP headers. diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index 2cb6ed3cf76..5c34757ec30 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -88,9 +88,9 @@ func (b *StubBus) receiveMessage(channel *buses.ChannelReference, message *buses // dispatchMessage dispatches messages for the bus to a channel's subscriber. func (b *StubBus) dispatchMessage(subscription channelsv1alpha1.SubscriptionSpec, channel *buses.ChannelReference, message *buses.Message) { subscriber := subscription.Subscriber - replyTo := subscription.ReplyTo - glog.Infof("Sending to %q for %q and replying to %q", subscriber, channel, replyTo) - b.dispatcher.DispatchMessage(subscriber, replyTo, channel.Namespace, message) + defaults := buses.DispatchDefaults{Namespace: channel.Namespace, ReplyTo: subscription.ReplyTo} + glog.Infof("Sending to %q for %q with defaults %+v", subscriber, channel, defaults) + b.dispatcher.DispatchMessage(message, subscriber, defaults) } func main() {