diff --git a/CHANGELOG.md b/CHANGELOG.md index bc61609713c..fcbcee2091a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * `-flusher.wal-dir` for the WAL directory to recover from. * `-flusher.concurrent-flushes` for number of concurrent flushes. * `-flusher.flush-op-timeout` is duration after which a flush should timeout. +* [FEATURE] Ingesters can now have an optional availability zone set, to ensure metric replication is distributed across zones. This is set via the `-ingester.availability-zone` flag or the `availability_zone` field in the config file. #2317 * [ENHANCEMENT] Better re-use of connections to DynamoDB and S3. #2268 * [ENHANCEMENT] Experimental TSDB: Add support for local `filesystem` backend. #2245 * [ENHANCEMENT] Experimental TSDB: Added memcached support for the TSDB index cache. #2290 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a5a212d0bbb..5f65d304dec 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -485,6 +485,11 @@ lifecycler: # CLI flag: -ingester.tokens-file-path [tokens_file_path: | default = ""] + # The availability zone of the host, this instance is running on. Default is + # the lifecycler ID. + # CLI flag: -ingester.availability-zone + [availability_zone: | default = ""] + # Number of times to try and transfer chunks before falling back to flushing. # Negative value or zero disables hand-over. # CLI flag: -ingester.max-transfer-retries diff --git a/docs/guides/running.md b/docs/guides/running.md index d29d8c9e5e8..817a6f2cd1e 100644 --- a/docs/guides/running.md +++ b/docs/guides/running.md @@ -51,7 +51,9 @@ Memcached is not essential but highly recommended. The standard replication factor is three, so that we can drop one replica and be unconcerned, as we still have two copies of the data left for redundancy. This is configurable: you can run with more -redundancy or less, depending on your risk appetite. +redundancy or less, depending on your risk appetite. By default +ingesters are not aware of availability zones. See [zone aware replication](zone-replication.md) +to change this. ### Schema diff --git a/docs/guides/zone-replication.md b/docs/guides/zone-replication.md new file mode 100644 index 00000000000..d02a858ed93 --- /dev/null +++ b/docs/guides/zone-replication.md @@ -0,0 +1,30 @@ +--- +title: "Ingester Hand-over" +linkTitle: "Ingester Hand-over" +weight: 5 +slug: ingester-handover +--- + +In a default configuration, time-series written to ingesters are replicated based on the container/pod name of the ingester instances. It is completely possible that all the replicas for the given time-series are held with in the same availability zone, even if the cortex infrastructure spans multiple zones within the region. Storing multiple replicas for a given time-series poses a risk for data loss if there is an outage affecting various nodes within a zone or a total outage. + +## Configuration + +Cortex can be configured to consider an availability zone value in its replication system. Doing so mitigates risks associated with losing multiple nodes with in the same availability zone. The availability zone for an ingester can be defined on the command line of the ingester using the `ingester.availability-zone` flag or using the yaml configuration: + +```yaml +ingester: + lifecycler: + availability_zone: "zone-3" +``` + +## Zone Replication Considerations + +Enabling availability zone awareness helps mitigate risks regarding data loss within a single zone, some items need consideration by an operator if they are thinking of enabling this feature. + +### Minimum number of Zones + +For cortex to function correctly, there must be at least the same number of availability zones as there is replica count. So by default, a cortex cluster should be spread over 3 zones as the default replica count is 3. It is safe to have more zones than the replica count, but it cannot be less. Having fewer availability zones than replica count causes a replica write to be missed, and in some cases, the write fails if the availability zone count is too low. + +### Cost + +Depending on the existing cortex infrastructure being used, this may cause an increase in running costs as most cloud providers charge for cross availability zone traffic. The most significant change would be for a cortex cluster currently running in a singular zone. \ No newline at end of file diff --git a/pkg/ring/http.go b/pkg/ring/http.go index 12fea72d096..1b10c1a891b 100644 --- a/pkg/ring/http.go +++ b/pkg/ring/http.go @@ -30,6 +30,7 @@ const pageContent = ` Instance ID + Availability Zone State Address Last Heartbeat @@ -46,6 +47,7 @@ const pageContent = ` {{ end }} {{ .ID }} + {{ .Zone }} {{ .State }} {{ .Address }} {{ .Timestamp }} @@ -138,16 +140,17 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { } ingesters = append(ingesters, struct { - ID, State, Address, Timestamp string - Tokens []uint32 - NumTokens int - Ownership float64 + ID, State, Address, Timestamp, Zone string + Tokens []uint32 + NumTokens int + Ownership float64 }{ ID: id, State: state, Address: ing.Addr, Timestamp: timestamp.String(), Tokens: ing.Tokens, + Zone: ing.Zone, NumTokens: len(ing.Tokens), Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100, }) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 2d805565c90..f3820dc45ca 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -55,6 +55,7 @@ type LifecyclerConfig struct { InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` TokensFilePath string `yaml:"tokens_file_path"` + Zone string `yaml:"availability_zone"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` @@ -103,6 +104,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.StringVar(&cfg.Addr, prefix+"lifecycler.addr", "", "IP address to advertise in consul.") f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).") f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register into consul.") + f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone of the host, this instance is running on. Default is the lifecycler ID.") } // Lifecycler is responsible for managing the lifecycle of entries in the ring. @@ -120,6 +122,7 @@ type Lifecycler struct { Addr string RingName string RingKey string + Zone string // Whether to flush if transfer fails on shutdown. flushOnShutdown bool @@ -160,6 +163,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa return nil, err } + zone := cfg.Zone + if zone == "" { + zone = cfg.ID + } + // We do allow a nil FlushTransferer, but to keep the ring logic easier we assume // it's always set, so we use a noop FlushTransferer if flushTransferer == nil { @@ -176,6 +184,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa RingName: ringName, RingKey: ringKey, flushOnShutdown: flushOnShutdown, + Zone: zone, actorChan: make(chan func()), @@ -502,14 +511,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error { if len(tokensFromFile) >= i.cfg.NumTokens { i.setState(ACTIVE) } - ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState()) i.setTokens(tokensFromFile) return ringDesc, true, nil } // Either we are a new ingester, or consul must have restarted level.Info(util.Logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName) - ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState()) return ringDesc, true, nil } @@ -564,7 +573,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool { ringTokens = append(ringTokens, newTokens...) sort.Sort(ringTokens) - ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState()) i.setTokens(ringTokens) @@ -626,7 +635,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er sort.Sort(myTokens) i.setTokens(myTokens) - ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState()) return ringDesc, true, nil }) @@ -655,7 +664,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error { if !ok { // consul must have restarted level.Info(util.Logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName) - ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState()) } else { ingesterDesc.Timestamp = time.Now().Unix() ingesterDesc.State = i.GetState() diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 3170deb9128..f13965b4e7e 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -37,7 +37,7 @@ func NewDesc() *Desc { // AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, // any other tokens are removed. -func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState) { +func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState) { if d.Ingesters == nil { d.Ingesters = map[string]IngesterDesc{} } @@ -47,6 +47,7 @@ func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState Timestamp: time.Now().Unix(), State: state, Tokens: tokens, + Zone: zone, } d.Ingesters[id] = ingester @@ -377,6 +378,7 @@ func (d *Desc) RemoveTombstones(limit time.Time) { type TokenDesc struct { Token uint32 Ingester string + Zone string } // Returns sorted list of tokens with ingester names. @@ -388,7 +390,7 @@ func (d *Desc) getTokens() []TokenDesc { tokens := make([]TokenDesc, 0, numTokens) for key, ing := range d.Ingesters { for _, token := range ing.Tokens { - tokens = append(tokens, TokenDesc{Token: token, Ingester: key}) + tokens = append(tokens, TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()}) } } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 06568c0f260..cd52968bc0d 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -182,6 +182,7 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet n = r.cfg.ReplicationFactor ingesters = buf[:0] distinctHosts = map[string]struct{}{} + distinctZones = map[string]struct{}{} start = r.search(key) iterations = 0 ) @@ -190,12 +191,16 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet // Wrap i around in the ring. i %= len(r.ringTokens) - // We want n *distinct* ingesters. + // We want n *distinct* ingesters && distinct zones. token := r.ringTokens[i] if _, ok := distinctHosts[token.Ingester]; ok { continue } + if _, ok := distinctZones[token.Zone]; ok { + continue + } distinctHosts[token.Ingester] = struct{}{} + distinctZones[token.Zone] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] // We do not want to Write to Ingesters that are not ACTIVE, but we do want diff --git a/pkg/ring/ring.pb.go b/pkg/ring/ring.pb.go index 62e3ad152af..077b41b379b 100644 --- a/pkg/ring/ring.pb.go +++ b/pkg/ring/ring.pb.go @@ -107,6 +107,7 @@ type IngesterDesc struct { Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` State IngesterState `protobuf:"varint,3,opt,name=state,proto3,enum=ring.IngesterState" json:"state,omitempty"` Tokens []uint32 `protobuf:"varint,6,rep,packed,name=tokens,proto3" json:"tokens,omitempty"` + Zone string `protobuf:"bytes,7,opt,name=zone,proto3" json:"zone,omitempty"` } func (m *IngesterDesc) Reset() { *m = IngesterDesc{} } @@ -169,6 +170,13 @@ func (m *IngesterDesc) GetTokens() []uint32 { return nil } +func (m *IngesterDesc) GetZone() string { + if m != nil { + return m.Zone + } + return "" +} + func init() { proto.RegisterEnum("ring.IngesterState", IngesterState_name, IngesterState_value) proto.RegisterType((*Desc)(nil), "ring.Desc") @@ -179,32 +187,32 @@ func init() { func init() { proto.RegisterFile("ring.proto", fileDescriptor_26381ed67e202a6e) } var fileDescriptor_26381ed67e202a6e = []byte{ - // 387 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0x3f, 0x6f, 0xd3, 0x40, - 0x18, 0xc6, 0xef, 0xb5, 0xcf, 0xc6, 0x79, 0x43, 0x2b, 0xeb, 0x90, 0x90, 0xa9, 0xd0, 0x61, 0x75, - 0x32, 0x48, 0xb8, 0x52, 0x60, 0x40, 0x48, 0x0c, 0x2d, 0x35, 0xc8, 0x56, 0x14, 0x2a, 0x53, 0x75, - 0x77, 0xda, 0xc3, 0x58, 0x25, 0x76, 0x65, 0x5f, 0x90, 0xba, 0xf1, 0x0d, 0xe0, 0x0b, 0xb0, 0xf3, - 0x51, 0x3a, 0x66, 0xcc, 0x84, 0x88, 0xb3, 0x30, 0xe6, 0x23, 0xa0, 0x3b, 0x27, 0x0a, 0xd9, 0x9e, - 0xdf, 0x3d, 0x7f, 0xde, 0xe1, 0x10, 0xeb, 0xa2, 0xcc, 0xc3, 0x9b, 0xba, 0x92, 0x15, 0xa3, 0x4a, - 0x1f, 0x3c, 0xcf, 0x0b, 0xf9, 0x79, 0x3a, 0x0e, 0x2f, 0xab, 0xc9, 0x51, 0x5e, 0xe5, 0xd5, 0x91, - 0x36, 0xc7, 0xd3, 0x4f, 0x9a, 0x34, 0x68, 0xd5, 0x95, 0x0e, 0x7f, 0x02, 0xd2, 0x53, 0xd1, 0x5c, - 0xb2, 0x37, 0xd8, 0x2b, 0xca, 0x5c, 0x34, 0x52, 0xd4, 0x8d, 0x07, 0xbe, 0x19, 0xf4, 0x07, 0x8f, - 0x42, 0xbd, 0xae, 0xec, 0x30, 0xde, 0x78, 0x51, 0x29, 0xeb, 0xdb, 0x13, 0x7a, 0xf7, 0xfb, 0x09, - 0x49, 0xb7, 0x8d, 0x83, 0x33, 0xdc, 0xdf, 0x8d, 0x30, 0x17, 0xcd, 0x6b, 0x71, 0xeb, 0x81, 0x0f, - 0x41, 0x2f, 0x55, 0x92, 0x05, 0x68, 0x7d, 0xcd, 0xbe, 0x4c, 0x85, 0x67, 0xf8, 0x10, 0xf4, 0x07, - 0xac, 0x9b, 0xdf, 0xd4, 0xd4, 0x99, 0xb4, 0x0b, 0xbc, 0x36, 0x5e, 0x41, 0x42, 0x1d, 0xc3, 0x35, - 0x0f, 0xbf, 0x03, 0xde, 0xff, 0x3f, 0xc1, 0x18, 0xd2, 0xec, 0xea, 0xaa, 0x5e, 0xef, 0x6a, 0xcd, - 0x1e, 0x63, 0x4f, 0x16, 0x13, 0xd1, 0xc8, 0x6c, 0x72, 0xa3, 0xc7, 0xcd, 0x74, 0xfb, 0xc0, 0x9e, - 0xa2, 0xd5, 0xc8, 0x4c, 0x0a, 0xcf, 0xf4, 0x21, 0xd8, 0x1f, 0x3c, 0xd8, 0x3d, 0xfb, 0x51, 0x59, - 0x69, 0x97, 0x60, 0x0f, 0xd1, 0x96, 0xd5, 0xb5, 0x28, 0x1b, 0xcf, 0xf6, 0xcd, 0x60, 0x2f, 0x5d, - 0x53, 0x42, 0x1d, 0xea, 0x5a, 0x09, 0x75, 0x2c, 0xd7, 0x7e, 0x36, 0xc4, 0xbd, 0x9d, 0x2e, 0x43, - 0xb4, 0x8f, 0xdf, 0x9e, 0xc7, 0x17, 0x91, 0x4b, 0x58, 0x1f, 0xef, 0x0d, 0xa3, 0xe3, 0x8b, 0x78, - 0xf4, 0xde, 0x05, 0x05, 0x67, 0xd1, 0xe8, 0x54, 0x81, 0xa1, 0x20, 0xf9, 0x10, 0x8f, 0x14, 0x98, - 0xcc, 0x41, 0x3a, 0x8c, 0xde, 0x9d, 0xbb, 0xf4, 0xe4, 0xe5, 0x6c, 0xc1, 0xc9, 0x7c, 0xc1, 0xc9, - 0x6a, 0xc1, 0xe1, 0x5b, 0xcb, 0xe1, 0x57, 0xcb, 0xe1, 0xae, 0xe5, 0x30, 0x6b, 0x39, 0xfc, 0x69, - 0x39, 0xfc, 0x6d, 0x39, 0x59, 0xb5, 0x1c, 0x7e, 0x2c, 0x39, 0x99, 0x2d, 0x39, 0x99, 0x2f, 0x39, - 0x19, 0xdb, 0xfa, 0xf3, 0x5e, 0xfc, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x33, 0x18, 0xb8, 0xad, 0xff, - 0x01, 0x00, 0x00, + // 399 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0x31, 0x6f, 0xd3, 0x40, + 0x1c, 0xc5, 0xef, 0x6f, 0x9f, 0x5d, 0xe7, 0x1f, 0x5a, 0x59, 0x87, 0x84, 0x4c, 0x85, 0x0e, 0xab, + 0x93, 0x41, 0xc2, 0x95, 0x02, 0x03, 0x42, 0x62, 0x68, 0xa9, 0x41, 0xb6, 0xa2, 0x50, 0x99, 0xaa, + 0xbb, 0xd3, 0x1e, 0xc6, 0x2a, 0xb1, 0x2b, 0xfb, 0x82, 0x54, 0x26, 0x3e, 0x02, 0x5f, 0x80, 0x89, + 0x85, 0x8f, 0xd2, 0x31, 0x63, 0x27, 0x44, 0x9c, 0x85, 0x31, 0x1f, 0x01, 0xdd, 0x39, 0x51, 0xc8, + 0xf6, 0x7e, 0xf7, 0xde, 0xff, 0xbd, 0xe1, 0x10, 0xeb, 0xa2, 0xcc, 0xc3, 0xeb, 0xba, 0x92, 0x15, + 0xa3, 0x4a, 0xef, 0x3f, 0xcb, 0x0b, 0xf9, 0x69, 0x3a, 0x0e, 0x2f, 0xaa, 0xc9, 0x61, 0x5e, 0xe5, + 0xd5, 0xa1, 0x36, 0xc7, 0xd3, 0x8f, 0x9a, 0x34, 0x68, 0xd5, 0x1d, 0x1d, 0xfc, 0x00, 0xa4, 0x27, + 0xa2, 0xb9, 0x60, 0xaf, 0xb1, 0x57, 0x94, 0xb9, 0x68, 0xa4, 0xa8, 0x1b, 0x0f, 0x7c, 0x33, 0xe8, + 0x0f, 0x1e, 0x86, 0xba, 0x5d, 0xd9, 0x61, 0xbc, 0xf6, 0xa2, 0x52, 0xd6, 0x37, 0xc7, 0xf4, 0xf6, + 0xf7, 0x63, 0x92, 0x6e, 0x2e, 0xf6, 0x4f, 0x71, 0x6f, 0x3b, 0xc2, 0x5c, 0x34, 0xaf, 0xc4, 0x8d, + 0x07, 0x3e, 0x04, 0xbd, 0x54, 0x49, 0x16, 0xa0, 0xf5, 0x25, 0xfb, 0x3c, 0x15, 0x9e, 0xe1, 0x43, + 0xd0, 0x1f, 0xb0, 0xae, 0x7e, 0x7d, 0xa6, 0x66, 0xd2, 0x2e, 0xf0, 0xca, 0x78, 0x09, 0x09, 0x75, + 0x0c, 0xd7, 0x3c, 0xf8, 0x09, 0x78, 0xef, 0xff, 0x04, 0x63, 0x48, 0xb3, 0xcb, 0xcb, 0x7a, 0xd5, + 0xab, 0x35, 0x7b, 0x84, 0x3d, 0x59, 0x4c, 0x44, 0x23, 0xb3, 0xc9, 0xb5, 0x2e, 0x37, 0xd3, 0xcd, + 0x03, 0x7b, 0x82, 0x56, 0x23, 0x33, 0x29, 0x3c, 0xd3, 0x87, 0x60, 0x6f, 0x70, 0x7f, 0x7b, 0xf6, + 0x83, 0xb2, 0xd2, 0x2e, 0xc1, 0x1e, 0xa0, 0x2d, 0xab, 0x2b, 0x51, 0x36, 0x9e, 0xed, 0x9b, 0xc1, + 0x6e, 0xba, 0x22, 0x35, 0xfa, 0xb5, 0x2a, 0x85, 0xb7, 0xd3, 0x8d, 0x2a, 0x9d, 0x50, 0x87, 0xba, + 0x56, 0x42, 0x1d, 0xcb, 0xb5, 0x9f, 0x0e, 0x71, 0x77, 0xab, 0x8f, 0x21, 0xda, 0x47, 0x6f, 0xce, + 0xe2, 0xf3, 0xc8, 0x25, 0xac, 0x8f, 0x3b, 0xc3, 0xe8, 0xe8, 0x3c, 0x1e, 0xbd, 0x73, 0x41, 0xc1, + 0x69, 0x34, 0x3a, 0x51, 0x60, 0x28, 0x48, 0xde, 0xc7, 0x23, 0x05, 0x26, 0x73, 0x90, 0x0e, 0xa3, + 0xb7, 0x67, 0x2e, 0x3d, 0x7e, 0x31, 0x9b, 0x73, 0x72, 0x37, 0xe7, 0x64, 0x39, 0xe7, 0xf0, 0xad, + 0xe5, 0xf0, 0xab, 0xe5, 0x70, 0xdb, 0x72, 0x98, 0xb5, 0x1c, 0xfe, 0xb4, 0x1c, 0xfe, 0xb6, 0x9c, + 0x2c, 0x5b, 0x0e, 0xdf, 0x17, 0x9c, 0xcc, 0x16, 0x9c, 0xdc, 0x2d, 0x38, 0x19, 0xdb, 0xfa, 0x43, + 0x9f, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0xbf, 0xe3, 0xcc, 0xec, 0x13, 0x02, 0x00, 0x00, } func (x IngesterState) String() string { @@ -281,6 +289,9 @@ func (this *IngesterDesc) Equal(that interface{}) bool { return false } } + if this.Zone != that1.Zone { + return false + } return true } func (this *Desc) GoString() string { @@ -309,12 +320,13 @@ func (this *IngesterDesc) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 8) + s := make([]string, 0, 9) s = append(s, "&ring.IngesterDesc{") s = append(s, "Addr: "+fmt.Sprintf("%#v", this.Addr)+",\n") s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n") s = append(s, "State: "+fmt.Sprintf("%#v", this.State)+",\n") s = append(s, "Tokens: "+fmt.Sprintf("%#v", this.Tokens)+",\n") + s = append(s, "Zone: "+fmt.Sprintf("%#v", this.Zone)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -393,6 +405,13 @@ func (m *IngesterDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Zone) > 0 { + i -= len(m.Zone) + copy(dAtA[i:], m.Zone) + i = encodeVarintRing(dAtA, i, uint64(len(m.Zone))) + i-- + dAtA[i] = 0x3a + } if len(m.Tokens) > 0 { dAtA3 := make([]byte, len(m.Tokens)*10) var j2 int @@ -483,6 +502,10 @@ func (m *IngesterDesc) Size() (n int) { } n += 1 + sovRing(uint64(l)) + l } + l = len(m.Zone) + if l > 0 { + n += 1 + l + sovRing(uint64(l)) + } return n } @@ -521,6 +544,7 @@ func (this *IngesterDesc) String() string { `Timestamp:` + fmt.Sprintf("%v", this.Timestamp) + `,`, `State:` + fmt.Sprintf("%v", this.State) + `,`, `Tokens:` + fmt.Sprintf("%v", this.Tokens) + `,`, + `Zone:` + fmt.Sprintf("%v", this.Zone) + `,`, `}`, }, "") return s @@ -890,6 +914,38 @@ func (m *IngesterDesc) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field Tokens", wireType) } + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Zone", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRing + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRing + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRing + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Zone = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRing(dAtA[iNdEx:]) diff --git a/pkg/ring/ring.proto b/pkg/ring/ring.proto index 9670b01c090..8290ee2cca3 100644 --- a/pkg/ring/ring.proto +++ b/pkg/ring/ring.proto @@ -19,6 +19,7 @@ message IngesterDesc { int64 timestamp = 2; // unix timestamp IngesterState state = 3; repeated uint32 tokens = 6; + string zone = 7; } enum IngesterState { diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index e657c72d9cd..16086ad0e60 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "sort" + strconv "strconv" "testing" "time" @@ -36,7 +37,7 @@ func benchmarkBatch(b *testing.B, numIngester, numKeys int) { for i := 0; i < numIngester; i++ { tokens := GenerateTokens(numTokens, takenTokens) takenTokens = append(takenTokens, tokens...) - desc.AddIngester(fmt.Sprintf("%d", i), fmt.Sprintf("ingester%d", i), tokens, ACTIVE) + desc.AddIngester(fmt.Sprintf("%d", i), fmt.Sprintf("ingester%d", i), strconv.Itoa(i), tokens, ACTIVE) } cfg := Config{} @@ -97,7 +98,7 @@ func TestAddIngester(t *testing.T) { ing1Tokens := GenerateTokens(128, nil) - r.AddIngester(ingName, "addr", ing1Tokens, ACTIVE) + r.AddIngester(ingName, "addr", "1", ing1Tokens, ACTIVE) require.Equal(t, "addr", r.Ingesters[ingName].Addr) require.Equal(t, ing1Tokens, r.Ingesters[ingName].Tokens) @@ -115,7 +116,7 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) { newTokens := GenerateTokens(128, nil) - r.AddIngester(ing1Name, "addr", newTokens, ACTIVE) + r.AddIngester(ing1Name, "addr", "1", newTokens, ACTIVE) require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens) } @@ -129,7 +130,7 @@ func TestSubring(t *testing.T) { name := fmt.Sprintf("ing%v", i) ingTokens := GenerateTokens(128, prevTokens) - r.AddIngester(name, fmt.Sprintf("addr%v", i), ingTokens, ACTIVE) + r.AddIngester(name, fmt.Sprintf("addr%v", i), strconv.Itoa(i), ingTokens, ACTIVE) prevTokens = append(prevTokens, ingTokens...) } @@ -183,7 +184,7 @@ func TestStableSubring(t *testing.T) { name := fmt.Sprintf("ing%v", i) ingTokens := GenerateTokens(128, prevTokens) - r.AddIngester(name, fmt.Sprintf("addr%v", i), ingTokens, ACTIVE) + r.AddIngester(name, fmt.Sprintf("addr%v", i), strconv.Itoa(i), ingTokens, ACTIVE) prevTokens = append(prevTokens, ingTokens...) } @@ -223,3 +224,120 @@ func TestStableSubring(t *testing.T) { require.Equal(t, subrings[i], subrings[next]) } } + +func TestZoneAwareIngesterAssignmentSucccess(t *testing.T) { + + // runs a series of Get calls on the ring to ensure Ingesters' zone values are taken into + // consideration when assigning a set for a given token. + + r := NewDesc() + + n := 16 // number of ingesters in ring + z := 3 // number of availability zones. + + testCount := 1000000 // number of key tests to run. + + var prevTokens []uint32 + for i := 0; i < n; i++ { + name := fmt.Sprintf("ing%v", i) + ingTokens := GenerateTokens(128, prevTokens) + + r.AddIngester(name, fmt.Sprintf("addr%v", i), fmt.Sprintf("zone-%v", i%z), ingTokens, ACTIVE) + + prevTokens = append(prevTokens, ingTokens...) + } + + // Create a ring with the ingesters + ring := Ring{ + name: "main ring", + cfg: Config{ + HeartbeatTimeout: time.Hour, + ReplicationFactor: 3, + }, + ringDesc: r, + ringTokens: r.getTokens(), + } + // use the GenerateTokens to get an array of random uint32 values + testValues := make([]uint32, testCount) + testValues = GenerateTokens(testCount, testValues) + ing := r.GetIngesters() + ingesters := make([]IngesterDesc, 0, len(ing)) + for _, v := range ing { + ingesters = append(ingesters, v) + } + var set ReplicationSet + var e error + for i := 0; i < testCount; i++ { + set, e = ring.Get(testValues[i], Write, ingesters) + if e != nil { + t.Fail() + return + } + + // check that we have the expected number of ingesters for replication. + require.Equal(t, 3, len(set.Ingesters)) + + // ensure all ingesters are in a different zone. + zones := make(map[string]struct{}) + for i := 0; i < len(set.Ingesters); i++ { + if _, ok := zones[set.Ingesters[i].Zone]; ok { + t.Fail() + } + zones[set.Ingesters[i].Zone] = struct{}{} + } + } + +} + +func TestZoneAwareIngesterAssignmentFailure(t *testing.T) { + + // This test ensures that when there are not ingesters in enough distinct zones + // an error will occur when attempting to get a replication set for a token. + + r := NewDesc() + + n := 16 // number of ingesters in ring + z := 1 // number of availability zones. + + testCount := 10 // number of key tests to run. + + var prevTokens []uint32 + for i := 0; i < n; i++ { + name := fmt.Sprintf("ing%v", i) + ingTokens := GenerateTokens(128, prevTokens) + + r.AddIngester(name, fmt.Sprintf("addr%v", i), fmt.Sprintf("zone-%v", i%z), ingTokens, ACTIVE) + + prevTokens = append(prevTokens, ingTokens...) + } + + // Create a ring with the ingesters + ring := Ring{ + name: "main ring", + cfg: Config{ + HeartbeatTimeout: time.Hour, + ReplicationFactor: 3, + }, + ringDesc: r, + ringTokens: r.getTokens(), + } + // use the GenerateTokens to get an array of random uint32 values + testValues := make([]uint32, testCount) + testValues = GenerateTokens(testCount, testValues) + ing := r.GetIngesters() + ingesters := make([]IngesterDesc, 0, len(ing)) + for _, v := range ing { + ingesters = append(ingesters, v) + } + + for i := 0; i < testCount; i++ { + // Since there is only 1 zone assigned, we are expecting an error here. + _, e := ring.Get(testValues[i], Write, ingesters) + if e != nil { + require.Equal(t, "at least 2 live replicas required, could only find 1", e.Error()) + continue + } + t.Fail() + } + +}