Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
# Updates the status to reflect subscribable status.
- apiGroups:
- messaging.knative.dev
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Knative Authors
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -20,46 +20,45 @@ import (
"errors"
"time"

cloudevents "github.com/cloudevents/sdk-go/v1"
"go.uber.org/zap"

"knative.dev/eventing/pkg/channel/multichannelfanout"
"knative.dev/eventing/pkg/channel/swappable"
"knative.dev/eventing/pkg/kncloudevents"
)

type Dispatcher interface {
UpdateConfig(config *multichannelfanout.Config) error
type MessageDispatcher interface {
UpdateConfig(ctx context.Context, config *multichannelfanout.Config) error
}

type InMemoryDispatcher struct {
handler *swappable.Handler
ceClient cloudevents.Client
writeTimeout time.Duration
logger *zap.Logger
type InMemoryMessageDispatcher struct {
handler *swappable.MessageHandler
httpBindingsReceiver *kncloudevents.HttpMessageReceiver
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice nice nice

writeTimeout time.Duration
logger *zap.Logger
}

type InMemoryDispatcherArgs struct {
type InMemoryMessageDispatcherArgs struct {
Port int
ReadTimeout time.Duration
WriteTimeout time.Duration
Handler *swappable.Handler
Handler *swappable.MessageHandler
Logger *zap.Logger
}

func (d *InMemoryDispatcher) UpdateConfig(config *multichannelfanout.Config) error {
return d.handler.UpdateConfig(config)
func (d *InMemoryMessageDispatcher) UpdateConfig(ctx context.Context, config *multichannelfanout.Config) error {
return d.handler.UpdateConfig(ctx, config)
}

// Start starts the inmemory dispatcher's message processing.
// This is a blocking call.
func (d *InMemoryDispatcher) Start(ctx context.Context) error {
func (d *InMemoryMessageDispatcher) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

errCh := make(chan error, 1)
go func() {
errCh <- d.ceClient.StartReceiver(ctx, d.handler.ServeHTTP)
errCh <- d.httpBindingsReceiver.StartListen(ctx, d.handler)
}()

// Stop either if the receiver stops (sending to errCh) or if the context Done channel is closed.
Expand All @@ -70,28 +69,26 @@ func (d *InMemoryDispatcher) Start(ctx context.Context) error {
break
}

// Done channel has been closed, we need to gracefully shutdown d.ceClient. The cancel() method will start its
// Done channel has been closed, we need to gracefully shutdown d.bindingsReceiver. The cancel() method will start its
// shutdown, if it hasn't finished in a reasonable amount of time, just return an error.
cancel()
select {
case err := <-errCh:
return err
case <-time.After(d.writeTimeout):
return errors.New("timeout shutting down ceClient")
return errors.New("timeout shutting http bindings receiver")
}
}

func NewDispatcher(args *InMemoryDispatcherArgs) *InMemoryDispatcher {
// TODO set read and write timeouts and port?
ceClient, err := kncloudevents.NewDefaultClient()
if err != nil {
args.Logger.Fatal("failed to create cloudevents client", zap.Error(err))
}
func NewMessageDispatcher(args *InMemoryMessageDispatcherArgs) *InMemoryMessageDispatcher {
// TODO set read and write timeouts?
bindingsReceiver := kncloudevents.NewHttpMessageReceiver(args.Port)
Comment thread
nlopezgi marked this conversation as resolved.

dispatcher := &InMemoryDispatcher{
handler: args.Handler,
ceClient: ceClient,
logger: args.Logger,
dispatcher := &InMemoryMessageDispatcher{
handler: args.Handler,
httpBindingsReceiver: bindingsReceiver,
logger: args.Logger,
writeTimeout: args.WriteTimeout,
}

return dispatcher
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Knative Authors
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@ limitations under the License.
package inmemorychannel

import (
"context"
"testing"
"time"

Expand All @@ -25,23 +26,23 @@ import (
logtesting "knative.dev/pkg/logging/testing"
)

func TestNewDispatcher(t *testing.T) {
func TestNewMessageDispatcher(t *testing.T) {
logger := logtesting.TestLogger(t).Desugar()
sh, err := swappable.NewEmptyHandler(logger)
sh, err := swappable.NewEmptyMessageHandler(context.TODO(), logger)

if err != nil {
t.Fatalf("Failed to create handler")
}

args := &InMemoryDispatcherArgs{
args := &InMemoryMessageDispatcherArgs{
Port: 8080,
ReadTimeout: 1 * time.Minute,
WriteTimeout: 1 * time.Minute,
Handler: sh,
Logger: logger,
}

d := NewDispatcher(args)
d := NewMessageDispatcher(args)

if d == nil {
t.Fatalf("Failed to create with NewDispatcher")
Expand Down
8 changes: 4 additions & 4 deletions pkg/reconciler/inmemorychannel/dispatcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ func NewController(
logger.Fatalw("Error setting up trace publishing", zap.Error(err))
}

sh, err := swappable.NewEmptyHandler(logger.Desugar())
sh, err := swappable.NewEmptyMessageHandler(ctx, logger.Desugar())
if err != nil {
logger.Fatalw("Error creating swappable.Handler", zap.Error(err))
logger.Fatalw("Error creating swappable.MessageHandler", zap.Error(err))
}

args := &inmemorychannel.InMemoryDispatcherArgs{
args := &inmemorychannel.InMemoryMessageDispatcherArgs{
Port: port,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
Handler: sh,
Logger: logger.Desugar(),
}
inMemoryDispatcher := inmemorychannel.NewDispatcher(args)
inMemoryDispatcher := inmemorychannel.NewMessageDispatcher(args)

inmemorychannelInformer := inmemorychannelinformer.Get(ctx)
informer := inmemorychannelInformer.Informer()
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
// Reconciler reconciles InMemory Channels.
type Reconciler struct {
configStore *channel.EventDispatcherConfigStore
dispatcher inmemorychannel.Dispatcher
dispatcher inmemorychannel.MessageDispatcher
inmemorychannelLister listers.InMemoryChannelLister
inmemorychannelInformer cache.SharedIndexInformer
}
Expand All @@ -61,7 +61,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1alpha1.InMemoryCh
}

config := r.newConfigFromInMemoryChannels(inmemoryChannels)
err = r.dispatcher.UpdateConfig(config)
err = r.dispatcher.UpdateConfig(ctx, config)
if err != nil {
logging.FromContext(ctx).Error("Error updating InMemory dispatcher config")
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestAllCases(t *testing.T) {

type fakeDispatcher struct{}

func (d *fakeDispatcher) UpdateConfig(config *multichannelfanout.Config) error {
func (d *fakeDispatcher) UpdateConfig(ctx context.Context, config *multichannelfanout.Config) error {
// TODO set error
return nil
}