Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions pkg/apis/duck/v1alpha1/routable_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)

// Routable is the schema for the routable portion of the spec
// section of the resource.
type Routable struct {
// This is the router specification.
Router RoutableSpec `json:"router,omitempty"`
}

// RoutableSpec defines a router
type RoutableSpec struct {
// Ref is a reference to the router service
Ref *corev1.ObjectReference `json:"ref,omitempty"`

// +optional
RouterURI string `json:"routerURI,omitempty"`

// Arguments defines the arguments to pass to the Router
// +optional
Arguments *runtime.RawExtension `json:"arguments,omitempty"`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the past we've had issues with "bag of JSON" or "bag of string-string" in terms of tooling usability and underspecifying the actual behavior. You might want to look at #930 for some of the discussion on filtering language and behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually removed arguments in the latest proposal but not in the code.

}
43 changes: 43 additions & 0 deletions pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/apis/messaging/v1alpha1/in_memory_channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ var _ webhook.GenericCRD = (*InMemoryChannel)(nil)
type InMemoryChannelSpec struct {
// Channel conforms to Duck type Subscribable.
Subscribable *eventingduck.Subscribable `json:"subscribable,omitempty"`

// Channel conforms to Duck type Routable.
Routable *eventingduck.Routable `json:"routable,omitempty"`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if both subscribable and routable is set on the same object? Are they "inclusive and" or "xor"?

If they are "and", why not simply add Arguments to Subscription for this behavior? (Again, I think this may be too generic to be well-supported across implementations, but having two different types for interacting with Channel seems even more ambiguous.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior is routable takes precedence.

}

// ChannelStatus represents the current state of a Channel.
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 73 additions & 0 deletions pkg/duck/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package duck

import (
"context"
"errors"

eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/reconciler/names"
"github.com/knative/pkg/apis/duck"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"go.uber.org/zap"
"k8s.io/client-go/dynamic"
)

// ResolveRouterURI resolves the Router object.
func ResolveRouterURI(ctx context.Context, dynamicClient dynamic.Interface, namespace string, s *eventingduck.RoutableSpec, track Track) (string, error) {
if s == nil || s.Ref == nil {
return "", nil
}

obj, err := ObjectReference(ctx, dynamicClient, namespace, s.Ref)
if err != nil {
logging.FromContext(ctx).Warn("Failed to fetch Router target",
zap.Error(err),
zap.Any("RouterSpec.Ref", s.Ref))
return "", err
}

// if err = track(*s.Ref); err != nil {
// return "", fmt.Errorf("unable to track the reference: %v", err)
// }

// K8s services are special cased. They can be called, even though they do not satisfy the
// Callable interface.
if s.Ref != nil && s.Ref.APIVersion == "v1" && s.Ref.Kind == "Service" {
// This Service must exist because ObjectReference did not return an error.
return DomainToURL(names.ServiceHostName(s.Ref.Name, namespace)), nil
}

t := duckv1alpha1.AddressableType{}
if err = duck.FromUnstructured(obj, &t); err == nil {
if t.Status.Address != nil {
url := t.Status.Address.GetURL()
return url.String(), nil
}
}

legacy := duckv1alpha1.LegacyTarget{}
if err = duck.FromUnstructured(obj, &legacy); err == nil {
if legacy.Status.DomainInternal != "" {
return DomainToURL(legacy.Status.DomainInternal), nil
}
}

return "", errors.New("status does not contain address")
}
53 changes: 48 additions & 5 deletions pkg/provisioners/fanout/fanout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package fanout

import (
"errors"
"fmt"
"net/http"
"time"

Expand All @@ -40,6 +41,10 @@ const (
// Config for a fanout.Handler.
type Config struct {
Subscriptions []eventingduck.SubscriberSpec `json:"subscriptions"`

// Router specifies the service determining where to dispatch events.
Router *eventingduck.RoutableSpec `'json:"router"`

// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
AsyncHandler bool `json:"asyncHandler,omitempty"`
Expand All @@ -53,6 +58,9 @@ type Handler struct {
receiver *provisioners.MessageReceiver
dispatcher *provisioners.MessageDispatcher

// Subscribers index
subscribers map[string]int

// TODO: Plumb context through the receiver and dispatcher and use that to store the timeout,
// rather than a member variable.
timeout time.Duration
Expand All @@ -75,8 +83,18 @@ func NewHandler(logger *zap.Logger, config Config) (*Handler, error) {
config: config,
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),
receivedMessages: make(chan *forwardMessage, messageBufferSize),
timeout: defaultTimeout,

timeout: defaultTimeout,
}
if config.Router != nil && config.Subscriptions != nil {
handler.subscribers = make(map[string]int)
for i, sub := range config.Subscriptions {
// TODO: can we not deprecate ref?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, I think "ref" can get confusing when objects are deleted and re-created quickly. Using a ID prevents this sort of confusion.

key := fmt.Sprintf("%s/%s", sub.DeprecatedRef.Namespace, sub.DeprecatedRef.Name)
handler.subscribers[key] = i
}
}

// The receiver function needs to point back at the handler itself, so set it up after
// initialization.
receiver, err := provisioners.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar())
Expand Down Expand Up @@ -108,10 +126,35 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// requests return successfully, then return nil. Else, return an error.
func (f *Handler) dispatch(msg *provisioners.Message) error {
errorCh := make(chan error, len(f.config.Subscriptions))
for _, sub := range f.config.Subscriptions {
go func(s eventingduck.SubscriberSpec) {
errorCh <- f.makeFanoutRequest(*msg, s)
}(sub)

if f.config.Router != nil {
routes, err := f.dispatcher.ComputeRoutes(msg, f.config.Router.RouterURI, provisioners.DispatchDefaults{})
if err != nil {
f.logger.Error("Compute routes had an error", zap.Error(err))
return err
}
subsCount := len(f.config.Subscriptions)

for _, name := range routes {
i, ok := f.subscribers[name]
if !ok {
errorCh <- fmt.Errorf("Invalid route %v", name)
} else {
go func(s eventingduck.SubscriberSpec) {
errorCh <- f.makeFanoutRequest(*msg, s)
}(f.config.Subscriptions[i])
}
}

for i := subsCount - len(routes); i >= 0; i-- {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this might be simpler to have routes be a set, and then iterate over subscribers and compare the name with the set. If no match, you can send back a nil. If there is a match, you can pop from the set and return the result of the request.

If you want to log any names that don't match Subscriptions as errors, you can then log them directly, and the accounting is a bit simpler to track, I think.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we want to have another method to handle the router case so that we don't have to iterate over all subscriptions. I mean this could also be done in dispatch, but that'd cleaner.

errorCh <- nil
}
} else {
for _, sub := range f.config.Subscriptions {
go func(s eventingduck.SubscriberSpec) {
errorCh <- f.makeFanoutRequest(*msg, s)
}(sub)
}
}

for range f.config.Subscriptions {
Expand Down
39 changes: 39 additions & 0 deletions pkg/provisioners/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package provisioners

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -223,3 +224,41 @@ func (d *MessageDispatcher) resolveURL(destination string, defaultNamespace stri
Path: "/",
}
}

// ComputeRoutes sends the message to the router.
func (d *MessageDispatcher) ComputeRoutes(message *Message, router string, defaults DispatchDefaults) ([]string, error) {
routerURL := d.resolveURL(router, defaults.Namespace)
req, err := http.NewRequest(http.MethodPost, routerURL.String(), bytes.NewReader(message.Payload))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A CoudEvent router will only need the attributes of the CloudEvent context. Sending the complete message to the router will not be necessary in all cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary for content-based routing. Are you suggesting the router should advertise what attributes it needs (could initially vs coarse-grained, context vs content vs both)?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that there is also content-based routing, but the Broker concept so far completely relies on CloudEvents. After all, this is exactly what CloudEvents are designed for. To make it short, yes. 😄
I know that in this stage we should not optimize too much. I only brought it up, because routing based on the context attributes is a major use case.

if err != nil {
return nil, fmt.Errorf("unable to create request %v", err)
}
req.Header = d.toHTTPHeaders(message.Headers)

res, err := d.httpClient.Do(req)
if err != nil {
return nil, err
}
if res == nil {
// I don't think this is actually reachable with http.Client.Do(), but just to be sure we
// check anyway.
return nil, errors.New("non-error nil result from http.Client.Do()")
}
defer res.Body.Close()
if isFailure(res.StatusCode) {
// reject non-successful responses
return nil, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", res.StatusCode)
}

payload, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("Unable to read response %v", err)
}

var routes []string
err = json.Unmarshal(payload, &routes)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm uncomfortable with the implied contract here:

CloudEvent -> [ subscription_names ]

In particular, it feels like there's some spooky action-at-a-distance in that the subscription names are based on shared knowledge of apiserver state (which may be laggy), rather than information in the request itself.

I'd much prefer a protocol where the choices were included in the request payload, so that the function could be implemented without needing reference to a third system.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no apiserver state involved here. The subscription names are user-specified and the router does not manipulate any apiserver objects.

However there is the issue that the router can become out-of-sync with the channel subscribers, eg when manually adding Subscription. One solution is to provide two channel modes, pubsub vs routing. When a channel is in routing mode, only Router objects can manipulate the channel.

if err != nil {
return nil, err
}

return routes, nil
}
3 changes: 3 additions & 0 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ func (r *Reconciler) newConfigFromInMemoryChannels(channels []*v1alpha1.InMemory
AsyncHandler: true,
Subscriptions: c.Spec.Subscribable.Subscribers,
}
if c.Spec.Routable != nil && c.Spec.Routable.Router.RouterURI != "" {
channelConfig.FanoutConfig.Router = &c.Spec.Routable.Router
}
}
cc = append(cc, channelConfig)
}
Expand Down