diff --git a/pkg/apis/duck/v1alpha1/routable_types.go b/pkg/apis/duck/v1alpha1/routable_types.go new file mode 100644 index 00000000000..d48e194e530 --- /dev/null +++ b/pkg/apis/duck/v1alpha1/routable_types.go @@ -0,0 +1,42 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// Routable is the schema for the routable portion of the spec +// section of the resource. +type Routable struct { + // This is the router specification. + Router RoutableSpec `json:"router,omitempty"` +} + +// RoutableSpec defines a router +type RoutableSpec struct { + // Ref is a reference to the router service + Ref *corev1.ObjectReference `json:"ref,omitempty"` + + // +optional + RouterURI string `json:"routerURI,omitempty"` + + // Arguments defines the arguments to pass to the Router + // +optional + Arguments *runtime.RawExtension `json:"arguments,omitempty"` +} diff --git a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go index 120fb29dbe5..056eeb6f56a 100644 --- a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go @@ -121,6 +121,49 @@ func (in *ChannelableStatus) DeepCopy() *ChannelableStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Routable) DeepCopyInto(out *Routable) { + *out = *in + in.Router.DeepCopyInto(&out.Router) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Routable. +func (in *Routable) DeepCopy() *Routable { + if in == nil { + return nil + } + out := new(Routable) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RoutableSpec) DeepCopyInto(out *RoutableSpec) { + *out = *in + if in.Ref != nil { + in, out := &in.Ref, &out.Ref + *out = new(v1.ObjectReference) + **out = **in + } + if in.Arguments != nil { + in, out := &in.Arguments, &out.Arguments + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RoutableSpec. +func (in *RoutableSpec) DeepCopy() *RoutableSpec { + if in == nil { + return nil + } + out := new(RoutableSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Subscribable) DeepCopyInto(out *Subscribable) { *out = *in diff --git a/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go b/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go index b2c65d685f3..abb566c5ff5 100644 --- a/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go +++ b/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go @@ -57,6 +57,9 @@ var _ webhook.GenericCRD = (*InMemoryChannel)(nil) type InMemoryChannelSpec struct { // Channel conforms to Duck type Subscribable. Subscribable *eventingduck.Subscribable `json:"subscribable,omitempty"` + + // Channel conforms to Duck type Routable. + Routable *eventingduck.Routable `json:"routable,omitempty"` } // ChannelStatus represents the current state of a Channel. diff --git a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go index 2c9fc5b68a0..1d0dd59742d 100644 --- a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go @@ -149,6 +149,11 @@ func (in *InMemoryChannelSpec) DeepCopyInto(out *InMemoryChannelSpec) { *out = new(duckv1alpha1.Subscribable) (*in).DeepCopyInto(*out) } + if in.Routable != nil { + in, out := &in.Routable, &out.Routable + *out = new(duckv1alpha1.Routable) + (*in).DeepCopyInto(*out) + } return } diff --git a/pkg/duck/router.go b/pkg/duck/router.go new file mode 100644 index 00000000000..792ec6e5cc7 --- /dev/null +++ b/pkg/duck/router.go @@ -0,0 +1,73 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package duck + +import ( + "context" + "errors" + + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + "github.com/knative/eventing/pkg/logging" + "github.com/knative/eventing/pkg/reconciler/names" + "github.com/knative/pkg/apis/duck" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + "k8s.io/client-go/dynamic" +) + +// ResolveRouterURI resolves the Router object. +func ResolveRouterURI(ctx context.Context, dynamicClient dynamic.Interface, namespace string, s *eventingduck.RoutableSpec, track Track) (string, error) { + if s == nil || s.Ref == nil { + return "", nil + } + + obj, err := ObjectReference(ctx, dynamicClient, namespace, s.Ref) + if err != nil { + logging.FromContext(ctx).Warn("Failed to fetch Router target", + zap.Error(err), + zap.Any("RouterSpec.Ref", s.Ref)) + return "", err + } + + // if err = track(*s.Ref); err != nil { + // return "", fmt.Errorf("unable to track the reference: %v", err) + // } + + // K8s services are special cased. They can be called, even though they do not satisfy the + // Callable interface. + if s.Ref != nil && s.Ref.APIVersion == "v1" && s.Ref.Kind == "Service" { + // This Service must exist because ObjectReference did not return an error. + return DomainToURL(names.ServiceHostName(s.Ref.Name, namespace)), nil + } + + t := duckv1alpha1.AddressableType{} + if err = duck.FromUnstructured(obj, &t); err == nil { + if t.Status.Address != nil { + url := t.Status.Address.GetURL() + return url.String(), nil + } + } + + legacy := duckv1alpha1.LegacyTarget{} + if err = duck.FromUnstructured(obj, &legacy); err == nil { + if legacy.Status.DomainInternal != "" { + return DomainToURL(legacy.Status.DomainInternal), nil + } + } + + return "", errors.New("status does not contain address") +} diff --git a/pkg/provisioners/fanout/fanout_handler.go b/pkg/provisioners/fanout/fanout_handler.go index ee6cf87b619..766f68f4abc 100644 --- a/pkg/provisioners/fanout/fanout_handler.go +++ b/pkg/provisioners/fanout/fanout_handler.go @@ -23,6 +23,7 @@ package fanout import ( "errors" + "fmt" "net/http" "time" @@ -40,6 +41,10 @@ const ( // Config for a fanout.Handler. type Config struct { Subscriptions []eventingduck.SubscriberSpec `json:"subscriptions"` + + // Router specifies the service determining where to dispatch events. + Router *eventingduck.RoutableSpec `'json:"router"` + // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. // It is expected to be false when used as a sidecar. AsyncHandler bool `json:"asyncHandler,omitempty"` @@ -53,6 +58,9 @@ type Handler struct { receiver *provisioners.MessageReceiver dispatcher *provisioners.MessageDispatcher + // Subscribers index + subscribers map[string]int + // TODO: Plumb context through the receiver and dispatcher and use that to store the timeout, // rather than a member variable. timeout time.Duration @@ -75,8 +83,18 @@ func NewHandler(logger *zap.Logger, config Config) (*Handler, error) { config: config, dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), receivedMessages: make(chan *forwardMessage, messageBufferSize), - timeout: defaultTimeout, + + timeout: defaultTimeout, } + if config.Router != nil && config.Subscriptions != nil { + handler.subscribers = make(map[string]int) + for i, sub := range config.Subscriptions { + // TODO: can we not deprecate ref? + key := fmt.Sprintf("%s/%s", sub.DeprecatedRef.Namespace, sub.DeprecatedRef.Name) + handler.subscribers[key] = i + } + } + // The receiver function needs to point back at the handler itself, so set it up after // initialization. receiver, err := provisioners.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar()) @@ -108,10 +126,35 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // requests return successfully, then return nil. Else, return an error. func (f *Handler) dispatch(msg *provisioners.Message) error { errorCh := make(chan error, len(f.config.Subscriptions)) - for _, sub := range f.config.Subscriptions { - go func(s eventingduck.SubscriberSpec) { - errorCh <- f.makeFanoutRequest(*msg, s) - }(sub) + + if f.config.Router != nil { + routes, err := f.dispatcher.ComputeRoutes(msg, f.config.Router.RouterURI, provisioners.DispatchDefaults{}) + if err != nil { + f.logger.Error("Compute routes had an error", zap.Error(err)) + return err + } + subsCount := len(f.config.Subscriptions) + + for _, name := range routes { + i, ok := f.subscribers[name] + if !ok { + errorCh <- fmt.Errorf("Invalid route %v", name) + } else { + go func(s eventingduck.SubscriberSpec) { + errorCh <- f.makeFanoutRequest(*msg, s) + }(f.config.Subscriptions[i]) + } + } + + for i := subsCount - len(routes); i >= 0; i-- { + errorCh <- nil + } + } else { + for _, sub := range f.config.Subscriptions { + go func(s eventingduck.SubscriberSpec) { + errorCh <- f.makeFanoutRequest(*msg, s) + }(sub) + } } for range f.config.Subscriptions { diff --git a/pkg/provisioners/message_dispatcher.go b/pkg/provisioners/message_dispatcher.go index 871824b853c..0af129c960f 100644 --- a/pkg/provisioners/message_dispatcher.go +++ b/pkg/provisioners/message_dispatcher.go @@ -18,6 +18,7 @@ package provisioners import ( "bytes" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -223,3 +224,41 @@ func (d *MessageDispatcher) resolveURL(destination string, defaultNamespace stri Path: "/", } } + +// ComputeRoutes sends the message to the router. +func (d *MessageDispatcher) ComputeRoutes(message *Message, router string, defaults DispatchDefaults) ([]string, error) { + routerURL := d.resolveURL(router, defaults.Namespace) + req, err := http.NewRequest(http.MethodPost, routerURL.String(), bytes.NewReader(message.Payload)) + if err != nil { + return nil, fmt.Errorf("unable to create request %v", err) + } + req.Header = d.toHTTPHeaders(message.Headers) + + res, err := d.httpClient.Do(req) + if err != nil { + return nil, err + } + if res == nil { + // I don't think this is actually reachable with http.Client.Do(), but just to be sure we + // check anyway. + return nil, errors.New("non-error nil result from http.Client.Do()") + } + defer res.Body.Close() + if isFailure(res.StatusCode) { + // reject non-successful responses + return nil, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", res.StatusCode) + } + + payload, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("Unable to read response %v", err) + } + + var routes []string + err = json.Unmarshal(payload, &routes) + if err != nil { + return nil, err + } + + return routes, nil +} diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 71b2ea4bc58..7d1beb6acbd 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -153,6 +153,9 @@ func (r *Reconciler) newConfigFromInMemoryChannels(channels []*v1alpha1.InMemory AsyncHandler: true, Subscriptions: c.Spec.Subscribable.Subscribers, } + if c.Spec.Routable != nil && c.Spec.Routable.Router.RouterURI != "" { + channelConfig.FanoutConfig.Router = &c.Spec.Routable.Router + } } cc = append(cc, channelConfig) }