diff --git a/pkg/apis/channels/v1alpha1/subscription_types.go b/pkg/apis/channels/v1alpha1/subscription_types.go index a25dd48f1f8..65457232940 100644 --- a/pkg/apis/channels/v1alpha1/subscription_types.go +++ b/pkg/apis/channels/v1alpha1/subscription_types.go @@ -46,6 +46,9 @@ type SubscriptionSpec struct { // Subscriber is the name of the subscriber service DNS name. Subscriber string `json:"subscriber"` + // Target service DNS name for replies returned by the subscriber. + ReplyTo string `json:"replyTo,omitempty"` + // Arguments is a list of configuration arguments for the Subscription. The // Arguments for a channel must contain values for each of the Parameters // specified by the Bus' spec.parameters.Subscriptions field except the diff --git a/pkg/buses/gcppubsub/bus.go b/pkg/buses/gcppubsub/bus.go index 5e9cf538f6e..57ae3e50015 100644 --- a/pkg/buses/gcppubsub/bus.go +++ b/pkg/buses/gcppubsub/bus.go @@ -167,12 +167,15 @@ func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters glog.Infof("Start receiving events for subscription %q\n", subscriptionID) err := subscription.Receive(cctx, func(ctx context.Context, pubsubMessage *pubsub.Message) { subscriber := sub.Spec.Subscriber - namespace := sub.Namespace message := &buses.Message{ Headers: pubsubMessage.Attributes, Payload: pubsubMessage.Data, } - err := b.messageDispatcher.DispatchMessage(subscriber, namespace, message) + defaults := buses.DispatchDefaults{ + Namespace: sub.Namespace, + ReplyTo: sub.Spec.ReplyTo, + } + err := b.messageDispatcher.DispatchMessage(message, subscriber, defaults) if err != nil { glog.Warningf("Unable to dispatch event %q to %q", pubsubMessage.ID, subscriber) pubsubMessage.Nack() diff --git a/pkg/buses/kafka/dispatcher/dispatcher.go b/pkg/buses/kafka/dispatcher/dispatcher.go index d8611010025..2bb2df93e95 100644 --- a/pkg/buses/kafka/dispatcher/dispatcher.go +++ b/pkg/buses/kafka/dispatcher/dispatcher.go @@ -167,7 +167,11 @@ func (d *dispatcher) subscribe(subscription *channelsv1alpha1.Subscription, para glog.Infof("Dispatching a message for subscription %s/%s: %s -> %s", subscription.Namespace, subscription.Name, subscription.Spec.Channel, subscription.Spec.Subscriber) message := fromKafkaMessage(msg) - err := d.messageDispatcher.DispatchMessage(subscription.Spec.Subscriber, subscription.Namespace, message) + defaults := buses.DispatchDefaults{ + Namespace: subscription.Namespace, + ReplyTo: subscription.Spec.ReplyTo, + } + err := d.messageDispatcher.DispatchMessage(message, subscription.Spec.Subscriber, defaults) if err != nil { glog.Warningf("Got error trying to dispatch message: %v", err) } diff --git a/pkg/buses/message.go b/pkg/buses/message.go index 5103cce6360..23a45b690a7 100644 --- a/pkg/buses/message.go +++ b/pkg/buses/message.go @@ -27,6 +27,8 @@ var forwardHeaders = []string{ } var forwardPrefixes = []string{ + // knative + "knative-", // cloud events "ce-", // tracing diff --git a/pkg/buses/message_dispatcher.go b/pkg/buses/message_dispatcher.go index 61e848886e6..0bb66064beb 100644 --- a/pkg/buses/message_dispatcher.go +++ b/pkg/buses/message_dispatcher.go @@ -19,11 +19,14 @@ package buses import ( "bytes" "fmt" + "io/ioutil" "net/http" "net/url" "strings" ) +const correlationIDHeaderName = "Knative-Correlation-Id" + // MessageDispatcher dispatches messages to a destination over HTTP. type MessageDispatcher struct { httpClient *http.Client @@ -32,6 +35,12 @@ type MessageDispatcher struct { supportedSchemes map[string]bool } +// DispatchDefaults provides default parameter values used when dispatching a message. +type DispatchDefaults struct { + Namespace string + ReplyTo string +} + // NewMessageDispatcher creates a new message dispatcher that can dispatch // messages to HTTP destinations. func NewMessageDispatcher() *MessageDispatcher { @@ -48,21 +57,48 @@ func NewMessageDispatcher() *MessageDispatcher { // DispatchMessage dispatches a message to a destination over HTTP. // -// The destination is a DNS name. For destinations with a single label, the -// default namespace is used to expand the destination into a fully qualified -// name within the cluster. -func (d *MessageDispatcher) DispatchMessage(destination string, defaultNamespace string, message *Message) error { - url := d.resolveURL(destination, defaultNamespace) +// The destination and replyTo are DNS names. For names with a single label, +// the default namespace is used to expand it into a fully qualified name +// within the cluster. +func (d *MessageDispatcher) DispatchMessage(message *Message, destination string, defaults DispatchDefaults) error { + destinationURL := d.resolveURL(destination, defaults.Namespace) + reply, err := d.executeRequest(destinationURL, message) + if err != nil { + return fmt.Errorf("Unable to complete request %v", err) + } + if defaults.ReplyTo != "" && reply != nil { + replyToURL := d.resolveURL(defaults.ReplyTo, defaults.Namespace) + _, err = d.executeRequest(replyToURL, reply) + if err != nil { + return fmt.Errorf("Failed to forward reply %v", err) + } + } + return nil +} + +func (d *MessageDispatcher) executeRequest(url *url.URL, message *Message) (*Message, error) { req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(message.Payload)) if err != nil { - return fmt.Errorf("Unable to create request %v", err) + return nil, fmt.Errorf("Unable to create request %v", err) } req.Header = d.toHTTPHeaders(message.Headers) - _, err = d.httpClient.Do(req) + res, err := d.httpClient.Do(req) if err != nil { - return fmt.Errorf("Unable to complete request %v", err) + return nil, err } - return nil + if res != nil { + headers := d.fromHTTPHeaders(res.Header) + // TODO: add configurable whitelisting of propagated headers/prefixes (configmap?) + if correlationID, ok := message.Headers[correlationIDHeaderName]; ok { + headers[correlationIDHeaderName] = correlationID + } + payload, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("Unable to read response %v", err) + } + return &Message{headers, payload}, nil + } + return nil, nil } // toHTTPHeaders converts message headers to HTTP headers. @@ -90,6 +126,32 @@ func (d *MessageDispatcher) toHTTPHeaders(headers map[string]string) http.Header return safe } +// fromHTTPHeaders converts HTTP headers into a message header map. +// +// Only headers whitelisted as safe are copied. If an HTTP header exists +// multiple times, a single value will be retained. +func (d *MessageDispatcher) fromHTTPHeaders(headers http.Header) map[string]string { + safe := map[string]string{} + + // TODO handle multi-value headers + for h, v := range headers { + // Headers are case-insensitive but test case are all lower-case + comparable := strings.ToLower(h) + if _, ok := d.forwardHeaders[comparable]; ok { + safe[h] = v[0] + continue + } + for _, p := range d.forwardPrefixes { + if strings.HasPrefix(comparable, p) { + safe[h] = v[0] + break + } + } + } + + return safe +} + func (d *MessageDispatcher) resolveURL(destination string, defaultNamespace string) *url.URL { if url, err := url.Parse(destination); err == nil && d.supportedSchemes[url.Scheme] { // already a URL with a known scheme diff --git a/pkg/buses/stub/main.go b/pkg/buses/stub/main.go index 9a05874cd03..5c34757ec30 100644 --- a/pkg/buses/stub/main.go +++ b/pkg/buses/stub/main.go @@ -88,8 +88,9 @@ func (b *StubBus) receiveMessage(channel *buses.ChannelReference, message *buses // dispatchMessage dispatches messages for the bus to a channel's subscriber. func (b *StubBus) dispatchMessage(subscription channelsv1alpha1.SubscriptionSpec, channel *buses.ChannelReference, message *buses.Message) { subscriber := subscription.Subscriber - glog.Infof("Sending to %q for %q", subscriber, channel) - b.dispatcher.DispatchMessage(subscriber, channel.Namespace, message) + defaults := buses.DispatchDefaults{Namespace: channel.Namespace, ReplyTo: subscription.ReplyTo} + glog.Infof("Sending to %q for %q with defaults %+v", subscriber, channel, defaults) + b.dispatcher.DispatchMessage(message, subscriber, defaults) } func main() {