Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/buses/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func (b *bus) dispatchMessage(subscription *channelsv1alpha1.Subscription, messa
subscriber := subscription.Spec.Subscriber
defaults := DispatchDefaults{
Namespace: subscription.Namespace,
ReplyTo: subscription.Spec.ReplyTo,
}
return b.dispatcher.DispatchMessage(message, subscriber, defaults)
return b.dispatcher.DispatchMessage(message, subscriber, subscription.Spec.ReplyTo, defaults)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we're in a bit of a wonky world right now since subscription should probably be using the one from eventing since that uses the new "approved to move forward" model. There it's Result?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 yeah I wonder if we need to make a copy of bus and have it use the new world subscriptions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we end up using the bus abstraction in the new model, then we probably should fork it. This PR is slightly modifying the DispatchMessage() interface, not really modifying anything in the bus abstraction, so I don't think it warrants copying yet.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @adamharwayne that we should change the existing bus code as little as possible. We want the old objects to continue working while we build the new ones.

}
70 changes: 45 additions & 25 deletions pkg/buses/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package buses

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -42,7 +43,6 @@ type MessageDispatcher struct {
// DispatchDefaults provides default parameter values used when dispatching a message.
type DispatchDefaults struct {
Namespace string
ReplyTo string
}

// NewMessageDispatcher creates a new message dispatcher that can dispatch
Expand All @@ -66,14 +66,21 @@ func NewMessageDispatcher(logger *zap.SugaredLogger) *MessageDispatcher {
// The destination and replyTo are DNS names. For names with a single label,
// the default namespace is used to expand it into a fully qualified name
// within the cluster.
func (d *MessageDispatcher) DispatchMessage(message *Message, destination string, defaults DispatchDefaults) error {
destinationURL := d.resolveURL(destination, defaults.Namespace)
reply, err := d.executeRequest(destinationURL, message)
if err != nil {
return fmt.Errorf("Unable to complete request %v", err)
func (d *MessageDispatcher) DispatchMessage(message *Message, destination, replyTo string, defaults DispatchDefaults) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked with Adam offline about the signature change that moved defaults.ReplyTo into the positional parameter replyTo. He argued that replyTo is not actually a default since it's specific to each Subscription and is never overridden.

var err error
// Default to replying with the original message. If there is a destination, then replace it
// with the response from the call to the destination instead.
reply := message
if destination != "" {
destinationURL := d.resolveURL(destination, defaults.Namespace)
reply, err = d.executeRequest(destinationURL, message)
if err != nil {
return fmt.Errorf("Unable to complete request %v", err)
}
}
if defaults.ReplyTo != "" && reply != nil {
replyToURL := d.resolveURL(defaults.ReplyTo, defaults.Namespace)

if replyTo != "" && reply != nil {
replyToURL := d.resolveURL(replyTo, defaults.Namespace)
_, err = d.executeRequest(replyToURL, reply)
if err != nil {
return fmt.Errorf("Failed to forward reply %v", err)
Expand All @@ -86,30 +93,43 @@ func (d *MessageDispatcher) executeRequest(url *url.URL, message *Message) (*Mes
d.logger.Infof("Dispatching message to %s", url.String())
req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(message.Payload))
if err != nil {
return nil, fmt.Errorf("Unable to create request %v", err)
return nil, fmt.Errorf("unable to create request %v", err)
}
req.Header = d.toHTTPHeaders(message.Headers)
res, err := d.httpClient.Do(req)
if err != nil {
return nil, err
}
if res != nil {
if res.StatusCode < 200 || res.StatusCode >= 300 {
// reject non-successful (2xx) responses
return nil, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", res.StatusCode)
}
headers := d.fromHTTPHeaders(res.Header)
// TODO: add configurable whitelisting of propagated headers/prefixes (configmap?)
if correlationID, ok := message.Headers[correlationIDHeaderName]; ok {
headers[correlationIDHeaderName] = correlationID
}
payload, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("Unable to read response %v", err)
}
return &Message{headers, payload}, nil
if res == nil {
// I don't think this is actually reachable with http.Client.Do(), but just to be sure we
// check anyway.
return nil, errors.New("non-error nil result from http.Client.Do()")
}
return nil, nil
defer res.Body.Close()
if isFailure(res.StatusCode) {
// reject non-successful responses
return nil, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", res.StatusCode)
}
headers := d.fromHTTPHeaders(res.Header)
// TODO: add configurable whitelisting of propagated headers/prefixes (configmap?)
if correlationID, ok := message.Headers[correlationIDHeaderName]; ok {
headers[correlationIDHeaderName] = correlationID
}
payload, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("Unable to read response %v", err)
}
if len(payload) == 0 {
// The response body is empty, the event has 'finished'.
return nil, nil
}
return &Message{headers, payload}, nil
}

// isFailure returns true if the status code is not a successful HTTP status.
func isFailure(statusCode int) bool {
return statusCode < http.StatusOK /* 200 */ ||
statusCode >= http.StatusMultipleChoices /* 300 */
}

// toHTTPHeaders converts message headers to HTTP headers.
Expand Down
Loading