Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/apis/channels/v1alpha1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type SubscriptionSpec struct {
// Subscriber is the name of the subscriber service DNS name.
Subscriber string `json:"subscriber"`

// Target service DNS name for replies returned by the subscriber.
ReplyTo string `json:"replyTo,omitempty"`

// Arguments is a list of configuration arguments for the Subscription. The
// Arguments for a channel must contain values for each of the Parameters
// specified by the Bus' spec.parameters.Subscriptions field except the
Expand Down
7 changes: 5 additions & 2 deletions pkg/buses/gcppubsub/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,15 @@ func (b *PubSubBus) ReceiveEvents(sub *channelsv1alpha1.Subscription, parameters
glog.Infof("Start receiving events for subscription %q\n", subscriptionID)
err := subscription.Receive(cctx, func(ctx context.Context, pubsubMessage *pubsub.Message) {
subscriber := sub.Spec.Subscriber
namespace := sub.Namespace
message := &buses.Message{
Headers: pubsubMessage.Attributes,
Payload: pubsubMessage.Data,
}
err := b.messageDispatcher.DispatchMessage(subscriber, namespace, message)
defaults := buses.DispatchDefaults{
Namespace: sub.Namespace,
ReplyTo: sub.Spec.ReplyTo,
}
err := b.messageDispatcher.DispatchMessage(message, subscriber, defaults)
if err != nil {
glog.Warningf("Unable to dispatch event %q to %q", pubsubMessage.ID, subscriber)
pubsubMessage.Nack()
Expand Down
6 changes: 5 additions & 1 deletion pkg/buses/kafka/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ func (d *dispatcher) subscribe(subscription *channelsv1alpha1.Subscription, para
glog.Infof("Dispatching a message for subscription %s/%s: %s -> %s", subscription.Namespace,
subscription.Name, subscription.Spec.Channel, subscription.Spec.Subscriber)
message := fromKafkaMessage(msg)
err := d.messageDispatcher.DispatchMessage(subscription.Spec.Subscriber, subscription.Namespace, message)
defaults := buses.DispatchDefaults{
Namespace: subscription.Namespace,
ReplyTo: subscription.Spec.ReplyTo,
}
err := d.messageDispatcher.DispatchMessage(message, subscription.Spec.Subscriber, defaults)
if err != nil {
glog.Warningf("Got error trying to dispatch message: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/buses/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var forwardHeaders = []string{
}

var forwardPrefixes = []string{
// knative
"knative-",
// cloud events
"ce-",
// tracing
Expand Down
80 changes: 71 additions & 9 deletions pkg/buses/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package buses
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
)

const correlationIDHeaderName = "Knative-Correlation-Id"

// MessageDispatcher dispatches messages to a destination over HTTP.
type MessageDispatcher struct {
httpClient *http.Client
Expand All @@ -32,6 +35,12 @@ type MessageDispatcher struct {
supportedSchemes map[string]bool
}

// DispatchDefaults provides default parameter values used when dispatching a message.
type DispatchDefaults struct {
Namespace string
ReplyTo string
}

// NewMessageDispatcher creates a new message dispatcher that can dispatch
// messages to HTTP destinations.
func NewMessageDispatcher() *MessageDispatcher {
Expand All @@ -48,21 +57,48 @@ func NewMessageDispatcher() *MessageDispatcher {

// DispatchMessage dispatches a message to a destination over HTTP.
//
// The destination is a DNS name. For destinations with a single label, the
// default namespace is used to expand the destination into a fully qualified
// name within the cluster.
func (d *MessageDispatcher) DispatchMessage(destination string, defaultNamespace string, message *Message) error {
url := d.resolveURL(destination, defaultNamespace)
// The destination and replyTo are DNS names. For names with a single label,
// the default namespace is used to expand it into a fully qualified name
// within the cluster.
func (d *MessageDispatcher) DispatchMessage(message *Message, destination string, defaults DispatchDefaults) error {
destinationURL := d.resolveURL(destination, defaults.Namespace)
reply, err := d.executeRequest(destinationURL, message)
if err != nil {
return fmt.Errorf("Unable to complete request %v", err)
}
if defaults.ReplyTo != "" && reply != nil {
replyToURL := d.resolveURL(defaults.ReplyTo, defaults.Namespace)
_, err = d.executeRequest(replyToURL, reply)
if err != nil {
return fmt.Errorf("Failed to forward reply %v", err)
}
}
return nil
}

func (d *MessageDispatcher) executeRequest(url *url.URL, message *Message) (*Message, error) {
req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(message.Payload))
if err != nil {
return fmt.Errorf("Unable to create request %v", err)
return nil, fmt.Errorf("Unable to create request %v", err)
}
req.Header = d.toHTTPHeaders(message.Headers)
_, err = d.httpClient.Do(req)
res, err := d.httpClient.Do(req)
if err != nil {
return fmt.Errorf("Unable to complete request %v", err)
return nil, err
}
return nil
if res != nil {
headers := d.fromHTTPHeaders(res.Header)
// TODO: add configurable whitelisting of propagated headers/prefixes (configmap?)
if correlationID, ok := message.Headers[correlationIDHeaderName]; ok {
headers[correlationIDHeaderName] = correlationID
}
payload, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("Unable to read response %v", err)
}
return &Message{headers, payload}, nil
}
return nil, nil
}

// toHTTPHeaders converts message headers to HTTP headers.
Expand Down Expand Up @@ -90,6 +126,32 @@ func (d *MessageDispatcher) toHTTPHeaders(headers map[string]string) http.Header
return safe
}

// fromHTTPHeaders converts HTTP headers into a message header map.
//
// Only headers whitelisted as safe are copied. If an HTTP header exists
// multiple times, a single value will be retained.
func (d *MessageDispatcher) fromHTTPHeaders(headers http.Header) map[string]string {
safe := map[string]string{}

// TODO handle multi-value headers
for h, v := range headers {
// Headers are case-insensitive but test case are all lower-case
comparable := strings.ToLower(h)
if _, ok := d.forwardHeaders[comparable]; ok {
safe[h] = v[0]
continue
}
for _, p := range d.forwardPrefixes {
if strings.HasPrefix(comparable, p) {
safe[h] = v[0]
break
}
}
}

return safe
}

func (d *MessageDispatcher) resolveURL(destination string, defaultNamespace string) *url.URL {
if url, err := url.Parse(destination); err == nil && d.supportedSchemes[url.Scheme] {
// already a URL with a known scheme
Expand Down
5 changes: 3 additions & 2 deletions pkg/buses/stub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func (b *StubBus) receiveMessage(channel *buses.ChannelReference, message *buses
// dispatchMessage dispatches messages for the bus to a channel's subscriber.
func (b *StubBus) dispatchMessage(subscription channelsv1alpha1.SubscriptionSpec, channel *buses.ChannelReference, message *buses.Message) {
subscriber := subscription.Subscriber
glog.Infof("Sending to %q for %q", subscriber, channel)
b.dispatcher.DispatchMessage(subscriber, channel.Namespace, message)
defaults := buses.DispatchDefaults{Namespace: channel.Namespace, ReplyTo: subscription.ReplyTo}
glog.Infof("Sending to %q for %q with defaults %+v", subscriber, channel, defaults)
b.dispatcher.DispatchMessage(message, subscriber, defaults)
}

func main() {
Expand Down