From d748b720ab1fc0f15434ba03832bb4115be93563 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Wed, 9 Jan 2019 19:27:05 -0500 Subject: [PATCH 1/2] first take --- cmd/fanoutsidecar/main.go | 19 +++++++++++++++---- .../in-memory-channel/in-memory-channel.yaml | 1 + pkg/sidecar/fanout/fanout_handler.go | 9 ++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/cmd/fanoutsidecar/main.go b/cmd/fanoutsidecar/main.go index 629babe71e2..9acf06564f8 100644 --- a/cmd/fanoutsidecar/main.go +++ b/cmd/fanoutsidecar/main.go @@ -30,6 +30,7 @@ import ( "github.com/knative/eventing/pkg/sidecar/configmap/filesystem" "github.com/knative/eventing/pkg/sidecar/configmap/watcher" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" "github.com/knative/eventing/pkg/sidecar/swappable" "github.com/knative/eventing/pkg/system" "go.uber.org/zap" @@ -53,6 +54,7 @@ var ( writeTimeout = 1 * time.Minute port int + blocking bool configMapNoticer string configMapNamespace string configMapName string @@ -60,6 +62,7 @@ var ( func init() { flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.") + flag.BoolVar(&blocking, "blocking", true, "Block on receive while dispatching.") flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerValues())) flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace, "The namespace of the ConfigMap that is watched for configuration.") flag.StringVar(&configMapName, "config_map_name", defaultConfigMapName, "The name of the ConfigMap that is watched for configuration.") @@ -86,7 +89,7 @@ func main() { logger.Fatal("Unable to create swappable.Handler", zap.Error(err)) } - mgr, err := setupConfigMapNoticer(logger, sh.UpdateConfig) + mgr, err := setupConfigMapNoticer(logger, sh.UpdateConfig, blocking) if err != nil { logger.Fatal("Unable to create configMap noticer.", zap.Error(err)) } @@ -119,18 +122,26 @@ func main() { s.Shutdown(ctx) } -func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateConfig) (manager.Manager, error) { +func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateConfig, blocking bool) (manager.Manager, error) { mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) if err != nil { return nil, err logger.Error("Error starting manager.", zap.Error(err)) } + wrappedConfigUpdated := func(config *multichannelfanout.Config) error { + // update fanout config for blocking + for _, c := range config.ChannelConfigs { + c.FanoutConfig.Blocking = blocking + } + return configUpdated(config) + } + switch configMapNoticer { case cmnfVolume: - err = setupConfigMapVolume(logger, mgr, configUpdated) + err = setupConfigMapVolume(logger, mgr, wrappedConfigUpdated) case cmnfWatcher: - err = setupConfigMapWatcher(logger, mgr, configUpdated) + err = setupConfigMapWatcher(logger, mgr, wrappedConfigUpdated) default: err = fmt.Errorf("need to provide the --config_map_noticer flag (valid values are %s)", configMapNoticerValues()) } diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml index 51dc7b1a957..a51d2d4ca35 100644 --- a/config/provisioners/in-memory-channel/in-memory-channel.yaml +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -197,6 +197,7 @@ spec: image: github.com/knative/eventing/cmd/fanoutsidecar args: - --sidecar_port=8080 + - --blocking=false - --config_map_noticer=watcher - --config_map_namespace=knative-eventing - --config_map_name=in-memory-channel-dispatcher-config-map diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go index dd60be7fed0..750de14af75 100644 --- a/pkg/sidecar/fanout/fanout_handler.go +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -40,6 +40,7 @@ const ( // Configuration for a fanout.Handler. type Config struct { Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"` + Blocking bool `json:"-"` } // http.Handler that takes a single request in and fans it out to N other servers. @@ -83,7 +84,13 @@ func NewHandler(logger *zap.Logger, config Config) *Handler { func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error { return func(_ provisioners.ChannelReference, m *provisioners.Message) error { - return f.dispatch(m) + if f.config.Blocking { + // hold receive open until dispatch finishes, reporting dispatch errors + return f.dispatch(m) + } + // ack receive immediately, ignoring dispatch errors + go f.dispatch(m) + return nil } } From 697a83cec535a8a5f78907012118deb758babeb1 Mon Sep 17 00:00:00 2001 From: Scott Andrews Date: Wed, 9 Jan 2019 19:33:07 -0500 Subject: [PATCH 2/2] Tease out static configuration from dynamic configuration --- cmd/fanoutsidecar/main.go | 20 +++------- pkg/sidecar/fanout/fanout_handler.go | 17 +++++--- pkg/sidecar/fanout/fanout_handler_test.go | 39 ++++++++++++++++++- .../multi_channel_fanout_handler.go | 20 +++++----- .../multi_channel_fanout_handler_test.go | 37 +++++++++++++++--- pkg/sidecar/swappable/swappable.go | 5 ++- pkg/sidecar/swappable/swappable_test.go | 6 +-- 7 files changed, 104 insertions(+), 40 deletions(-) diff --git a/cmd/fanoutsidecar/main.go b/cmd/fanoutsidecar/main.go index 9acf06564f8..48de029d2ce 100644 --- a/cmd/fanoutsidecar/main.go +++ b/cmd/fanoutsidecar/main.go @@ -30,7 +30,7 @@ import ( "github.com/knative/eventing/pkg/sidecar/configmap/filesystem" "github.com/knative/eventing/pkg/sidecar/configmap/watcher" - "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/swappable" "github.com/knative/eventing/pkg/system" "go.uber.org/zap" @@ -84,12 +84,12 @@ func main() { logger.Fatal("--sidecar_port flag must be set") } - sh, err := swappable.NewEmptyHandler(logger) + sh, err := swappable.NewEmptyHandler(logger, fanout.StaticConfig{Blocking: blocking}) if err != nil { logger.Fatal("Unable to create swappable.Handler", zap.Error(err)) } - mgr, err := setupConfigMapNoticer(logger, sh.UpdateConfig, blocking) + mgr, err := setupConfigMapNoticer(logger, sh.UpdateConfig) if err != nil { logger.Fatal("Unable to create configMap noticer.", zap.Error(err)) } @@ -122,26 +122,18 @@ func main() { s.Shutdown(ctx) } -func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateConfig, blocking bool) (manager.Manager, error) { +func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateConfig) (manager.Manager, error) { mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) if err != nil { return nil, err logger.Error("Error starting manager.", zap.Error(err)) } - wrappedConfigUpdated := func(config *multichannelfanout.Config) error { - // update fanout config for blocking - for _, c := range config.ChannelConfigs { - c.FanoutConfig.Blocking = blocking - } - return configUpdated(config) - } - switch configMapNoticer { case cmnfVolume: - err = setupConfigMapVolume(logger, mgr, wrappedConfigUpdated) + err = setupConfigMapVolume(logger, mgr, configUpdated) case cmnfWatcher: - err = setupConfigMapWatcher(logger, mgr, wrappedConfigUpdated) + err = setupConfigMapWatcher(logger, mgr, configUpdated) default: err = fmt.Errorf("need to provide the --config_map_noticer flag (valid values are %s)", configMapNoticerValues()) } diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go index 750de14af75..b03d947affe 100644 --- a/pkg/sidecar/fanout/fanout_handler.go +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -40,12 +40,16 @@ const ( // Configuration for a fanout.Handler. type Config struct { Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"` - Blocking bool `json:"-"` +} + +type StaticConfig struct { + Blocking bool } // http.Handler that takes a single request in and fans it out to N other servers. type Handler struct { - config Config + config Config + staticConfig StaticConfig receivedMessages chan *forwardMessage receiver *provisioners.MessageReceiver @@ -67,10 +71,11 @@ type forwardMessage struct { } // NewHandler creates a new fanout.Handler. -func NewHandler(logger *zap.Logger, config Config) *Handler { +func NewHandler(logger *zap.Logger, config Config, staticConfig StaticConfig) *Handler { handler := &Handler{ logger: logger, config: config, + staticConfig: staticConfig, dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), receivedMessages: make(chan *forwardMessage, messageBufferSize), timeout: defaultTimeout, @@ -84,11 +89,11 @@ func NewHandler(logger *zap.Logger, config Config) *Handler { func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error { return func(_ provisioners.ChannelReference, m *provisioners.Message) error { - if f.config.Blocking { - // hold receive open until dispatch finishes, reporting dispatch errors + if f.staticConfig.Blocking { + // hold receive open until dispatch finishes, report dispatch errors return f.dispatch(m) } - // ack receive immediately, ignoring dispatch errors + // ack receive immediately, dispatch is good faith attempt, ignore errors go f.dispatch(m) return nil } diff --git a/pkg/sidecar/fanout/fanout_handler_test.go b/pkg/sidecar/fanout/fanout_handler_test.go index 6702984154d..b2583b9bc7f 100644 --- a/pkg/sidecar/fanout/fanout_handler_test.go +++ b/pkg/sidecar/fanout/fanout_handler_test.go @@ -64,11 +64,13 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { subscriber func(http.ResponseWriter, *http.Request) channel func(http.ResponseWriter, *http.Request) expectedStatus int + staticConfig StaticConfig }{ "rejected by receiver": { receiverFunc: func(provisioners.ChannelReference, *provisioners.Message) error { return errors.New("rejected by test-receiver") }, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusInternalServerError, }, "fanout times out": { @@ -82,16 +84,33 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { time.Sleep(10 * time.Millisecond) writer.WriteHeader(http.StatusAccepted) }, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusInternalServerError, }, + "fanout times out, non-blocking": { + timeout: time.Millisecond, + subs: []eventingduck.ChannelSubscriberSpec{ + { + SubscriberURI: replaceSubscriber, + }, + }, + subscriber: func(writer http.ResponseWriter, _ *http.Request) { + time.Sleep(10 * time.Millisecond) + writer.WriteHeader(http.StatusAccepted) + }, + staticConfig: StaticConfig{Blocking: false}, + expectedStatus: http.StatusAccepted, + }, "zero subs succeed": { subs: []eventingduck.ChannelSubscriberSpec{}, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusAccepted, }, "empty sub succeeds": { subs: []eventingduck.ChannelSubscriberSpec{ {}, }, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusAccepted, }, "reply fails": { @@ -103,6 +122,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { channel: func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusNotFound) }, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusInternalServerError, }, "subscriber fails": { @@ -114,8 +134,21 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { subscriber: func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusNotFound) }, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusInternalServerError, }, + "subscriber fails, non-blocking": { + subs: []eventingduck.ChannelSubscriberSpec{ + { + SubscriberURI: replaceSubscriber, + }, + }, + subscriber: func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusNotFound) + }, + staticConfig: StaticConfig{Blocking: false}, + expectedStatus: http.StatusAccepted, + }, "subscriber succeeds, result fails": { subs: []eventingduck.ChannelSubscriberSpec{ { @@ -127,6 +160,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { channel: func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusForbidden) }, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusInternalServerError, }, "one sub succeeds": { @@ -140,6 +174,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { channel: func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusAccepted) }, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusAccepted, }, "one sub succeeds, one sub fails": { @@ -155,6 +190,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { }, subscriber: callableSucceed, channel: (&succeedOnce{}).handler, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusInternalServerError, }, "all subs succeed": { @@ -176,6 +212,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { channel: func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusAccepted) }, + staticConfig: StaticConfig{Blocking: true}, expectedStatus: http.StatusAccepted, }, } @@ -205,7 +242,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) { subs = append(subs, sub) } - h := NewHandler(zap.NewNop(), Config{Subscriptions: subs}) + h := NewHandler(zap.NewNop(), Config{Subscriptions: subs}, tc.staticConfig) if tc.receiverFunc != nil { h.receiver = provisioners.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar()) } diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go index 6169e8028dc..6c73a59a5d4 100644 --- a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go @@ -71,18 +71,19 @@ func getChannelKey(r *http.Request) (string, error) { // on, and then delegates handling of that request to the single fanout.Handler corresponding to // that Channel. type Handler struct { - logger *zap.Logger - handlers map[string]*fanout.Handler - config Config + logger *zap.Logger + handlers map[string]*fanout.Handler + config Config + staticConfig fanout.StaticConfig } // NewHandler creates a new Handler. -func NewHandler(logger *zap.Logger, conf Config) (*Handler, error) { +func NewHandler(logger *zap.Logger, conf Config, staticConf fanout.StaticConfig) (*Handler, error) { handlers := make(map[string]*fanout.Handler, len(conf.ChannelConfigs)) for _, cc := range conf.ChannelConfigs { key := makeChannelKeyFromConfig(cc) - handler := fanout.NewHandler(logger, cc.FanoutConfig) + handler := fanout.NewHandler(logger, cc.FanoutConfig, staticConf) if _, present := handlers[key]; present { logger.Error("Duplicate channel key", zap.String("channelKey", key)) return nil, fmt.Errorf("duplicate channel key: %v", key) @@ -91,9 +92,10 @@ func NewHandler(logger *zap.Logger, conf Config) (*Handler, error) { } return &Handler{ - logger: logger, - config: conf, - handlers: handlers, + logger: logger, + config: conf, + staticConfig: staticConf, + handlers: handlers, }, nil } @@ -107,7 +109,7 @@ func (h *Handler) ConfigDiff(updated Config) string { // CopyWithNewConfig creates a new copy of this Handler with all the fields identical, except the // new Handler uses conf, rather than copying the existing Handler's config. func (h *Handler) CopyWithNewConfig(conf Config) (*Handler, error) { - return NewHandler(h.logger, conf) + return NewHandler(h.logger, conf, h.staticConfig) } // ServeHTTP delegates the actual handling of the request to a fanout.Handler, based on the diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go index 0c807df50d4..7d5cf7d3d4f 100644 --- a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go @@ -87,7 +87,7 @@ func TestNewHandler(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - _, err := NewHandler(zap.NewNop(), tc.config) + _, err := NewHandler(zap.NewNop(), tc.config, fanout.StaticConfig{}) if tc.createErr != "" { if err == nil { t.Errorf("Expected NewHandler error: '%v'. Actual nil", tc.createErr) @@ -136,7 +136,7 @@ func TestCopyWithNewConfig(t *testing.T) { if cmp.Equal(orig, updated) { t.Errorf("Orig and updated must be different") } - h, err := NewHandler(zap.NewNop(), orig) + h, err := NewHandler(zap.NewNop(), orig, fanout.StaticConfig{}) if err != nil { t.Errorf("Unable to create handler, %v", err) } @@ -153,6 +153,9 @@ func TestCopyWithNewConfig(t *testing.T) { if !cmp.Equal(newH.config, updated) { t.Errorf("Incorrect copied config. Expected '%v'. Actual '%v'", updated, newH.config) } + if h.staticConfig != newH.staticConfig { + t.Errorf("Incorrect copied static config. Expected '%v'. Actual '%v'", h.staticConfig, newH.staticConfig) + } } func TestConfigDiff(t *testing.T) { @@ -206,7 +209,7 @@ func TestConfigDiff(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - h, err := NewHandler(zap.NewNop(), tc.orig) + h, err := NewHandler(zap.NewNop(), tc.orig, fanout.StaticConfig{}) if err != nil { t.Errorf("Unable to create handler: %v", err) } @@ -223,6 +226,7 @@ func TestServeHTTP(t *testing.T) { testCases := map[string]struct { name string config Config + staticConfig fanout.StaticConfig respStatusCode int key string expectedStatusCode int @@ -237,7 +241,7 @@ func TestServeHTTP(t *testing.T) { key: "no-dot", expectedStatusCode: http.StatusInternalServerError, }, - "pass through failure": { + "pass through failure, blocking": { config: Config{ ChannelConfigs: []ChannelConfig{ { @@ -253,10 +257,33 @@ func TestServeHTTP(t *testing.T) { }, }, }, + staticConfig: fanout.StaticConfig{Blocking: true}, respStatusCode: http.StatusInternalServerError, key: "first-channel.default", expectedStatusCode: http.StatusInternalServerError, }, + "pass through failure, non-blocking": { + config: Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "first-channel", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + { + ReplyURI: replaceDomain, + }, + }, + }, + }, + }, + }, + // because the config is non-blocking, even though the dispatch response fails, the receiver response is accepted + staticConfig: fanout.StaticConfig{Blocking: false}, + respStatusCode: http.StatusInternalServerError, + key: "first-channel.default", + expectedStatusCode: http.StatusAccepted, + }, "choose channel": { config: Config{ ChannelConfigs: []ChannelConfig{ @@ -301,7 +328,7 @@ func TestServeHTTP(t *testing.T) { // Rewrite the replaceDomains to call the server we just created. replaceDomains(tc.config, server.URL[7:]) - h, err := NewHandler(zap.NewNop(), tc.config) + h, err := NewHandler(zap.NewNop(), tc.config, tc.staticConfig) if err != nil { t.Errorf("Unexpected NewHandler error: '%v'", err) } diff --git a/pkg/sidecar/swappable/swappable.go b/pkg/sidecar/swappable/swappable.go index ec591092d92..a85dbfd0212 100644 --- a/pkg/sidecar/swappable/swappable.go +++ b/pkg/sidecar/swappable/swappable.go @@ -28,6 +28,7 @@ import ( "sync" "sync/atomic" + "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" "go.uber.org/zap" ) @@ -54,8 +55,8 @@ func NewHandler(handler *multichannelfanout.Handler, logger *zap.Logger) *Handle return h } -func NewEmptyHandler(logger *zap.Logger) (*Handler, error) { - h, err := multichannelfanout.NewHandler(logger, multichannelfanout.Config{}) +func NewEmptyHandler(logger *zap.Logger, staticConfig fanout.StaticConfig) (*Handler, error) { + h, err := multichannelfanout.NewHandler(logger, multichannelfanout.Config{}, staticConfig) if err != nil { return nil, err } diff --git a/pkg/sidecar/swappable/swappable_test.go b/pkg/sidecar/swappable/swappable_test.go index 3993ed17fd9..39f3375fccf 100644 --- a/pkg/sidecar/swappable/swappable_test.go +++ b/pkg/sidecar/swappable/swappable_test.go @@ -76,7 +76,7 @@ func TestHandler(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - h, err := NewEmptyHandler(zap.NewNop()) + h, err := NewEmptyHandler(zap.NewNop(), fanout.StaticConfig{}) if err != nil { t.Errorf("Unexpected error creating handler: %v", err) } @@ -125,7 +125,7 @@ func TestHandler_InvalidConfigChange(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - h, err := NewEmptyHandler(zap.NewNop()) + h, err := NewEmptyHandler(zap.NewNop(), fanout.StaticConfig{}) if err != nil { t.Errorf("Unexpected error creating handler: %v", err) } @@ -153,7 +153,7 @@ func TestHandler_InvalidConfigChange(t *testing.T) { } func TestHandler_NilConfigChange(t *testing.T) { - h, err := NewEmptyHandler(zap.NewNop()) + h, err := NewEmptyHandler(zap.NewNop(), fanout.StaticConfig{}) if err != nil { t.Errorf("Unexpected error creating handler: %v", err) }