Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions pkg/buses/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

// MessageDispatcher dispatches messages to a destination over HTTP.
type MessageDispatcher struct {
httpClient *http.Client
forwardHeaders map[string]bool
forwardPrefixes []string
httpClient *http.Client
forwardHeaders map[string]bool
forwardPrefixes []string
supportedSchemes map[string]bool
}

// NewMessageDispatcher creates a new message dispatcher that can dispatch
Expand All @@ -38,6 +39,10 @@ func NewMessageDispatcher() *MessageDispatcher {
httpClient: &http.Client{},
forwardHeaders: headerSet(forwardHeaders),
forwardPrefixes: forwardPrefixes,
supportedSchemes: map[string]bool{
"http": true,
"https": true,
},
}
}

Expand All @@ -47,12 +52,7 @@ func NewMessageDispatcher() *MessageDispatcher {
// default namespace is used to expand the destination into a fully qualified
// name within the cluster.
func (d *MessageDispatcher) DispatchMessage(destination string, defaultNamespace string, message *Message) error {
destination = d.resolveDestination(destination, defaultNamespace)
url := url.URL{
Scheme: "http",
Host: destination,
Path: "/",
}
url := d.resolveURL(destination, defaultNamespace)
req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(message.Payload))
if err != nil {
return fmt.Errorf("Unable to create request %v", err)
Expand Down Expand Up @@ -87,9 +87,17 @@ func (d *MessageDispatcher) toHTTPHeaders(headers map[string]string) http.Header
return safe
}

func (d *MessageDispatcher) resolveDestination(destination string, defaultNamespace string) string {
func (d *MessageDispatcher) resolveURL(destination string, defaultNamespace string) *url.URL {
if url, err := url.Parse(destination); err == nil && d.supportedSchemes[url.Scheme] {
// already a URL with a known scheme
return url
}
if strings.Index(destination, ".") == -1 {
destination = fmt.Sprintf("%s.%s.svc.cluster.local", destination, defaultNamespace)
}
return destination
return &url.URL{
Scheme: "http",
Host: destination,
Path: "/",
}
}