diff --git a/pkg/router/template/fake.go b/pkg/router/template/fake.go index a34f5f019b29..1d68532a581b 100644 --- a/pkg/router/template/fake.go +++ b/pkg/router/template/fake.go @@ -5,11 +5,10 @@ package templaterouter func NewFakeTemplateRouter() *templateRouter { fakeCertManager, _ := newSimpleCertificateManager(newFakeCertificateManagerConfig(), &fakeCertWriter{}) return &templateRouter{ - state: map[string]ServiceAliasConfig{}, - serviceUnits: make(map[string]ServiceUnit), - certManager: fakeCertManager, - rateLimitedCommitFunction: nil, - rateLimitedCommitStopChannel: make(chan struct{}), + state: map[string]ServiceAliasConfig{}, + serviceUnits: make(map[string]ServiceUnit), + certManager: fakeCertManager, + rateLimitedCommitFunction: nil, } } diff --git a/pkg/router/template/limiter/limiter.go b/pkg/router/template/limiter/limiter.go new file mode 100644 index 000000000000..93bc140a5a00 --- /dev/null +++ b/pkg/router/template/limiter/limiter.go @@ -0,0 +1,144 @@ +package limiter + +import ( + "sync" + "time" + + "github.com/golang/glog" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" +) + +// HandlerFunc defines function signature for a CoalescingSerializingRateLimiter. +type HandlerFunc func() error + +// CoalescingSerializingRateLimiter guarantees that calls will not happen to the given function +// more frequently than the given interval, and it guarantees that only one call will happen at a time. +// The calls are not queued, i.e. if you make 5 calls to RegisterChange(), it does not guarantee that the +// handler will be invoked 5 times, it merely guarantees it will be invoked once, and no more often than +// the rate. +// The calls to the handler will happen in the background and are expected to do their own locking if needed. +type CoalescingSerializingRateLimiter struct { + // handlerFunc is the function to rate limit and seriaize calls to. + handlerFunc HandlerFunc + + // callInterval is the minimum time between the starts of handler calls. + callInterval time.Duration + + // lastStart is the time the last run of the handler started. + lastStart time.Time + + // changeReqTime is nil if no change has been registered since the last handler run completed, otherwise it is the + // time last change was registered. + changeReqTime *time.Time + + // handlerRunning indicates whether the Handler is actively running. + handlerRunning bool + + // lock protects the CoalescingSerializingRateLimiter structure from multiple threads manipulating it at once. + lock sync.Mutex + + // callbackTimer is the timer we use to make callbacks to re-run the function to decide if we need to do work. + callbackTimer *time.Timer +} + +func NewCoalescingSerializingRateLimiter(interval time.Duration, handlerFunc HandlerFunc) *CoalescingSerializingRateLimiter { + limiter := &CoalescingSerializingRateLimiter{ + handlerFunc: handlerFunc, + callInterval: interval, + lastStart: time.Time{}, + changeReqTime: nil, + handlerRunning: false, + } + + return limiter +} + +// RegisterChange() indicates that the rate limited function should be called. It may not immediately run it, but it will cause it to run within +// the ReloadInterval. It will always immediately return, the function will be run in the background. Not every call to RegisterChange() will +// result in the function getting called. If it is called repeatedly while it is still within the ReloadInterval since the last run, it will +// only run once when the time allows it. +func (csrl *CoalescingSerializingRateLimiter) RegisterChange() { + glog.V(8).Infof("RegisterChange called") + + csrl.changeWorker(true) +} + +func (csrl *CoalescingSerializingRateLimiter) changeWorker(userChanged bool) { + csrl.lock.Lock() + defer csrl.lock.Unlock() + + glog.V(8).Infof("changeWorker called") + + if userChanged && csrl.changeReqTime == nil { + // They just registered a change manually (and we aren't in the middle of a change) + now := time.Now() + csrl.changeReqTime = &now + } + + if csrl.handlerRunning { + // We don't need to do anything else... there's a run in progress, and when it is done it will re-call this function at which point the work will then happen + glog.V(8).Infof("The handler was already running (%v) started at %s, returning from the worker", csrl.handlerRunning, csrl.lastStart.String()) + return + } + + if csrl.changeReqTime == nil { + // There's no work queued so we have nothing to do. We should only get here when + // the function is re-called after a reload + glog.V(8).Infof("No invoke requested time, so there's no queued work. Nothing to do.") + return + } + + // There is no handler running, let's see if we should run yet, or schedule a callback + now := time.Now() + sinceLastRun := now.Sub(csrl.lastStart) + untilNextCallback := csrl.callInterval - sinceLastRun + glog.V(8).Infof("Checking reload; now: %v, lastStart: %v, sinceLast %v, limit %v, remaining %v", now, csrl.lastStart, sinceLastRun, csrl.callInterval, untilNextCallback) + + if untilNextCallback > 0 { + // We want to reload... but can't yet because some window is not satisfied + if csrl.callbackTimer == nil { + csrl.callbackTimer = time.AfterFunc(untilNextCallback, func() { csrl.changeWorker(false) }) + } else { + // While we are resetting the timer, it should have fired and be stopped. + // The first time the worker is called it will know the precise duration + // until when a run would be valid and has scheduled a timer for that point + csrl.callbackTimer.Reset(untilNextCallback) + } + + glog.V(8).Infof("Can't invoke the handler yet, need to delay %s, callback scheduled", untilNextCallback.String()) + + return + } + + // Otherwise we can reload immediately... let's do it! + glog.V(8).Infof("Calling the handler function (for invoke time %v)", csrl.changeReqTime) + csrl.handlerRunning = true + csrl.changeReqTime = nil + csrl.lastStart = now + + // Go run the handler so we don't block the caller + go csrl.runHandler() + + return +} + +func (csrl *CoalescingSerializingRateLimiter) runHandler() { + // Call the handler, but do it in its own function so we can cleanup in case the handler panics + runHandler := func() error { + defer func() { + csrl.lock.Lock() + csrl.handlerRunning = false + csrl.lock.Unlock() + }() + + return csrl.handlerFunc() + } + if err := runHandler(); err != nil { + utilruntime.HandleError(err) + } + + // Re-call the commit in case there is work waiting that came in while we were working + // we want to call the top level commit in case the state has not changed + glog.V(8).Infof("Re-Calling the worker after a reload in case work came in") + csrl.changeWorker(false) +} diff --git a/pkg/router/template/limiter/limiter_test.go b/pkg/router/template/limiter/limiter_test.go new file mode 100644 index 000000000000..3bc9b15da25d --- /dev/null +++ b/pkg/router/template/limiter/limiter_test.go @@ -0,0 +1,75 @@ +package limiter + +import ( + "fmt" + "sync" + "testing" + "time" +) + +type handler struct { + count int + sync.Mutex +} + +func (h *handler) handle() error { + h.Lock() + defer h.Unlock() + h.count += 1 + return nil +} + +func (h *handler) counter() int { + h.Lock() + defer h.Unlock() + return h.count +} + +func TestCoalescingSerializingRateLimiter(t *testing.T) { + + fmt.Println("start") + + tests := []struct { + Name string + Interval time.Duration + Times int + }{ + { + Name: "3PO", + Interval: 3 * time.Second, + Times: 10, + }, + { + Name: "five-fer", + Interval: 5 * time.Second, + Times: 20, + }, + { + Name: "longjob", + Interval: 2 * time.Second, + Times: 20, + }, + } + + for _, tc := range tests { + h := &handler{} + rlf := NewCoalescingSerializingRateLimiter(tc.Interval, h.handle) + + for i := 0; i < tc.Times; i++ { + fmt.Println("start") + rlf.RegisterChange() + fmt.Println("end") + } + + select { + case <-time.After(tc.Interval + 2*time.Second): + fmt.Println("after") + + counter := h.counter() + if tc.Interval > 0 && counter >= tc.Times/2 { + t.Errorf("For coalesced calls, expected number of invocations to be at least half. Expected: < %v Got: %v", + tc.Times/2, counter) + } + } + } +} diff --git a/pkg/router/template/router.go b/pkg/router/template/router.go index 6256c4878af6..47b2b06fba7c 100644 --- a/pkg/router/template/router.go +++ b/pkg/router/template/router.go @@ -22,7 +22,7 @@ import ( cmdutil "github.com/openshift/origin/pkg/cmd/util" routeapi "github.com/openshift/origin/pkg/route/apis/route" "github.com/openshift/origin/pkg/router/controller" - "github.com/openshift/origin/pkg/util/ratelimiter" + "github.com/openshift/origin/pkg/router/template/limiter" ) const ( @@ -88,9 +88,7 @@ type templateRouter struct { allowWildcardRoutes bool // rateLimitedCommitFunction is a rate limited commit (persist state + refresh the backend) // function that coalesces and controls how often the router is reloaded. - rateLimitedCommitFunction *ratelimiter.RateLimitedFunction - // rateLimitedCommitStopChannel is the stop/terminate channel. - rateLimitedCommitStopChannel chan struct{} + rateLimitedCommitFunction *limiter.CoalescingSerializingRateLimiter // lock is a mutex used to prevent concurrent router reloads. lock sync.Mutex // If true, haproxy should only bind ports when it has route and endpoint state @@ -206,12 +204,10 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { metricReload: metricsReload, metricWriteConfig: metricWriteConfig, - rateLimitedCommitFunction: nil, - rateLimitedCommitStopChannel: make(chan struct{}), + rateLimitedCommitFunction: nil, } - numSeconds := int(cfg.reloadInterval.Seconds()) - router.EnableRateLimiter(numSeconds, router.commitAndReload) + router.EnableRateLimiter(cfg.reloadInterval, router.commitAndReload) if err := router.writeDefaultCert(); err != nil { return nil, err @@ -227,14 +223,9 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { return router, nil } -func (r *templateRouter) EnableRateLimiter(interval int, handlerFunc ratelimiter.HandlerFunc) { - keyFunc := func(_ interface{}) (string, error) { - return "templaterouter", nil - } - - r.rateLimitedCommitFunction = ratelimiter.NewRateLimitedFunction(keyFunc, interval, handlerFunc) - r.rateLimitedCommitFunction.RunUntil(r.rateLimitedCommitStopChannel) - glog.V(2).Infof("Template router will coalesce reloads within %v seconds of each other", interval) +func (r *templateRouter) EnableRateLimiter(interval time.Duration, handlerFunc limiter.HandlerFunc) { + r.rateLimitedCommitFunction = limiter.NewCoalescingSerializingRateLimiter(interval, handlerFunc) + glog.V(2).Infof("Template router will coalesce reloads within %s of each other", interval.String()) } // secretToPem composes a PEM file at the output directory from an input private key and crt file. @@ -327,7 +318,7 @@ func (r *templateRouter) Commit() { r.lock.Unlock() if needsCommit { - r.rateLimitedCommitFunction.Invoke(r.rateLimitedCommitFunction) + r.rateLimitedCommitFunction.RegisterChange() } } diff --git a/test/end-to-end/router_test.go b/test/end-to-end/router_test.go index b669613de49e..975ff3417ada 100644 --- a/test/end-to-end/router_test.go +++ b/test/end-to-end/router_test.go @@ -1348,10 +1348,15 @@ func createAndStartRouterContainerExtended(dockerCli *dockerClient.Client, maste hostVols = append(hostVols, fmt.Sprintf("%[1]s:/usr/bin/openshift", binary)) } + logLevel := os.Getenv("OPENSHIFT_LOG_LEVEL") + if len(logLevel) == 0 { + logLevel = "4" + } + containerOpts := dockerClient.CreateContainerOptions{ Config: &dockerClient.Config{ Image: getRouterImage(), - Cmd: []string{"--master=" + masterIp, "--loglevel=4"}, + Cmd: []string{"--master=" + masterIp, "--loglevel=" + logLevel}, Env: env, ExposedPorts: exposedPorts, VolumesFrom: vols, @@ -1414,8 +1419,10 @@ func validateServer(server *tr.TestHttpService, t *testing.T) { // cleanUp stops and removes the deployed router func cleanUp(t *testing.T, dockerCli *dockerClient.Client, routerId string) { + getAllLogs, _ := strconv.ParseBool(os.Getenv("OPENSHIFT_GET_ALL_DOCKER_LOGS")) + dockerCli.StopContainer(routerId, 5) - if t.Failed() { + if t.Failed() || getAllLogs { dockerCli.Logs(dockerClient.LogsOptions{ Container: routerId, OutputStream: os.Stdout, diff --git a/test/integration/router_without_haproxy_test.go b/test/integration/router_without_haproxy_test.go index 841c560cba19..9bc4822f063e 100644 --- a/test/integration/router_without_haproxy_test.go +++ b/test/integration/router_without_haproxy_test.go @@ -142,7 +142,7 @@ func stressRouter(t *testing.T, namespaceCount, routesPerNamespace, routerCount, plugins := []*templateplugin.TemplatePlugin{} // Don't coalesce reloads to validate reload suppression during sync. - reloadInterval := 0 + reloadInterval := 0 * time.Second // Track reload counts indexed by router name. reloadedMap := make(map[string]int) @@ -364,7 +364,7 @@ func (p *DelayPlugin) Commit() error { // launchRateLimitedRouter launches a rate-limited template router // that communicates with the api via the provided clients. -func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, kc kclientset.Interface, name string, maxDelay int32, reloadInterval int, reloadedMap map[string]int) *templateplugin.TemplatePlugin { +func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, kc kclientset.Interface, name string, maxDelay int32, reloadInterval time.Duration, reloadedMap map[string]int) *templateplugin.TemplatePlugin { reloadedMap[name] = 0 rateLimitingFunc := func() error { reloadedMap[name] += 1 @@ -385,7 +385,7 @@ func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.In return templatePlugin } -func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval int, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) { +func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval time.Duration, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) { r := templateplugin.NewFakeTemplateRouter() r.EnableRateLimiter(reloadInterval, func() error {