diff --git a/Gopkg.lock b/Gopkg.lock index 3f91aa7dfda..5093df2ffe6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -71,6 +71,14 @@ revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98" version = "v1.1.0" +[[projects]] + digest = "1:1b91ae0dc69a41d4c2ed23ea5cffb721ea63f5037ca4b81e6d6771fbb8f45129" + name = "github.com/fsnotify/fsnotify" + packages = ["."] + pruneopts = "NUT" + revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" + version = "v1.4.7" + [[projects]] digest = "1:81466b4218bf6adddac2572a30ac733a9255919bc2f470b4827a317bd4ee1756" name = "github.com/ghodss/yaml" @@ -268,7 +276,7 @@ revision = "5c1d8c8469d1ed34b2aecf4c2305b3a57fff2ee3" [[projects]] - digest = "1:3032bf41e1ec7fe0093c6db659b3cf202ef528c421c26376a7fe209467a9fd74" + digest = "1:a3f465e8fba2ec1a371c52063ce81f17fbf7356a8cadbc160b2f94796ff5e785" name = "github.com/knative/pkg" packages = [ "apis", @@ -1052,6 +1060,7 @@ "github.com/Shopify/sarama", "github.com/bsm/sarama-cluster", "github.com/davecgh/go-spew/spew", + "github.com/fsnotify/fsnotify", "github.com/golang/glog", "github.com/google/go-cmp/cmp", "github.com/google/go-cmp/cmp/cmpopts", @@ -1077,6 +1086,7 @@ "github.com/knative/test-infra", "github.com/prometheus/client_golang/prometheus/promhttp", "go.opencensus.io/trace", + "go.uber.org/atomic", "go.uber.org/zap", "go.uber.org/zap/zapcore", "golang.org/x/net/context", @@ -1106,6 +1116,7 @@ "k8s.io/apimachinery/pkg/util/sets/types", "k8s.io/apimachinery/pkg/util/validation", "k8s.io/apimachinery/pkg/util/wait", + "k8s.io/apimachinery/pkg/util/yaml", "k8s.io/apimachinery/pkg/watch", "k8s.io/client-go/discovery", "k8s.io/client-go/discovery/fake", diff --git a/cmd/fanoutsidecar/kodata/LICENSE b/cmd/fanoutsidecar/kodata/LICENSE new file mode 120000 index 00000000000..5853aaea53b --- /dev/null +++ b/cmd/fanoutsidecar/kodata/LICENSE @@ -0,0 +1 @@ +../../../LICENSE \ No newline at end of file diff --git a/cmd/fanoutsidecar/kodata/VENDOR-LICENSE b/cmd/fanoutsidecar/kodata/VENDOR-LICENSE new file mode 120000 index 00000000000..91c1d5bb8f3 --- /dev/null +++ b/cmd/fanoutsidecar/kodata/VENDOR-LICENSE @@ -0,0 +1 @@ +../../../VENDOR-LICENSE \ No newline at end of file diff --git a/cmd/fanoutsidecar/main.go b/cmd/fanoutsidecar/main.go new file mode 100644 index 00000000000..1ece5804172 --- /dev/null +++ b/cmd/fanoutsidecar/main.go @@ -0,0 +1,167 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// A sidecar that implements filtering of Cloud Events sent out via HTTP. Implemented as an HTTP +// proxy that the main containers need to write through. + +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "github.com/knative/eventing/pkg/sidecar/configmap/filesystem" + "github.com/knative/eventing/pkg/sidecar/configmap/watcher" + "github.com/knative/eventing/pkg/sidecar/swappable" + "github.com/knative/eventing/pkg/system" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "k8s.io/client-go/kubernetes" + "log" + "net/http" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/runtime/signals" + "strings" + "time" +) + +const ( + defaultConfigMapName = "in-memory-bus-config" + + // The following are the only valid values of the config_map_noticer flag. + cmnfVolume = "volume" + cmnfWatcher = "watcher" +) + +var ( + readTimeout = 1 * time.Minute + writeTimeout = 1 * time.Minute + + port int + configMapNoticer string + configMapNamespace string + configMapName string +) + +func init() { + flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.") + flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerFlags())) + flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace, "The namespace of the ConfigMap that is watched for configuration.") + flag.StringVar(&configMapName, "config_map_name", defaultConfigMapName, "The name of the ConfigMap that is watched for configuration.") +} + +func configMapNoticerFlags() string { + return strings.Join([]string{cmnfVolume, cmnfWatcher}, ", ") +} + +func main() { + flag.Parse() + + logger, err := zap.NewProduction() + if err != nil { + log.Fatalf("Unable to create logger: %v", err) + } + + if port < 0 { + logger.Fatal("--sidecar_port flag must be set") + } + + sh, err := swappable.NewEmptyHandler(logger) + if err != nil { + logger.Fatal("Unable to create swappable.Handler", zap.Error(err)) + } + + mgr, err := setupConfigMapNoticer(logger, sh.UpdateConfig) + if err != nil { + logger.Fatal("Unable to create configMap noticer.", zap.Error(err)) + } + + s := &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: sh, + ErrorLog: zap.NewStdLog(logger), + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + } + + // Start both the manager (which notices ConfigMap changes) and the HTTP server. + var g errgroup.Group + g.Go(func() error { + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + // Start blocks forever, so run it in a goroutine. + return mgr.Start(stopCh) + }) + logger.Info("Fanout sidecar Listening...", zap.String("Address", s.Addr)) + g.Go(s.ListenAndServe) + err = g.Wait() + if err != nil { + logger.Error("Either the HTTP server or the ConfigMap noticer failed.", zap.Error(err)) + } + + ctx, cancel := context.WithTimeout(context.Background(), writeTimeout) + defer cancel() + s.Shutdown(ctx) +} + +func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateConfig) (manager.Manager, error) { + mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) + if err != nil { + return nil, err + logger.Error("Error starting manager.", zap.Error(err)) + } + + switch configMapNoticer { + case cmnfVolume: + err = setupConfigMapVolume(logger, mgr, configUpdated) + case cmnfWatcher: + err = setupConfigMapWatcher(logger, mgr, configUpdated) + default: + err = errors.New("need to provide the --config_map_noticer flag") + } + if err != nil { + return nil, err + } + + return mgr, nil +} + +func setupConfigMapVolume(logger *zap.Logger, mgr manager.Manager, configUpdated swappable.UpdateConfig) error { + cmn, err := filesystem.NewConfigMapWatcher(logger, filesystem.ConfigDir, configUpdated) + if err != nil { + logger.Error("Unable to create filesystem.ConifgMapWatcher", zap.Error(err)) + return err + } + mgr.Add(cmn) + return nil +} + +func setupConfigMapWatcher(logger *zap.Logger, mgr manager.Manager, configUpdated swappable.UpdateConfig) error { + kc, err := kubernetes.NewForConfig(mgr.GetConfig()) + if err != nil { + return err + } + + cmw, err := watcher.NewWatcher(logger, kc, configMapNamespace, configMapName, configUpdated) + if err != nil { + return err + } + + mgr.Add(cmw) + return nil +} diff --git a/pkg/buses/message_receiver.go b/pkg/buses/message_receiver.go index 7639a7aec51..a8cf46089ca 100644 --- a/pkg/buses/message_receiver.go +++ b/pkg/buses/message_receiver.go @@ -103,7 +103,7 @@ func (r *MessageReceiver) stop(srv *http.Server) { func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Request) { host := req.Host r.logger.Infof("Received request for %s", host) - channel := r.parseChannel(host) + channel := ParseChannel(host) message, err := r.fromRequest(req) if err != nil { @@ -165,7 +165,7 @@ func (r *MessageReceiver) fromHTTPHeaders(headers http.Header) map[string]string // parseChannel converts the channel's hostname into a channel // reference. -func (r *MessageReceiver) parseChannel(host string) ChannelReference { +func ParseChannel(host string) ChannelReference { chunks := strings.Split(host, ".") return ChannelReference{ Name: chunks[0], diff --git a/pkg/sidecar/configmap/filesystem/filesystem_watcher.go b/pkg/sidecar/configmap/filesystem/filesystem_watcher.go new file mode 100644 index 00000000000..228cbd691b9 --- /dev/null +++ b/pkg/sidecar/configmap/filesystem/filesystem_watcher.go @@ -0,0 +1,124 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filesystem + +import ( + "errors" + "github.com/fsnotify/fsnotify" + sidecarconfigmap "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + "github.com/knative/eventing/pkg/sidecar/swappable" + "github.com/knative/pkg/configmap" + "go.uber.org/zap" +) + +const ( + // The mount path of the configMap volume. + ConfigDir = "/etc/config/fanout_sidecar" +) + +// Monitors an attached ConfigMap volume for updated configuration and calls `configUpdated` when +// the value changes. +type configMapWatcher struct { + logger *zap.Logger + // The directory to read the configMap from. + dir string + // Stop the watcher by closing this channel. + watcherStopCh chan<- bool + + // The function to call when the configuration is updated. + configUpdated swappable.UpdateConfig +} + +// NewConfigMapWatcher creates a new filesystem.configMapWatcher. The caller is responsible for +// calling Start(<-chan), likely via a controller-runtime Manager. +func NewConfigMapWatcher(logger *zap.Logger, dir string, updateConfig swappable.UpdateConfig) (*configMapWatcher, error) { + conf, err := readConfigMap(logger, dir) + if err != nil { + logger.Error("Unable to read configMap", zap.Error(err)) + return nil, err + } + + logger.Info("Read initial configMap", zap.Any("conf", conf)) + + err = updateConfig(conf) + if err != nil { + logger.Error("Unable to use the initial configMap: %v", zap.Error(err)) + return nil, err + } + + cmw := &configMapWatcher{ + logger: logger, + dir: dir, + configUpdated: updateConfig, + } + return cmw, nil +} + +// readConfigMap attempts to read the configMap from the attached volume. +func readConfigMap(logger *zap.Logger, dir string) (*multichannelfanout.Config, error) { + cm, err := configmap.Load(dir) + if err != nil { + return nil, err + } + return sidecarconfigmap.NewFanoutConfig(logger, cm) +} + +// updateConfig reads the configMap data and calls `configUpdated` with the updated value. +func (cmw *configMapWatcher) updateConfig() { + conf, err := readConfigMap(cmw.logger, cmw.dir) + if err != nil { + cmw.logger.Error("Unable to read the configMap", zap.Error(err)) + return + } + err = cmw.configUpdated(conf) + if err != nil { + cmw.logger.Error("Unable to update config", zap.Error(err)) + return + } +} + +func (cmw *configMapWatcher) Start(stopCh <-chan struct{}) error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + + err = watcher.Add(cmw.dir) + if err != nil { + return err + } + + for { + select { + case _, ok := <-watcher.Events: + if !ok { + // Channel closed. + return errors.New("watcher.Events channel closed") + } + cmw.updateConfig() + case err, ok := <-watcher.Errors: + if !ok { + // Channel closed. + return errors.New("watcher.Errors channel closed") + } + cmw.logger.Error("watcher.Errors", zap.Error(err)) + case <-stopCh: + return watcher.Close() + } + } +} diff --git a/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go b/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go new file mode 100644 index 00000000000..5b3894b1bc6 --- /dev/null +++ b/pkg/sidecar/configmap/filesystem/filesystem_watcher_test.go @@ -0,0 +1,358 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filesystem + +import ( + "errors" + "fmt" + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + "gopkg.in/yaml.v2" + "io/ioutil" + "os" + "strings" + "sync" + "testing" + "time" +) + +func TestReadConfigMap(t *testing.T) { + testCases := []struct { + name string + createDir bool + config string + expected *multichannelfanout.Config + expectedErr bool + }{ + { + name: "dir does not exist", + createDir: false, + }, + { + name: "no data", + createDir: true, + expectedErr: true, + }, + { + name: "invalid YAML", + createDir: true, + config: ` + key: + - value + - different indent level + `, + expectedErr: true, + }, + { + name: "valid YAML -- invalid JSON", + config: "{ nil: Key }", + createDir: true, + expectedErr: true, + }, + { + name: "unknown field", + config: "{ channelConfigs: [ { not: a-defined-field } ] }", + createDir: true, + expectedErr: true, + }, + { + name: "valid", + createDir: true, + config: ` + channelConfigs: + - namespace: default + name: c1 + fanoutConfig: + subscriptions: + - callableDomain: event-changer.default.svc.cluster.local + sinkableDomain: message-dumper-bar.default.svc.cluster.local + - callableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableDomain: message-dumper-bar.default.svc.cluster.local + - namespace: default + name: c2 + fanoutConfig: + subscriptions: + - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - namespace: other + name: c3 + fanoutConfig: + subscriptions: + - sinkableDomain: message-dumper-foo.default.svc.cluster.local + `, + expected: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "event-changer.default.svc.cluster.local", + SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + }, + { + CallableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + { + SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + }, + }, + }, + }, + { + Namespace: "default", + Name: "c2", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + }, + }, + }, + { + Namespace: "other", + Name: "c3", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + }, + }, + }, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var dir string + if tc.createDir { + dir = createTempDir(t) + defer os.RemoveAll(dir) + } else { + dir = "/tmp/doesNotExist" + } + writeConfigString(t, dir, tc.config) + c, e := readConfigMap(zap.NewNop(), dir) + if tc.expectedErr { + if e == nil { + t.Errorf("Expected an error, actual nil") + } + return + } + if !cmp.Equal(c, tc.expected) { + t.Errorf("Unexpected config. Expected '%v'. Actual '%v'.", tc.expected, c) + } + }) + } +} + +func TestWatch(t *testing.T) { + testCases := map[string]struct { + initialConfigErr error + initialConfig *multichannelfanout.Config + updateConfigErr error + updateConfig *multichannelfanout.Config + }{ + "error applying initial config": { + initialConfig: &multichannelfanout.Config{}, + initialConfigErr: errors.New("test-induced error"), + }, + "read initial config": { + initialConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "foo.bar", + }, + }, + }, + }, + }, + }, + }, + "error apply updated config": { + initialConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "foo.bar", + }, + }, + }, + }, + }, + }, + updateConfigErr: errors.New("test-induced error"), + }, + "update config": { + initialConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "foo.bar", + }, + }, + }, + }, + }, + }, + updateConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "new-channel", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "baz.qux", + }, + }, + }, + }, + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + dir := createTempDir(t) + defer os.RemoveAll(dir) + writeConfig(t, dir, tc.initialConfig) + + cuc := &configUpdatedChecker{ + updateConfigErr: tc.initialConfigErr, + } + cmw, err := NewConfigMapWatcher(zap.NewNop(), dir, cuc.updateConfig) + if err != nil { + if tc.initialConfigErr != err { + t.Errorf("Unexpected error making ConfigMapWatcher. Expected: '%v'. Actual '%v'", tc.initialConfigErr, err) + } + return + } + ac := cuc.getConfig() + if !cmp.Equal(tc.initialConfig, ac) { + t.Errorf("Unexpected initial config. Expected '%v'. Actual '%v'", tc.initialConfig, ac) + } + + stopCh := make(chan struct{}) + go cmw.Start(stopCh) + defer func() { + close(stopCh) + }() + // Sadly, the test is flaky unless we sleep here, waiting for the file system + // watcher to truly start. + time.Sleep(100 * time.Millisecond) + + if tc.updateConfigErr != nil { + cuc.updateConfigErr = tc.updateConfigErr + } + + expected := tc.initialConfig + if tc.updateConfig != nil { + expected = tc.updateConfig + } + + cuc.updateCalled = make(chan struct{}, 1) + writeConfig(t, dir, expected) + // The watcher is running in another goroutine, give it some time to notice the + // change. + select { + case <-cuc.updateCalled: + break + case <-time.After(5 * time.Second): + t.Errorf("Time out waiting for watcher to notice change.") + } + + ac = cuc.getConfig() + if !cmp.Equal(ac, expected) { + t.Errorf("Unexpected update config. Expected '%v'. Actual '%v'", expected, ac) + } + }) + } +} + +type configUpdatedChecker struct { + configLock sync.Mutex + config *multichannelfanout.Config + updateCalled chan struct{} + updateConfigErr error +} + +func (cuc *configUpdatedChecker) updateConfig(config *multichannelfanout.Config) error { + cuc.configLock.Lock() + defer cuc.configLock.Unlock() + cuc.config = config + if cuc.updateCalled != nil { + cuc.updateCalled <- struct{}{} + } + return cuc.updateConfigErr +} + +func (cuc *configUpdatedChecker) getConfig() *multichannelfanout.Config { + cuc.configLock.Lock() + defer cuc.configLock.Unlock() + return cuc.config +} + +func createTempDir(t *testing.T) string { + dir, err := ioutil.TempDir("", "configMapHandlerTest") + if err != nil { + t.Errorf("Unable to make temp directory: %v", err) + } + return dir +} + +func writeConfig(t *testing.T, dir string, config *multichannelfanout.Config) { + if config != nil { + yb, err := yaml.Marshal(config) + if err != nil { + t.Errorf("Unable to marshal the config") + } + writeConfigString(t, dir, string(yb)) + } +} + +func writeConfigString(t *testing.T, dir, config string) { + if config != "" { + // Golang editors tend to replace leading spaces with tabs. YAML is left whitespace + // sensitive, so let's replace the tabs with spaces. + leftSpaceConfig := strings.Replace(config, "\t", " ", -1) + err := ioutil.WriteFile(fmt.Sprintf("%s/%s", dir, configmap.MultiChannelFanoutConfigKey), []byte(leftSpaceConfig), 0700) + if err != nil { + t.Errorf("Problem writing the config file: %v", err) + } + } +} diff --git a/pkg/sidecar/configmap/parse.go b/pkg/sidecar/configmap/parse.go new file mode 100644 index 00000000000..8661dd2bf8e --- /dev/null +++ b/pkg/sidecar/configmap/parse.go @@ -0,0 +1,39 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "fmt" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + "go.uber.org/zap" +) + +const ( + // MultiChannelFanoutConfigKey is the key in the ConfigMap that contains all the configuration + // data. + MultiChannelFanoutConfigKey = "multiChannelFanoutConfig" +) + +// ConfigMapData attempts to parse the config map's data into a multichannelfanout.Config. +func NewFanoutConfig(logger *zap.Logger, data map[string]string) (*multichannelfanout.Config, error) { + str, present := data[MultiChannelFanoutConfigKey] + if !present { + logger.Error("Expected key not found", zap.String("key", MultiChannelFanoutConfigKey)) + return nil, fmt.Errorf("expected key not found: %v", MultiChannelFanoutConfigKey) + } + return multichannelfanout.Parse(logger, str) +} diff --git a/pkg/sidecar/configmap/parse_test.go b/pkg/sidecar/configmap/parse_test.go new file mode 100644 index 00000000000..a621ad510a8 --- /dev/null +++ b/pkg/sidecar/configmap/parse_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package configmap + +import ( + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + "strings" + "testing" +) + +func TestNewFanoutConfig(t *testing.T) { + testCases := []struct { + name string + config string + expected *multichannelfanout.Config + expectedErr bool + }{ + { + name: "no data", + expectedErr: true, + }, + { + name: "invalid YAML", + config: ` + key: + - value + - different indent level + `, + expectedErr: true, + }, + { + name: "valid YAML -- invalid JSON", + config: "{ nil: Key }", + expectedErr: true, + }, + { + name: "unknown field", + config: "{ channelConfigs: [ { not: a-defined-field } ] }", + expectedErr: true, + }, + { + name: "valid", + config: ` + channelConfigs: + - namespace: default + name: c1 + fanoutConfig: + subscriptions: + - callableDomain: event-changer.default.svc.cluster.local + sinkableDomain: message-dumper-bar.default.svc.cluster.local + - callableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableDomain: message-dumper-bar.default.svc.cluster.local + - namespace: default + name: c2 + fanoutConfig: + subscriptions: + - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - namespace: other + name: c3 + fanoutConfig: + subscriptions: + - sinkableDomain: message-dumper-foo.default.svc.cluster.local + `, + expected: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "event-changer.default.svc.cluster.local", + SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + }, + { + CallableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + { + SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + }, + }, + }, + }, + { + Namespace: "default", + Name: "c2", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + }, + }, + }, + { + Namespace: "other", + Name: "c3", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + }, + }, + }, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + data := formatData(tc.config) + c, e := NewFanoutConfig(zap.NewNop(), data) + if tc.expectedErr { + if e == nil { + t.Errorf("Expected an error, actual nil") + } + return + } + if !cmp.Equal(c, tc.expected) { + t.Errorf("Unexpected config. Expected '%v'. Actual '%v'.", tc.expected, c) + } + }) + } +} + +func formatData(config string) map[string]string { + data := make(map[string]string) + if config != "" { + // Golang editors tend to replace leading spaces with tabs. YAML is left whitespace + // sensitive and disallows tabs, so let's replace the tabs with four spaces. + leftSpaceConfig := strings.Replace(config, "\t", " ", -1) + data[MultiChannelFanoutConfigKey] = leftSpaceConfig + } + return data +} diff --git a/pkg/sidecar/configmap/watcher/watcher.go b/pkg/sidecar/configmap/watcher/watcher.go new file mode 100644 index 00000000000..01dc5d7af9a --- /dev/null +++ b/pkg/sidecar/configmap/watcher/watcher.go @@ -0,0 +1,49 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + sidecarconfigmap "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/swappable" + "github.com/knative/pkg/configmap" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// NewWatcher creates a new InformedWatcher that watches the specified ConfigMap and on any change +// that results in a valid multichannelfanout.Config calls configUpdated. +func NewWatcher(logger *zap.Logger, kc kubernetes.Interface, cmNamespace, cmName string, configUpdated swappable.UpdateConfig) (manager.Runnable, error) { + iw := configmap.NewInformedWatcher(kc, cmNamespace) + iw.Watch(cmName, func(cm *corev1.ConfigMap) { + config, err := sidecarconfigmap.NewFanoutConfig(logger, cm.Data) + if err != nil { + logger.Error("Could not parse ConfigMap", zap.Error(err), + zap.Any("configMap.Data", cm.Data)) + return + } + + err = configUpdated(config) + if err != nil { + logger.Error("Unable to update config", zap.Error(err)) + return + } + }) + + return iw, nil +} diff --git a/pkg/sidecar/configmap/watcher/watcher_test.go b/pkg/sidecar/configmap/watcher/watcher_test.go new file mode 100644 index 00000000000..e6296e5f201 --- /dev/null +++ b/pkg/sidecar/configmap/watcher/watcher_test.go @@ -0,0 +1,124 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "errors" + "github.com/google/go-cmp/cmp" + sidecarconfigmap "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/configmap" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "testing" +) + +const ( + namespace = "test-namespace" + name = "test-name" +) + +func TestReconcile(t *testing.T) { + testCases := map[string]struct { + config map[string]string + updateConfigErr error + expectedConfig *multichannelfanout.Config + }{ + "missing key": { + config: map[string]string{}, + expectedConfig: nil, + }, + "cannot parse cm": { + config: map[string]string{ + sidecarconfigmap.MultiChannelFanoutConfigKey: "invalid config", + }, + expectedConfig: nil, + }, + "configUpdated fails": { + config: map[string]string{ + sidecarconfigmap.MultiChannelFanoutConfigKey: "", + }, + updateConfigErr: errors.New("test-error"), + expectedConfig: &multichannelfanout.Config{}, + }, + "success": { + config: map[string]string{ + sidecarconfigmap.MultiChannelFanoutConfigKey: ` + channelConfigs: + - name: foo + namespace: bar + fanoutConfig: + subscriptions: + - callableDomain: callable + sinkableDomain: sinkable`, + }, + expectedConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Name: "foo", + Namespace: "bar", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "callable", + SinkableDomain: "sinkable", + }, + }, + }, + }, + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + cuc := &configUpdatedChecker{ + updateConfigErr: tc.updateConfigErr, + } + + r, err := NewWatcher(zap.NewNop(), nil, namespace, name, cuc.updateConfig) + if err != nil { + t.Errorf("Error creating watcher: %v", err) + } + iw := r.(*configmap.InformedWatcher) + iw.OnChange(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Data: tc.config, + }) + + if diff := cmp.Diff(tc.expectedConfig, cuc.config); diff != "" { + t.Errorf("Unexpected config (-want +got): %v", diff) + } + }) + } +} + +type configUpdatedChecker struct { + config *multichannelfanout.Config + updateConfigErr error +} + +func (cuc *configUpdatedChecker) updateConfig(config *multichannelfanout.Config) error { + cuc.config = config + return cuc.updateConfigErr +} diff --git a/pkg/sidecar/fanout/fanout_handler.go b/pkg/sidecar/fanout/fanout_handler.go new file mode 100644 index 00000000000..87e636c8d8e --- /dev/null +++ b/pkg/sidecar/fanout/fanout_handler.go @@ -0,0 +1,123 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package fanout provides an http.Handler that takes in one request and fans it out to N other +// requests, based on a list of Subscriptions. Logically, it represents all the Subscriptions to a +// single Knative Channel. +// It will normally be used in conjunction with multichannelfanout.Handler, which contains multiple +// fanout.Handlers, each corresponding to a single Knative Channel. +package fanout + +import ( + "errors" + "github.com/knative/eventing/pkg/buses" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + "net/http" + "time" +) + +const ( + defaultTimeout = 1 * time.Minute + + messageBufferSize = 500 +) + +// Configuration for a fanout.Handler. +type Config struct { + Subscriptions []duckv1alpha1.ChannelSubscriberSpec `json:"subscriptions"` +} + +// http.Handler that takes a single request in and fans it out to N other servers. +type Handler struct { + config Config + + receivedMessages chan *forwardMessage + receiver *buses.MessageReceiver + dispatcher *buses.MessageDispatcher + + // TODO: Plumb context through the receiver and dispatcher and use that to store the timeout, + // rather than a member variable. + timeout time.Duration + + logger *zap.Logger +} + +var _ http.Handler = &Handler{} + +// forwardMessage is passed between the Receiver and the Dispatcher. +type forwardMessage struct { + msg *buses.Message + done chan<- error +} + +// NewHandler creates a new fanout.Handler. +func NewHandler(logger *zap.Logger, config Config) *Handler { + handler := &Handler{ + logger: logger, + config: config, + dispatcher: buses.NewMessageDispatcher(logger.Sugar()), + receivedMessages: make(chan *forwardMessage, messageBufferSize), + timeout: defaultTimeout, + } + // The receiver function needs to point back at the handler itself, so set it up after + // initialization. + handler.receiver = buses.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar()) + + return handler +} + +func createReceiverFunction(f *Handler) func(buses.ChannelReference, *buses.Message) error { + return func(_ buses.ChannelReference, m *buses.Message) error { + return f.dispatch(m) + } +} + +func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + f.receiver.HandleRequest(w, r) +} + +// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out +// requests return successfully, then return nil. Else, return an error. +func (f *Handler) dispatch(msg *buses.Message) error { + errorCh := make(chan error, len(f.config.Subscriptions)) + for _, sub := range f.config.Subscriptions { + go func(s duckv1alpha1.ChannelSubscriberSpec) { + errorCh <- f.makeFanoutRequest(*msg, s) + }(sub) + } + + for range f.config.Subscriptions { + select { + case err := <-errorCh: + if err != nil { + f.logger.Error("Fanout had an error", zap.Error(err)) + return err + } + case <-time.After(f.timeout): + f.logger.Error("Fanout timed out") + return errors.New("fanout timed out") + } + } + // All Subscriptions returned err = nil. + return nil +} + +// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and +// the `sink` portions of the subscription. +func (f *Handler) makeFanoutRequest(m buses.Message, sub duckv1alpha1.ChannelSubscriberSpec) error { + return f.dispatcher.DispatchMessage(&m, sub.CallableDomain, sub.SinkableDomain, buses.DispatchDefaults{}) +} diff --git a/pkg/sidecar/fanout/fanout_handler_test.go b/pkg/sidecar/fanout/fanout_handler_test.go new file mode 100644 index 00000000000..9bdf020e544 --- /dev/null +++ b/pkg/sidecar/fanout/fanout_handler_test.go @@ -0,0 +1,254 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fanout + +import ( + "errors" + "github.com/knative/eventing/pkg/buses" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/atomic" + "go.uber.org/zap" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// Domains used in subscriptions, which will be replaced by the real domains of the started HTTP +// servers. +const ( + replaceCallable = "replaceCallable" + replaceSinkable = "replaceSinkable" +) + +var ( + cloudEventReq = httptest.NewRequest("POST", "http://channelname.channelnamespace/", body(cloudEvent)) + cloudEvent = `{ + "cloudEventsVersion" : "0.1", + "eventType" : "com.example.someevent", + "eventTypeVersion" : "1.0", + "source" : "/mycontext", + "eventID" : "A234-1234-1234", + "eventTime" : "2018-04-05T17:31:00Z", + "extensions" : { + "comExampleExtension" : "value" + }, + "contentType" : "text/xml", + "data" : "" +}` +) + +func TestFanoutHandler_ServeHTTP(t *testing.T) { + testCases := map[string]struct { + receiverFunc func(buses.ChannelReference, *buses.Message) error + timeout time.Duration + subs []duckv1alpha1.ChannelSubscriberSpec + callable func(http.ResponseWriter, *http.Request) + sinkable func(http.ResponseWriter, *http.Request) + expectedStatus int + }{ + "rejected by receiver": { + receiverFunc: func(buses.ChannelReference, *buses.Message) error { + return errors.New("rejected by test-receiver") + }, + expectedStatus: http.StatusInternalServerError, + }, + "fanout times out": { + timeout: time.Millisecond, + subs: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceCallable, + }, + }, + callable: func(writer http.ResponseWriter, _ *http.Request) { + time.Sleep(10 * time.Millisecond) + writer.WriteHeader(http.StatusAccepted) + }, + expectedStatus: http.StatusInternalServerError, + }, + "zero subs succeed": { + subs: []duckv1alpha1.ChannelSubscriberSpec{}, + expectedStatus: http.StatusAccepted, + }, + "empty sub succeeds": { + subs: []duckv1alpha1.ChannelSubscriberSpec{ + {}, + }, + expectedStatus: http.StatusAccepted, + }, + "sinkable fails": { + subs: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: replaceSinkable, + }, + }, + sinkable: func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusNotFound) + }, + expectedStatus: http.StatusInternalServerError, + }, + "callable fails": { + subs: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceCallable, + }, + }, + callable: func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusNotFound) + }, + expectedStatus: http.StatusInternalServerError, + }, + "callable succeeds, sinkable fails": { + subs: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceCallable, + SinkableDomain: replaceSinkable, + }, + }, + callable: callableSucceed, + sinkable: func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusForbidden) + }, + expectedStatus: http.StatusInternalServerError, + }, + "one sub succeeds": { + subs: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceCallable, + SinkableDomain: replaceSinkable, + }, + }, + callable: callableSucceed, + sinkable: func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusAccepted) + }, + expectedStatus: http.StatusAccepted, + }, + "one sub succeeds, one sub fails": { + subs: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceCallable, + SinkableDomain: replaceSinkable, + }, + { + CallableDomain: replaceCallable, + SinkableDomain: replaceSinkable, + }, + }, + callable: callableSucceed, + sinkable: (&succeedOnce{}).handler, + expectedStatus: http.StatusInternalServerError, + }, + "all subs succeed": { + subs: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceCallable, + SinkableDomain: replaceSinkable, + }, + { + CallableDomain: replaceCallable, + SinkableDomain: replaceSinkable, + }, + { + CallableDomain: replaceCallable, + SinkableDomain: replaceSinkable, + }, + }, + callable: callableSucceed, + sinkable: func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusAccepted) + }, + expectedStatus: http.StatusAccepted, + }, + } + for n, tc := range testCases { + if n != "fanout times out" { + //continue + } + t.Run(n, func(t *testing.T) { + callableServer := httptest.NewServer(&fakeHandler{ + handler: tc.callable, + }) + defer callableServer.Close() + sinkableServer := httptest.NewServer(&fakeHandler{ + handler: tc.sinkable, + }) + defer sinkableServer.Close() + + // Rewrite the subs to use the servers we just started. + subs := make([]duckv1alpha1.ChannelSubscriberSpec, 0) + for _, sub := range tc.subs { + if sub.CallableDomain == replaceCallable { + sub.CallableDomain = callableServer.URL[7:] // strip the leading 'http://' + } + if sub.SinkableDomain == replaceSinkable { + sub.SinkableDomain = sinkableServer.URL[7:] // strip the leading 'http://' + } + subs = append(subs, sub) + } + + h := NewHandler(zap.NewNop(), Config{Subscriptions: subs}) + if tc.receiverFunc != nil { + h.receiver = buses.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar()) + } + if tc.timeout != 0 { + h.timeout = tc.timeout + } else { + // Reasonable timeout for the tests. + h.timeout = 100 * time.Millisecond + } + + w := httptest.NewRecorder() + h.ServeHTTP(w, cloudEventReq) + if w.Code != tc.expectedStatus { + t.Errorf("Unexpected status code. Expected %v, Actual %v", tc.expectedStatus, w.Code) + } + }) + } +} + +type fakeHandler struct { + handler func(http.ResponseWriter, *http.Request) +} + +func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + r.Body.Close() + h.handler(w, r) +} + +type succeedOnce struct { + called atomic.Bool +} + +func (s *succeedOnce) handler(w http.ResponseWriter, _ *http.Request) { + if s.called.CAS(false, true) { + w.WriteHeader(http.StatusAccepted) + } else { + w.WriteHeader(http.StatusForbidden) + } +} + +func body(body string) io.ReadCloser { + return ioutil.NopCloser(strings.NewReader(body)) +} +func callableSucceed(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(http.StatusOK) + writer.Write([]byte(cloudEvent)) +} diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go new file mode 100644 index 00000000000..3882bb6a2ae --- /dev/null +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler.go @@ -0,0 +1,120 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package multichannelfanout provides an http.Handler that takes in one request to a Knative +// Channel and fans it out to N other requests. Logically, it represents multiple Knative Channels. +// It is made up of a map, map[channel]fanout.Handler and each incoming request is inspected to +// determine which Channel it is on. This Handler delegates the HTTP handling to the fanout.Handler +// corresponding to the incoming request's Channel. +// It is often used in conjunction with a swappable.Handler. The swappable.Handler delegates all its +// requests to the multichannelfanout.Handler. When a new configuration is available, a new +// multichannelfanout.Handler is created and swapped in for all subsequent requests. The old +// multichannelfanout.Handler is discarded. +package multichannelfanout + +import ( + "fmt" + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/sidecar/fanout" + "go.uber.org/zap" + "net/http" +) + +// The configuration of this handler. +type Config struct { + // The configuration of each channel in this handler. + ChannelConfigs []ChannelConfig `json:"channelConfigs"` +} + +type ChannelConfig struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + FanoutConfig fanout.Config `json:"fanoutConfig"` +} + +// MakeChannelKey creates the key used for this Channel in the Handler's handlers map. +func makeChannelKey(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + +// makeChannelKeyFromConfig creates the channel key for a given channelConfig. It is a helper around +// MakeChannelKey. +func makeChannelKeyFromConfig(config ChannelConfig) string { + return makeChannelKey(config.Namespace, config.Name) +} + +// getChannelKey extracts the channel key from the given HTTP request. +func getChannelKey(r *http.Request) string { + cr := buses.ParseChannel(r.Host) + return makeChannelKey(cr.Namespace, cr.Name) +} + +// Handler is an http.Handler that introspects the incoming request to determine what Channel it is +// on, and then delegates handling of that request to the single fanout.Handler corresponding to +// that Channel. +type Handler struct { + logger *zap.Logger + handlers map[string]*fanout.Handler + config Config +} + +// NewHandler creates a new Handler. +func NewHandler(logger *zap.Logger, conf Config) (*Handler, error) { + handlers := make(map[string]*fanout.Handler, len(conf.ChannelConfigs)) + + for _, cc := range conf.ChannelConfigs { + key := makeChannelKeyFromConfig(cc) + handler := fanout.NewHandler(logger, cc.FanoutConfig) + if _, present := handlers[key]; present { + logger.Error("Duplicate channel key", zap.String("channelKey", key)) + return nil, fmt.Errorf("duplicate channel key: %v", key) + } + handlers[key] = handler + } + + return &Handler{ + logger: logger, + config: conf, + handlers: handlers, + }, nil +} + +// ConfigDiffs diffs the new config with the existing config. If there are no differences, then the +// empty string is returned. If there are differences, then a non-empty string is returned +// describing the differences. +func (h *Handler) ConfigDiff(updated Config) string { + return cmp.Diff(h.config, updated) +} + +// CopyWithNewConfig creates a new copy of this Handler with all the fields identical, except the +// new Handler uses conf, rather than copying the existing Handler's config. +func (h *Handler) CopyWithNewConfig(conf Config) (*Handler, error) { + return NewHandler(h.logger, conf) +} + +// ServeHTTP delegates the actual handling of the request to a fanout.Handler, based on the +// request's channel key. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + channelKey := getChannelKey(r) + fh, ok := h.handlers[channelKey] + if !ok { + h.logger.Error("Unable to find a handler for request", zap.String("channelKey", channelKey)) + w.WriteHeader(http.StatusInternalServerError) + return + } + fh.ServeHTTP(w, r) +} diff --git a/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go new file mode 100644 index 00000000000..3bc5e6f0faf --- /dev/null +++ b/pkg/sidecar/multichannelfanout/multi_channel_fanout_handler_test.go @@ -0,0 +1,339 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichannelfanout + +import ( + "fmt" + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/sidecar/fanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +const ( + // The httptest.Server's host name will replace this value in all ChannelConfigs. + replaceDomain = "replaceDomain" +) + +func TestMakeChannelKey(t *testing.T) { + testCases := []struct { + namespace string + name string + key string + }{ + { + namespace: "default", + name: "channel", + key: "default/channel", + }, + { + namespace: "foo", + name: "bar", + key: "foo/bar", + }, + } + for _, tc := range testCases { + name := fmt.Sprintf("%s, %s -> %s", tc.namespace, tc.name, tc.key) + t.Run(name, func(t *testing.T) { + if key := makeChannelKey(tc.namespace, tc.name); key != tc.key { + t.Errorf("Unexpected ChannelKey. Expected '%v'. Actual '%v'", tc.key, key) + } + }) + } +} + +func TestNewHandler(t *testing.T) { + testCases := []struct { + name string + config Config + createErr string + }{ + { + name: "duplicate channel key", + config: Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "duplicate", + }, + { + Namespace: "default", + Name: "duplicate", + }, + }, + }, + createErr: "duplicate channel key: default/duplicate", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, err := NewHandler(zap.NewNop(), tc.config) + if tc.createErr != "" { + if err == nil { + t.Errorf("Expected NewHandler error: '%v'. Actual nil", tc.createErr) + } else if err.Error() != tc.createErr { + t.Errorf("Unexpected NewHandler error. Expected '%v'. Actual '%v'", tc.createErr, err) + } + return + } else if err != nil { + t.Errorf("Unexpected NewHandler error. Expected nil. Actual '%v'", err) + } + }) + } +} + +func TestCopyWithNewConfig(t *testing.T) { + orig := Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "callabledomain", + }, + }, + }, + }, + }, + } + updated := Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "somethingdifferent", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "sinkabledomain", + }, + }, + }, + }, + }, + } + if cmp.Equal(orig, updated) { + t.Errorf("Orig and updated must be different") + } + h, err := NewHandler(zap.NewNop(), orig) + if err != nil { + t.Errorf("Unable to create handler, %v", err) + } + if !cmp.Equal(h.config, orig) { + t.Errorf("Incorrect config. Expected '%v'. Actual '%v'", orig, h.config) + } + newH, err := h.CopyWithNewConfig(updated) + if err != nil { + t.Errorf("Unable to copy handler: %v", err) + } + if h.logger != newH.logger { + t.Errorf("Did not copy logger") + } + if !cmp.Equal(newH.config, updated) { + t.Errorf("Incorrect copied config. Expected '%v'. Actual '%v'", updated, newH.config) + } +} + +func TestConfigDiff(t *testing.T) { + config := Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "callabledomain", + }, + }, + }, + }, + }, + } + testCases := []struct { + name string + orig Config + updated Config + expectedDiff bool + }{ + { + name: "same", + orig: config, + updated: config, + expectedDiff: false, + }, + { + name: "different", + orig: config, + updated: Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "different", + }, + }, + }, + }, + }, + }, + expectedDiff: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + h, err := NewHandler(zap.NewNop(), tc.orig) + if err != nil { + t.Errorf("Unable to create handler: %v", err) + } + diff := h.ConfigDiff(tc.updated) + + if hasDiff := diff != ""; hasDiff != tc.expectedDiff { + t.Errorf("Unexpected diff result. Expected %v. Actual %v", tc.expectedDiff, hasDiff) + } + }) + } +} + +func TestServeHTTP(t *testing.T) { + testCases := map[string]struct { + name string + config Config + respStatusCode int + key string + expectedStatusCode int + }{ + "non-existent channel": { + config: Config{}, + key: "default.does-not-exist", + expectedStatusCode: http.StatusInternalServerError, + }, + "pass through failure": { + config: Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "first-channel", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: replaceDomain, + }, + }, + }, + }, + }, + }, + respStatusCode: http.StatusInternalServerError, + key: "first-channel.default", + expectedStatusCode: http.StatusInternalServerError, + }, + "choose channel": { + config: Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "first-channel", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "first-to-domain", + }, + }, + }, + }, + { + Namespace: "default", + Name: "second-channel", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceDomain, + }, + }, + }, + }, + }, + }, + respStatusCode: http.StatusOK, + key: "second-channel.default", + expectedStatusCode: http.StatusAccepted, + }, + } + requestWithChannelKey := func(key string) *http.Request { + r := httptest.NewRequest("POST", fmt.Sprintf("http://%s/", key), strings.NewReader("{}")) + return r + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + server := httptest.NewServer(&fakeHandler{statusCode: tc.respStatusCode}) + defer server.Close() + + // Rewrite the replaceDomains to call the server we just created. + replaceDomains(tc.config, server.URL[7:]) + + h, err := NewHandler(zap.NewNop(), tc.config) + if err != nil { + t.Errorf("Unexpected NewHandler error: '%v'", err) + } + + r := requestWithChannelKey(tc.key) + w := httptest.NewRecorder() + h.ServeHTTP(w, r) + resp := w.Result() + if resp.StatusCode != tc.expectedStatusCode { + t.Errorf("Unexpected status code. Expected %v, actual %v", tc.expectedStatusCode, resp.StatusCode) + } + if w.Body.String() != "" { + t.Errorf("Expected empty response body. Actual: %v", w.Body) + } + }) + } +} + +func replaceDomains(config Config, replacement string) { + for i, cc := range config.ChannelConfigs { + for j, sub := range cc.FanoutConfig.Subscriptions { + if sub.CallableDomain == replaceDomain { + sub.CallableDomain = replacement + } + if sub.SinkableDomain == replaceDomain { + sub.SinkableDomain = replacement + } + cc.FanoutConfig.Subscriptions[j] = sub + } + config.ChannelConfigs[i] = cc + } +} + +type fakeHandler struct { + statusCode int +} + +func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + w.WriteHeader(h.statusCode) +} diff --git a/pkg/sidecar/multichannelfanout/parse.go b/pkg/sidecar/multichannelfanout/parse.go new file mode 100644 index 00000000000..35687b442d8 --- /dev/null +++ b/pkg/sidecar/multichannelfanout/parse.go @@ -0,0 +1,44 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichannelfanout + +import ( + "bytes" + "encoding/json" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/yaml" +) + +// Parse attempts to parse the YAML string into a multichannelfanout.Config. +func Parse(logger *zap.Logger, str string) (*Config, error) { + jb, err := yaml.ToJSON([]byte(str)) + if err != nil { + logger.Error("Unable to convert str to JSON", zap.Error(err), zap.String("str", str)) + return nil, err + } + var conf Config + err = unmarshalJsonDisallowUnknownFields(jb, &conf) + return &conf, err +} + +// unmarshalJsonDisallowUnknownFields unmarshalls JSON, but unlike json.Unmarshal, will fail if +// given an unknown field (rather than json.Unmarshall's ignoring the unknown field). +func unmarshalJsonDisallowUnknownFields(jb []byte, v interface{}) error { + d := json.NewDecoder(bytes.NewReader(jb)) + d.DisallowUnknownFields() + return d.Decode(v) +} diff --git a/pkg/sidecar/multichannelfanout/parse_test.go b/pkg/sidecar/multichannelfanout/parse_test.go new file mode 100644 index 00000000000..568ac7d3391 --- /dev/null +++ b/pkg/sidecar/multichannelfanout/parse_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multichannelfanout + +import ( + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/sidecar/fanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + "strings" + "testing" +) + +func TestConfigMapData(t *testing.T) { + testCases := []struct { + name string + config string + expected *Config + expectedErr bool + }{ + { + name: "no data", + expected: &Config {}, + }, + { + name: "invalid YAML", + config: ` + key: + - value + - different indent level + `, + expectedErr: true, + }, + { + name: "valid YAML -- invalid JSON", + config: "{ nil: Key }", + expectedErr: true, + }, + { + name: "unknown field", + config: "{ channelConfigs: [ { not: a-defined-field } ] }", + expectedErr: true, + }, + { + name: "valid", + config: ` + channelConfigs: + - namespace: default + name: c1 + fanoutConfig: + subscriptions: + - callableDomain: event-changer.default.svc.cluster.local + sinkableDomain: message-dumper-bar.default.svc.cluster.local + - callableDomain: message-dumper-foo.default.svc.cluster.local + - sinkableDomain: message-dumper-bar.default.svc.cluster.local + - namespace: default + name: c2 + fanoutConfig: + subscriptions: + - sinkableDomain: message-dumper-foo.default.svc.cluster.local + - namespace: other + name: c3 + fanoutConfig: + subscriptions: + - sinkableDomain: message-dumper-foo.default.svc.cluster.local + `, + expected: &Config{ + ChannelConfigs: []ChannelConfig{ + { + Namespace: "default", + Name: "c1", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: "event-changer.default.svc.cluster.local", + SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + }, + { + CallableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + { + SinkableDomain: "message-dumper-bar.default.svc.cluster.local", + }, + }, + }, + }, + { + Namespace: "default", + Name: "c2", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + }, + }, + }, + { + Namespace: "other", + Name: "c3", + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: "message-dumper-foo.default.svc.cluster.local", + }, + }, + }, + }, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.name != "valid" { return } + c, e := Parse(zap.NewNop(), formatData(tc.config)) + if tc.expectedErr { + if e == nil { + t.Errorf("Expected an error, actual nil") + } + return + } + if !cmp.Equal(c, tc.expected) { + t.Errorf("Unexpected config. Expected '%v'. Actual '%v'.", tc.expected, c) + } + }) + } +} + +func formatData(config string) string { + // Golang editors tend to replace leading spaces with tabs. YAML is left whitespace + // sensitive and disallows tabs, so let's replace the tabs with four spaces. + return strings.Replace(config, "\t", " ", -1) +} diff --git a/pkg/sidecar/swappable/swappable.go b/pkg/sidecar/swappable/swappable.go new file mode 100644 index 00000000000..97ba588680c --- /dev/null +++ b/pkg/sidecar/swappable/swappable.go @@ -0,0 +1,105 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package swappable provides an http.Handler that delegates all HTTP requests to an underlying +// multichannelfanout.Handler. When a new configuration is available, a new +// multichannelfanout.Handler is created and swapped in. All subsequent requests go to the new +// handler. +// It is often used in conjunction with something that notices changes to ConfigMaps, such as +// configmap.watcher or configmap.filesystem. +package swappable + +import ( + "errors" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + "go.uber.org/zap" + "net/http" + "sync" + "sync/atomic" +) + +// http.Handler that atomically swaps between underlying handlers. +type Handler struct { + // The current multichannelfanout.Handler to delegate HTTP requests to. Never use this directly, + // instead use {get,set}MultiChannelFanoutHandler, which enforces the type we expect. + fanout atomic.Value + updateLock sync.Mutex + logger *zap.Logger +} + +type UpdateConfig func(config *multichannelfanout.Config) error + +var _ UpdateConfig = (&Handler{}).UpdateConfig + +// NewHandler creates a new swappable.Handler. +func NewHandler(handler *multichannelfanout.Handler, logger *zap.Logger) *Handler { + h := &Handler{ + logger: logger.With(zap.String("httpHandler", "swappable")), + } + h.setMultiChannelFanoutHandler(handler) + return h +} + +func NewEmptyHandler(logger *zap.Logger) (*Handler, error) { + h, err := multichannelfanout.NewHandler(logger, multichannelfanout.Config{}) + if err != nil { + return nil, err + } + return NewHandler(h, logger), nil +} + +// getMultiChannelFanoutHandler gets the current multichannelfanout.Handler to delegate all HTTP +// requests to. +func (h *Handler) getMultiChannelFanoutHandler() *multichannelfanout.Handler { + return h.fanout.Load().(*multichannelfanout.Handler) +} + +// setMultiChannelFanoutHandler sets a new multichannelfanout.Handler to delegate all subsequent +// HTTP requests to. +func (h *Handler) setMultiChannelFanoutHandler(nh *multichannelfanout.Handler) { + h.fanout.Store(nh) +} + +// UpdateConfig copies the current inner multichannelfanout.Handler with the new configuration. If +// the new configuration is valid, then the new inner handler is swapped in and will start serving +// HTTP traffic. +func (h *Handler) UpdateConfig(config *multichannelfanout.Config) error { + if config == nil { + return errors.New("nil config") + } + + h.updateLock.Lock() + defer h.updateLock.Unlock() + + ih := h.getMultiChannelFanoutHandler() + if diff := ih.ConfigDiff(*config); diff != "" { + h.logger.Info("Updating config (-old +new)", zap.String("diff", diff)) + newIh, err := ih.CopyWithNewConfig(*config) + if err != nil { + h.logger.Info("Unable to update config", zap.Error(err), zap.Any("config", config)) + return err + } + h.setMultiChannelFanoutHandler(newIh) + } + return nil +} + +// ServeHTTP delegates all HTTP requests to the current multichannelfanout.Handler. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Hand work off to the current multi channel fanout handler. + h.logger.Debug("ServeHTTP request received") + h.getMultiChannelFanoutHandler().ServeHTTP(w, r) +} diff --git a/pkg/sidecar/swappable/swappable_test.go b/pkg/sidecar/swappable/swappable_test.go new file mode 100644 index 00000000000..2c663bf800b --- /dev/null +++ b/pkg/sidecar/swappable/swappable_test.go @@ -0,0 +1,217 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package swappable + +import ( + "fmt" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +const ( + namespace = "default" + name = "channel1" + replaceDomain = "replaceDomain" +) + +func TestHandler(t *testing.T) { + testCases := map[string]struct { + configs []multichannelfanout.Config + }{ + "config change": { + configs: []multichannelfanout.Config{ + { + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: namespace, + Name: name, + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceDomain, + }, + }, + }, + }, + }, + }, + { + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: namespace, + Name: name, + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + SinkableDomain: replaceDomain, + }, + }, + }, + }, + }, + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + h, err := NewEmptyHandler(zap.NewNop()) + if err != nil { + t.Errorf("Unexpected error creating handler: %v", err) + } + for _, c := range tc.configs { + updateConfigAndTest(t, h, c) + } + }) + } +} + +func TestHandler_InvalidConfigChange(t *testing.T) { + testCases := map[string]struct { + initialConfig multichannelfanout.Config + badUpdateConfig multichannelfanout.Config + }{ + "invalid config change": { + initialConfig: multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: namespace, + Name: name, + FanoutConfig: fanout.Config{ + Subscriptions: []duckv1alpha1.ChannelSubscriberSpec{ + { + CallableDomain: replaceDomain, + }, + }, + }, + }, + }, + }, + badUpdateConfig: multichannelfanout.Config{ + // Duplicate (namespace, name). + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: namespace, + Name: name, + }, + { + Namespace: namespace, + Name: name, + }, + }, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + h, err := NewEmptyHandler(zap.NewNop()) + if err != nil { + t.Errorf("Unexpected error creating handler: %v", err) + } + + server := httptest.NewServer(&successHandler{}) + defer server.Close() + + rc := replaceDomains(tc.initialConfig, server.URL[7:]) + err = h.UpdateConfig(&rc) + if err != nil { + t.Errorf("Unexpected error updating to initial config: %v", tc.initialConfig) + } + assertRequestAccepted(t, h) + + // Try to update to the new config, it will fail. But we should still be able to hit the + // original server. + ruc := replaceDomains(tc.badUpdateConfig, server.URL[7:]) + err = h.UpdateConfig(&ruc) + if err == nil { + t.Errorf("Expected an error when updating to a bad config.") + } + assertRequestAccepted(t, h) + }) + } +} + +func TestHandler_NilConfigChange(t *testing.T) { + h, err := NewEmptyHandler(zap.NewNop()) + if err != nil { + t.Errorf("Unexpected error creating handler: %v", err) + } + + err = h.UpdateConfig(nil) + if err == nil { + t.Errorf("Expected an error when updating to a nil config.") + } +} + +func updateConfigAndTest(t *testing.T, h *Handler, config multichannelfanout.Config) { + server := httptest.NewServer(&successHandler{}) + defer server.Close() + + rc := replaceDomains(config, server.URL[7:]) + orig := h.fanout.Load() + err := h.UpdateConfig(&rc) + if err != nil { + t.Errorf("Unexpected error updating config: %+v", err) + } + if orig == h.fanout.Load() { + t.Errorf("Expected the inner multiChannelFanoutHandler to change, it didn't: %v", orig) + } + + assertRequestAccepted(t, h) +} + +func assertRequestAccepted(t *testing.T, h *Handler) { + w := httptest.NewRecorder() + h.ServeHTTP(w, makeRequest(namespace, name)) + if w.Code != http.StatusAccepted { + t.Errorf("Unexpected response code. Expected 202. Actual %v", w.Code) + } +} + +type successHandler struct{} + +func (*successHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + r.Body.Close() +} + +func makeRequest(namespace, name string) *http.Request { + r := httptest.NewRequest("POST", fmt.Sprintf("http://%s.%s/", name, namespace), strings.NewReader("")) + return r +} + +func replaceDomains(c multichannelfanout.Config, replacement string) multichannelfanout.Config { + for i, cc := range c.ChannelConfigs { + for j, sub := range cc.FanoutConfig.Subscriptions { + if sub.SinkableDomain == replaceDomain { + sub.SinkableDomain = replacement + } + if sub.CallableDomain == replaceDomain { + sub.CallableDomain = replacement + } + cc.FanoutConfig.Subscriptions[j] = sub + } + c.ChannelConfigs[i] = cc + } + return c +} diff --git a/third_party/VENDOR-LICENSE b/third_party/VENDOR-LICENSE index d28593bbdba..45ce934d7ba 100644 --- a/third_party/VENDOR-LICENSE +++ b/third_party/VENDOR-LICENSE @@ -254,6 +254,40 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +=========================================================== +Import: github.com/knative/eventing/vendor/github.com/fsnotify/fsnotify + +Copyright (c) 2012 The Go Authors. All rights reserved. +Copyright (c) 2012 fsnotify Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + + =========================================================== Import: github.com/knative/eventing/vendor/github.com/ghodss/yaml diff --git a/vendor/github.com/fsnotify/fsnotify/AUTHORS b/vendor/github.com/fsnotify/fsnotify/AUTHORS new file mode 100644 index 00000000000..5ab5d41c547 --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/AUTHORS @@ -0,0 +1,52 @@ +# Names should be added to this file as +# Name or Organization +# The email address is not required for organizations. + +# You can update this list using the following command: +# +# $ git shortlog -se | awk '{print $2 " " $3 " " $4}' + +# Please keep the list sorted. + +Aaron L +Adrien Bustany +Amit Krishnan +Anmol Sethi +Bjørn Erik Pedersen +Bruno Bigras +Caleb Spare +Case Nelson +Chris Howey +Christoffer Buchholz +Daniel Wagner-Hall +Dave Cheney +Evan Phoenix +Francisco Souza +Hari haran +John C Barstow +Kelvin Fo +Ken-ichirou MATSUZAWA +Matt Layher +Nathan Youngman +Nickolai Zeldovich +Patrick +Paul Hammond +Pawel Knap +Pieter Droogendijk +Pursuit92 +Riku Voipio +Rob Figueiredo +Rodrigo Chiossi +Slawek Ligus +Soge Zhang +Tiffany Jernigan +Tilak Sharma +Tom Payne +Travis Cline +Tudor Golubenco +Vahe Khachikyan +Yukang +bronze1man +debrando +henrikedwards +铁哥 diff --git a/vendor/github.com/fsnotify/fsnotify/LICENSE b/vendor/github.com/fsnotify/fsnotify/LICENSE new file mode 100644 index 00000000000..f21e5408009 --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/LICENSE @@ -0,0 +1,28 @@ +Copyright (c) 2012 The Go Authors. All rights reserved. +Copyright (c) 2012 fsnotify Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/fsnotify/fsnotify/fen.go b/vendor/github.com/fsnotify/fsnotify/fen.go new file mode 100644 index 00000000000..ced39cb881e --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/fen.go @@ -0,0 +1,37 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build solaris + +package fsnotify + +import ( + "errors" +) + +// Watcher watches a set of files, delivering events to a channel. +type Watcher struct { + Events chan Event + Errors chan error +} + +// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. +func NewWatcher() (*Watcher, error) { + return nil, errors.New("FEN based watcher not yet supported for fsnotify\n") +} + +// Close removes all watches and closes the events channel. +func (w *Watcher) Close() error { + return nil +} + +// Add starts watching the named file or directory (non-recursively). +func (w *Watcher) Add(name string) error { + return nil +} + +// Remove stops watching the the named file or directory (non-recursively). +func (w *Watcher) Remove(name string) error { + return nil +} diff --git a/vendor/github.com/fsnotify/fsnotify/fsnotify.go b/vendor/github.com/fsnotify/fsnotify/fsnotify.go new file mode 100644 index 00000000000..190bf0de575 --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/fsnotify.go @@ -0,0 +1,66 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !plan9 + +// Package fsnotify provides a platform-independent interface for file system notifications. +package fsnotify + +import ( + "bytes" + "errors" + "fmt" +) + +// Event represents a single file system notification. +type Event struct { + Name string // Relative path to the file or directory. + Op Op // File operation that triggered the event. +} + +// Op describes a set of file operations. +type Op uint32 + +// These are the generalized file operations that can trigger a notification. +const ( + Create Op = 1 << iota + Write + Remove + Rename + Chmod +) + +func (op Op) String() string { + // Use a buffer for efficient string concatenation + var buffer bytes.Buffer + + if op&Create == Create { + buffer.WriteString("|CREATE") + } + if op&Remove == Remove { + buffer.WriteString("|REMOVE") + } + if op&Write == Write { + buffer.WriteString("|WRITE") + } + if op&Rename == Rename { + buffer.WriteString("|RENAME") + } + if op&Chmod == Chmod { + buffer.WriteString("|CHMOD") + } + if buffer.Len() == 0 { + return "" + } + return buffer.String()[1:] // Strip leading pipe +} + +// String returns a string representation of the event in the form +// "file: REMOVE|WRITE|..." +func (e Event) String() string { + return fmt.Sprintf("%q: %s", e.Name, e.Op.String()) +} + +// Common errors that can be reported by a watcher +var ErrEventOverflow = errors.New("fsnotify queue overflow") diff --git a/vendor/github.com/fsnotify/fsnotify/inotify.go b/vendor/github.com/fsnotify/fsnotify/inotify.go new file mode 100644 index 00000000000..d9fd1b88a05 --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/inotify.go @@ -0,0 +1,337 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build linux + +package fsnotify + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "unsafe" + + "golang.org/x/sys/unix" +) + +// Watcher watches a set of files, delivering events to a channel. +type Watcher struct { + Events chan Event + Errors chan error + mu sync.Mutex // Map access + fd int + poller *fdPoller + watches map[string]*watch // Map of inotify watches (key: path) + paths map[int]string // Map of watched paths (key: watch descriptor) + done chan struct{} // Channel for sending a "quit message" to the reader goroutine + doneResp chan struct{} // Channel to respond to Close +} + +// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. +func NewWatcher() (*Watcher, error) { + // Create inotify fd + fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC) + if fd == -1 { + return nil, errno + } + // Create epoll + poller, err := newFdPoller(fd) + if err != nil { + unix.Close(fd) + return nil, err + } + w := &Watcher{ + fd: fd, + poller: poller, + watches: make(map[string]*watch), + paths: make(map[int]string), + Events: make(chan Event), + Errors: make(chan error), + done: make(chan struct{}), + doneResp: make(chan struct{}), + } + + go w.readEvents() + return w, nil +} + +func (w *Watcher) isClosed() bool { + select { + case <-w.done: + return true + default: + return false + } +} + +// Close removes all watches and closes the events channel. +func (w *Watcher) Close() error { + if w.isClosed() { + return nil + } + + // Send 'close' signal to goroutine, and set the Watcher to closed. + close(w.done) + + // Wake up goroutine + w.poller.wake() + + // Wait for goroutine to close + <-w.doneResp + + return nil +} + +// Add starts watching the named file or directory (non-recursively). +func (w *Watcher) Add(name string) error { + name = filepath.Clean(name) + if w.isClosed() { + return errors.New("inotify instance already closed") + } + + const agnosticEvents = unix.IN_MOVED_TO | unix.IN_MOVED_FROM | + unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY | + unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF + + var flags uint32 = agnosticEvents + + w.mu.Lock() + defer w.mu.Unlock() + watchEntry := w.watches[name] + if watchEntry != nil { + flags |= watchEntry.flags | unix.IN_MASK_ADD + } + wd, errno := unix.InotifyAddWatch(w.fd, name, flags) + if wd == -1 { + return errno + } + + if watchEntry == nil { + w.watches[name] = &watch{wd: uint32(wd), flags: flags} + w.paths[wd] = name + } else { + watchEntry.wd = uint32(wd) + watchEntry.flags = flags + } + + return nil +} + +// Remove stops watching the named file or directory (non-recursively). +func (w *Watcher) Remove(name string) error { + name = filepath.Clean(name) + + // Fetch the watch. + w.mu.Lock() + defer w.mu.Unlock() + watch, ok := w.watches[name] + + // Remove it from inotify. + if !ok { + return fmt.Errorf("can't remove non-existent inotify watch for: %s", name) + } + + // We successfully removed the watch if InotifyRmWatch doesn't return an + // error, we need to clean up our internal state to ensure it matches + // inotify's kernel state. + delete(w.paths, int(watch.wd)) + delete(w.watches, name) + + // inotify_rm_watch will return EINVAL if the file has been deleted; + // the inotify will already have been removed. + // watches and pathes are deleted in ignoreLinux() implicitly and asynchronously + // by calling inotify_rm_watch() below. e.g. readEvents() goroutine receives IN_IGNORE + // so that EINVAL means that the wd is being rm_watch()ed or its file removed + // by another thread and we have not received IN_IGNORE event. + success, errno := unix.InotifyRmWatch(w.fd, watch.wd) + if success == -1 { + // TODO: Perhaps it's not helpful to return an error here in every case. + // the only two possible errors are: + // EBADF, which happens when w.fd is not a valid file descriptor of any kind. + // EINVAL, which is when fd is not an inotify descriptor or wd is not a valid watch descriptor. + // Watch descriptors are invalidated when they are removed explicitly or implicitly; + // explicitly by inotify_rm_watch, implicitly when the file they are watching is deleted. + return errno + } + + return nil +} + +type watch struct { + wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall) + flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags) +} + +// readEvents reads from the inotify file descriptor, converts the +// received events into Event objects and sends them via the Events channel +func (w *Watcher) readEvents() { + var ( + buf [unix.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events + n int // Number of bytes read with read() + errno error // Syscall errno + ok bool // For poller.wait + ) + + defer close(w.doneResp) + defer close(w.Errors) + defer close(w.Events) + defer unix.Close(w.fd) + defer w.poller.close() + + for { + // See if we have been closed. + if w.isClosed() { + return + } + + ok, errno = w.poller.wait() + if errno != nil { + select { + case w.Errors <- errno: + case <-w.done: + return + } + continue + } + + if !ok { + continue + } + + n, errno = unix.Read(w.fd, buf[:]) + // If a signal interrupted execution, see if we've been asked to close, and try again. + // http://man7.org/linux/man-pages/man7/signal.7.html : + // "Before Linux 3.8, reads from an inotify(7) file descriptor were not restartable" + if errno == unix.EINTR { + continue + } + + // unix.Read might have been woken up by Close. If so, we're done. + if w.isClosed() { + return + } + + if n < unix.SizeofInotifyEvent { + var err error + if n == 0 { + // If EOF is received. This should really never happen. + err = io.EOF + } else if n < 0 { + // If an error occurred while reading. + err = errno + } else { + // Read was too short. + err = errors.New("notify: short read in readEvents()") + } + select { + case w.Errors <- err: + case <-w.done: + return + } + continue + } + + var offset uint32 + // We don't know how many events we just read into the buffer + // While the offset points to at least one whole event... + for offset <= uint32(n-unix.SizeofInotifyEvent) { + // Point "raw" to the event in the buffer + raw := (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset])) + + mask := uint32(raw.Mask) + nameLen := uint32(raw.Len) + + if mask&unix.IN_Q_OVERFLOW != 0 { + select { + case w.Errors <- ErrEventOverflow: + case <-w.done: + return + } + } + + // If the event happened to the watched directory or the watched file, the kernel + // doesn't append the filename to the event, but we would like to always fill the + // the "Name" field with a valid filename. We retrieve the path of the watch from + // the "paths" map. + w.mu.Lock() + name, ok := w.paths[int(raw.Wd)] + // IN_DELETE_SELF occurs when the file/directory being watched is removed. + // This is a sign to clean up the maps, otherwise we are no longer in sync + // with the inotify kernel state which has already deleted the watch + // automatically. + if ok && mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF { + delete(w.paths, int(raw.Wd)) + delete(w.watches, name) + } + w.mu.Unlock() + + if nameLen > 0 { + // Point "bytes" at the first byte of the filename + bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&buf[offset+unix.SizeofInotifyEvent])) + // The filename is padded with NULL bytes. TrimRight() gets rid of those. + name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000") + } + + event := newEvent(name, mask) + + // Send the events that are not ignored on the events channel + if !event.ignoreLinux(mask) { + select { + case w.Events <- event: + case <-w.done: + return + } + } + + // Move to the next event in the buffer + offset += unix.SizeofInotifyEvent + nameLen + } + } +} + +// Certain types of events can be "ignored" and not sent over the Events +// channel. Such as events marked ignore by the kernel, or MODIFY events +// against files that do not exist. +func (e *Event) ignoreLinux(mask uint32) bool { + // Ignore anything the inotify API says to ignore + if mask&unix.IN_IGNORED == unix.IN_IGNORED { + return true + } + + // If the event is not a DELETE or RENAME, the file must exist. + // Otherwise the event is ignored. + // *Note*: this was put in place because it was seen that a MODIFY + // event was sent after the DELETE. This ignores that MODIFY and + // assumes a DELETE will come or has come if the file doesn't exist. + if !(e.Op&Remove == Remove || e.Op&Rename == Rename) { + _, statErr := os.Lstat(e.Name) + return os.IsNotExist(statErr) + } + return false +} + +// newEvent returns an platform-independent Event based on an inotify mask. +func newEvent(name string, mask uint32) Event { + e := Event{Name: name} + if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO { + e.Op |= Create + } + if mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF || mask&unix.IN_DELETE == unix.IN_DELETE { + e.Op |= Remove + } + if mask&unix.IN_MODIFY == unix.IN_MODIFY { + e.Op |= Write + } + if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM { + e.Op |= Rename + } + if mask&unix.IN_ATTRIB == unix.IN_ATTRIB { + e.Op |= Chmod + } + return e +} diff --git a/vendor/github.com/fsnotify/fsnotify/inotify_poller.go b/vendor/github.com/fsnotify/fsnotify/inotify_poller.go new file mode 100644 index 00000000000..cc7db4b22ef --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/inotify_poller.go @@ -0,0 +1,187 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build linux + +package fsnotify + +import ( + "errors" + + "golang.org/x/sys/unix" +) + +type fdPoller struct { + fd int // File descriptor (as returned by the inotify_init() syscall) + epfd int // Epoll file descriptor + pipe [2]int // Pipe for waking up +} + +func emptyPoller(fd int) *fdPoller { + poller := new(fdPoller) + poller.fd = fd + poller.epfd = -1 + poller.pipe[0] = -1 + poller.pipe[1] = -1 + return poller +} + +// Create a new inotify poller. +// This creates an inotify handler, and an epoll handler. +func newFdPoller(fd int) (*fdPoller, error) { + var errno error + poller := emptyPoller(fd) + defer func() { + if errno != nil { + poller.close() + } + }() + poller.fd = fd + + // Create epoll fd + poller.epfd, errno = unix.EpollCreate1(0) + if poller.epfd == -1 { + return nil, errno + } + // Create pipe; pipe[0] is the read end, pipe[1] the write end. + errno = unix.Pipe2(poller.pipe[:], unix.O_NONBLOCK) + if errno != nil { + return nil, errno + } + + // Register inotify fd with epoll + event := unix.EpollEvent{ + Fd: int32(poller.fd), + Events: unix.EPOLLIN, + } + errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.fd, &event) + if errno != nil { + return nil, errno + } + + // Register pipe fd with epoll + event = unix.EpollEvent{ + Fd: int32(poller.pipe[0]), + Events: unix.EPOLLIN, + } + errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.pipe[0], &event) + if errno != nil { + return nil, errno + } + + return poller, nil +} + +// Wait using epoll. +// Returns true if something is ready to be read, +// false if there is not. +func (poller *fdPoller) wait() (bool, error) { + // 3 possible events per fd, and 2 fds, makes a maximum of 6 events. + // I don't know whether epoll_wait returns the number of events returned, + // or the total number of events ready. + // I decided to catch both by making the buffer one larger than the maximum. + events := make([]unix.EpollEvent, 7) + for { + n, errno := unix.EpollWait(poller.epfd, events, -1) + if n == -1 { + if errno == unix.EINTR { + continue + } + return false, errno + } + if n == 0 { + // If there are no events, try again. + continue + } + if n > 6 { + // This should never happen. More events were returned than should be possible. + return false, errors.New("epoll_wait returned more events than I know what to do with") + } + ready := events[:n] + epollhup := false + epollerr := false + epollin := false + for _, event := range ready { + if event.Fd == int32(poller.fd) { + if event.Events&unix.EPOLLHUP != 0 { + // This should not happen, but if it does, treat it as a wakeup. + epollhup = true + } + if event.Events&unix.EPOLLERR != 0 { + // If an error is waiting on the file descriptor, we should pretend + // something is ready to read, and let unix.Read pick up the error. + epollerr = true + } + if event.Events&unix.EPOLLIN != 0 { + // There is data to read. + epollin = true + } + } + if event.Fd == int32(poller.pipe[0]) { + if event.Events&unix.EPOLLHUP != 0 { + // Write pipe descriptor was closed, by us. This means we're closing down the + // watcher, and we should wake up. + } + if event.Events&unix.EPOLLERR != 0 { + // If an error is waiting on the pipe file descriptor. + // This is an absolute mystery, and should never ever happen. + return false, errors.New("Error on the pipe descriptor.") + } + if event.Events&unix.EPOLLIN != 0 { + // This is a regular wakeup, so we have to clear the buffer. + err := poller.clearWake() + if err != nil { + return false, err + } + } + } + } + + if epollhup || epollerr || epollin { + return true, nil + } + return false, nil + } +} + +// Close the write end of the poller. +func (poller *fdPoller) wake() error { + buf := make([]byte, 1) + n, errno := unix.Write(poller.pipe[1], buf) + if n == -1 { + if errno == unix.EAGAIN { + // Buffer is full, poller will wake. + return nil + } + return errno + } + return nil +} + +func (poller *fdPoller) clearWake() error { + // You have to be woken up a LOT in order to get to 100! + buf := make([]byte, 100) + n, errno := unix.Read(poller.pipe[0], buf) + if n == -1 { + if errno == unix.EAGAIN { + // Buffer is empty, someone else cleared our wake. + return nil + } + return errno + } + return nil +} + +// Close all poller file descriptors, but not the one passed to it. +func (poller *fdPoller) close() { + if poller.pipe[1] != -1 { + unix.Close(poller.pipe[1]) + } + if poller.pipe[0] != -1 { + unix.Close(poller.pipe[0]) + } + if poller.epfd != -1 { + unix.Close(poller.epfd) + } +} diff --git a/vendor/github.com/fsnotify/fsnotify/kqueue.go b/vendor/github.com/fsnotify/fsnotify/kqueue.go new file mode 100644 index 00000000000..86e76a3d676 --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/kqueue.go @@ -0,0 +1,521 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build freebsd openbsd netbsd dragonfly darwin + +package fsnotify + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + "time" + + "golang.org/x/sys/unix" +) + +// Watcher watches a set of files, delivering events to a channel. +type Watcher struct { + Events chan Event + Errors chan error + done chan struct{} // Channel for sending a "quit message" to the reader goroutine + + kq int // File descriptor (as returned by the kqueue() syscall). + + mu sync.Mutex // Protects access to watcher data + watches map[string]int // Map of watched file descriptors (key: path). + externalWatches map[string]bool // Map of watches added by user of the library. + dirFlags map[string]uint32 // Map of watched directories to fflags used in kqueue. + paths map[int]pathInfo // Map file descriptors to path names for processing kqueue events. + fileExists map[string]bool // Keep track of if we know this file exists (to stop duplicate create events). + isClosed bool // Set to true when Close() is first called +} + +type pathInfo struct { + name string + isDir bool +} + +// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. +func NewWatcher() (*Watcher, error) { + kq, err := kqueue() + if err != nil { + return nil, err + } + + w := &Watcher{ + kq: kq, + watches: make(map[string]int), + dirFlags: make(map[string]uint32), + paths: make(map[int]pathInfo), + fileExists: make(map[string]bool), + externalWatches: make(map[string]bool), + Events: make(chan Event), + Errors: make(chan error), + done: make(chan struct{}), + } + + go w.readEvents() + return w, nil +} + +// Close removes all watches and closes the events channel. +func (w *Watcher) Close() error { + w.mu.Lock() + if w.isClosed { + w.mu.Unlock() + return nil + } + w.isClosed = true + + // copy paths to remove while locked + var pathsToRemove = make([]string, 0, len(w.watches)) + for name := range w.watches { + pathsToRemove = append(pathsToRemove, name) + } + w.mu.Unlock() + // unlock before calling Remove, which also locks + + for _, name := range pathsToRemove { + w.Remove(name) + } + + // send a "quit" message to the reader goroutine + close(w.done) + + return nil +} + +// Add starts watching the named file or directory (non-recursively). +func (w *Watcher) Add(name string) error { + w.mu.Lock() + w.externalWatches[name] = true + w.mu.Unlock() + _, err := w.addWatch(name, noteAllEvents) + return err +} + +// Remove stops watching the the named file or directory (non-recursively). +func (w *Watcher) Remove(name string) error { + name = filepath.Clean(name) + w.mu.Lock() + watchfd, ok := w.watches[name] + w.mu.Unlock() + if !ok { + return fmt.Errorf("can't remove non-existent kevent watch for: %s", name) + } + + const registerRemove = unix.EV_DELETE + if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil { + return err + } + + unix.Close(watchfd) + + w.mu.Lock() + isDir := w.paths[watchfd].isDir + delete(w.watches, name) + delete(w.paths, watchfd) + delete(w.dirFlags, name) + w.mu.Unlock() + + // Find all watched paths that are in this directory that are not external. + if isDir { + var pathsToRemove []string + w.mu.Lock() + for _, path := range w.paths { + wdir, _ := filepath.Split(path.name) + if filepath.Clean(wdir) == name { + if !w.externalWatches[path.name] { + pathsToRemove = append(pathsToRemove, path.name) + } + } + } + w.mu.Unlock() + for _, name := range pathsToRemove { + // Since these are internal, not much sense in propagating error + // to the user, as that will just confuse them with an error about + // a path they did not explicitly watch themselves. + w.Remove(name) + } + } + + return nil +} + +// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) +const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME + +// keventWaitTime to block on each read from kevent +var keventWaitTime = durationToTimespec(100 * time.Millisecond) + +// addWatch adds name to the watched file set. +// The flags are interpreted as described in kevent(2). +// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks. +func (w *Watcher) addWatch(name string, flags uint32) (string, error) { + var isDir bool + // Make ./name and name equivalent + name = filepath.Clean(name) + + w.mu.Lock() + if w.isClosed { + w.mu.Unlock() + return "", errors.New("kevent instance already closed") + } + watchfd, alreadyWatching := w.watches[name] + // We already have a watch, but we can still override flags. + if alreadyWatching { + isDir = w.paths[watchfd].isDir + } + w.mu.Unlock() + + if !alreadyWatching { + fi, err := os.Lstat(name) + if err != nil { + return "", err + } + + // Don't watch sockets. + if fi.Mode()&os.ModeSocket == os.ModeSocket { + return "", nil + } + + // Don't watch named pipes. + if fi.Mode()&os.ModeNamedPipe == os.ModeNamedPipe { + return "", nil + } + + // Follow Symlinks + // Unfortunately, Linux can add bogus symlinks to watch list without + // issue, and Windows can't do symlinks period (AFAIK). To maintain + // consistency, we will act like everything is fine. There will simply + // be no file events for broken symlinks. + // Hence the returns of nil on errors. + if fi.Mode()&os.ModeSymlink == os.ModeSymlink { + name, err = filepath.EvalSymlinks(name) + if err != nil { + return "", nil + } + + w.mu.Lock() + _, alreadyWatching = w.watches[name] + w.mu.Unlock() + + if alreadyWatching { + return name, nil + } + + fi, err = os.Lstat(name) + if err != nil { + return "", nil + } + } + + watchfd, err = unix.Open(name, openMode, 0700) + if watchfd == -1 { + return "", err + } + + isDir = fi.IsDir() + } + + const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE + if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil { + unix.Close(watchfd) + return "", err + } + + if !alreadyWatching { + w.mu.Lock() + w.watches[name] = watchfd + w.paths[watchfd] = pathInfo{name: name, isDir: isDir} + w.mu.Unlock() + } + + if isDir { + // Watch the directory if it has not been watched before, + // or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles) + w.mu.Lock() + + watchDir := (flags&unix.NOTE_WRITE) == unix.NOTE_WRITE && + (!alreadyWatching || (w.dirFlags[name]&unix.NOTE_WRITE) != unix.NOTE_WRITE) + // Store flags so this watch can be updated later + w.dirFlags[name] = flags + w.mu.Unlock() + + if watchDir { + if err := w.watchDirectoryFiles(name); err != nil { + return "", err + } + } + } + return name, nil +} + +// readEvents reads from kqueue and converts the received kevents into +// Event values that it sends down the Events channel. +func (w *Watcher) readEvents() { + eventBuffer := make([]unix.Kevent_t, 10) + +loop: + for { + // See if there is a message on the "done" channel + select { + case <-w.done: + break loop + default: + } + + // Get new events + kevents, err := read(w.kq, eventBuffer, &keventWaitTime) + // EINTR is okay, the syscall was interrupted before timeout expired. + if err != nil && err != unix.EINTR { + select { + case w.Errors <- err: + case <-w.done: + break loop + } + continue + } + + // Flush the events we received to the Events channel + for len(kevents) > 0 { + kevent := &kevents[0] + watchfd := int(kevent.Ident) + mask := uint32(kevent.Fflags) + w.mu.Lock() + path := w.paths[watchfd] + w.mu.Unlock() + event := newEvent(path.name, mask) + + if path.isDir && !(event.Op&Remove == Remove) { + // Double check to make sure the directory exists. This can happen when + // we do a rm -fr on a recursively watched folders and we receive a + // modification event first but the folder has been deleted and later + // receive the delete event + if _, err := os.Lstat(event.Name); os.IsNotExist(err) { + // mark is as delete event + event.Op |= Remove + } + } + + if event.Op&Rename == Rename || event.Op&Remove == Remove { + w.Remove(event.Name) + w.mu.Lock() + delete(w.fileExists, event.Name) + w.mu.Unlock() + } + + if path.isDir && event.Op&Write == Write && !(event.Op&Remove == Remove) { + w.sendDirectoryChangeEvents(event.Name) + } else { + // Send the event on the Events channel. + select { + case w.Events <- event: + case <-w.done: + break loop + } + } + + if event.Op&Remove == Remove { + // Look for a file that may have overwritten this. + // For example, mv f1 f2 will delete f2, then create f2. + if path.isDir { + fileDir := filepath.Clean(event.Name) + w.mu.Lock() + _, found := w.watches[fileDir] + w.mu.Unlock() + if found { + // make sure the directory exists before we watch for changes. When we + // do a recursive watch and perform rm -fr, the parent directory might + // have gone missing, ignore the missing directory and let the + // upcoming delete event remove the watch from the parent directory. + if _, err := os.Lstat(fileDir); err == nil { + w.sendDirectoryChangeEvents(fileDir) + } + } + } else { + filePath := filepath.Clean(event.Name) + if fileInfo, err := os.Lstat(filePath); err == nil { + w.sendFileCreatedEventIfNew(filePath, fileInfo) + } + } + } + + // Move to next event + kevents = kevents[1:] + } + } + + // cleanup + err := unix.Close(w.kq) + if err != nil { + // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors. + select { + case w.Errors <- err: + default: + } + } + close(w.Events) + close(w.Errors) +} + +// newEvent returns an platform-independent Event based on kqueue Fflags. +func newEvent(name string, mask uint32) Event { + e := Event{Name: name} + if mask&unix.NOTE_DELETE == unix.NOTE_DELETE { + e.Op |= Remove + } + if mask&unix.NOTE_WRITE == unix.NOTE_WRITE { + e.Op |= Write + } + if mask&unix.NOTE_RENAME == unix.NOTE_RENAME { + e.Op |= Rename + } + if mask&unix.NOTE_ATTRIB == unix.NOTE_ATTRIB { + e.Op |= Chmod + } + return e +} + +func newCreateEvent(name string) Event { + return Event{Name: name, Op: Create} +} + +// watchDirectoryFiles to mimic inotify when adding a watch on a directory +func (w *Watcher) watchDirectoryFiles(dirPath string) error { + // Get all files + files, err := ioutil.ReadDir(dirPath) + if err != nil { + return err + } + + for _, fileInfo := range files { + filePath := filepath.Join(dirPath, fileInfo.Name()) + filePath, err = w.internalWatch(filePath, fileInfo) + if err != nil { + return err + } + + w.mu.Lock() + w.fileExists[filePath] = true + w.mu.Unlock() + } + + return nil +} + +// sendDirectoryEvents searches the directory for newly created files +// and sends them over the event channel. This functionality is to have +// the BSD version of fsnotify match Linux inotify which provides a +// create event for files created in a watched directory. +func (w *Watcher) sendDirectoryChangeEvents(dirPath string) { + // Get all files + files, err := ioutil.ReadDir(dirPath) + if err != nil { + select { + case w.Errors <- err: + case <-w.done: + return + } + } + + // Search for new files + for _, fileInfo := range files { + filePath := filepath.Join(dirPath, fileInfo.Name()) + err := w.sendFileCreatedEventIfNew(filePath, fileInfo) + + if err != nil { + return + } + } +} + +// sendFileCreatedEvent sends a create event if the file isn't already being tracked. +func (w *Watcher) sendFileCreatedEventIfNew(filePath string, fileInfo os.FileInfo) (err error) { + w.mu.Lock() + _, doesExist := w.fileExists[filePath] + w.mu.Unlock() + if !doesExist { + // Send create event + select { + case w.Events <- newCreateEvent(filePath): + case <-w.done: + return + } + } + + // like watchDirectoryFiles (but without doing another ReadDir) + filePath, err = w.internalWatch(filePath, fileInfo) + if err != nil { + return err + } + + w.mu.Lock() + w.fileExists[filePath] = true + w.mu.Unlock() + + return nil +} + +func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, error) { + if fileInfo.IsDir() { + // mimic Linux providing delete events for subdirectories + // but preserve the flags used if currently watching subdirectory + w.mu.Lock() + flags := w.dirFlags[name] + w.mu.Unlock() + + flags |= unix.NOTE_DELETE | unix.NOTE_RENAME + return w.addWatch(name, flags) + } + + // watch file to mimic Linux inotify + return w.addWatch(name, noteAllEvents) +} + +// kqueue creates a new kernel event queue and returns a descriptor. +func kqueue() (kq int, err error) { + kq, err = unix.Kqueue() + if kq == -1 { + return kq, err + } + return kq, nil +} + +// register events with the queue +func register(kq int, fds []int, flags int, fflags uint32) error { + changes := make([]unix.Kevent_t, len(fds)) + + for i, fd := range fds { + // SetKevent converts int to the platform-specific types: + unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags) + changes[i].Fflags = fflags + } + + // register the events + success, err := unix.Kevent(kq, changes, nil, nil) + if success == -1 { + return err + } + return nil +} + +// read retrieves pending events, or waits until an event occurs. +// A timeout of nil blocks indefinitely, while 0 polls the queue. +func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) { + n, err := unix.Kevent(kq, nil, events, timeout) + if err != nil { + return nil, err + } + return events[0:n], nil +} + +// durationToTimespec prepares a timeout value +func durationToTimespec(d time.Duration) unix.Timespec { + return unix.NsecToTimespec(d.Nanoseconds()) +} diff --git a/vendor/github.com/fsnotify/fsnotify/open_mode_bsd.go b/vendor/github.com/fsnotify/fsnotify/open_mode_bsd.go new file mode 100644 index 00000000000..7d8de14513e --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/open_mode_bsd.go @@ -0,0 +1,11 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build freebsd openbsd netbsd dragonfly + +package fsnotify + +import "golang.org/x/sys/unix" + +const openMode = unix.O_NONBLOCK | unix.O_RDONLY diff --git a/vendor/github.com/fsnotify/fsnotify/open_mode_darwin.go b/vendor/github.com/fsnotify/fsnotify/open_mode_darwin.go new file mode 100644 index 00000000000..9139e17161b --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/open_mode_darwin.go @@ -0,0 +1,12 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin + +package fsnotify + +import "golang.org/x/sys/unix" + +// note: this constant is not defined on BSD +const openMode = unix.O_EVTONLY diff --git a/vendor/github.com/fsnotify/fsnotify/windows.go b/vendor/github.com/fsnotify/fsnotify/windows.go new file mode 100644 index 00000000000..09436f31d82 --- /dev/null +++ b/vendor/github.com/fsnotify/fsnotify/windows.go @@ -0,0 +1,561 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build windows + +package fsnotify + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "runtime" + "sync" + "syscall" + "unsafe" +) + +// Watcher watches a set of files, delivering events to a channel. +type Watcher struct { + Events chan Event + Errors chan error + isClosed bool // Set to true when Close() is first called + mu sync.Mutex // Map access + port syscall.Handle // Handle to completion port + watches watchMap // Map of watches (key: i-number) + input chan *input // Inputs to the reader are sent on this channel + quit chan chan<- error +} + +// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. +func NewWatcher() (*Watcher, error) { + port, e := syscall.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 0) + if e != nil { + return nil, os.NewSyscallError("CreateIoCompletionPort", e) + } + w := &Watcher{ + port: port, + watches: make(watchMap), + input: make(chan *input, 1), + Events: make(chan Event, 50), + Errors: make(chan error), + quit: make(chan chan<- error, 1), + } + go w.readEvents() + return w, nil +} + +// Close removes all watches and closes the events channel. +func (w *Watcher) Close() error { + if w.isClosed { + return nil + } + w.isClosed = true + + // Send "quit" message to the reader goroutine + ch := make(chan error) + w.quit <- ch + if err := w.wakeupReader(); err != nil { + return err + } + return <-ch +} + +// Add starts watching the named file or directory (non-recursively). +func (w *Watcher) Add(name string) error { + if w.isClosed { + return errors.New("watcher already closed") + } + in := &input{ + op: opAddWatch, + path: filepath.Clean(name), + flags: sysFSALLEVENTS, + reply: make(chan error), + } + w.input <- in + if err := w.wakeupReader(); err != nil { + return err + } + return <-in.reply +} + +// Remove stops watching the the named file or directory (non-recursively). +func (w *Watcher) Remove(name string) error { + in := &input{ + op: opRemoveWatch, + path: filepath.Clean(name), + reply: make(chan error), + } + w.input <- in + if err := w.wakeupReader(); err != nil { + return err + } + return <-in.reply +} + +const ( + // Options for AddWatch + sysFSONESHOT = 0x80000000 + sysFSONLYDIR = 0x1000000 + + // Events + sysFSACCESS = 0x1 + sysFSALLEVENTS = 0xfff + sysFSATTRIB = 0x4 + sysFSCLOSE = 0x18 + sysFSCREATE = 0x100 + sysFSDELETE = 0x200 + sysFSDELETESELF = 0x400 + sysFSMODIFY = 0x2 + sysFSMOVE = 0xc0 + sysFSMOVEDFROM = 0x40 + sysFSMOVEDTO = 0x80 + sysFSMOVESELF = 0x800 + + // Special events + sysFSIGNORED = 0x8000 + sysFSQOVERFLOW = 0x4000 +) + +func newEvent(name string, mask uint32) Event { + e := Event{Name: name} + if mask&sysFSCREATE == sysFSCREATE || mask&sysFSMOVEDTO == sysFSMOVEDTO { + e.Op |= Create + } + if mask&sysFSDELETE == sysFSDELETE || mask&sysFSDELETESELF == sysFSDELETESELF { + e.Op |= Remove + } + if mask&sysFSMODIFY == sysFSMODIFY { + e.Op |= Write + } + if mask&sysFSMOVE == sysFSMOVE || mask&sysFSMOVESELF == sysFSMOVESELF || mask&sysFSMOVEDFROM == sysFSMOVEDFROM { + e.Op |= Rename + } + if mask&sysFSATTRIB == sysFSATTRIB { + e.Op |= Chmod + } + return e +} + +const ( + opAddWatch = iota + opRemoveWatch +) + +const ( + provisional uint64 = 1 << (32 + iota) +) + +type input struct { + op int + path string + flags uint32 + reply chan error +} + +type inode struct { + handle syscall.Handle + volume uint32 + index uint64 +} + +type watch struct { + ov syscall.Overlapped + ino *inode // i-number + path string // Directory path + mask uint64 // Directory itself is being watched with these notify flags + names map[string]uint64 // Map of names being watched and their notify flags + rename string // Remembers the old name while renaming a file + buf [4096]byte +} + +type indexMap map[uint64]*watch +type watchMap map[uint32]indexMap + +func (w *Watcher) wakeupReader() error { + e := syscall.PostQueuedCompletionStatus(w.port, 0, 0, nil) + if e != nil { + return os.NewSyscallError("PostQueuedCompletionStatus", e) + } + return nil +} + +func getDir(pathname string) (dir string, err error) { + attr, e := syscall.GetFileAttributes(syscall.StringToUTF16Ptr(pathname)) + if e != nil { + return "", os.NewSyscallError("GetFileAttributes", e) + } + if attr&syscall.FILE_ATTRIBUTE_DIRECTORY != 0 { + dir = pathname + } else { + dir, _ = filepath.Split(pathname) + dir = filepath.Clean(dir) + } + return +} + +func getIno(path string) (ino *inode, err error) { + h, e := syscall.CreateFile(syscall.StringToUTF16Ptr(path), + syscall.FILE_LIST_DIRECTORY, + syscall.FILE_SHARE_READ|syscall.FILE_SHARE_WRITE|syscall.FILE_SHARE_DELETE, + nil, syscall.OPEN_EXISTING, + syscall.FILE_FLAG_BACKUP_SEMANTICS|syscall.FILE_FLAG_OVERLAPPED, 0) + if e != nil { + return nil, os.NewSyscallError("CreateFile", e) + } + var fi syscall.ByHandleFileInformation + if e = syscall.GetFileInformationByHandle(h, &fi); e != nil { + syscall.CloseHandle(h) + return nil, os.NewSyscallError("GetFileInformationByHandle", e) + } + ino = &inode{ + handle: h, + volume: fi.VolumeSerialNumber, + index: uint64(fi.FileIndexHigh)<<32 | uint64(fi.FileIndexLow), + } + return ino, nil +} + +// Must run within the I/O thread. +func (m watchMap) get(ino *inode) *watch { + if i := m[ino.volume]; i != nil { + return i[ino.index] + } + return nil +} + +// Must run within the I/O thread. +func (m watchMap) set(ino *inode, watch *watch) { + i := m[ino.volume] + if i == nil { + i = make(indexMap) + m[ino.volume] = i + } + i[ino.index] = watch +} + +// Must run within the I/O thread. +func (w *Watcher) addWatch(pathname string, flags uint64) error { + dir, err := getDir(pathname) + if err != nil { + return err + } + if flags&sysFSONLYDIR != 0 && pathname != dir { + return nil + } + ino, err := getIno(dir) + if err != nil { + return err + } + w.mu.Lock() + watchEntry := w.watches.get(ino) + w.mu.Unlock() + if watchEntry == nil { + if _, e := syscall.CreateIoCompletionPort(ino.handle, w.port, 0, 0); e != nil { + syscall.CloseHandle(ino.handle) + return os.NewSyscallError("CreateIoCompletionPort", e) + } + watchEntry = &watch{ + ino: ino, + path: dir, + names: make(map[string]uint64), + } + w.mu.Lock() + w.watches.set(ino, watchEntry) + w.mu.Unlock() + flags |= provisional + } else { + syscall.CloseHandle(ino.handle) + } + if pathname == dir { + watchEntry.mask |= flags + } else { + watchEntry.names[filepath.Base(pathname)] |= flags + } + if err = w.startRead(watchEntry); err != nil { + return err + } + if pathname == dir { + watchEntry.mask &= ^provisional + } else { + watchEntry.names[filepath.Base(pathname)] &= ^provisional + } + return nil +} + +// Must run within the I/O thread. +func (w *Watcher) remWatch(pathname string) error { + dir, err := getDir(pathname) + if err != nil { + return err + } + ino, err := getIno(dir) + if err != nil { + return err + } + w.mu.Lock() + watch := w.watches.get(ino) + w.mu.Unlock() + if watch == nil { + return fmt.Errorf("can't remove non-existent watch for: %s", pathname) + } + if pathname == dir { + w.sendEvent(watch.path, watch.mask&sysFSIGNORED) + watch.mask = 0 + } else { + name := filepath.Base(pathname) + w.sendEvent(filepath.Join(watch.path, name), watch.names[name]&sysFSIGNORED) + delete(watch.names, name) + } + return w.startRead(watch) +} + +// Must run within the I/O thread. +func (w *Watcher) deleteWatch(watch *watch) { + for name, mask := range watch.names { + if mask&provisional == 0 { + w.sendEvent(filepath.Join(watch.path, name), mask&sysFSIGNORED) + } + delete(watch.names, name) + } + if watch.mask != 0 { + if watch.mask&provisional == 0 { + w.sendEvent(watch.path, watch.mask&sysFSIGNORED) + } + watch.mask = 0 + } +} + +// Must run within the I/O thread. +func (w *Watcher) startRead(watch *watch) error { + if e := syscall.CancelIo(watch.ino.handle); e != nil { + w.Errors <- os.NewSyscallError("CancelIo", e) + w.deleteWatch(watch) + } + mask := toWindowsFlags(watch.mask) + for _, m := range watch.names { + mask |= toWindowsFlags(m) + } + if mask == 0 { + if e := syscall.CloseHandle(watch.ino.handle); e != nil { + w.Errors <- os.NewSyscallError("CloseHandle", e) + } + w.mu.Lock() + delete(w.watches[watch.ino.volume], watch.ino.index) + w.mu.Unlock() + return nil + } + e := syscall.ReadDirectoryChanges(watch.ino.handle, &watch.buf[0], + uint32(unsafe.Sizeof(watch.buf)), false, mask, nil, &watch.ov, 0) + if e != nil { + err := os.NewSyscallError("ReadDirectoryChanges", e) + if e == syscall.ERROR_ACCESS_DENIED && watch.mask&provisional == 0 { + // Watched directory was probably removed + if w.sendEvent(watch.path, watch.mask&sysFSDELETESELF) { + if watch.mask&sysFSONESHOT != 0 { + watch.mask = 0 + } + } + err = nil + } + w.deleteWatch(watch) + w.startRead(watch) + return err + } + return nil +} + +// readEvents reads from the I/O completion port, converts the +// received events into Event objects and sends them via the Events channel. +// Entry point to the I/O thread. +func (w *Watcher) readEvents() { + var ( + n, key uint32 + ov *syscall.Overlapped + ) + runtime.LockOSThread() + + for { + e := syscall.GetQueuedCompletionStatus(w.port, &n, &key, &ov, syscall.INFINITE) + watch := (*watch)(unsafe.Pointer(ov)) + + if watch == nil { + select { + case ch := <-w.quit: + w.mu.Lock() + var indexes []indexMap + for _, index := range w.watches { + indexes = append(indexes, index) + } + w.mu.Unlock() + for _, index := range indexes { + for _, watch := range index { + w.deleteWatch(watch) + w.startRead(watch) + } + } + var err error + if e := syscall.CloseHandle(w.port); e != nil { + err = os.NewSyscallError("CloseHandle", e) + } + close(w.Events) + close(w.Errors) + ch <- err + return + case in := <-w.input: + switch in.op { + case opAddWatch: + in.reply <- w.addWatch(in.path, uint64(in.flags)) + case opRemoveWatch: + in.reply <- w.remWatch(in.path) + } + default: + } + continue + } + + switch e { + case syscall.ERROR_MORE_DATA: + if watch == nil { + w.Errors <- errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer") + } else { + // The i/o succeeded but the buffer is full. + // In theory we should be building up a full packet. + // In practice we can get away with just carrying on. + n = uint32(unsafe.Sizeof(watch.buf)) + } + case syscall.ERROR_ACCESS_DENIED: + // Watched directory was probably removed + w.sendEvent(watch.path, watch.mask&sysFSDELETESELF) + w.deleteWatch(watch) + w.startRead(watch) + continue + case syscall.ERROR_OPERATION_ABORTED: + // CancelIo was called on this handle + continue + default: + w.Errors <- os.NewSyscallError("GetQueuedCompletionPort", e) + continue + case nil: + } + + var offset uint32 + for { + if n == 0 { + w.Events <- newEvent("", sysFSQOVERFLOW) + w.Errors <- errors.New("short read in readEvents()") + break + } + + // Point "raw" to the event in the buffer + raw := (*syscall.FileNotifyInformation)(unsafe.Pointer(&watch.buf[offset])) + buf := (*[syscall.MAX_PATH]uint16)(unsafe.Pointer(&raw.FileName)) + name := syscall.UTF16ToString(buf[:raw.FileNameLength/2]) + fullname := filepath.Join(watch.path, name) + + var mask uint64 + switch raw.Action { + case syscall.FILE_ACTION_REMOVED: + mask = sysFSDELETESELF + case syscall.FILE_ACTION_MODIFIED: + mask = sysFSMODIFY + case syscall.FILE_ACTION_RENAMED_OLD_NAME: + watch.rename = name + case syscall.FILE_ACTION_RENAMED_NEW_NAME: + if watch.names[watch.rename] != 0 { + watch.names[name] |= watch.names[watch.rename] + delete(watch.names, watch.rename) + mask = sysFSMOVESELF + } + } + + sendNameEvent := func() { + if w.sendEvent(fullname, watch.names[name]&mask) { + if watch.names[name]&sysFSONESHOT != 0 { + delete(watch.names, name) + } + } + } + if raw.Action != syscall.FILE_ACTION_RENAMED_NEW_NAME { + sendNameEvent() + } + if raw.Action == syscall.FILE_ACTION_REMOVED { + w.sendEvent(fullname, watch.names[name]&sysFSIGNORED) + delete(watch.names, name) + } + if w.sendEvent(fullname, watch.mask&toFSnotifyFlags(raw.Action)) { + if watch.mask&sysFSONESHOT != 0 { + watch.mask = 0 + } + } + if raw.Action == syscall.FILE_ACTION_RENAMED_NEW_NAME { + fullname = filepath.Join(watch.path, watch.rename) + sendNameEvent() + } + + // Move to the next event in the buffer + if raw.NextEntryOffset == 0 { + break + } + offset += raw.NextEntryOffset + + // Error! + if offset >= n { + w.Errors <- errors.New("Windows system assumed buffer larger than it is, events have likely been missed.") + break + } + } + + if err := w.startRead(watch); err != nil { + w.Errors <- err + } + } +} + +func (w *Watcher) sendEvent(name string, mask uint64) bool { + if mask == 0 { + return false + } + event := newEvent(name, uint32(mask)) + select { + case ch := <-w.quit: + w.quit <- ch + case w.Events <- event: + } + return true +} + +func toWindowsFlags(mask uint64) uint32 { + var m uint32 + if mask&sysFSACCESS != 0 { + m |= syscall.FILE_NOTIFY_CHANGE_LAST_ACCESS + } + if mask&sysFSMODIFY != 0 { + m |= syscall.FILE_NOTIFY_CHANGE_LAST_WRITE + } + if mask&sysFSATTRIB != 0 { + m |= syscall.FILE_NOTIFY_CHANGE_ATTRIBUTES + } + if mask&(sysFSMOVE|sysFSCREATE|sysFSDELETE) != 0 { + m |= syscall.FILE_NOTIFY_CHANGE_FILE_NAME | syscall.FILE_NOTIFY_CHANGE_DIR_NAME + } + return m +} + +func toFSnotifyFlags(action uint32) uint64 { + switch action { + case syscall.FILE_ACTION_ADDED: + return sysFSCREATE + case syscall.FILE_ACTION_REMOVED: + return sysFSDELETE + case syscall.FILE_ACTION_MODIFIED: + return sysFSMODIFY + case syscall.FILE_ACTION_RENAMED_OLD_NAME: + return sysFSMOVEDFROM + case syscall.FILE_ACTION_RENAMED_NEW_NAME: + return sysFSMOVEDTO + } + return 0 +}