diff --git a/contrib/completions/bash/openshift b/contrib/completions/bash/openshift index d791dfb0f4ef..4cb02ec3d6ee 100644 --- a/contrib/completions/bash/openshift +++ b/contrib/completions/bash/openshift @@ -24282,6 +24282,10 @@ _openshift_infra_router() local_nonpersistent_flags+=("--project-labels=") flags+=("--reload=") local_nonpersistent_flags+=("--reload=") + flags+=("--reload-event-wait=") + local_nonpersistent_flags+=("--reload-event-wait=") + flags+=("--reload-gap=") + local_nonpersistent_flags+=("--reload-gap=") flags+=("--request-timeout=") local_nonpersistent_flags+=("--request-timeout=") flags+=("--resync-interval=") diff --git a/contrib/completions/zsh/openshift b/contrib/completions/zsh/openshift index d5fc6ff07d6f..146675283083 100644 --- a/contrib/completions/zsh/openshift +++ b/contrib/completions/zsh/openshift @@ -24431,6 +24431,10 @@ _openshift_infra_router() local_nonpersistent_flags+=("--project-labels=") flags+=("--reload=") local_nonpersistent_flags+=("--reload=") + flags+=("--reload-event-wait=") + local_nonpersistent_flags+=("--reload-event-wait=") + flags+=("--reload-gap=") + local_nonpersistent_flags+=("--reload-gap=") flags+=("--request-timeout=") local_nonpersistent_flags+=("--request-timeout=") flags+=("--resync-interval=") diff --git a/pkg/cmd/infra/router/template.go b/pkg/cmd/infra/router/template.go index f602745a1d69..5a1878043513 100644 --- a/pkg/cmd/infra/router/template.go +++ b/pkg/cmd/infra/router/template.go @@ -34,9 +34,6 @@ import ( "github.com/openshift/origin/pkg/version" ) -// defaultReloadInterval is how often to do reloads in seconds. -const defaultReloadInterval = 5 - var routerLong = templates.LongDesc(` Start a router @@ -68,6 +65,8 @@ type TemplateRouter struct { TemplateFile string ReloadScript string ReloadInterval time.Duration + ReloadGap time.Duration + ReloadEventWait time.Duration DefaultCertificate string DefaultCertificatePath string DefaultCertificateDir string @@ -81,14 +80,15 @@ type TemplateRouter struct { MetricsType string } -// reloadInterval returns how often to run the router reloads. The interval -// value is based on an environment variable or the default. -func reloadInterval() time.Duration { - interval := util.Env("RELOAD_INTERVAL", fmt.Sprintf("%vs", defaultReloadInterval)) - value, err := time.ParseDuration(interval) +// getDurationEnv() gets retrieves the value of the named environment +// variable and checks that is a valid duration. If it is not, or it +// is not set, it returns the given default. +func getDurationEnv(envName string, defaultDuration time.Duration) time.Duration { + valueString := util.Env(envName, defaultDuration.String()) + value, err := time.ParseDuration(valueString) if err != nil { - glog.Warningf("Invalid RELOAD_INTERVAL %q, using default value %v ...", interval, defaultReloadInterval) - value = time.Duration(defaultReloadInterval * time.Second) + glog.Warningf("Invalid %s %q, using default value %s ...", envName, valueString, defaultDuration.String()) + value = defaultDuration } return value } @@ -103,7 +103,9 @@ func (o *TemplateRouter) Bind(flag *pflag.FlagSet) { flag.StringVar(&o.DefaultDestinationCAPath, "default-destination-ca-path", util.Env("DEFAULT_DESTINATION_CA_PATH", "/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt"), "A path to a PEM file containing the default CA bundle to use with re-encrypt routes. This CA should sign for certificates in the Kubernetes DNS space (service.namespace.svc).") flag.StringVar(&o.TemplateFile, "template", util.Env("TEMPLATE_FILE", ""), "The path to the template file to use") flag.StringVar(&o.ReloadScript, "reload", util.Env("RELOAD_SCRIPT", ""), "The path to the reload script to use") - flag.DurationVar(&o.ReloadInterval, "interval", reloadInterval(), "Controls how often router reloads are invoked. Mutiple router reload requests are coalesced for the duration of this interval since the last reload time.") + flag.DurationVar(&o.ReloadInterval, "interval", getDurationEnv("RELOAD_INTERVAL", 5*time.Second), "Controls how often router reloads are invoked. Mutiple router reload requests are coalesced for the duration of this interval since the time the last reload started.") + flag.DurationVar(&o.ReloadGap, "reload-gap", getDurationEnv("RELOAD_GAP", 100*time.Millisecond), "Controls how often router reloads are invoked. Mutiple router reload requests are coalesced for the duration of this interval since the time the last reload ended.") + flag.DurationVar(&o.ReloadEventWait, "reload-event-wait", getDurationEnv("RELOAD_EVENT_WAIT", 10*time.Millisecond), "Controls how often router reloads are invoked. Mutiple router reload events are gathered for the given duration before a reload is triggered.") flag.BoolVar(&o.ExtendedValidation, "extended-validation", util.Env("EXTENDED_VALIDATION", "true") == "true", "If set, then an additional extended validation step is performed on all routes admitted in by this router. Defaults to true and enables the extended validation checks.") flag.BoolVar(&o.BindPortsAfterSync, "bind-ports-after-sync", util.Env("ROUTER_BIND_PORTS_AFTER_SYNC", "") == "true", "Bind ports only after route state has been synchronized") flag.StringVar(&o.MaxConnections, "max-connections", util.Env("ROUTER_MAX_CONNECTIONS", ""), "Specifies the maximum number of concurrent connections.") @@ -205,6 +207,14 @@ func (o *TemplateRouterOptions) Complete() error { return fmt.Errorf("invalid reload interval: %v - must be a positive duration", nsecs) } + if o.ReloadGap < 0 { + return fmt.Errorf("invalid reload gap: %s - must not be negative", o.ReloadGap.String()) + } + + if o.ReloadEventWait < 0 { + return fmt.Errorf("invalid reload event wait: %s - must not be negative", o.ReloadEventWait.String()) + } + if len(routerCanonicalHostname) > 0 { if errs := validation.IsDNS1123Subdomain(routerCanonicalHostname); len(errs) != 0 { return fmt.Errorf("invalid canonical hostname: %s", routerCanonicalHostname) @@ -324,6 +334,8 @@ func (o *TemplateRouterOptions) Run() error { TemplatePath: o.TemplateFile, ReloadScriptPath: o.ReloadScript, ReloadInterval: o.ReloadInterval, + ReloadGap: o.ReloadGap, + ReloadEventWait: o.ReloadEventWait, DefaultCertificate: o.DefaultCertificate, DefaultCertificatePath: o.DefaultCertificatePath, DefaultCertificateDir: o.DefaultCertificateDir, diff --git a/pkg/router/template/fake.go b/pkg/router/template/fake.go index a34f5f019b29..ca34948fc8ae 100644 --- a/pkg/router/template/fake.go +++ b/pkg/router/template/fake.go @@ -1,25 +1,27 @@ package templaterouter +import "time" + // NewFakeTemplateRouter provides an empty template router with a simple certificate manager // backed by a fake cert writer for testing func NewFakeTemplateRouter() *templateRouter { fakeCertManager, _ := newSimpleCertificateManager(newFakeCertificateManagerConfig(), &fakeCertWriter{}) return &templateRouter{ - state: map[string]ServiceAliasConfig{}, - serviceUnits: make(map[string]ServiceUnit), - certManager: fakeCertManager, - rateLimitedCommitFunction: nil, - rateLimitedCommitStopChannel: make(chan struct{}), + state: map[string]ServiceAliasConfig{}, + serviceUnits: make(map[string]ServiceUnit), + certManager: fakeCertManager, } } // FakeReloadHandler implements the minimal changes needed to make the locking behavior work -// This MUST match the behavior with the stateChanged of commitAndReload +// This MUST match the behavior with the object updates of commitAndReload() in router.go func (r *templateRouter) FakeReloadHandler() { r.lock.Lock() defer r.lock.Unlock() r.stateChanged = false + r.lastReloadStart = time.Now() + r.lastReloadEnd = time.Now() return } diff --git a/pkg/router/template/plugin.go b/pkg/router/template/plugin.go index aa27fbdab8ee..f8f0fb0863cc 100644 --- a/pkg/router/template/plugin.go +++ b/pkg/router/template/plugin.go @@ -46,6 +46,8 @@ type TemplatePluginConfig struct { TemplatePath string ReloadScriptPath string ReloadInterval time.Duration + ReloadGap time.Duration + ReloadEventWait time.Duration ReloadCallbacks []func() DefaultCertificate string DefaultCertificatePath string @@ -123,6 +125,8 @@ func NewTemplatePlugin(cfg TemplatePluginConfig, lookupSvc ServiceLookup) (*Temp templates: templates, reloadScriptPath: cfg.ReloadScriptPath, reloadInterval: cfg.ReloadInterval, + reloadGap: cfg.ReloadGap, + reloadEventWait: cfg.ReloadEventWait, reloadCallbacks: cfg.ReloadCallbacks, defaultCertificate: cfg.DefaultCertificate, defaultCertificatePath: cfg.DefaultCertificatePath, diff --git a/pkg/router/template/router.go b/pkg/router/template/router.go index 64026c2aa1ef..520bb01a016e 100644 --- a/pkg/router/template/router.go +++ b/pkg/router/template/router.go @@ -17,12 +17,12 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" cmdutil "github.com/openshift/origin/pkg/cmd/util" routeapi "github.com/openshift/origin/pkg/route/apis/route" "github.com/openshift/origin/pkg/router/controller" - "github.com/openshift/origin/pkg/util/ratelimiter" ) const ( @@ -52,7 +52,6 @@ type templateRouter struct { dir string templates map[string]*template.Template reloadScriptPath string - reloadInterval time.Duration reloadCallbacks []func() state map[string]ServiceAliasConfig serviceUnits map[string]ServiceUnit @@ -86,13 +85,23 @@ type templateRouter struct { statsPort int // if the router should allow wildcard routes. allowWildcardRoutes bool - // rateLimitedCommitFunction is a rate limited commit (persist state + refresh the backend) - // function that coalesces and controls how often the router is reloaded. - rateLimitedCommitFunction *ratelimiter.RateLimitedFunction - // rateLimitedCommitStopChannel is the stop/terminate channel. - rateLimitedCommitStopChannel chan struct{} - // lock is a mutex used to prevent concurrent router reloads. + // lock is a mutex used to prevent concurrent router state updates lock sync.Mutex + // Track the start and end of router reloads + lastReloadStart time.Time + lastReloadEnd time.Time + // commitReqTime is nil if no commit is needed, otherwise it is the time the commit was requested + commitReqTime *time.Time + // commitRunning indicates whether the commitFunc is actively running + commitRunning bool + // commitTimer is the timer we use to make callbacks to the delayed commits + commitTimer *time.Timer + // reloadInterval is the minimum time between the starts of reloads + reloadInterval time.Duration + // reloadGap is the minimum gap between the end of a reload and the start of the next + reloadGap time.Duration + // reloadEventWait is the duration to wait after an event before triggering a reload in case other events come from the same change (should be short... milliseconds) + reloadEventWait time.Duration // If true, haproxy should only bind ports when it has route and endpoint state bindPortsAfterSync bool // whether the router state has been read from the api at least once @@ -103,6 +112,8 @@ type templateRouter struct { metricReload prometheus.Summary // metricWriteConfig tracks writing config metricWriteConfig prometheus.Summary + // commitFunc is the commit (persist state + refresh the backend) function. This is only to be used for our test hooks + commitFunc CommitFunc } // templateRouterCfg holds all configuration items required to initialize the template router @@ -111,6 +122,8 @@ type templateRouterCfg struct { templates map[string]*template.Template reloadScriptPath string reloadInterval time.Duration + reloadGap time.Duration + reloadEventWait time.Duration reloadCallbacks []func() defaultCertificate string defaultCertificatePath string @@ -186,7 +199,6 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { dir: dir, templates: cfg.templates, reloadScriptPath: cfg.reloadScriptPath, - reloadInterval: cfg.reloadInterval, reloadCallbacks: cfg.reloadCallbacks, state: make(map[string]ServiceAliasConfig), serviceUnits: make(map[string]ServiceUnit), @@ -203,38 +215,38 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { peerEndpoints: []Endpoint{}, bindPortsAfterSync: cfg.bindPortsAfterSync, + commitReqTime: nil, + commitRunning: false, + stateChanged: true, + + reloadInterval: cfg.reloadInterval, + reloadGap: cfg.reloadGap, + reloadEventWait: cfg.reloadEventWait, + metricReload: metricsReload, metricWriteConfig: metricWriteConfig, - - rateLimitedCommitFunction: nil, - rateLimitedCommitStopChannel: make(chan struct{}), } - numSeconds := int(cfg.reloadInterval.Seconds()) - router.EnableRateLimiter(numSeconds, router.commitAndReload) + router.SetCommitFunc(func() error { + return router.commitAndReload() + }) + + glog.V(2).Infof("Template router will coalesce reloads within %s seconds of the last restart start time, within %s seconds of the last restart end time, and wait %s seconds after the first event", router.reloadInterval.String(), router.reloadGap.String(), router.reloadEventWait.String()) if err := router.writeDefaultCert(); err != nil { return nil, err } + glog.V(4).Infof("Reading persisted state") if err := router.readState(); err != nil { return nil, err } - glog.V(4).Infof("Committing state") - // Bypass the rate limiter to ensure the first sync will be - // committed without delay. - router.commitAndReload() - return router, nil -} -func (r *templateRouter) EnableRateLimiter(interval int, handlerFunc ratelimiter.HandlerFunc) { - keyFunc := func(_ interface{}) (string, error) { - return "templaterouter", nil - } + // Do an immediate commit so that we can reply to health checks before the state syncs + glog.V(4).Infof("Committing without a sync so that the router replies to health checks") + router.realCommit(false) - r.rateLimitedCommitFunction = ratelimiter.NewRateLimitedFunction(keyFunc, interval, handlerFunc) - r.rateLimitedCommitFunction.RunUntil(r.rateLimitedCommitStopChannel) - glog.V(2).Infof("Template router will coalesce reloads within %v seconds of each other", interval) + return router, nil } // secretToPem composes a PEM file at the output directory from an input private key and crt file. @@ -311,31 +323,174 @@ func (r *templateRouter) readState() error { return json.Unmarshal(data, &r.state) } -// Commit applies the changes made to the router configuration - persists +// Define a type for the pluggable commit function +type CommitFunc func() error + +// This should ONLY be used for external testing hooks +func (r *templateRouter) SetCommitFunc(commitFunc CommitFunc) { + r.commitFunc = commitFunc +} + +// Commit calls the realCommit worker with a flag saying that it has +// synced. This should be called only when the router state is +// consistent. realCommit() below can be called on initial router +// load to load the haproxy without a full state sync. +func (r *templateRouter) Commit() { + r.realCommit(true) +} + +// realCommit applies the changes made to the router configuration - persists // the state and refresh the backend. This is all done in the background // so that we can rate limit + coalesce multiple changes. +// +// The hasSynced argument specifies whether the state is fully synchronized. +// // Note: If this is changed FakeCommit() in fake.go should also be updated -func (r *templateRouter) Commit() { +func (r *templateRouter) realCommit(hasSynced bool) { r.lock.Lock() - if !r.synced { - glog.V(4).Infof("Router state synchronized for the first time") - r.synced = true - r.stateChanged = true + if hasSynced { + // Only update our sync state if we've really synced. We want + // to let the router start early so that it can return health + // while the state synchronizes, but before the main ports are + // bound + if !r.synced { + glog.V(4).Infof("Router state synchronized for the first time, allowing rapid reload") + r.synced = true + r.stateChanged = true + + // Clean out the last reload variables so the reload from the first sync doesn't have to wait + r.lastReloadStart = time.Time{} + r.lastReloadEnd = time.Time{} + } + } + + if r.stateChanged { + // If the state changed, then we need to commit + glog.V(8).Infof("Commit called and the state has changed, could reload") + + if r.commitReqTime == nil { + // There is no scheduled commit worker, so set the time we started and + // invoke the worker code. It will decide if it can run now, or if it + // needs to schedule a callback. + // + // We need to track the earliest commit time so we can do burst supression. + now := time.Now() + r.commitReqTime = &now + r.lock.Unlock() + + glog.V(8).Infof("No scheduled reload, calling the worker (curtime: %v)", now) + + r.commitWorker() + return + } + + glog.V(8).Infof("There is already a scheduled reload (for %v), skipping the worker", r.commitReqTime) + } + + r.lock.Unlock() +} + +// timeUntilNextAction() works out when we can next reload based on the current time, the last action time, and the minimum allowed gap between the two. +// In order to be allowed to reload immediately, the last action + minumum gap must be < the current time. +// If we can reload, then it returns a zero duration. +// If we can't reload, then it reuturns a duration to wait for before a reload would be allowed. +func timeUntilNextAction(now time.Time, lastActionTime time.Time, minimumActionGap time.Duration) (nextReload time.Duration) { + if sinceLastAction := now.Sub(lastActionTime); sinceLastAction < minimumActionGap { + return minimumActionGap - sinceLastAction } - needsCommit := r.stateChanged + return 0 * time.Second +} + +func (r *templateRouter) commitWorker() { + r.lock.Lock() + + glog.V(8).Infof("CommitWorker called") + + if r.commitRunning { + // We don't need to do anything else... there's a commit in progress, and when it is done it will re-call this function at which point the work will then happen + glog.V(8).Infof("There was already a commit running (%v) returning from the worker", r.commitRunning) + r.lock.Unlock() + return + } + + if r.commitReqTime == nil { + // There's no commit queued so we have nothing to do. We should only get here when + // the function is re-called after a reload + glog.V(8).Infof("No commit requested time, so there's no queued commit. Nothing to do.") + r.lock.Unlock() + return + } + + // There is no commit running, let's see if we should run yet, or schedule a callback + var untilNextCallback time.Duration + now := time.Now() + + getNextReload := func(potentialDuration time.Duration) time.Duration { + // Use the largest of the potential durations so we only come back when the checks will allow it to run + if potentialDuration > untilNextCallback { + return potentialDuration + } + + return untilNextCallback + } + + untilNextCallback = getNextReload(timeUntilNextAction(now, r.lastReloadStart, r.reloadInterval)) + untilNextCallback = getNextReload(timeUntilNextAction(now, r.lastReloadEnd, r.reloadGap)) + untilNextCallback = getNextReload(timeUntilNextAction(now, *r.commitReqTime, r.reloadEventWait)) + + if untilNextCallback > 0 { + // We want to reload... but can't yet because some window is not satisfied + if r.commitTimer == nil { + r.commitTimer = time.AfterFunc(untilNextCallback, r.commitWorker) + } else { + // While we are resetting the timer, it should have fired and be stopped. + // The first time the worker is called it will know the precise duration + // until when a run would be valid and has scheduled a timer for that point + r.commitTimer.Reset(untilNextCallback) + } + + glog.V(8).Infof("Can't reload the router yet, need to delay %s, callback scheduled", untilNextCallback.String()) + + r.lock.Unlock() + return + } + + // Otherwise we can reload immediately... let's do it! + glog.V(8).Infof("Calling the router commit function (for req time %v)", r.commitReqTime) + r.commitRunning = true + r.commitReqTime = nil r.lock.Unlock() - if needsCommit { - r.rateLimitedCommitFunction.Invoke(r.rateLimitedCommitFunction) + if err := func() error { + defer func() { + r.lock.Lock() + r.commitRunning = false + r.lock.Unlock() + }() + + return r.commitFunc() + }(); err != nil { + utilruntime.HandleError(err) } + + // Re-call the commit in case there is work waiting that came in while we were working + // we want to call the top level commit in case the state has not changed + glog.V(8).Infof("Re-Calling the worker after a reload in case work came in") + r.commitWorker() } // commitAndReload refreshes the backend and persists the router state. +// This is the default implementation of the commit function, it can be replaced by the FakeReloadHandler() for testing purposes. +// If you change any state handling here make sure you keep that in sync. +// +// Note: Only one commitAndReload can be in progress at a time, but the Commit function takes care of +// ensuring that only one commit function is called. func (r *templateRouter) commitAndReload() error { - // only state changes must be done under the lock + if err := func() error { + // Only state reads and changes must be done under the lock, the reload itself must not be done under the lock r.lock.Lock() defer r.lock.Unlock() @@ -345,11 +500,11 @@ func (r *templateRouter) commitAndReload() error { } r.stateChanged = false + r.lastReloadStart = time.Now() glog.V(4).Infof("Writing the router config") - reloadStart := time.Now() err := r.writeConfig() - r.metricWriteConfig.Observe(float64(time.Now().Sub(reloadStart)) / float64(time.Second)) + r.metricWriteConfig.Observe(float64(time.Now().Sub(r.lastReloadStart)) / float64(time.Second)) return err }(); err != nil { return err @@ -363,11 +518,16 @@ func (r *templateRouter) commitAndReload() error { glog.V(4).Infof("Reloading the router") reloadStart := time.Now() err := r.reloadRouter() - r.metricReload.Observe(float64(time.Now().Sub(reloadStart)) / float64(time.Second)) + reloadEnd := time.Now() + r.metricReload.Observe(float64(reloadEnd.Sub(reloadStart)) / float64(time.Second)) if err != nil { return err } + r.lock.Lock() + r.lastReloadEnd = reloadEnd + r.lock.Unlock() + return nil } diff --git a/test/end-to-end/router_test.go b/test/end-to-end/router_test.go index b669613de49e..975ff3417ada 100644 --- a/test/end-to-end/router_test.go +++ b/test/end-to-end/router_test.go @@ -1348,10 +1348,15 @@ func createAndStartRouterContainerExtended(dockerCli *dockerClient.Client, maste hostVols = append(hostVols, fmt.Sprintf("%[1]s:/usr/bin/openshift", binary)) } + logLevel := os.Getenv("OPENSHIFT_LOG_LEVEL") + if len(logLevel) == 0 { + logLevel = "4" + } + containerOpts := dockerClient.CreateContainerOptions{ Config: &dockerClient.Config{ Image: getRouterImage(), - Cmd: []string{"--master=" + masterIp, "--loglevel=4"}, + Cmd: []string{"--master=" + masterIp, "--loglevel=" + logLevel}, Env: env, ExposedPorts: exposedPorts, VolumesFrom: vols, @@ -1414,8 +1419,10 @@ func validateServer(server *tr.TestHttpService, t *testing.T) { // cleanUp stops and removes the deployed router func cleanUp(t *testing.T, dockerCli *dockerClient.Client, routerId string) { + getAllLogs, _ := strconv.ParseBool(os.Getenv("OPENSHIFT_GET_ALL_DOCKER_LOGS")) + dockerCli.StopContainer(routerId, 5) - if t.Failed() { + if t.Failed() || getAllLogs { dockerCli.Logs(dockerClient.LogsOptions{ Container: routerId, OutputStream: os.Stdout, diff --git a/test/integration/router_without_haproxy_test.go b/test/integration/router_without_haproxy_test.go index f111690399aa..fa0c887246b9 100644 --- a/test/integration/router_without_haproxy_test.go +++ b/test/integration/router_without_haproxy_test.go @@ -390,7 +390,7 @@ func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.In func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval int, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) { r := templateplugin.NewFakeTemplateRouter() - r.EnableRateLimiter(reloadInterval, func() error { + r.SetCommitFunc(func() error { r.FakeReloadHandler() return rateLimitingFunc() })