From 6e45039654f1897ce55690ade863be776797233a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 11 Jan 2019 21:13:12 +0000 Subject: [PATCH 1/6] When searching for an ingester to send chunks to, ensure we can actually connect to it. Signed-off-by: Tom Wilkie --- pkg/ingester/client/client.go | 1 + pkg/ingester/lifecycle_test.go | 4 +++ pkg/ingester/transfer.go | 46 ++++++++++++++++++++++++---------- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 764cf708949..54679185f1c 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -27,6 +27,7 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra type HealthAndIngesterClient interface { IngesterClient grpc_health_v1.HealthClient + Close() error } type closableHealthAndIngesterClient struct { diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 81f91dc7d73..58705a0a516 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -291,6 +291,10 @@ func (i ingesterClientAdapater) Close() error { return nil } +func (i ingesterClientAdapater) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + return nil, nil +} + // TestIngesterFlush tries to test that the ingester flushes chunks before // removing itself from the ring. func TestIngesterFlush(t *testing.T) { diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 2f669f48a0d..2387d3cc155 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -11,6 +11,8 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -258,7 +260,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { // findTargetIngester finds an ingester in PENDING state. func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, error) { - findIngester := func() (*ring.IngesterDesc, error) { + findIngester := func(ctx context.Context) (*ring.IngesterDesc, error) { ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) if err != nil { return nil, err @@ -269,22 +271,40 @@ func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, return nil, fmt.Errorf("no pending ingesters") } - return &ingesters[0], nil + ingester := &ingesters[0] + c, err := i.cfg.ingesterClientFactory(ingester.Addr, i.clientConfig) + if err != nil { + return nil, err + } + defer c.Close() + + _, err = c.Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(true)) + return ingester, nil } - deadline := time.Now().Add(i.cfg.SearchPendingFor) + deadline := time.NewTimer(i.cfg.SearchPendingFor) + defer deadline.Stop() + + ticker := time.NewTicker(i.cfg.SearchPendingFor / pendingSearchIterations) + defer ticker.Stop() + for { - ingester, err := findIngester() - if err != nil { - level.Debug(util.Logger).Log("msg", "Error looking for pending ingester", "err", err) - if time.Now().Before(deadline) { - time.Sleep(i.cfg.SearchPendingFor / pendingSearchIterations) - continue - } else { - level.Warn(util.Logger).Log("msg", "Could not find pending ingester before deadline", "err", err) - return nil, err + select { + case <-ticker.C: + ctx, cancel := context.WithTimeout(ctx, i.cfg.SearchPendingFor/pendingSearchIterations) + ingester, err := findIngester(ctx) + cancel() + if err != nil { + level.Debug(util.Logger).Log("msg", "Error looking for pending ingester", "err", err) } + return ingester, nil + + case <-deadline.C: + level.Warn(util.Logger).Log("msg", "Could not find pending ingester before deadline") + return nil, fmt.Errorf("could not find pending ingester before deadline") + + case <-ctx.Done(): + return nil, ctx.Err() } - return ingester, nil } } From 80a9c2881f544bba2b39df074a9a5661cc165c82 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 12 Jan 2019 11:35:57 +0000 Subject: [PATCH 2/6] Ensure we check the error when looking for an ingester. Signed-off-by: Tom Wilkie --- pkg/ingester/transfer.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 2387d3cc155..e23bc36b371 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "google.golang.org/grpc" @@ -205,12 +206,12 @@ func (i *Ingester) TransferOut(ctx context.Context) error { if err != nil { return err } - defer c.(io.Closer).Close() + defer c.Close() ctx = user.InjectOrgID(ctx, "-1") stream, err := c.TransferChunks(ctx) if err != nil { - return err + return errors.Wrap(err, "TransferChunks") } for userID, state := range userStatesCopy { @@ -225,7 +226,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { chunks, err := toWireChunks(pair.series.chunkDescs) if err != nil { state.fpLocker.Unlock(pair.fp) - return err + return errors.Wrap(err, "toWireChunks") } err = stream.Send(&client.TimeSeriesChunk{ @@ -236,7 +237,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { }) state.fpLocker.Unlock(pair.fp) if err != nil { - return err + return errors.Wrap(err, "Send") } sentChunks.Add(float64(len(chunks))) @@ -245,7 +246,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { _, err = stream.CloseAndRecv() if err != nil { - return err + return errors.Wrap(err, "CloseAndRecv") } // Close & empty all the flush queues, to unblock waiting workers. @@ -278,8 +279,8 @@ func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, } defer c.Close() - _, err = c.Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(true)) - return ingester, nil + _, err = c.Check(user.InjectOrgID(ctx, "0"), &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(true)) + return ingester, err } deadline := time.NewTimer(i.cfg.SearchPendingFor) @@ -295,7 +296,8 @@ func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, ingester, err := findIngester(ctx) cancel() if err != nil { - level.Debug(util.Logger).Log("msg", "Error looking for pending ingester", "err", err) + level.Warn(util.Logger).Log("msg", "Error looking for pending ingester", "err", err) + continue } return ingester, nil From 4120f6ec2f6729456b995839b3d116e808b0f0c6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 12 Jan 2019 11:36:11 +0000 Subject: [PATCH 3/6] Add a metric for shutdown duration & success. Signed-off-by: Tom Wilkie --- pkg/ring/lifecycler.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index ef31aac0733..dda46ce1de2 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -30,6 +30,11 @@ var ( Name: "cortex_ingester_ring_tokens_to_own", Help: "The number of tokens to own in the ring.", }) + shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_shutdown_duration_seconds", + Help: "Duration (in seconds) of cortex shutdown procedure (ie transfer or flush).", + Buckets: prometheus.DefBuckets, + }, []string{"op", "status"}) ) // LifecyclerConfig is the config to build a Lifecycler. @@ -454,16 +459,24 @@ func (i *Lifecycler) changeState(ctx context.Context, state IngesterState) error func (i *Lifecycler) processShutdown(ctx context.Context) { flushRequired := true if i.cfg.ClaimOnRollout { + transferStart := time.Now() if err := i.flushTransferer.TransferOut(ctx); err != nil { level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err) + shutdownDuration.WithLabelValues("transfer", "fail").Observe(time.Since(transferStart).Seconds()) } else { flushRequired = false + shutdownDuration.WithLabelValues("transfer", "success").Observe(time.Since(transferStart).Seconds()) } } if flushRequired { + flushStart := time.Now() i.flushTransferer.Flush() + shutdownDuration.WithLabelValues("flush", "success").Observe(time.Since(flushStart).Seconds()) } + + // Sleep for 2 scrape intervals, so the shutdownDuration metric can be collected. + time.Sleep(30 * time.Second) } // unregister removes our entry from consul. From 7b5b3c283d5b27beacd1e9da85a3b18e40807cfa Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 12 Jan 2019 11:56:24 +0000 Subject: [PATCH 4/6] Make the final sleep configurable, and set it to 0 in tests, so they don't timeout. Signed-off-by: Tom Wilkie --- pkg/ingester/lifecycle_test.go | 1 + pkg/ring/lifecycler.go | 6 ++++-- pkg/ring/lifecycler_test.go | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 58705a0a516..10faa14ca62 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -40,6 +40,7 @@ func defaultIngesterTestConfig() Config { cfg.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0) cfg.LifecyclerConfig.Addr = "localhost" cfg.LifecyclerConfig.ID = "localhost" + cfg.LifecyclerConfig.FinalSleep = 0 return cfg } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index dda46ce1de2..e600014c01c 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -50,6 +50,7 @@ type LifecyclerConfig struct { ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"` NormaliseTokens bool `yaml:"normalise_tokens,omitempty"` InfNames []string `yaml:"interface_names"` + FinalSleep time.Duration `yaml:"final_sleep"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address"` @@ -68,6 +69,7 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.MinReadyDuration, "ingester.min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.") f.BoolVar(&cfg.ClaimOnRollout, "ingester.claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.") f.BoolVar(&cfg.NormaliseTokens, "ingester.normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.") + f.DurationVar(&cfg.FinalSleep, "ingester.final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.") hostname, err := os.Hostname() if err != nil { @@ -475,8 +477,8 @@ func (i *Lifecycler) processShutdown(ctx context.Context) { shutdownDuration.WithLabelValues("flush", "success").Observe(time.Since(flushStart).Seconds()) } - // Sleep for 2 scrape intervals, so the shutdownDuration metric can be collected. - time.Sleep(30 * time.Second) + // Sleep so the shutdownDuration metric can be collected. + time.Sleep(i.cfg.FinalSleep) } // unregister removes our entry from consul. diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 718325a567a..f59fbcdb4ec 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -42,6 +42,7 @@ func TestRingNormaliseMigration(t *testing.T) { lifecyclerConfig1.NumTokens = 1 lifecyclerConfig1.ClaimOnRollout = true lifecyclerConfig1.ID = "ing1" + lifecyclerConfig1.FinalSleep = 0 ft := &flushTransferer{} l1, err := NewLifecycler(lifecyclerConfig1, ft) @@ -67,6 +68,7 @@ func TestRingNormaliseMigration(t *testing.T) { lifecyclerConfig2.JoinAfter = 100 * time.Second lifecyclerConfig2.NormaliseTokens = true lifecyclerConfig2.ID = "ing2" + lifecyclerConfig1.FinalSleep = 0 l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}) require.NoError(t, err) From 7d27cf17e37493e05b030beff575b908502d78a8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 9 Feb 2019 11:40:10 +0000 Subject: [PATCH 5/6] Review feedback: don't try and connect to ingester on search, instead retry the transfer if they fail. Signed-off-by: Tom Wilkie --- pkg/ingester/ingester.go | 4 +++- pkg/ingester/transfer.go | 32 +++++++++++++++++++++----------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 31c9f9b9b7b..acd89716294 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -80,7 +80,8 @@ type Config struct { LifecyclerConfig ring.LifecyclerConfig // Config for transferring chunks. - SearchPendingFor time.Duration + SearchPendingFor time.Duration + MaxTransferRetries int // Config for chunk flushing. FlushCheckPeriod time.Duration @@ -102,6 +103,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlags(f) f.DurationVar(&cfg.SearchPendingFor, "ingester.search-pending-for", 30*time.Second, "Time to spend searching for a pending ingester when shutting down.") + f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 5, "Number of times to try and transfer chunks before falling back to flushing.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.") f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.") diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index e23bc36b371..8a1597cb999 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -12,8 +12,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -190,6 +188,26 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { // TransferOut finds an ingester in PENDING state and transfers our chunks to it. // Called as part of the ingester shutdown process. func (i *Ingester) TransferOut(ctx context.Context) error { + backoff := util.NewBackoff(ctx, util.BackoffConfig{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 1 * time.Second, + MaxRetries: i.cfg.MaxTransferRetries, + }) + + for backoff.Ongoing() { + err := i.transferOut(ctx) + if err == nil { + return nil + } + + level.Error(util.Logger).Log("msg", "transfer failed", "err", err) + backoff.Wait() + } + + return backoff.Err() +} + +func (i *Ingester) transferOut(ctx context.Context) error { userStatesCopy := i.userStates.cp() if len(userStatesCopy) == 0 { level.Info(util.Logger).Log("msg", "nothing to transfer") @@ -272,15 +290,7 @@ func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, return nil, fmt.Errorf("no pending ingesters") } - ingester := &ingesters[0] - c, err := i.cfg.ingesterClientFactory(ingester.Addr, i.clientConfig) - if err != nil { - return nil, err - } - defer c.Close() - - _, err = c.Check(user.InjectOrgID(ctx, "0"), &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(true)) - return ingester, err + return &ingesters[0], nil } deadline := time.NewTimer(i.cfg.SearchPendingFor) From 9c97fb0af389154b9c73a861cc4ec6aa19ad1625 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 11 Mar 2019 14:19:24 +0000 Subject: [PATCH 6/6] Review feedback. Signed-off-by: Tom Wilkie --- pkg/ingester/ingester.go | 5 ++-- pkg/ingester/lifecycle_test.go | 2 -- pkg/ingester/transfer.go | 51 ++++++---------------------------- pkg/ring/lifecycler.go | 2 +- 4 files changed, 12 insertions(+), 48 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index acd89716294..32effa79466 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -80,7 +80,6 @@ type Config struct { LifecyclerConfig ring.LifecyclerConfig // Config for transferring chunks. - SearchPendingFor time.Duration MaxTransferRetries int // Config for chunk flushing. @@ -102,8 +101,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlags(f) - f.DurationVar(&cfg.SearchPendingFor, "ingester.search-pending-for", 30*time.Second, "Time to spend searching for a pending ingester when shutting down.") - f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 5, "Number of times to try and transfer chunks before falling back to flushing.") + f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.") f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.") @@ -114,6 +112,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.") // DEPRECATED, no-op + f.Duration("ingester.search-pending-for", 30*time.Second, "DEPRECATED. Time to spend searching for a pending ingester when shutting down.") f.Bool("ingester.reject-old-samples", false, "DEPRECATED. Reject old samples.") f.Duration("ingester.reject-old-samples.max-age", 0, "DEPRECATED. Maximum accepted sample age before rejecting.") f.Int("ingester.validation.max-length-label-name", 0, "DEPRECATED. Maximum length accepted for label names.") diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 10faa14ca62..40b7b4927bc 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -96,7 +96,6 @@ func TestIngesterTransfer(t *testing.T) { cfg1.LifecyclerConfig.Addr = "ingester1" cfg1.LifecyclerConfig.ClaimOnRollout = true cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second - cfg1.SearchPendingFor = 1 * time.Second ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil) require.NoError(t, err) @@ -178,7 +177,6 @@ func TestIngesterBadTransfer(t *testing.T) { cfg.LifecyclerConfig.Addr = "ingester1" cfg.LifecyclerConfig.ClaimOnRollout = true cfg.LifecyclerConfig.JoinAfter = 100 * time.Second - cfg.SearchPendingFor = 1 * time.Second ing, err := New(cfg, defaultClientTestConfig(), limits, nil) require.NoError(t, err) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 8a1597cb999..fad58e3464c 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -20,10 +20,6 @@ import ( "github.com/weaveworks/common/user" ) -const ( - pendingSearchIterations = 10 -) - var ( sentChunks = prometheus.NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_sent_chunks", @@ -190,7 +186,7 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { func (i *Ingester) TransferOut(ctx context.Context) error { backoff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 100 * time.Millisecond, - MaxBackoff: 1 * time.Second, + MaxBackoff: 5 * time.Second, MaxRetries: i.cfg.MaxTransferRetries, }) @@ -279,44 +275,15 @@ func (i *Ingester) transferOut(ctx context.Context) error { // findTargetIngester finds an ingester in PENDING state. func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, error) { - findIngester := func(ctx context.Context) (*ring.IngesterDesc, error) { - ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) - if err != nil { - return nil, err - } - - ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING) - if len(ingesters) <= 0 { - return nil, fmt.Errorf("no pending ingesters") - } - - return &ingesters[0], nil + ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) + if err != nil { + return nil, err } - deadline := time.NewTimer(i.cfg.SearchPendingFor) - defer deadline.Stop() - - ticker := time.NewTicker(i.cfg.SearchPendingFor / pendingSearchIterations) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - ctx, cancel := context.WithTimeout(ctx, i.cfg.SearchPendingFor/pendingSearchIterations) - ingester, err := findIngester(ctx) - cancel() - if err != nil { - level.Warn(util.Logger).Log("msg", "Error looking for pending ingester", "err", err) - continue - } - return ingester, nil - - case <-deadline.C: - level.Warn(util.Logger).Log("msg", "Could not find pending ingester before deadline") - return nil, fmt.Errorf("could not find pending ingester before deadline") - - case <-ctx.Done(): - return nil, ctx.Err() - } + ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING) + if len(ingesters) <= 0 { + return nil, fmt.Errorf("no pending ingesters") } + + return &ingesters[0], nil } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index e600014c01c..5a4d7b3dd90 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -33,7 +33,7 @@ var ( shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_shutdown_duration_seconds", Help: "Duration (in seconds) of cortex shutdown procedure (ie transfer or flush).", - Buckets: prometheus.DefBuckets, + Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins. }, []string{"op", "status"}) )