Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b745cad
Unit tests for passthrough headers.
Harwayne Sep 26, 2018
0f949d5
Unit tests.
Harwayne Sep 26, 2018
fd2ad47
Initial fanout sidecar.
Harwayne Sep 26, 2018
b850e29
Switch to ChannelSubscriberSpec.
Harwayne Sep 26, 2018
9e0c7f6
Fanout tests work.
Harwayne Sep 27, 2018
b03f91c
MultiChannelFanoutHandler tests work.
Harwayne Sep 27, 2018
6ed11df
Remove ClientFactory.
Harwayne Sep 27, 2018
7de128d
All tests working.
Harwayne Sep 27, 2018
b7bdd18
Remove erroneous messages from storage.
Harwayne Sep 28, 2018
5f69090
Run hack/update-deps.sh.
Harwayne Sep 28, 2018
b4ec4a0
swappable.Handler is now the 'top' http.Handler. ConfigMaps can be up…
Harwayne Sep 28, 2018
8338ceb
Delete the old config_map_handler.
Harwayne Sep 28, 2018
ced872b
Respond to PR comments.
Harwayne Oct 1, 2018
2ba92be
Merge branch 'master' into fanout-with-dispatcher
Harwayne Oct 1, 2018
5be5ea5
Merge branch 'master' into fanout-with-dispatcher
Harwayne Oct 1, 2018
0d1b9ed
Remove the no longer used httpDoer interface.
Harwayne Oct 1, 2018
4ac2f9e
Merge branch 'master' into fanout-with-dispatcher
Harwayne Oct 5, 2018
46a9b13
hack/update-deps.sh
Harwayne Oct 5, 2018
d046c88
Use channels to send information between the fanout's receiver and di…
Harwayne Oct 9, 2018
6ecefb3
Symlink VENDOR-LICENSE and LICENSE
Harwayne Oct 9, 2018
e82c2bb
Respond to PR comments.
Harwayne Oct 9, 2018
bd9facc
Use InformerWatcher rather the homegrown ConfigMap watcher.
Harwayne Oct 9, 2018
1bb5b44
Change filesystem watcher unit tests away from using the HTTP server …
Harwayne Oct 10, 2018
895dcd3
fanout.Handler synchronously calls dispatch(), rather than asynchrono…
Harwayne Oct 10, 2018
b7c2bf1
Respond to PR comments.
Harwayne Oct 10, 2018
60247cf
Shutdown with a timeout.
Harwayne Oct 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/fanoutsidecar/kodata/LICENSE
1 change: 1 addition & 0 deletions cmd/fanoutsidecar/kodata/VENDOR-LICENSE
167 changes: 167 additions & 0 deletions cmd/fanoutsidecar/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// A sidecar that implements filtering of Cloud Events sent out via HTTP. Implemented as an HTTP
// proxy that the main containers need to write through.

package main

import (
"context"
"errors"
"flag"
"fmt"
"github.com/knative/eventing/pkg/sidecar/configmap/filesystem"
"github.com/knative/eventing/pkg/sidecar/configmap/watcher"
"github.com/knative/eventing/pkg/sidecar/swappable"
"github.com/knative/eventing/pkg/system"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
"log"
"net/http"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
"strings"
"time"
)

const (
defaultConfigMapName = "in-memory-bus-config"

// The following are the only valid values of the config_map_noticer flag.
cmnfVolume = "volume"
cmnfWatcher = "watcher"
)

var (
readTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute

port int
configMapNoticer string
configMapNamespace string
configMapName string
)

func init() {
flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.")
flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerFlags()))
flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace, "The namespace of the ConfigMap that is watched for configuration.")
flag.StringVar(&configMapName, "config_map_name", defaultConfigMapName, "The name of the ConfigMap that is watched for configuration.")
}

func configMapNoticerFlags() string {
return strings.Join([]string{cmnfVolume, cmnfWatcher}, ", ")
}

func main() {
flag.Parse()

logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("Unable to create logger: %v", err)
}

if port < 0 {
logger.Fatal("--sidecar_port flag must be set")
}

sh, err := swappable.NewEmptyHandler(logger)
if err != nil {
logger.Fatal("Unable to create swappable.Handler", zap.Error(err))
}

mgr, err := setupConfigMapNoticer(logger, sh.UpdateConfig)
if err != nil {
logger.Fatal("Unable to create configMap noticer.", zap.Error(err))
}

s := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: sh,
ErrorLog: zap.NewStdLog(logger),
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
}

// Start both the manager (which notices ConfigMap changes) and the HTTP server.
var g errgroup.Group
g.Go(func() error {
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
// Start blocks forever, so run it in a goroutine.
return mgr.Start(stopCh)
})
logger.Info("Fanout sidecar Listening...", zap.String("Address", s.Addr))
g.Go(s.ListenAndServe)
err = g.Wait()
if err != nil {
logger.Error("Either the HTTP server or the ConfigMap noticer failed.", zap.Error(err))
}

ctx, cancel := context.WithTimeout(context.Background(), writeTimeout)
defer cancel()
s.Shutdown(ctx)
}

func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateConfig) (manager.Manager, error) {
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
return nil, err
logger.Error("Error starting manager.", zap.Error(err))
}

switch configMapNoticer {
case cmnfVolume:
err = setupConfigMapVolume(logger, mgr, configUpdated)
case cmnfWatcher:
err = setupConfigMapWatcher(logger, mgr, configUpdated)
default:
err = errors.New("need to provide the --config_map_noticer flag")
}
if err != nil {
return nil, err
}

return mgr, nil
}

func setupConfigMapVolume(logger *zap.Logger, mgr manager.Manager, configUpdated swappable.UpdateConfig) error {
cmn, err := filesystem.NewConfigMapWatcher(logger, filesystem.ConfigDir, configUpdated)
if err != nil {
logger.Error("Unable to create filesystem.ConifgMapWatcher", zap.Error(err))
return err
}
mgr.Add(cmn)
return nil
}

Comment thread
adamharwayne marked this conversation as resolved.
func setupConfigMapWatcher(logger *zap.Logger, mgr manager.Manager, configUpdated swappable.UpdateConfig) error {
kc, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return err
}

cmw, err := watcher.NewWatcher(logger, kc, configMapNamespace, configMapName, configUpdated)
if err != nil {
return err
}

mgr.Add(cmw)
return nil
}
4 changes: 2 additions & 2 deletions pkg/buses/message_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (r *MessageReceiver) stop(srv *http.Server) {
func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Request) {
host := req.Host
r.logger.Infof("Received request for %s", host)
channel := r.parseChannel(host)
channel := ParseChannel(host)

message, err := r.fromRequest(req)
if err != nil {
Expand Down Expand Up @@ -165,7 +165,7 @@ func (r *MessageReceiver) fromHTTPHeaders(headers http.Header) map[string]string

// parseChannel converts the channel's hostname into a channel
// reference.
func (r *MessageReceiver) parseChannel(host string) ChannelReference {
func ParseChannel(host string) ChannelReference {
chunks := strings.Split(host, ".")
return ChannelReference{
Name: chunks[0],
Expand Down
124 changes: 124 additions & 0 deletions pkg/sidecar/configmap/filesystem/filesystem_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filesystem

import (
"errors"
"github.com/fsnotify/fsnotify"
sidecarconfigmap "github.com/knative/eventing/pkg/sidecar/configmap"
"github.com/knative/eventing/pkg/sidecar/multichannelfanout"
"github.com/knative/eventing/pkg/sidecar/swappable"
"github.com/knative/pkg/configmap"
"go.uber.org/zap"
)

const (
// The mount path of the configMap volume.
ConfigDir = "/etc/config/fanout_sidecar"
)

// Monitors an attached ConfigMap volume for updated configuration and calls `configUpdated` when
// the value changes.
type configMapWatcher struct {
logger *zap.Logger
// The directory to read the configMap from.
dir string
// Stop the watcher by closing this channel.
watcherStopCh chan<- bool

// The function to call when the configuration is updated.
configUpdated swappable.UpdateConfig
}

// NewConfigMapWatcher creates a new filesystem.configMapWatcher. The caller is responsible for
// calling Start(<-chan), likely via a controller-runtime Manager.
func NewConfigMapWatcher(logger *zap.Logger, dir string, updateConfig swappable.UpdateConfig) (*configMapWatcher, error) {
conf, err := readConfigMap(logger, dir)
if err != nil {
logger.Error("Unable to read configMap", zap.Error(err))
return nil, err
}

logger.Info("Read initial configMap", zap.Any("conf", conf))

err = updateConfig(conf)
if err != nil {
logger.Error("Unable to use the initial configMap: %v", zap.Error(err))
return nil, err
}

cmw := &configMapWatcher{
logger: logger,
dir: dir,
configUpdated: updateConfig,
}
return cmw, nil
}

// readConfigMap attempts to read the configMap from the attached volume.
func readConfigMap(logger *zap.Logger, dir string) (*multichannelfanout.Config, error) {
cm, err := configmap.Load(dir)
if err != nil {
return nil, err
}
return sidecarconfigmap.NewFanoutConfig(logger, cm)
}

// updateConfig reads the configMap data and calls `configUpdated` with the updated value.
func (cmw *configMapWatcher) updateConfig() {
conf, err := readConfigMap(cmw.logger, cmw.dir)
if err != nil {
cmw.logger.Error("Unable to read the configMap", zap.Error(err))
return
}
err = cmw.configUpdated(conf)
if err != nil {
cmw.logger.Error("Unable to update config", zap.Error(err))
return
}
}

func (cmw *configMapWatcher) Start(stopCh <-chan struct{}) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}

err = watcher.Add(cmw.dir)
if err != nil {
return err
}

for {
select {
case _, ok := <-watcher.Events:
if !ok {
// Channel closed.
return errors.New("watcher.Events channel closed")
}
cmw.updateConfig()
case err, ok := <-watcher.Errors:
if !ok {
// Channel closed.
return errors.New("watcher.Errors channel closed")
}
cmw.logger.Error("watcher.Errors", zap.Error(err))
case <-stopCh:
return watcher.Close()
}
}
}
Loading