diff --git a/pkg/buses/logger.go b/pkg/buses/logger.go deleted file mode 100644 index ed439cc34ae..00000000000 --- a/pkg/buses/logger.go +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2018 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package buses - -import ( - "github.com/knative/pkg/logging" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -const ( - busLoggingComponent = "bus" - dispatcherLoggingComponent = "dispatcher" - receiverLoggingComponent = "receiver" -) - -// NewLoggingConfig creates a static logging configuration appropriate for a -// bus. All logging levels are set to Info. -func NewLoggingConfig() *logging.Config { - lc := &logging.Config{} - lc.LoggingConfig = `{ - "level": "info", - "development": false, - "outputPaths": ["stdout"], - "errorOutputPaths": ["stderr"], - "encoding": "json", - "encoderConfig": { - "timeKey": "ts", - "levelKey": "level", - "nameKey": "logger", - "callerKey": "caller", - "messageKey": "msg", - "stacktraceKey": "stacktrace", - "lineEnding": "", - "levelEncoder": "", - "timeEncoder": "iso8601", - "durationEncoder": "", - "callerEncoder": "" - } - }` - lc.LoggingLevel = make(map[string]zapcore.Level) - lc.LoggingLevel[busLoggingComponent] = zapcore.InfoLevel - return lc -} - -// NewBusLoggerFromConfig creates a new zap logger for the bus component based -// on the provided configuration -func NewBusLoggerFromConfig(config *logging.Config) *zap.SugaredLogger { - logger, _ := logging.NewLoggerFromConfig(config, busLoggingComponent) - return logger -} diff --git a/pkg/buses/logger_test.go b/pkg/buses/logger_test.go deleted file mode 100644 index 5c7960229be..00000000000 --- a/pkg/buses/logger_test.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package buses_test - -import ( - "testing" - - "go.uber.org/zap" - - "github.com/knative/eventing/pkg/buses" -) - -func TestNewLoggingConfig(t *testing.T) { - config := buses.NewLoggingConfig() - expected := zap.InfoLevel - actual := config.LoggingLevel["bus"] - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "Logging level", expected, actual) - } -} - -func TestNewBusLoggerFromConfig(t *testing.T) { - config := buses.NewLoggingConfig() - logger := buses.NewBusLoggerFromConfig(config) - expected := true - actual := logger.Desugar().Core().Enabled(zap.InfoLevel) - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "Logging level", expected, actual) - } -} diff --git a/pkg/buses/message.go b/pkg/buses/message.go deleted file mode 100644 index 23a45b690a7..00000000000 --- a/pkg/buses/message.go +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2018 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package buses - -import ( - "errors" -) - -var forwardHeaders = []string{ - "content-type", - // tracing - "x-request-id", -} - -var forwardPrefixes = []string{ - // knative - "knative-", - // cloud events - "ce-", - // tracing - "x-b3-", - "x-ot-", -} - -// Message represents an chunk of data within a bus. The message contains both -// a map of string headers and a binary payload. -// -// A message may represent a CloudEvent. -type Message struct { - - // Headers provide metadata about the message payload. All header keys - // should be lowercase. - Headers map[string]string - - // Payload is the raw binary content of the message. The payload format is - // often described by the 'content-type' header. - Payload []byte -} - -// ErrUnknownChannel is returned when a message is received by a bus for a -// channel that does not exist. -var ErrUnknownChannel = errors.New("unknown channel") - -func headerSet(headers []string) map[string]bool { - set := make(map[string]bool) - for _, header := range headers { - set[header] = true - } - return set -} diff --git a/pkg/buses/message_dispatcher.go b/pkg/buses/message_dispatcher.go deleted file mode 100644 index 39346b7eb2d..00000000000 --- a/pkg/buses/message_dispatcher.go +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright 2018 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package buses - -import ( - "bytes" - "errors" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "strings" - - "go.uber.org/zap" -) - -const correlationIDHeaderName = "Knative-Correlation-Id" - -// MessageDispatcher dispatches messages to a destination over HTTP. -type MessageDispatcher struct { - httpClient *http.Client - forwardHeaders map[string]bool - forwardPrefixes []string - supportedSchemes map[string]bool - - logger *zap.SugaredLogger -} - -// DispatchDefaults provides default parameter values used when dispatching a message. -type DispatchDefaults struct { - Namespace string -} - -// NewMessageDispatcher creates a new message dispatcher that can dispatch -// messages to HTTP destinations. -func NewMessageDispatcher(logger *zap.SugaredLogger) *MessageDispatcher { - return &MessageDispatcher{ - httpClient: &http.Client{}, - forwardHeaders: headerSet(forwardHeaders), - forwardPrefixes: forwardPrefixes, - supportedSchemes: map[string]bool{ - "http": true, - "https": true, - }, - - logger: logger, - } -} - -// DispatchMessage dispatches a message to a destination over HTTP. -// -// The destination and reply are DNS names. For names with a single label, -// the default namespace is used to expand it into a fully qualified name -// within the cluster. -func (d *MessageDispatcher) DispatchMessage(message *Message, destination, reply string, defaults DispatchDefaults) error { - var err error - // Default to replying with the original message. If there is a destination, then replace it - // with the response from the call to the destination instead. - response := message - if destination != "" { - destinationURL := d.resolveURL(destination, defaults.Namespace) - response, err = d.executeRequest(destinationURL, message) - if err != nil { - return fmt.Errorf("Unable to complete request %v", err) - } - } - - if reply != "" && response != nil { - replyURL := d.resolveURL(reply, defaults.Namespace) - _, err = d.executeRequest(replyURL, response) - if err != nil { - return fmt.Errorf("Failed to forward reply %v", err) - } - } - return nil -} - -func (d *MessageDispatcher) executeRequest(url *url.URL, message *Message) (*Message, error) { - d.logger.Infof("Dispatching message to %s", url.String()) - req, err := http.NewRequest(http.MethodPost, url.String(), bytes.NewReader(message.Payload)) - if err != nil { - return nil, fmt.Errorf("unable to create request %v", err) - } - req.Header = d.toHTTPHeaders(message.Headers) - res, err := d.httpClient.Do(req) - if err != nil { - return nil, err - } - if res == nil { - // I don't think this is actually reachable with http.Client.Do(), but just to be sure we - // check anyway. - return nil, errors.New("non-error nil result from http.Client.Do()") - } - defer res.Body.Close() - if isFailure(res.StatusCode) { - // reject non-successful responses - return nil, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", res.StatusCode) - } - headers := d.fromHTTPHeaders(res.Header) - // TODO: add configurable whitelisting of propagated headers/prefixes (configmap?) - if correlationID, ok := message.Headers[correlationIDHeaderName]; ok { - headers[correlationIDHeaderName] = correlationID - } - payload, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, fmt.Errorf("Unable to read response %v", err) - } - if len(payload) == 0 { - // The response body is empty, the event has 'finished'. - return nil, nil - } - return &Message{headers, payload}, nil -} - -// isFailure returns true if the status code is not a successful HTTP status. -func isFailure(statusCode int) bool { - return statusCode < http.StatusOK /* 200 */ || - statusCode >= http.StatusMultipleChoices /* 300 */ -} - -// toHTTPHeaders converts message headers to HTTP headers. -// -// Only headers whitelisted as safe are copied. -func (d *MessageDispatcher) toHTTPHeaders(headers map[string]string) http.Header { - safe := http.Header{} - - for name, value := range headers { - // Header names are case insensitive. Be sure to compare against a lower-cased version - // (all our oracles are lower-case as well). - name = strings.ToLower(name) - if _, ok := d.forwardHeaders[name]; ok { - safe.Add(name, value) - continue - } - for _, prefix := range d.forwardPrefixes { - if strings.HasPrefix(name, prefix) { - safe.Add(name, value) - break - } - } - } - - return safe -} - -// fromHTTPHeaders converts HTTP headers into a message header map. -// -// Only headers whitelisted as safe are copied. If an HTTP header exists -// multiple times, a single value will be retained. -func (d *MessageDispatcher) fromHTTPHeaders(headers http.Header) map[string]string { - safe := map[string]string{} - - // TODO handle multi-value headers - for h, v := range headers { - // Headers are case-insensitive but test case are all lower-case - comparable := strings.ToLower(h) - if _, ok := d.forwardHeaders[comparable]; ok { - safe[h] = v[0] - continue - } - for _, p := range d.forwardPrefixes { - if strings.HasPrefix(comparable, p) { - safe[h] = v[0] - break - } - } - } - - return safe -} - -func (d *MessageDispatcher) resolveURL(destination string, defaultNamespace string) *url.URL { - if url, err := url.Parse(destination); err == nil && d.supportedSchemes[url.Scheme] { - // already a URL with a known scheme - return url - } - if strings.Index(destination, ".") == -1 { - destination = fmt.Sprintf("%s.%s.svc.cluster.local", destination, defaultNamespace) - } - return &url.URL{ - Scheme: "http", - Host: destination, - Path: "/", - } -} diff --git a/pkg/buses/message_dispatcher_test.go b/pkg/buses/message_dispatcher_test.go deleted file mode 100644 index ab173169080..00000000000 --- a/pkg/buses/message_dispatcher_test.go +++ /dev/null @@ -1,390 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package buses - -import ( - "bytes" - "io/ioutil" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "testing" - - "github.com/google/go-cmp/cmp" - "go.uber.org/zap" -) - -var ( - // Headers that are added to the response, but we don't want to check in our assertions. - unimportantHeaders = map[string]struct{}{ - "accept-encoding": {}, - "content-length": {}, - "content-type": {}, - "user-agent": {}, - } -) - -func TestDispatchMessage(t *testing.T) { - testCases := map[string]struct { - sendToDestination bool - sendToReply bool - message *Message - fakeResponse *http.Response - expectedErr bool - expectedDestRequest *requestValidation - expectedReplyRequest *requestValidation - }{ - "destination - only": { - sendToDestination: true, - message: &Message{ - Headers: map[string]string{ - // do-not-forward should not get forwarded. - "do-not-forward": "header", - "x-request-id": "id123", - "knative-1": "knative-1-value", - "knative-2": "knative-2-value", - "ce-abc": "ce-abc-value", - }, - Payload: []byte("destination"), - }, - expectedDestRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "ce-abc": {"ce-abc-value"}, - }, - Body: "destination", - }, - }, - "destination - only -- error": { - sendToDestination: true, - message: &Message{ - Headers: map[string]string{ - // do-not-forward should not get forwarded. - "do-not-forward": "header", - "x-request-id": "id123", - "knative-1": "knative-1-value", - "knative-2": "knative-2-value", - "ce-abc": "ce-abc-value", - }, - Payload: []byte("destination"), - }, - expectedDestRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "ce-abc": {"ce-abc-value"}, - }, - Body: "destination", - }, - fakeResponse: &http.Response{ - StatusCode: http.StatusNotFound, - Body: ioutil.NopCloser(bytes.NewBufferString("destination-response")), - }, - expectedErr: true, - }, - "reply - only": { - sendToReply: true, - message: &Message{ - Headers: map[string]string{ - // do-not-forward should not get forwarded. - "do-not-forward": "header", - "x-request-id": "id123", - "knative-1": "knative-1-value", - "knative-2": "knative-2-value", - "ce-abc": "ce-abc-value", - }, - Payload: []byte("reply"), - }, - expectedReplyRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "ce-abc": {"ce-abc-value"}, - }, - Body: "reply", - }, - }, - "reply - only -- error": { - sendToReply: true, - message: &Message{ - Headers: map[string]string{ - // do-not-forward should not get forwarded. - "do-not-forward": "header", - "x-request-id": "id123", - "knative-1": "knative-1-value", - "knative-2": "knative-2-value", - "ce-abc": "ce-abc-value", - }, - Payload: []byte("reply"), - }, - expectedReplyRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "ce-abc": {"ce-abc-value"}, - }, - Body: "reply", - }, - fakeResponse: &http.Response{ - StatusCode: http.StatusNotFound, - Body: ioutil.NopCloser(bytes.NewBufferString("destination-response")), - }, - expectedErr: true, - }, - "destination and reply - dest returns bad status code": { - sendToDestination: true, - sendToReply: true, - message: &Message{ - Headers: map[string]string{ - // do-not-forward should not get forwarded. - "do-not-forward": "header", - "x-request-id": "id123", - "knative-1": "knative-1-value", - "knative-2": "knative-2-value", - "ce-abc": "ce-abc-value", - }, - Payload: []byte("destination"), - }, - expectedDestRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "ce-abc": {"ce-abc-value"}, - }, - Body: "destination", - }, - fakeResponse: &http.Response{ - StatusCode: http.StatusInternalServerError, - Body: ioutil.NopCloser(bytes.NewBufferString("destination-response")), - }, - expectedErr: true, - }, - "destination and reply - dest returns empty body": { - sendToDestination: true, - sendToReply: true, - message: &Message{ - Headers: map[string]string{ - // do-not-forward should not get forwarded. - "do-not-forward": "header", - "x-request-id": "id123", - "knative-1": "knative-1-value", - "knative-2": "knative-2-value", - "ce-abc": "ce-abc-value", - }, - Payload: []byte("destination"), - }, - expectedDestRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "ce-abc": {"ce-abc-value"}, - }, - Body: "destination", - }, - fakeResponse: &http.Response{ - StatusCode: http.StatusAccepted, - Header: map[string][]string{ - "do-not-passthrough": {"no"}, - "x-request-id": {"altered-id"}, - "knative-1": {"new-knative-1-value"}, - "ce-abc": {"new-ce-abc-value"}, - }, - Body: ioutil.NopCloser(bytes.NewBufferString("")), - }, - }, - "destination and reply": { - sendToDestination: true, - sendToReply: true, - message: &Message{ - Headers: map[string]string{ - // do-not-forward should not get forwarded. - "do-not-forward": "header", - "x-request-id": "id123", - "knative-1": "knative-1-value", - "knative-2": "knative-2-value", - "ce-abc": "ce-abc-value", - }, - Payload: []byte("destination"), - }, - expectedDestRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "ce-abc": {"ce-abc-value"}, - }, - Body: "destination", - }, - fakeResponse: &http.Response{ - StatusCode: http.StatusAccepted, - Header: map[string][]string{ - "do-not-passthrough": {"no"}, - "x-request-id": {"altered-id"}, - "knative-1": {"new-knative-1-value"}, - "ce-abc": {"new-ce-abc-value"}, - }, - Body: ioutil.NopCloser(bytes.NewBufferString("destination-response")), - }, - expectedReplyRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"altered-id"}, - "knative-1": {"new-knative-1-value"}, - "ce-abc": {"new-ce-abc-value"}, - }, - Body: "destination-response", - }, - }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - destHandler := &fakeHandler{ - t: t, - response: tc.fakeResponse, - requests: make([]requestValidation, 0), - } - destServer := httptest.NewServer(destHandler) - defer destServer.Close() - replyHandler := &fakeHandler{ - t: t, - response: tc.fakeResponse, - requests: make([]requestValidation, 0), - } - replyServer := httptest.NewServer(replyHandler) - defer replyServer.Close() - - md := NewMessageDispatcher(zap.NewNop().Sugar()) - err := md.DispatchMessage(tc.message, - getDomain(t, tc.sendToDestination, destServer.URL), - getDomain(t, tc.sendToReply, replyServer.URL), - DispatchDefaults{}) - if tc.expectedErr != (err != nil) { - t.Errorf("Unexpected error from DispatchRequest. Expected %v. Actual: %v", tc.expectedErr, err) - } - if tc.expectedDestRequest != nil { - rv := destHandler.popRequest(t) - assertEquality(t, destServer.URL, *tc.expectedDestRequest, rv) - } - if tc.expectedReplyRequest != nil { - rv := replyHandler.popRequest(t) - assertEquality(t, replyServer.URL, *tc.expectedReplyRequest, rv) - } - if len(destHandler.requests) != 0 { - t.Errorf("Unexpected destination requests: %+v", destHandler.requests) - } - if len(replyHandler.requests) != 0 { - t.Errorf("Unexpected reply requests: %+v", replyHandler.requests) - } - }) - } -} - -func getDomain(t *testing.T, shouldSend bool, serverURL string) string { - if shouldSend { - server, err := url.Parse(serverURL) - if err != nil { - t.Errorf("Bad serverURL: %q", serverURL) - } - return server.Host - } - return "" -} - -type requestValidation struct { - Host string - Headers http.Header - Body string -} - -type fakeHandler struct { - t *testing.T - response *http.Response - requests []requestValidation -} - -func (f *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - - // Make a copy of the request. - body, err := ioutil.ReadAll(r.Body) - if err != nil { - f.t.Error("Failed to read the request body") - } - f.requests = append(f.requests, requestValidation{ - Host: r.Host, - Headers: r.Header, - Body: string(body), - }) - - // Write the response. - if f.response != nil { - for h, vs := range f.response.Header { - for _, v := range vs { - w.Header().Add(h, v) - } - } - w.WriteHeader(f.response.StatusCode) - var buf bytes.Buffer - buf.ReadFrom(f.response.Body) - w.Write(buf.Bytes()) - } else { - w.WriteHeader(http.StatusOK) - w.Write([]byte("")) - } -} - -func (f *fakeHandler) popRequest(t *testing.T) requestValidation { - if len(f.requests) == 0 { - t.Error("Unable to pop request") - } - rv := f.requests[0] - f.requests = f.requests[1:] - return rv -} - -func assertEquality(t *testing.T, replacementURL string, expected, actual requestValidation) { - server, err := url.Parse(replacementURL) - if err != nil { - t.Errorf("Bad replacement URL: %q", replacementURL) - } - expected.Host = server.Host - canonicalizeHeaders(expected, actual) - if diff := cmp.Diff(expected, actual); diff != "" { - t.Errorf("Unexpected difference (-want, +got): %v", diff) - } -} - -func canonicalizeHeaders(rvs ...requestValidation) { - // HTTP header names are case-insensitive, so normalize them to lower case for comparison. - for _, rv := range rvs { - headers := rv.Headers - for n, v := range headers { - delete(headers, n) - ln := strings.ToLower(n) - if _, present := unimportantHeaders[ln]; !present { - headers[ln] = v - } - } - } -} diff --git a/pkg/buses/message_receiver.go b/pkg/buses/message_receiver.go deleted file mode 100644 index a4aa93f4435..00000000000 --- a/pkg/buses/message_receiver.go +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright 2018 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package buses - -import ( - "fmt" - "io/ioutil" - "net/http" - "strings" - - "go.uber.org/zap" -) - -// MessageReceiver starts a server to receive new messages for the bus. The new -// message is emitted via the receiver function. -const ( - MessageReceiverPort = 8080 -) - -type MessageReceiver struct { - receiverFunc func(ChannelReference, *Message) error - forwardHeaders map[string]bool - forwardPrefixes []string - - logger *zap.SugaredLogger -} - -// NewMessageReceiver creates a message receiver passing new messages to the -// receiverFunc. -func NewMessageReceiver(receiverFunc func(ChannelReference, *Message) error, logger *zap.SugaredLogger) *MessageReceiver { - receiver := &MessageReceiver{ - receiverFunc: receiverFunc, - forwardHeaders: headerSet(forwardHeaders), - forwardPrefixes: forwardPrefixes, - - logger: logger, - } - return receiver -} - -// Run starts receiving messages for the receiver. -// -// Only HTTP POST requests to the root path (/) are accepted. If other paths or -// methods are needed, use the HandleRequest method directly with another HTTP -// server. -// -// This method will block until a message is received on the stop channel. -func (r *MessageReceiver) Run(stopCh <-chan struct{}) { - svr := r.start() - defer r.stop(svr) - - <-stopCh -} - -func (r *MessageReceiver) start() *http.Server { - r.logger.Info("Starting web server") - srv := &http.Server{ - Addr: fmt.Sprintf(":%d", MessageReceiverPort), - Handler: http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - if req.URL.Path != "/" { - res.WriteHeader(http.StatusNotFound) - return - } - if req.Method != http.MethodPost { - res.WriteHeader(http.StatusMethodNotAllowed) - return - } - - r.HandleRequest(res, req) - }), - } - go func() { - if err := srv.ListenAndServe(); err != http.ErrServerClosed { - r.logger.Errorf("HttpServer: ListenAndServe() error: %v", err) - } - }() - return srv -} - -func (r *MessageReceiver) stop(srv *http.Server) { - r.logger.Info("Shutdown web server") - if err := srv.Shutdown(nil); err != nil { - r.logger.Fatal(err) - } -} - -// HandleRequest is an http Handler function. The request is converted to a -// Message and emitted to the receiver func. -// -// The response status codes: -// 202 - the message was sent to subscibers -// 404 - the request was for an unknown channel -// 500 - an error occurred processing the request -func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Request) { - host := req.Host - r.logger.Infof("Received request for %s", host) - channel := ParseChannel(host) - - message, err := r.fromRequest(req) - if err != nil { - res.WriteHeader(http.StatusInternalServerError) - return - } - - err = r.receiverFunc(channel, message) - if err != nil { - if err == ErrUnknownChannel { - res.WriteHeader(http.StatusNotFound) - } else { - res.WriteHeader(http.StatusInternalServerError) - } - return - } - - res.WriteHeader(http.StatusAccepted) -} - -func (r *MessageReceiver) fromRequest(req *http.Request) (*Message, error) { - body, err := ioutil.ReadAll(req.Body) - if err != nil { - return nil, err - } - headers := r.fromHTTPHeaders(req.Header) - message := &Message{ - Headers: headers, - Payload: body, - } - return message, nil -} - -// fromHTTPHeaders converts HTTP headers into a message header map. -// -// Only headers whitelisted as safe are copied. If an HTTP header exists -// multiple times, a single value will be retained. -func (r *MessageReceiver) fromHTTPHeaders(headers http.Header) map[string]string { - safe := map[string]string{} - - // TODO handle multi-value headers - for h, v := range headers { - // Headers are case-insensitive but test case are all lower-case - comparable := strings.ToLower(h) - if _, ok := r.forwardHeaders[comparable]; ok { - safe[h] = v[0] - continue - } - for _, p := range r.forwardPrefixes { - if strings.HasPrefix(comparable, p) { - safe[h] = v[0] - break - } - } - } - - return safe -} - -// ParseChannel converts the channel's hostname into a channel -// reference. -func ParseChannel(host string) ChannelReference { - chunks := strings.Split(host, ".") - return ChannelReference{ - Name: chunks[0], - Namespace: chunks[1], - } -} diff --git a/pkg/buses/references.go b/pkg/buses/references.go deleted file mode 100644 index 7455022fee5..00000000000 --- a/pkg/buses/references.go +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2018 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package buses - -import ( - "fmt" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" -) - -// ChannelReference references a Channel within the cluster by name and -// namespace. -type ChannelReference struct { - Namespace string - Name string -} - -// NewChannelReference creates a ChannelReference from a Channel -func NewChannelReference(channel *eventingv1alpha1.Channel) ChannelReference { - return NewChannelReferenceFromNames(channel.Name, channel.Namespace) -} - -// NewChannelReferenceFromSubscription creates a ChannelReference from a -// Subscription for a Channel. -func NewChannelReferenceFromSubscription(subscription *eventingv1alpha1.Subscription) ChannelReference { - return NewChannelReferenceFromNames(subscription.Spec.Channel.Name, subscription.Namespace) -} - -// NewChannelReferenceFromNames creates a ChannelReference for a name and -// namespace. -func NewChannelReferenceFromNames(name, namespace string) ChannelReference { - return ChannelReference{ - Namespace: namespace, - Name: name, - } -} - -func (r *ChannelReference) String() string { - return fmt.Sprintf("%s/%s", r.Namespace, r.Name) -} diff --git a/pkg/buses/references_test.go b/pkg/buses/references_test.go deleted file mode 100644 index ef75a5070fc..00000000000 --- a/pkg/buses/references_test.go +++ /dev/null @@ -1,95 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package buses_test - -import ( - "fmt" - "testing" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/buses" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -const ( - referencesTestNamespace = "test-namespace" - referencesTestChannelName = "test-channel" - referencesTestSubscriptionName = "test-subscription" -) - -func TestNewChannelReference(t *testing.T) { - channel := &eventingv1alpha1.Channel{ - ObjectMeta: metav1.ObjectMeta{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - }, - } - expected := buses.ChannelReference{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - } - actual := buses.NewChannelReference(channel) - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) - } -} - -func TestNewChannelReferenceFromSubscription(t *testing.T) { - subscription := &eventingv1alpha1.Subscription{ - ObjectMeta: metav1.ObjectMeta{ - Name: referencesTestSubscriptionName, - Namespace: referencesTestNamespace, - }, - Spec: eventingv1alpha1.SubscriptionSpec{ - Channel: corev1.ObjectReference{ - Name: referencesTestChannelName, - }, - }, - } - expected := buses.ChannelReference{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - } - actual := buses.NewChannelReferenceFromSubscription(subscription) - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) - } -} - -func TestNewChannelReferenceFromNames(t *testing.T) { - expected := buses.ChannelReference{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - } - actual := buses.NewChannelReferenceFromNames(referencesTestChannelName, referencesTestNamespace) - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) - } -} - -func TestChannelReference_String(t *testing.T) { - ref := buses.ChannelReference{ - Name: referencesTestChannelName, - Namespace: referencesTestNamespace, - } - expected := fmt.Sprintf("%s/%s", referencesTestNamespace, referencesTestChannelName) - actual := ref.String() - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "ChannelReference", expected, actual) - } -}