Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/fanoutsidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/knative/eventing/pkg/sidecar/configmap/filesystem"
"github.com/knative/eventing/pkg/sidecar/configmap/watcher"
"github.com/knative/eventing/pkg/sidecar/fanout"
"github.com/knative/eventing/pkg/sidecar/swappable"
"github.com/knative/eventing/pkg/system"
"go.uber.org/zap"
Expand All @@ -53,13 +54,15 @@ var (
writeTimeout = 1 * time.Minute

port int
blocking bool
configMapNoticer string
configMapNamespace string
configMapName string
)

func init() {
flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.")
flag.BoolVar(&blocking, "blocking", true, "Block on receive while dispatching.")
flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerValues()))
flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace, "The namespace of the ConfigMap that is watched for configuration.")
flag.StringVar(&configMapName, "config_map_name", defaultConfigMapName, "The name of the ConfigMap that is watched for configuration.")
Expand All @@ -81,7 +84,7 @@ func main() {
logger.Fatal("--sidecar_port flag must be set")
}

sh, err := swappable.NewEmptyHandler(logger)
sh, err := swappable.NewEmptyHandler(logger, fanout.StaticConfig{Blocking: blocking})
if err != nil {
logger.Fatal("Unable to create swappable.Handler", zap.Error(err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ spec:
image: github.com/knative/eventing/cmd/fanoutsidecar
args:
- --sidecar_port=8080
- --blocking=false
- --config_map_noticer=watcher
- --config_map_namespace=knative-eventing
- --config_map_name=in-memory-channel-dispatcher-config-map
18 changes: 15 additions & 3 deletions pkg/sidecar/fanout/fanout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ type Config struct {
Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"`
}

type StaticConfig struct {
Blocking bool
}

// http.Handler that takes a single request in and fans it out to N other servers.
type Handler struct {
config Config
config Config
staticConfig StaticConfig

receivedMessages chan *forwardMessage
receiver *provisioners.MessageReceiver
Expand All @@ -66,10 +71,11 @@ type forwardMessage struct {
}

// NewHandler creates a new fanout.Handler.
func NewHandler(logger *zap.Logger, config Config) *Handler {
func NewHandler(logger *zap.Logger, config Config, staticConfig StaticConfig) *Handler {
handler := &Handler{
logger: logger,
config: config,
staticConfig: staticConfig,
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),
receivedMessages: make(chan *forwardMessage, messageBufferSize),
timeout: defaultTimeout,
Expand All @@ -83,7 +89,13 @@ func NewHandler(logger *zap.Logger, config Config) *Handler {

func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {
return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
return f.dispatch(m)
if f.staticConfig.Blocking {
// hold receive open until dispatch finishes, report dispatch errors
return f.dispatch(m)
}
// ack receive immediately, dispatch is good faith attempt, ignore errors
go f.dispatch(m)
return nil
}
}

Expand Down
39 changes: 38 additions & 1 deletion pkg/sidecar/fanout/fanout_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
subscriber func(http.ResponseWriter, *http.Request)
channel func(http.ResponseWriter, *http.Request)
expectedStatus int
staticConfig StaticConfig
}{
"rejected by receiver": {
receiverFunc: func(provisioners.ChannelReference, *provisioners.Message) error {
return errors.New("rejected by test-receiver")
},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusInternalServerError,
},
"fanout times out": {
Expand All @@ -82,16 +84,33 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
time.Sleep(10 * time.Millisecond)
writer.WriteHeader(http.StatusAccepted)
},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusInternalServerError,
},
"fanout times out, non-blocking": {
timeout: time.Millisecond,
subs: []eventingduck.ChannelSubscriberSpec{
{
SubscriberURI: replaceSubscriber,
},
},
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
time.Sleep(10 * time.Millisecond)
writer.WriteHeader(http.StatusAccepted)
},
staticConfig: StaticConfig{Blocking: false},
expectedStatus: http.StatusAccepted,
},
"zero subs succeed": {
subs: []eventingduck.ChannelSubscriberSpec{},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusAccepted,
},
"empty sub succeeds": {
subs: []eventingduck.ChannelSubscriberSpec{
{},
},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusAccepted,
},
"reply fails": {
Expand All @@ -103,6 +122,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
channel: func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusNotFound)
},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusInternalServerError,
},
"subscriber fails": {
Expand All @@ -114,8 +134,21 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusNotFound)
},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusInternalServerError,
},
"subscriber fails, non-blocking": {
subs: []eventingduck.ChannelSubscriberSpec{
{
SubscriberURI: replaceSubscriber,
},
},
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusNotFound)
},
staticConfig: StaticConfig{Blocking: false},
expectedStatus: http.StatusAccepted,
},
"subscriber succeeds, result fails": {
subs: []eventingduck.ChannelSubscriberSpec{
{
Expand All @@ -127,6 +160,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
channel: func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusForbidden)
},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusInternalServerError,
},
"one sub succeeds": {
Expand All @@ -140,6 +174,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
channel: func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusAccepted)
},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusAccepted,
},
"one sub succeeds, one sub fails": {
Expand All @@ -155,6 +190,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
},
subscriber: callableSucceed,
channel: (&succeedOnce{}).handler,
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusInternalServerError,
},
"all subs succeed": {
Expand All @@ -176,6 +212,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
channel: func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusAccepted)
},
staticConfig: StaticConfig{Blocking: true},
expectedStatus: http.StatusAccepted,
},
}
Expand Down Expand Up @@ -205,7 +242,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
subs = append(subs, sub)
}

h := NewHandler(zap.NewNop(), Config{Subscriptions: subs})
h := NewHandler(zap.NewNop(), Config{Subscriptions: subs}, tc.staticConfig)
if tc.receiverFunc != nil {
h.receiver = provisioners.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar())
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,19 @@ func getChannelKey(r *http.Request) (string, error) {
// on, and then delegates handling of that request to the single fanout.Handler corresponding to
// that Channel.
type Handler struct {
logger *zap.Logger
handlers map[string]*fanout.Handler
config Config
logger *zap.Logger
handlers map[string]*fanout.Handler
config Config
staticConfig fanout.StaticConfig
}

// NewHandler creates a new Handler.
func NewHandler(logger *zap.Logger, conf Config) (*Handler, error) {
func NewHandler(logger *zap.Logger, conf Config, staticConf fanout.StaticConfig) (*Handler, error) {
handlers := make(map[string]*fanout.Handler, len(conf.ChannelConfigs))

for _, cc := range conf.ChannelConfigs {
key := makeChannelKeyFromConfig(cc)
handler := fanout.NewHandler(logger, cc.FanoutConfig)
handler := fanout.NewHandler(logger, cc.FanoutConfig, staticConf)
if _, present := handlers[key]; present {
logger.Error("Duplicate channel key", zap.String("channelKey", key))
return nil, fmt.Errorf("duplicate channel key: %v", key)
Expand All @@ -91,9 +92,10 @@ func NewHandler(logger *zap.Logger, conf Config) (*Handler, error) {
}

return &Handler{
logger: logger,
config: conf,
handlers: handlers,
logger: logger,
config: conf,
staticConfig: staticConf,
handlers: handlers,
}, nil
}

Expand All @@ -107,7 +109,7 @@ func (h *Handler) ConfigDiff(updated Config) string {
// CopyWithNewConfig creates a new copy of this Handler with all the fields identical, except the
// new Handler uses conf, rather than copying the existing Handler's config.
func (h *Handler) CopyWithNewConfig(conf Config) (*Handler, error) {
return NewHandler(h.logger, conf)
return NewHandler(h.logger, conf, h.staticConfig)
}

// ServeHTTP delegates the actual handling of the request to a fanout.Handler, based on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNewHandler(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, err := NewHandler(zap.NewNop(), tc.config)
_, err := NewHandler(zap.NewNop(), tc.config, fanout.StaticConfig{})
if tc.createErr != "" {
if err == nil {
t.Errorf("Expected NewHandler error: '%v'. Actual nil", tc.createErr)
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestCopyWithNewConfig(t *testing.T) {
if cmp.Equal(orig, updated) {
t.Errorf("Orig and updated must be different")
}
h, err := NewHandler(zap.NewNop(), orig)
h, err := NewHandler(zap.NewNop(), orig, fanout.StaticConfig{})
if err != nil {
t.Errorf("Unable to create handler, %v", err)
}
Expand All @@ -153,6 +153,9 @@ func TestCopyWithNewConfig(t *testing.T) {
if !cmp.Equal(newH.config, updated) {
t.Errorf("Incorrect copied config. Expected '%v'. Actual '%v'", updated, newH.config)
}
if h.staticConfig != newH.staticConfig {
t.Errorf("Incorrect copied static config. Expected '%v'. Actual '%v'", h.staticConfig, newH.staticConfig)
}
}

func TestConfigDiff(t *testing.T) {
Expand Down Expand Up @@ -206,7 +209,7 @@ func TestConfigDiff(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
h, err := NewHandler(zap.NewNop(), tc.orig)
h, err := NewHandler(zap.NewNop(), tc.orig, fanout.StaticConfig{})
if err != nil {
t.Errorf("Unable to create handler: %v", err)
}
Expand All @@ -223,6 +226,7 @@ func TestServeHTTP(t *testing.T) {
testCases := map[string]struct {
name string
config Config
staticConfig fanout.StaticConfig
respStatusCode int
key string
expectedStatusCode int
Expand All @@ -237,7 +241,7 @@ func TestServeHTTP(t *testing.T) {
key: "no-dot",
expectedStatusCode: http.StatusInternalServerError,
},
"pass through failure": {
"pass through failure, blocking": {
config: Config{
ChannelConfigs: []ChannelConfig{
{
Expand All @@ -253,10 +257,33 @@ func TestServeHTTP(t *testing.T) {
},
},
},
staticConfig: fanout.StaticConfig{Blocking: true},
respStatusCode: http.StatusInternalServerError,
key: "first-channel.default",
expectedStatusCode: http.StatusInternalServerError,
},
"pass through failure, non-blocking": {
config: Config{
ChannelConfigs: []ChannelConfig{
{
Namespace: "default",
Name: "first-channel",
FanoutConfig: fanout.Config{
Subscriptions: []eventingduck.ChannelSubscriberSpec{
{
ReplyURI: replaceDomain,
},
},
},
},
},
},
// because the config is non-blocking, even though the dispatch response fails, the receiver response is accepted
staticConfig: fanout.StaticConfig{Blocking: false},
respStatusCode: http.StatusInternalServerError,
key: "first-channel.default",
expectedStatusCode: http.StatusAccepted,
},
"choose channel": {
config: Config{
ChannelConfigs: []ChannelConfig{
Expand Down Expand Up @@ -301,7 +328,7 @@ func TestServeHTTP(t *testing.T) {
// Rewrite the replaceDomains to call the server we just created.
replaceDomains(tc.config, server.URL[7:])

h, err := NewHandler(zap.NewNop(), tc.config)
h, err := NewHandler(zap.NewNop(), tc.config, tc.staticConfig)
if err != nil {
t.Errorf("Unexpected NewHandler error: '%v'", err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sidecar/swappable/swappable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"
"sync/atomic"

"github.com/knative/eventing/pkg/sidecar/fanout"
"github.com/knative/eventing/pkg/sidecar/multichannelfanout"
"go.uber.org/zap"
)
Expand All @@ -54,8 +55,8 @@ func NewHandler(handler *multichannelfanout.Handler, logger *zap.Logger) *Handle
return h
}

func NewEmptyHandler(logger *zap.Logger) (*Handler, error) {
h, err := multichannelfanout.NewHandler(logger, multichannelfanout.Config{})
func NewEmptyHandler(logger *zap.Logger, staticConfig fanout.StaticConfig) (*Handler, error) {
h, err := multichannelfanout.NewHandler(logger, multichannelfanout.Config{}, staticConfig)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sidecar/swappable/swappable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestHandler(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
h, err := NewEmptyHandler(zap.NewNop())
h, err := NewEmptyHandler(zap.NewNop(), fanout.StaticConfig{})
if err != nil {
t.Errorf("Unexpected error creating handler: %v", err)
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestHandler_InvalidConfigChange(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
h, err := NewEmptyHandler(zap.NewNop())
h, err := NewEmptyHandler(zap.NewNop(), fanout.StaticConfig{})
if err != nil {
t.Errorf("Unexpected error creating handler: %v", err)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestHandler_InvalidConfigChange(t *testing.T) {
}

func TestHandler_NilConfigChange(t *testing.T) {
h, err := NewEmptyHandler(zap.NewNop())
h, err := NewEmptyHandler(zap.NewNop(), fanout.StaticConfig{})
if err != nil {
t.Errorf("Unexpected error creating handler: %v", err)
}
Expand Down