From dfc5864a0e43c928b38a9d356cedd9b01a86fafd Mon Sep 17 00:00:00 2001 From: Benjamin Bennett Date: Fri, 29 Sep 2017 11:14:14 -0400 Subject: [PATCH 1/2] Made the router test take Env variables to alter logging Added new environment variables to help with debugging: - OPENSHIFT_LOG_LEVEL: Defaults to 4, but sets the debug level to the given value - OPENSHIFT_GET_ALL_DOCKER_LOGS: A boolean that enables dumping of all container logs if any container failed (rather than just giving the logs from the failure) --- test/end-to-end/router_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/test/end-to-end/router_test.go b/test/end-to-end/router_test.go index b669613de49e..975ff3417ada 100644 --- a/test/end-to-end/router_test.go +++ b/test/end-to-end/router_test.go @@ -1348,10 +1348,15 @@ func createAndStartRouterContainerExtended(dockerCli *dockerClient.Client, maste hostVols = append(hostVols, fmt.Sprintf("%[1]s:/usr/bin/openshift", binary)) } + logLevel := os.Getenv("OPENSHIFT_LOG_LEVEL") + if len(logLevel) == 0 { + logLevel = "4" + } + containerOpts := dockerClient.CreateContainerOptions{ Config: &dockerClient.Config{ Image: getRouterImage(), - Cmd: []string{"--master=" + masterIp, "--loglevel=4"}, + Cmd: []string{"--master=" + masterIp, "--loglevel=" + logLevel}, Env: env, ExposedPorts: exposedPorts, VolumesFrom: vols, @@ -1414,8 +1419,10 @@ func validateServer(server *tr.TestHttpService, t *testing.T) { // cleanUp stops and removes the deployed router func cleanUp(t *testing.T, dockerCli *dockerClient.Client, routerId string) { + getAllLogs, _ := strconv.ParseBool(os.Getenv("OPENSHIFT_GET_ALL_DOCKER_LOGS")) + dockerCli.StopContainer(routerId, 5) - if t.Failed() { + if t.Failed() || getAllLogs { dockerCli.Logs(dockerClient.LogsOptions{ Container: routerId, OutputStream: os.Stdout, From dac5ce6d5136fa09af03c53b0cb93a0ef8b6bc13 Mon Sep 17 00:00:00 2001 From: Benjamin Bennett Date: Thu, 19 Oct 2017 11:35:04 -0400 Subject: [PATCH 2/2] Change the router reload suppression so that it doesn't block updates This changes the locking so that a reload doesn't hold a lock of the router object for the duration of the reload so that updates that happen while the router is reloading can be processed immediately and batched up, then included when the next reload occurs. Before this, if a reload ran longer than the reload interval, only one event would be processed before triggering a new reload. Which would then lock out other event processing. This caused the router to not make any meaningful progress consuming events. A new module to do the rate limiting has been added. The module has have a top and bottom half. The top half simply calls the bottom half with a flag indicating the user has made a change. The flag simply tells the bottom half to register the desire to reload (so we can do it under a single lock acquisition). The bottom half is in charge of determining if it can immediately reload or if it has to wait. If it must wait, then it works out the earliest time it can reload and schedules a callback to itself for that time. If it determines it can reload, then it runs the reload code immediately. When the reload is complete, it calls itself again to make sure there was no other pending reload that had come in while the reload was running. Whenever the bottom half calls itself, it does it without the flag indicating the user made a change. Fixes bug 1471899 -- https://bugzilla.redhat.com/show_bug.cgi?id=1471899 --- pkg/router/template/fake.go | 9 +- pkg/router/template/limiter/limiter.go | 144 ++++++++++++++++++ pkg/router/template/limiter/limiter_test.go | 75 +++++++++ pkg/router/template/router.go | 25 +-- .../router_without_haproxy_test.go | 6 +- 5 files changed, 234 insertions(+), 25 deletions(-) create mode 100644 pkg/router/template/limiter/limiter.go create mode 100644 pkg/router/template/limiter/limiter_test.go diff --git a/pkg/router/template/fake.go b/pkg/router/template/fake.go index a34f5f019b29..1d68532a581b 100644 --- a/pkg/router/template/fake.go +++ b/pkg/router/template/fake.go @@ -5,11 +5,10 @@ package templaterouter func NewFakeTemplateRouter() *templateRouter { fakeCertManager, _ := newSimpleCertificateManager(newFakeCertificateManagerConfig(), &fakeCertWriter{}) return &templateRouter{ - state: map[string]ServiceAliasConfig{}, - serviceUnits: make(map[string]ServiceUnit), - certManager: fakeCertManager, - rateLimitedCommitFunction: nil, - rateLimitedCommitStopChannel: make(chan struct{}), + state: map[string]ServiceAliasConfig{}, + serviceUnits: make(map[string]ServiceUnit), + certManager: fakeCertManager, + rateLimitedCommitFunction: nil, } } diff --git a/pkg/router/template/limiter/limiter.go b/pkg/router/template/limiter/limiter.go new file mode 100644 index 000000000000..93bc140a5a00 --- /dev/null +++ b/pkg/router/template/limiter/limiter.go @@ -0,0 +1,144 @@ +package limiter + +import ( + "sync" + "time" + + "github.com/golang/glog" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" +) + +// HandlerFunc defines function signature for a CoalescingSerializingRateLimiter. +type HandlerFunc func() error + +// CoalescingSerializingRateLimiter guarantees that calls will not happen to the given function +// more frequently than the given interval, and it guarantees that only one call will happen at a time. +// The calls are not queued, i.e. if you make 5 calls to RegisterChange(), it does not guarantee that the +// handler will be invoked 5 times, it merely guarantees it will be invoked once, and no more often than +// the rate. +// The calls to the handler will happen in the background and are expected to do their own locking if needed. +type CoalescingSerializingRateLimiter struct { + // handlerFunc is the function to rate limit and seriaize calls to. + handlerFunc HandlerFunc + + // callInterval is the minimum time between the starts of handler calls. + callInterval time.Duration + + // lastStart is the time the last run of the handler started. + lastStart time.Time + + // changeReqTime is nil if no change has been registered since the last handler run completed, otherwise it is the + // time last change was registered. + changeReqTime *time.Time + + // handlerRunning indicates whether the Handler is actively running. + handlerRunning bool + + // lock protects the CoalescingSerializingRateLimiter structure from multiple threads manipulating it at once. + lock sync.Mutex + + // callbackTimer is the timer we use to make callbacks to re-run the function to decide if we need to do work. + callbackTimer *time.Timer +} + +func NewCoalescingSerializingRateLimiter(interval time.Duration, handlerFunc HandlerFunc) *CoalescingSerializingRateLimiter { + limiter := &CoalescingSerializingRateLimiter{ + handlerFunc: handlerFunc, + callInterval: interval, + lastStart: time.Time{}, + changeReqTime: nil, + handlerRunning: false, + } + + return limiter +} + +// RegisterChange() indicates that the rate limited function should be called. It may not immediately run it, but it will cause it to run within +// the ReloadInterval. It will always immediately return, the function will be run in the background. Not every call to RegisterChange() will +// result in the function getting called. If it is called repeatedly while it is still within the ReloadInterval since the last run, it will +// only run once when the time allows it. +func (csrl *CoalescingSerializingRateLimiter) RegisterChange() { + glog.V(8).Infof("RegisterChange called") + + csrl.changeWorker(true) +} + +func (csrl *CoalescingSerializingRateLimiter) changeWorker(userChanged bool) { + csrl.lock.Lock() + defer csrl.lock.Unlock() + + glog.V(8).Infof("changeWorker called") + + if userChanged && csrl.changeReqTime == nil { + // They just registered a change manually (and we aren't in the middle of a change) + now := time.Now() + csrl.changeReqTime = &now + } + + if csrl.handlerRunning { + // We don't need to do anything else... there's a run in progress, and when it is done it will re-call this function at which point the work will then happen + glog.V(8).Infof("The handler was already running (%v) started at %s, returning from the worker", csrl.handlerRunning, csrl.lastStart.String()) + return + } + + if csrl.changeReqTime == nil { + // There's no work queued so we have nothing to do. We should only get here when + // the function is re-called after a reload + glog.V(8).Infof("No invoke requested time, so there's no queued work. Nothing to do.") + return + } + + // There is no handler running, let's see if we should run yet, or schedule a callback + now := time.Now() + sinceLastRun := now.Sub(csrl.lastStart) + untilNextCallback := csrl.callInterval - sinceLastRun + glog.V(8).Infof("Checking reload; now: %v, lastStart: %v, sinceLast %v, limit %v, remaining %v", now, csrl.lastStart, sinceLastRun, csrl.callInterval, untilNextCallback) + + if untilNextCallback > 0 { + // We want to reload... but can't yet because some window is not satisfied + if csrl.callbackTimer == nil { + csrl.callbackTimer = time.AfterFunc(untilNextCallback, func() { csrl.changeWorker(false) }) + } else { + // While we are resetting the timer, it should have fired and be stopped. + // The first time the worker is called it will know the precise duration + // until when a run would be valid and has scheduled a timer for that point + csrl.callbackTimer.Reset(untilNextCallback) + } + + glog.V(8).Infof("Can't invoke the handler yet, need to delay %s, callback scheduled", untilNextCallback.String()) + + return + } + + // Otherwise we can reload immediately... let's do it! + glog.V(8).Infof("Calling the handler function (for invoke time %v)", csrl.changeReqTime) + csrl.handlerRunning = true + csrl.changeReqTime = nil + csrl.lastStart = now + + // Go run the handler so we don't block the caller + go csrl.runHandler() + + return +} + +func (csrl *CoalescingSerializingRateLimiter) runHandler() { + // Call the handler, but do it in its own function so we can cleanup in case the handler panics + runHandler := func() error { + defer func() { + csrl.lock.Lock() + csrl.handlerRunning = false + csrl.lock.Unlock() + }() + + return csrl.handlerFunc() + } + if err := runHandler(); err != nil { + utilruntime.HandleError(err) + } + + // Re-call the commit in case there is work waiting that came in while we were working + // we want to call the top level commit in case the state has not changed + glog.V(8).Infof("Re-Calling the worker after a reload in case work came in") + csrl.changeWorker(false) +} diff --git a/pkg/router/template/limiter/limiter_test.go b/pkg/router/template/limiter/limiter_test.go new file mode 100644 index 000000000000..3bc9b15da25d --- /dev/null +++ b/pkg/router/template/limiter/limiter_test.go @@ -0,0 +1,75 @@ +package limiter + +import ( + "fmt" + "sync" + "testing" + "time" +) + +type handler struct { + count int + sync.Mutex +} + +func (h *handler) handle() error { + h.Lock() + defer h.Unlock() + h.count += 1 + return nil +} + +func (h *handler) counter() int { + h.Lock() + defer h.Unlock() + return h.count +} + +func TestCoalescingSerializingRateLimiter(t *testing.T) { + + fmt.Println("start") + + tests := []struct { + Name string + Interval time.Duration + Times int + }{ + { + Name: "3PO", + Interval: 3 * time.Second, + Times: 10, + }, + { + Name: "five-fer", + Interval: 5 * time.Second, + Times: 20, + }, + { + Name: "longjob", + Interval: 2 * time.Second, + Times: 20, + }, + } + + for _, tc := range tests { + h := &handler{} + rlf := NewCoalescingSerializingRateLimiter(tc.Interval, h.handle) + + for i := 0; i < tc.Times; i++ { + fmt.Println("start") + rlf.RegisterChange() + fmt.Println("end") + } + + select { + case <-time.After(tc.Interval + 2*time.Second): + fmt.Println("after") + + counter := h.counter() + if tc.Interval > 0 && counter >= tc.Times/2 { + t.Errorf("For coalesced calls, expected number of invocations to be at least half. Expected: < %v Got: %v", + tc.Times/2, counter) + } + } + } +} diff --git a/pkg/router/template/router.go b/pkg/router/template/router.go index 6256c4878af6..47b2b06fba7c 100644 --- a/pkg/router/template/router.go +++ b/pkg/router/template/router.go @@ -22,7 +22,7 @@ import ( cmdutil "github.com/openshift/origin/pkg/cmd/util" routeapi "github.com/openshift/origin/pkg/route/apis/route" "github.com/openshift/origin/pkg/router/controller" - "github.com/openshift/origin/pkg/util/ratelimiter" + "github.com/openshift/origin/pkg/router/template/limiter" ) const ( @@ -88,9 +88,7 @@ type templateRouter struct { allowWildcardRoutes bool // rateLimitedCommitFunction is a rate limited commit (persist state + refresh the backend) // function that coalesces and controls how often the router is reloaded. - rateLimitedCommitFunction *ratelimiter.RateLimitedFunction - // rateLimitedCommitStopChannel is the stop/terminate channel. - rateLimitedCommitStopChannel chan struct{} + rateLimitedCommitFunction *limiter.CoalescingSerializingRateLimiter // lock is a mutex used to prevent concurrent router reloads. lock sync.Mutex // If true, haproxy should only bind ports when it has route and endpoint state @@ -206,12 +204,10 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { metricReload: metricsReload, metricWriteConfig: metricWriteConfig, - rateLimitedCommitFunction: nil, - rateLimitedCommitStopChannel: make(chan struct{}), + rateLimitedCommitFunction: nil, } - numSeconds := int(cfg.reloadInterval.Seconds()) - router.EnableRateLimiter(numSeconds, router.commitAndReload) + router.EnableRateLimiter(cfg.reloadInterval, router.commitAndReload) if err := router.writeDefaultCert(); err != nil { return nil, err @@ -227,14 +223,9 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { return router, nil } -func (r *templateRouter) EnableRateLimiter(interval int, handlerFunc ratelimiter.HandlerFunc) { - keyFunc := func(_ interface{}) (string, error) { - return "templaterouter", nil - } - - r.rateLimitedCommitFunction = ratelimiter.NewRateLimitedFunction(keyFunc, interval, handlerFunc) - r.rateLimitedCommitFunction.RunUntil(r.rateLimitedCommitStopChannel) - glog.V(2).Infof("Template router will coalesce reloads within %v seconds of each other", interval) +func (r *templateRouter) EnableRateLimiter(interval time.Duration, handlerFunc limiter.HandlerFunc) { + r.rateLimitedCommitFunction = limiter.NewCoalescingSerializingRateLimiter(interval, handlerFunc) + glog.V(2).Infof("Template router will coalesce reloads within %s of each other", interval.String()) } // secretToPem composes a PEM file at the output directory from an input private key and crt file. @@ -327,7 +318,7 @@ func (r *templateRouter) Commit() { r.lock.Unlock() if needsCommit { - r.rateLimitedCommitFunction.Invoke(r.rateLimitedCommitFunction) + r.rateLimitedCommitFunction.RegisterChange() } } diff --git a/test/integration/router_without_haproxy_test.go b/test/integration/router_without_haproxy_test.go index 841c560cba19..9bc4822f063e 100644 --- a/test/integration/router_without_haproxy_test.go +++ b/test/integration/router_without_haproxy_test.go @@ -142,7 +142,7 @@ func stressRouter(t *testing.T, namespaceCount, routesPerNamespace, routerCount, plugins := []*templateplugin.TemplatePlugin{} // Don't coalesce reloads to validate reload suppression during sync. - reloadInterval := 0 + reloadInterval := 0 * time.Second // Track reload counts indexed by router name. reloadedMap := make(map[string]int) @@ -364,7 +364,7 @@ func (p *DelayPlugin) Commit() error { // launchRateLimitedRouter launches a rate-limited template router // that communicates with the api via the provided clients. -func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, kc kclientset.Interface, name string, maxDelay int32, reloadInterval int, reloadedMap map[string]int) *templateplugin.TemplatePlugin { +func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, kc kclientset.Interface, name string, maxDelay int32, reloadInterval time.Duration, reloadedMap map[string]int) *templateplugin.TemplatePlugin { reloadedMap[name] = 0 rateLimitingFunc := func() error { reloadedMap[name] += 1 @@ -385,7 +385,7 @@ func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.In return templatePlugin } -func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval int, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) { +func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval time.Duration, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) { r := templateplugin.NewFakeTemplateRouter() r.EnableRateLimiter(reloadInterval, func() error {