diff --git a/CHANGELOG.md b/CHANGELOG.md index ccd11afd005..c468ac2c893 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.1.0 / 2019-08-07 +* [CHANGE] --claim-on-rollout flag deprecated; feature is now always on #1566 * [CHANGE] HA Tracker flags were renamed to provide more clarity #1465 * `distributor.accept-ha-labels` is now `distributor.ha-tracker.enable` * `distributor.accept-ha-samples` is now `distributor.ha-tracker.enable-for-all-users` diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 8dadd3b7c9b..f55713146f0 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -41,6 +41,7 @@ func defaultIngesterTestConfig() Config { cfg.LifecyclerConfig.Addr = "localhost" cfg.LifecyclerConfig.ID = "localhost" cfg.LifecyclerConfig.FinalSleep = 0 + cfg.MaxTransferRetries = -1 return cfg } @@ -94,8 +95,8 @@ func TestIngesterTransfer(t *testing.T) { cfg1 := defaultIngesterTestConfig() cfg1.LifecyclerConfig.ID = "ingester1" cfg1.LifecyclerConfig.Addr = "ingester1" - cfg1.LifecyclerConfig.ClaimOnRollout = true cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second + cfg1.MaxTransferRetries = 10 ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil) require.NoError(t, err) @@ -184,7 +185,6 @@ func TestIngesterBadTransfer(t *testing.T) { cfg := defaultIngesterTestConfig() cfg.LifecyclerConfig.ID = "ingester1" cfg.LifecyclerConfig.Addr = "ingester1" - cfg.LifecyclerConfig.ClaimOnRollout = true cfg.LifecyclerConfig.JoinAfter = 100 * time.Second ing, err := New(cfg, defaultClientTestConfig(), limits, nil, nil) require.NoError(t, err) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 8d52eac1710..30ece4a53b9 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -184,6 +184,9 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { // TransferOut finds an ingester in PENDING state and transfers our chunks to it. // Called as part of the ingester shutdown process. func (i *Ingester) TransferOut(ctx context.Context) error { + if i.cfg.MaxTransferRetries < 0 { + return fmt.Errorf("transfers disabled") + } backoff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 5 * time.Second, diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index da8009e5f77..43f49b4b09a 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -48,7 +48,7 @@ type LifecyclerConfig struct { HeartbeatPeriod time.Duration `yaml:"heartbeat_period,omitempty"` JoinAfter time.Duration `yaml:"join_after,omitempty"` MinReadyDuration time.Duration `yaml:"min_ready_duration,omitempty"` - ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"` + UnusedFlag bool `yaml:"claim_on_rollout,omitempty"` // DEPRECATED - left for backwards-compatibility NormaliseTokens bool `yaml:"normalise_tokens,omitempty"` InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` @@ -79,7 +79,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.") f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.") f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.") - f.BoolVar(&cfg.ClaimOnRollout, prefix+"claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.") + flagext.DeprecatedFlag(f, prefix+"claim-on-rollout", "DEPRECATED. This feature is no longer optional.") f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.") f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.") @@ -474,15 +474,13 @@ func (i *Lifecycler) changeState(ctx context.Context, state IngesterState) error func (i *Lifecycler) processShutdown(ctx context.Context) { flushRequired := true - if i.cfg.ClaimOnRollout { - transferStart := time.Now() - if err := i.flushTransferer.TransferOut(ctx); err != nil { - level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err) - shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds()) - } else { - flushRequired = false - shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds()) - } + transferStart := time.Now() + if err := i.flushTransferer.TransferOut(ctx); err != nil { + level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err) + shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds()) + } else { + flushRequired = false + shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds()) } if flushRequired { diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 9eb34674501..4d930b57a8e 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -32,7 +32,6 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig { lifecyclerConfig.Port = 1 lifecyclerConfig.RingConfig = ringConfig lifecyclerConfig.NumTokens = 1 - lifecyclerConfig.ClaimOnRollout = true lifecyclerConfig.ID = id lifecyclerConfig.FinalSleep = 0 return lifecyclerConfig