diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 764cf708949..54679185f1c 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -27,6 +27,7 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra type HealthAndIngesterClient interface { IngesterClient grpc_health_v1.HealthClient + Close() error } type closableHealthAndIngesterClient struct { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 31c9f9b9b7b..32effa79466 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -80,7 +80,7 @@ type Config struct { LifecyclerConfig ring.LifecyclerConfig // Config for transferring chunks. - SearchPendingFor time.Duration + MaxTransferRetries int // Config for chunk flushing. FlushCheckPeriod time.Duration @@ -101,7 +101,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlags(f) - f.DurationVar(&cfg.SearchPendingFor, "ingester.search-pending-for", 30*time.Second, "Time to spend searching for a pending ingester when shutting down.") + f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.") f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.") @@ -112,6 +112,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.") // DEPRECATED, no-op + f.Duration("ingester.search-pending-for", 30*time.Second, "DEPRECATED. Time to spend searching for a pending ingester when shutting down.") f.Bool("ingester.reject-old-samples", false, "DEPRECATED. Reject old samples.") f.Duration("ingester.reject-old-samples.max-age", 0, "DEPRECATED. Maximum accepted sample age before rejecting.") f.Int("ingester.validation.max-length-label-name", 0, "DEPRECATED. Maximum length accepted for label names.") diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 81f91dc7d73..40b7b4927bc 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -40,6 +40,7 @@ func defaultIngesterTestConfig() Config { cfg.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0) cfg.LifecyclerConfig.Addr = "localhost" cfg.LifecyclerConfig.ID = "localhost" + cfg.LifecyclerConfig.FinalSleep = 0 return cfg } @@ -95,7 +96,6 @@ func TestIngesterTransfer(t *testing.T) { cfg1.LifecyclerConfig.Addr = "ingester1" cfg1.LifecyclerConfig.ClaimOnRollout = true cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second - cfg1.SearchPendingFor = 1 * time.Second ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil) require.NoError(t, err) @@ -177,7 +177,6 @@ func TestIngesterBadTransfer(t *testing.T) { cfg.LifecyclerConfig.Addr = "ingester1" cfg.LifecyclerConfig.ClaimOnRollout = true cfg.LifecyclerConfig.JoinAfter = 100 * time.Second - cfg.SearchPendingFor = 1 * time.Second ing, err := New(cfg, defaultClientTestConfig(), limits, nil) require.NoError(t, err) @@ -291,6 +290,10 @@ func (i ingesterClientAdapater) Close() error { return nil } +func (i ingesterClientAdapater) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + return nil, nil +} + // TestIngesterFlush tries to test that the ingester flushes chunks before // removing itself from the ring. func TestIngesterFlush(t *testing.T) { diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 2f669f48a0d..fad58e3464c 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -19,10 +20,6 @@ import ( "github.com/weaveworks/common/user" ) -const ( - pendingSearchIterations = 10 -) - var ( sentChunks = prometheus.NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_sent_chunks", @@ -187,6 +184,26 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { // TransferOut finds an ingester in PENDING state and transfers our chunks to it. // Called as part of the ingester shutdown process. func (i *Ingester) TransferOut(ctx context.Context) error { + backoff := util.NewBackoff(ctx, util.BackoffConfig{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 5 * time.Second, + MaxRetries: i.cfg.MaxTransferRetries, + }) + + for backoff.Ongoing() { + err := i.transferOut(ctx) + if err == nil { + return nil + } + + level.Error(util.Logger).Log("msg", "transfer failed", "err", err) + backoff.Wait() + } + + return backoff.Err() +} + +func (i *Ingester) transferOut(ctx context.Context) error { userStatesCopy := i.userStates.cp() if len(userStatesCopy) == 0 { level.Info(util.Logger).Log("msg", "nothing to transfer") @@ -203,12 +220,12 @@ func (i *Ingester) TransferOut(ctx context.Context) error { if err != nil { return err } - defer c.(io.Closer).Close() + defer c.Close() ctx = user.InjectOrgID(ctx, "-1") stream, err := c.TransferChunks(ctx) if err != nil { - return err + return errors.Wrap(err, "TransferChunks") } for userID, state := range userStatesCopy { @@ -223,7 +240,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { chunks, err := toWireChunks(pair.series.chunkDescs) if err != nil { state.fpLocker.Unlock(pair.fp) - return err + return errors.Wrap(err, "toWireChunks") } err = stream.Send(&client.TimeSeriesChunk{ @@ -234,7 +251,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { }) state.fpLocker.Unlock(pair.fp) if err != nil { - return err + return errors.Wrap(err, "Send") } sentChunks.Add(float64(len(chunks))) @@ -243,7 +260,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { _, err = stream.CloseAndRecv() if err != nil { - return err + return errors.Wrap(err, "CloseAndRecv") } // Close & empty all the flush queues, to unblock waiting workers. @@ -258,33 +275,15 @@ func (i *Ingester) TransferOut(ctx context.Context) error { // findTargetIngester finds an ingester in PENDING state. func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, error) { - findIngester := func() (*ring.IngesterDesc, error) { - ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) - if err != nil { - return nil, err - } - - ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING) - if len(ingesters) <= 0 { - return nil, fmt.Errorf("no pending ingesters") - } - - return &ingesters[0], nil + ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) + if err != nil { + return nil, err } - deadline := time.Now().Add(i.cfg.SearchPendingFor) - for { - ingester, err := findIngester() - if err != nil { - level.Debug(util.Logger).Log("msg", "Error looking for pending ingester", "err", err) - if time.Now().Before(deadline) { - time.Sleep(i.cfg.SearchPendingFor / pendingSearchIterations) - continue - } else { - level.Warn(util.Logger).Log("msg", "Could not find pending ingester before deadline", "err", err) - return nil, err - } - } - return ingester, nil + ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING) + if len(ingesters) <= 0 { + return nil, fmt.Errorf("no pending ingesters") } + + return &ingesters[0], nil } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index ef31aac0733..5a4d7b3dd90 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -30,6 +30,11 @@ var ( Name: "cortex_ingester_ring_tokens_to_own", Help: "The number of tokens to own in the ring.", }) + shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_shutdown_duration_seconds", + Help: "Duration (in seconds) of cortex shutdown procedure (ie transfer or flush).", + Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins. + }, []string{"op", "status"}) ) // LifecyclerConfig is the config to build a Lifecycler. @@ -45,6 +50,7 @@ type LifecyclerConfig struct { ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"` NormaliseTokens bool `yaml:"normalise_tokens,omitempty"` InfNames []string `yaml:"interface_names"` + FinalSleep time.Duration `yaml:"final_sleep"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address"` @@ -63,6 +69,7 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.MinReadyDuration, "ingester.min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.") f.BoolVar(&cfg.ClaimOnRollout, "ingester.claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.") f.BoolVar(&cfg.NormaliseTokens, "ingester.normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.") + f.DurationVar(&cfg.FinalSleep, "ingester.final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.") hostname, err := os.Hostname() if err != nil { @@ -454,16 +461,24 @@ func (i *Lifecycler) changeState(ctx context.Context, state IngesterState) error func (i *Lifecycler) processShutdown(ctx context.Context) { flushRequired := true if i.cfg.ClaimOnRollout { + transferStart := time.Now() if err := i.flushTransferer.TransferOut(ctx); err != nil { level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err) + shutdownDuration.WithLabelValues("transfer", "fail").Observe(time.Since(transferStart).Seconds()) } else { flushRequired = false + shutdownDuration.WithLabelValues("transfer", "success").Observe(time.Since(transferStart).Seconds()) } } if flushRequired { + flushStart := time.Now() i.flushTransferer.Flush() + shutdownDuration.WithLabelValues("flush", "success").Observe(time.Since(flushStart).Seconds()) } + + // Sleep so the shutdownDuration metric can be collected. + time.Sleep(i.cfg.FinalSleep) } // unregister removes our entry from consul. diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 718325a567a..f59fbcdb4ec 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -42,6 +42,7 @@ func TestRingNormaliseMigration(t *testing.T) { lifecyclerConfig1.NumTokens = 1 lifecyclerConfig1.ClaimOnRollout = true lifecyclerConfig1.ID = "ing1" + lifecyclerConfig1.FinalSleep = 0 ft := &flushTransferer{} l1, err := NewLifecycler(lifecyclerConfig1, ft) @@ -67,6 +68,7 @@ func TestRingNormaliseMigration(t *testing.T) { lifecyclerConfig2.JoinAfter = 100 * time.Second lifecyclerConfig2.NormaliseTokens = true lifecyclerConfig2.ID = "ing2" + lifecyclerConfig1.FinalSleep = 0 l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}) require.NoError(t, err)