From 8bb74df5733787f24cbec87b94981d90d603f712 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 8 Aug 2019 16:23:14 +0000 Subject: [PATCH 1/3] Remove claim-on-rollout option Leave the flag and yaml names for now to avoid crashing. Signed-off-by: Bryan Boreham --- pkg/ingester/lifecycle_test.go | 2 -- pkg/ring/lifecycler.go | 20 +++++++++----------- pkg/ring/lifecycler_test.go | 1 - 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 8dadd3b7c9b..83d8f8a39e3 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -94,7 +94,6 @@ func TestIngesterTransfer(t *testing.T) { cfg1 := defaultIngesterTestConfig() cfg1.LifecyclerConfig.ID = "ingester1" cfg1.LifecyclerConfig.Addr = "ingester1" - cfg1.LifecyclerConfig.ClaimOnRollout = true cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil) require.NoError(t, err) @@ -184,7 +183,6 @@ func TestIngesterBadTransfer(t *testing.T) { cfg := defaultIngesterTestConfig() cfg.LifecyclerConfig.ID = "ingester1" cfg.LifecyclerConfig.Addr = "ingester1" - cfg.LifecyclerConfig.ClaimOnRollout = true cfg.LifecyclerConfig.JoinAfter = 100 * time.Second ing, err := New(cfg, defaultClientTestConfig(), limits, nil, nil) require.NoError(t, err) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index da8009e5f77..43f49b4b09a 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -48,7 +48,7 @@ type LifecyclerConfig struct { HeartbeatPeriod time.Duration `yaml:"heartbeat_period,omitempty"` JoinAfter time.Duration `yaml:"join_after,omitempty"` MinReadyDuration time.Duration `yaml:"min_ready_duration,omitempty"` - ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"` + UnusedFlag bool `yaml:"claim_on_rollout,omitempty"` // DEPRECATED - left for backwards-compatibility NormaliseTokens bool `yaml:"normalise_tokens,omitempty"` InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` @@ -79,7 +79,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.") f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.") f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.") - f.BoolVar(&cfg.ClaimOnRollout, prefix+"claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.") + flagext.DeprecatedFlag(f, prefix+"claim-on-rollout", "DEPRECATED. This feature is no longer optional.") f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.") f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.") @@ -474,15 +474,13 @@ func (i *Lifecycler) changeState(ctx context.Context, state IngesterState) error func (i *Lifecycler) processShutdown(ctx context.Context) { flushRequired := true - if i.cfg.ClaimOnRollout { - transferStart := time.Now() - if err := i.flushTransferer.TransferOut(ctx); err != nil { - level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err) - shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds()) - } else { - flushRequired = false - shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds()) - } + transferStart := time.Now() + if err := i.flushTransferer.TransferOut(ctx); err != nil { + level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err) + shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds()) + } else { + flushRequired = false + shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds()) } if flushRequired { diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 9eb34674501..4d930b57a8e 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -32,7 +32,6 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig { lifecyclerConfig.Port = 1 lifecyclerConfig.RingConfig = ringConfig lifecyclerConfig.NumTokens = 1 - lifecyclerConfig.ClaimOnRollout = true lifecyclerConfig.ID = id lifecyclerConfig.FinalSleep = 0 return lifecyclerConfig From 6973461e44b5014541b3e492a78181cc8886d52c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 16 Aug 2019 15:47:36 +0000 Subject: [PATCH 2/3] Disable chunk transfers when unit-testing other parts of ingester Now we default to always attempt a transfer, we need to disable this in testing otherwise it slows every test down by 5-10 seconds. Signed-off-by: Bryan Boreham --- pkg/ingester/lifecycle_test.go | 2 ++ pkg/ingester/transfer.go | 3 +++ 2 files changed, 5 insertions(+) diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 83d8f8a39e3..f55713146f0 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -41,6 +41,7 @@ func defaultIngesterTestConfig() Config { cfg.LifecyclerConfig.Addr = "localhost" cfg.LifecyclerConfig.ID = "localhost" cfg.LifecyclerConfig.FinalSleep = 0 + cfg.MaxTransferRetries = -1 return cfg } @@ -95,6 +96,7 @@ func TestIngesterTransfer(t *testing.T) { cfg1.LifecyclerConfig.ID = "ingester1" cfg1.LifecyclerConfig.Addr = "ingester1" cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second + cfg1.MaxTransferRetries = 10 ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil) require.NoError(t, err) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 8d52eac1710..30ece4a53b9 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -184,6 +184,9 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { // TransferOut finds an ingester in PENDING state and transfers our chunks to it. // Called as part of the ingester shutdown process. func (i *Ingester) TransferOut(ctx context.Context) error { + if i.cfg.MaxTransferRetries < 0 { + return fmt.Errorf("transfers disabled") + } backoff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 5 * time.Second, From ea76f05cbabb07aec81e5d9ffc0fffa70cc9e691 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 24 Aug 2019 08:45:20 +0000 Subject: [PATCH 3/3] Changelog entry for --claim-on-rollout flag change Signed-off-by: Bryan Boreham --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ccd11afd005..c468ac2c893 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.1.0 / 2019-08-07 +* [CHANGE] --claim-on-rollout flag deprecated; feature is now always on #1566 * [CHANGE] HA Tracker flags were renamed to provide more clarity #1465 * `distributor.accept-ha-labels` is now `distributor.ha-tracker.enable` * `distributor.accept-ha-samples` is now `distributor.ha-tracker.enable-for-all-users`