From 5d34fac7e571310fd13082f8ed28784a740d9d2d Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Thu, 19 Mar 2020 09:53:51 -0700 Subject: [PATCH 01/10] adding Zone information to Token and Ingester description structs. Added distinct zone check in ring Get ingesters function Signed-off-by: Ken Haines --- pkg/ring/model.go | 3 +- pkg/ring/ring.go | 7 ++- pkg/ring/ring.pb.go | 110 +++++++++++++++++++++++++++++++++----------- pkg/ring/ring.proto | 1 + 4 files changed, 92 insertions(+), 29 deletions(-) diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 3170deb9128..991b7dbbedf 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -377,6 +377,7 @@ func (d *Desc) RemoveTombstones(limit time.Time) { type TokenDesc struct { Token uint32 Ingester string + Zone string } // Returns sorted list of tokens with ingester names. @@ -388,7 +389,7 @@ func (d *Desc) getTokens() []TokenDesc { tokens := make([]TokenDesc, 0, numTokens) for key, ing := range d.Ingesters { for _, token := range ing.Tokens { - tokens = append(tokens, TokenDesc{Token: token, Ingester: key}) + tokens = append(tokens, TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()}) } } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 06568c0f260..cd52968bc0d 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -182,6 +182,7 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet n = r.cfg.ReplicationFactor ingesters = buf[:0] distinctHosts = map[string]struct{}{} + distinctZones = map[string]struct{}{} start = r.search(key) iterations = 0 ) @@ -190,12 +191,16 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet // Wrap i around in the ring. i %= len(r.ringTokens) - // We want n *distinct* ingesters. + // We want n *distinct* ingesters && distinct zones. token := r.ringTokens[i] if _, ok := distinctHosts[token.Ingester]; ok { continue } + if _, ok := distinctZones[token.Zone]; ok { + continue + } distinctHosts[token.Ingester] = struct{}{} + distinctZones[token.Zone] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] // We do not want to Write to Ingesters that are not ACTIVE, but we do want diff --git a/pkg/ring/ring.pb.go b/pkg/ring/ring.pb.go index 62e3ad152af..077b41b379b 100644 --- a/pkg/ring/ring.pb.go +++ b/pkg/ring/ring.pb.go @@ -107,6 +107,7 @@ type IngesterDesc struct { Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` State IngesterState `protobuf:"varint,3,opt,name=state,proto3,enum=ring.IngesterState" json:"state,omitempty"` Tokens []uint32 `protobuf:"varint,6,rep,packed,name=tokens,proto3" json:"tokens,omitempty"` + Zone string `protobuf:"bytes,7,opt,name=zone,proto3" json:"zone,omitempty"` } func (m *IngesterDesc) Reset() { *m = IngesterDesc{} } @@ -169,6 +170,13 @@ func (m *IngesterDesc) GetTokens() []uint32 { return nil } +func (m *IngesterDesc) GetZone() string { + if m != nil { + return m.Zone + } + return "" +} + func init() { proto.RegisterEnum("ring.IngesterState", IngesterState_name, IngesterState_value) proto.RegisterType((*Desc)(nil), "ring.Desc") @@ -179,32 +187,32 @@ func init() { func init() { proto.RegisterFile("ring.proto", fileDescriptor_26381ed67e202a6e) } var fileDescriptor_26381ed67e202a6e = []byte{ - // 387 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0x3f, 0x6f, 0xd3, 0x40, - 0x18, 0xc6, 0xef, 0xb5, 0xcf, 0xc6, 0x79, 0x43, 0x2b, 0xeb, 0x90, 0x90, 0xa9, 0xd0, 0x61, 0x75, - 0x32, 0x48, 0xb8, 0x52, 0x60, 0x40, 0x48, 0x0c, 0x2d, 0x35, 0xc8, 0x56, 0x14, 0x2a, 0x53, 0x75, - 0x77, 0xda, 0xc3, 0x58, 0x25, 0x76, 0x65, 0x5f, 0x90, 0xba, 0xf1, 0x0d, 0xe0, 0x0b, 0xb0, 0xf3, - 0x51, 0x3a, 0x66, 0xcc, 0x84, 0x88, 0xb3, 0x30, 0xe6, 0x23, 0xa0, 0x3b, 0x27, 0x0a, 0xd9, 0x9e, - 0xdf, 0x3d, 0x7f, 0xde, 0xe1, 0x10, 0xeb, 0xa2, 0xcc, 0xc3, 0x9b, 0xba, 0x92, 0x15, 0xa3, 0x4a, - 0x1f, 0x3c, 0xcf, 0x0b, 0xf9, 0x79, 0x3a, 0x0e, 0x2f, 0xab, 0xc9, 0x51, 0x5e, 0xe5, 0xd5, 0x91, - 0x36, 0xc7, 0xd3, 0x4f, 0x9a, 0x34, 0x68, 0xd5, 0x95, 0x0e, 0x7f, 0x02, 0xd2, 0x53, 0xd1, 0x5c, - 0xb2, 0x37, 0xd8, 0x2b, 0xca, 0x5c, 0x34, 0x52, 0xd4, 0x8d, 0x07, 0xbe, 0x19, 0xf4, 0x07, 0x8f, - 0x42, 0xbd, 0xae, 0xec, 0x30, 0xde, 0x78, 0x51, 0x29, 0xeb, 0xdb, 0x13, 0x7a, 0xf7, 0xfb, 0x09, - 0x49, 0xb7, 0x8d, 0x83, 0x33, 0xdc, 0xdf, 0x8d, 0x30, 0x17, 0xcd, 0x6b, 0x71, 0xeb, 0x81, 0x0f, - 0x41, 0x2f, 0x55, 0x92, 0x05, 0x68, 0x7d, 0xcd, 0xbe, 0x4c, 0x85, 0x67, 0xf8, 0x10, 0xf4, 0x07, - 0xac, 0x9b, 0xdf, 0xd4, 0xd4, 0x99, 0xb4, 0x0b, 0xbc, 0x36, 0x5e, 0x41, 0x42, 0x1d, 0xc3, 0x35, - 0x0f, 0xbf, 0x03, 0xde, 0xff, 0x3f, 0xc1, 0x18, 0xd2, 0xec, 0xea, 0xaa, 0x5e, 0xef, 0x6a, 0xcd, - 0x1e, 0x63, 0x4f, 0x16, 0x13, 0xd1, 0xc8, 0x6c, 0x72, 0xa3, 0xc7, 0xcd, 0x74, 0xfb, 0xc0, 0x9e, - 0xa2, 0xd5, 0xc8, 0x4c, 0x0a, 0xcf, 0xf4, 0x21, 0xd8, 0x1f, 0x3c, 0xd8, 0x3d, 0xfb, 0x51, 0x59, - 0x69, 0x97, 0x60, 0x0f, 0xd1, 0x96, 0xd5, 0xb5, 0x28, 0x1b, 0xcf, 0xf6, 0xcd, 0x60, 0x2f, 0x5d, - 0x53, 0x42, 0x1d, 0xea, 0x5a, 0x09, 0x75, 0x2c, 0xd7, 0x7e, 0x36, 0xc4, 0xbd, 0x9d, 0x2e, 0x43, - 0xb4, 0x8f, 0xdf, 0x9e, 0xc7, 0x17, 0x91, 0x4b, 0x58, 0x1f, 0xef, 0x0d, 0xa3, 0xe3, 0x8b, 0x78, - 0xf4, 0xde, 0x05, 0x05, 0x67, 0xd1, 0xe8, 0x54, 0x81, 0xa1, 0x20, 0xf9, 0x10, 0x8f, 0x14, 0x98, - 0xcc, 0x41, 0x3a, 0x8c, 0xde, 0x9d, 0xbb, 0xf4, 0xe4, 0xe5, 0x6c, 0xc1, 0xc9, 0x7c, 0xc1, 0xc9, - 0x6a, 0xc1, 0xe1, 0x5b, 0xcb, 0xe1, 0x57, 0xcb, 0xe1, 0xae, 0xe5, 0x30, 0x6b, 0x39, 0xfc, 0x69, - 0x39, 0xfc, 0x6d, 0x39, 0x59, 0xb5, 0x1c, 0x7e, 0x2c, 0x39, 0x99, 0x2d, 0x39, 0x99, 0x2f, 0x39, - 0x19, 0xdb, 0xfa, 0xf3, 0x5e, 0xfc, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x33, 0x18, 0xb8, 0xad, 0xff, - 0x01, 0x00, 0x00, + // 399 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0x31, 0x6f, 0xd3, 0x40, + 0x1c, 0xc5, 0xef, 0x6f, 0x9f, 0x5d, 0xe7, 0x1f, 0x5a, 0x59, 0x87, 0x84, 0x4c, 0x85, 0x0e, 0xab, + 0x93, 0x41, 0xc2, 0x95, 0x02, 0x03, 0x42, 0x62, 0x68, 0xa9, 0x41, 0xb6, 0xa2, 0x50, 0x99, 0xaa, + 0xbb, 0xd3, 0x1e, 0xc6, 0x2a, 0xb1, 0x2b, 0xfb, 0x82, 0x54, 0x26, 0x3e, 0x02, 0x5f, 0x80, 0x89, + 0x85, 0x8f, 0xd2, 0x31, 0x63, 0x27, 0x44, 0x9c, 0x85, 0x31, 0x1f, 0x01, 0xdd, 0x39, 0x51, 0xc8, + 0xf6, 0x7e, 0xf7, 0xde, 0xff, 0xbd, 0xe1, 0x10, 0xeb, 0xa2, 0xcc, 0xc3, 0xeb, 0xba, 0x92, 0x15, + 0xa3, 0x4a, 0xef, 0x3f, 0xcb, 0x0b, 0xf9, 0x69, 0x3a, 0x0e, 0x2f, 0xaa, 0xc9, 0x61, 0x5e, 0xe5, + 0xd5, 0xa1, 0x36, 0xc7, 0xd3, 0x8f, 0x9a, 0x34, 0x68, 0xd5, 0x1d, 0x1d, 0xfc, 0x00, 0xa4, 0x27, + 0xa2, 0xb9, 0x60, 0xaf, 0xb1, 0x57, 0x94, 0xb9, 0x68, 0xa4, 0xa8, 0x1b, 0x0f, 0x7c, 0x33, 0xe8, + 0x0f, 0x1e, 0x86, 0xba, 0x5d, 0xd9, 0x61, 0xbc, 0xf6, 0xa2, 0x52, 0xd6, 0x37, 0xc7, 0xf4, 0xf6, + 0xf7, 0x63, 0x92, 0x6e, 0x2e, 0xf6, 0x4f, 0x71, 0x6f, 0x3b, 0xc2, 0x5c, 0x34, 0xaf, 0xc4, 0x8d, + 0x07, 0x3e, 0x04, 0xbd, 0x54, 0x49, 0x16, 0xa0, 0xf5, 0x25, 0xfb, 0x3c, 0x15, 0x9e, 0xe1, 0x43, + 0xd0, 0x1f, 0xb0, 0xae, 0x7e, 0x7d, 0xa6, 0x66, 0xd2, 0x2e, 0xf0, 0xca, 0x78, 0x09, 0x09, 0x75, + 0x0c, 0xd7, 0x3c, 0xf8, 0x09, 0x78, 0xef, 0xff, 0x04, 0x63, 0x48, 0xb3, 0xcb, 0xcb, 0x7a, 0xd5, + 0xab, 0x35, 0x7b, 0x84, 0x3d, 0x59, 0x4c, 0x44, 0x23, 0xb3, 0xc9, 0xb5, 0x2e, 0x37, 0xd3, 0xcd, + 0x03, 0x7b, 0x82, 0x56, 0x23, 0x33, 0x29, 0x3c, 0xd3, 0x87, 0x60, 0x6f, 0x70, 0x7f, 0x7b, 0xf6, + 0x83, 0xb2, 0xd2, 0x2e, 0xc1, 0x1e, 0xa0, 0x2d, 0xab, 0x2b, 0x51, 0x36, 0x9e, 0xed, 0x9b, 0xc1, + 0x6e, 0xba, 0x22, 0x35, 0xfa, 0xb5, 0x2a, 0x85, 0xb7, 0xd3, 0x8d, 0x2a, 0x9d, 0x50, 0x87, 0xba, + 0x56, 0x42, 0x1d, 0xcb, 0xb5, 0x9f, 0x0e, 0x71, 0x77, 0xab, 0x8f, 0x21, 0xda, 0x47, 0x6f, 0xce, + 0xe2, 0xf3, 0xc8, 0x25, 0xac, 0x8f, 0x3b, 0xc3, 0xe8, 0xe8, 0x3c, 0x1e, 0xbd, 0x73, 0x41, 0xc1, + 0x69, 0x34, 0x3a, 0x51, 0x60, 0x28, 0x48, 0xde, 0xc7, 0x23, 0x05, 0x26, 0x73, 0x90, 0x0e, 0xa3, + 0xb7, 0x67, 0x2e, 0x3d, 0x7e, 0x31, 0x9b, 0x73, 0x72, 0x37, 0xe7, 0x64, 0x39, 0xe7, 0xf0, 0xad, + 0xe5, 0xf0, 0xab, 0xe5, 0x70, 0xdb, 0x72, 0x98, 0xb5, 0x1c, 0xfe, 0xb4, 0x1c, 0xfe, 0xb6, 0x9c, + 0x2c, 0x5b, 0x0e, 0xdf, 0x17, 0x9c, 0xcc, 0x16, 0x9c, 0xdc, 0x2d, 0x38, 0x19, 0xdb, 0xfa, 0x43, + 0x9f, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0xbf, 0xe3, 0xcc, 0xec, 0x13, 0x02, 0x00, 0x00, } func (x IngesterState) String() string { @@ -281,6 +289,9 @@ func (this *IngesterDesc) Equal(that interface{}) bool { return false } } + if this.Zone != that1.Zone { + return false + } return true } func (this *Desc) GoString() string { @@ -309,12 +320,13 @@ func (this *IngesterDesc) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 8) + s := make([]string, 0, 9) s = append(s, "&ring.IngesterDesc{") s = append(s, "Addr: "+fmt.Sprintf("%#v", this.Addr)+",\n") s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n") s = append(s, "State: "+fmt.Sprintf("%#v", this.State)+",\n") s = append(s, "Tokens: "+fmt.Sprintf("%#v", this.Tokens)+",\n") + s = append(s, "Zone: "+fmt.Sprintf("%#v", this.Zone)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -393,6 +405,13 @@ func (m *IngesterDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Zone) > 0 { + i -= len(m.Zone) + copy(dAtA[i:], m.Zone) + i = encodeVarintRing(dAtA, i, uint64(len(m.Zone))) + i-- + dAtA[i] = 0x3a + } if len(m.Tokens) > 0 { dAtA3 := make([]byte, len(m.Tokens)*10) var j2 int @@ -483,6 +502,10 @@ func (m *IngesterDesc) Size() (n int) { } n += 1 + sovRing(uint64(l)) + l } + l = len(m.Zone) + if l > 0 { + n += 1 + l + sovRing(uint64(l)) + } return n } @@ -521,6 +544,7 @@ func (this *IngesterDesc) String() string { `Timestamp:` + fmt.Sprintf("%v", this.Timestamp) + `,`, `State:` + fmt.Sprintf("%v", this.State) + `,`, `Tokens:` + fmt.Sprintf("%v", this.Tokens) + `,`, + `Zone:` + fmt.Sprintf("%v", this.Zone) + `,`, `}`, }, "") return s @@ -890,6 +914,38 @@ func (m *IngesterDesc) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field Tokens", wireType) } + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Zone", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRing + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRing + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRing + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Zone = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRing(dAtA[iNdEx:]) diff --git a/pkg/ring/ring.proto b/pkg/ring/ring.proto index 9670b01c090..8290ee2cca3 100644 --- a/pkg/ring/ring.proto +++ b/pkg/ring/ring.proto @@ -19,6 +19,7 @@ message IngesterDesc { int64 timestamp = 2; // unix timestamp IngesterState state = 3; repeated uint32 tokens = 6; + string zone = 7; } enum IngesterState { From 43de699e94e4e31526a4e66b2340ae86ef63aa81 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Thu, 19 Mar 2020 10:36:25 -0700 Subject: [PATCH 02/10] updating AddIngester to include zone Signed-off-by: Ken Haines --- pkg/ring/model.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 991b7dbbedf..f13965b4e7e 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -37,7 +37,7 @@ func NewDesc() *Desc { // AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens, // any other tokens are removed. -func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState) { +func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state IngesterState) { if d.Ingesters == nil { d.Ingesters = map[string]IngesterDesc{} } @@ -47,6 +47,7 @@ func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState Timestamp: time.Now().Unix(), State: state, Tokens: tokens, + Zone: zone, } d.Ingesters[id] = ingester From 04939c3e6a373a0ec4ace5efe968989a878350c4 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Thu, 19 Mar 2020 11:15:30 -0700 Subject: [PATCH 03/10] updating lifecylcer config and tests to enable availability zone Signed-off-by: Ken Haines --- pkg/ring/http.go | 2 ++ pkg/ring/lifecycler.go | 14 +++++++++----- pkg/ring/ring_test.go | 11 ++++++----- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/ring/http.go b/pkg/ring/http.go index 12fea72d096..d731f743deb 100644 --- a/pkg/ring/http.go +++ b/pkg/ring/http.go @@ -30,6 +30,7 @@ const pageContent = ` Instance ID + Availabilty Zone State Address Last Heartbeat @@ -46,6 +47,7 @@ const pageContent = ` {{ end }} {{ .ID }} + {{ .Zone }} {{ .State }} {{ .Address }} {{ .Timestamp }} diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 2d805565c90..195fcef38ff 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -55,6 +55,7 @@ type LifecyclerConfig struct { InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` TokensFilePath string `yaml:"tokens_file_path"` + Zone string `yaml:"availability_zone"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` @@ -103,6 +104,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.StringVar(&cfg.Addr, prefix+"lifecycler.addr", "", "IP address to advertise in consul.") f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).") f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register into consul.") + f.StringVar(&cfg.Zone, prefix+"availability-zone", hostname, "The availability zone of the host, this instance is running on. Default is the hostname value.") } // Lifecycler is responsible for managing the lifecycle of entries in the ring. @@ -120,6 +122,7 @@ type Lifecycler struct { Addr string RingName string RingKey string + Zone string // Whether to flush if transfer fails on shutdown. flushOnShutdown bool @@ -176,6 +179,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa RingName: ringName, RingKey: ringKey, flushOnShutdown: flushOnShutdown, + Zone: cfg.Zone, actorChan: make(chan func()), @@ -502,14 +506,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error { if len(tokensFromFile) >= i.cfg.NumTokens { i.setState(ACTIVE) } - ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState()) i.setTokens(tokensFromFile) return ringDesc, true, nil } // Either we are a new ingester, or consul must have restarted level.Info(util.Logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName) - ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState()) return ringDesc, true, nil } @@ -564,7 +568,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool { ringTokens = append(ringTokens, newTokens...) sort.Sort(ringTokens) - ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState()) i.setTokens(ringTokens) @@ -626,7 +630,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er sort.Sort(myTokens) i.setTokens(myTokens) - ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState()) return ringDesc, true, nil }) @@ -655,7 +659,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error { if !ok { // consul must have restarted level.Info(util.Logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName) - ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState()) + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState()) } else { ingesterDesc.Timestamp = time.Now().Unix() ingesterDesc.State = i.GetState() diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index e657c72d9cd..ec137d9e72e 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "sort" + strconv "strconv" "testing" "time" @@ -36,7 +37,7 @@ func benchmarkBatch(b *testing.B, numIngester, numKeys int) { for i := 0; i < numIngester; i++ { tokens := GenerateTokens(numTokens, takenTokens) takenTokens = append(takenTokens, tokens...) - desc.AddIngester(fmt.Sprintf("%d", i), fmt.Sprintf("ingester%d", i), tokens, ACTIVE) + desc.AddIngester(fmt.Sprintf("%d", i), fmt.Sprintf("ingester%d", i), strconv.Itoa(i), tokens, ACTIVE) } cfg := Config{} @@ -97,7 +98,7 @@ func TestAddIngester(t *testing.T) { ing1Tokens := GenerateTokens(128, nil) - r.AddIngester(ingName, "addr", ing1Tokens, ACTIVE) + r.AddIngester(ingName, "addr", "1", ing1Tokens, ACTIVE) require.Equal(t, "addr", r.Ingesters[ingName].Addr) require.Equal(t, ing1Tokens, r.Ingesters[ingName].Tokens) @@ -115,7 +116,7 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) { newTokens := GenerateTokens(128, nil) - r.AddIngester(ing1Name, "addr", newTokens, ACTIVE) + r.AddIngester(ing1Name, "addr", "1", newTokens, ACTIVE) require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens) } @@ -129,7 +130,7 @@ func TestSubring(t *testing.T) { name := fmt.Sprintf("ing%v", i) ingTokens := GenerateTokens(128, prevTokens) - r.AddIngester(name, fmt.Sprintf("addr%v", i), ingTokens, ACTIVE) + r.AddIngester(name, fmt.Sprintf("addr%v", i), strconv.Itoa(i), ingTokens, ACTIVE) prevTokens = append(prevTokens, ingTokens...) } @@ -183,7 +184,7 @@ func TestStableSubring(t *testing.T) { name := fmt.Sprintf("ing%v", i) ingTokens := GenerateTokens(128, prevTokens) - r.AddIngester(name, fmt.Sprintf("addr%v", i), ingTokens, ACTIVE) + r.AddIngester(name, fmt.Sprintf("addr%v", i), strconv.Itoa(i), ingTokens, ACTIVE) prevTokens = append(prevTokens, ingTokens...) } From e35561da7d760f6e5622064a8343681d5783422a Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Fri, 20 Mar 2020 20:08:49 -0700 Subject: [PATCH 04/10] adding zone info to ring's HTTP method Signed-off-by: Ken Haines --- pkg/ring/http.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/ring/http.go b/pkg/ring/http.go index d731f743deb..1cfbae88564 100644 --- a/pkg/ring/http.go +++ b/pkg/ring/http.go @@ -140,16 +140,17 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { } ingesters = append(ingesters, struct { - ID, State, Address, Timestamp string - Tokens []uint32 - NumTokens int - Ownership float64 + ID, State, Address, Timestamp, Zone string + Tokens []uint32 + NumTokens int + Ownership float64 }{ ID: id, State: state, Address: ing.Addr, Timestamp: timestamp.String(), Tokens: ing.Tokens, + Zone: ing.Zone, NumTokens: len(ing.Tokens), Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100, }) From 10fbf0aa667f3834ba5c0b830102dea4147e50ac Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Mon, 23 Mar 2020 11:51:50 -0700 Subject: [PATCH 05/10] adding positive & negative zone aware replica set tests Signed-off-by: Ken Haines --- pkg/ring/ring_test.go | 117 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index ec137d9e72e..16086ad0e60 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -224,3 +224,120 @@ func TestStableSubring(t *testing.T) { require.Equal(t, subrings[i], subrings[next]) } } + +func TestZoneAwareIngesterAssignmentSucccess(t *testing.T) { + + // runs a series of Get calls on the ring to ensure Ingesters' zone values are taken into + // consideration when assigning a set for a given token. + + r := NewDesc() + + n := 16 // number of ingesters in ring + z := 3 // number of availability zones. + + testCount := 1000000 // number of key tests to run. + + var prevTokens []uint32 + for i := 0; i < n; i++ { + name := fmt.Sprintf("ing%v", i) + ingTokens := GenerateTokens(128, prevTokens) + + r.AddIngester(name, fmt.Sprintf("addr%v", i), fmt.Sprintf("zone-%v", i%z), ingTokens, ACTIVE) + + prevTokens = append(prevTokens, ingTokens...) + } + + // Create a ring with the ingesters + ring := Ring{ + name: "main ring", + cfg: Config{ + HeartbeatTimeout: time.Hour, + ReplicationFactor: 3, + }, + ringDesc: r, + ringTokens: r.getTokens(), + } + // use the GenerateTokens to get an array of random uint32 values + testValues := make([]uint32, testCount) + testValues = GenerateTokens(testCount, testValues) + ing := r.GetIngesters() + ingesters := make([]IngesterDesc, 0, len(ing)) + for _, v := range ing { + ingesters = append(ingesters, v) + } + var set ReplicationSet + var e error + for i := 0; i < testCount; i++ { + set, e = ring.Get(testValues[i], Write, ingesters) + if e != nil { + t.Fail() + return + } + + // check that we have the expected number of ingesters for replication. + require.Equal(t, 3, len(set.Ingesters)) + + // ensure all ingesters are in a different zone. + zones := make(map[string]struct{}) + for i := 0; i < len(set.Ingesters); i++ { + if _, ok := zones[set.Ingesters[i].Zone]; ok { + t.Fail() + } + zones[set.Ingesters[i].Zone] = struct{}{} + } + } + +} + +func TestZoneAwareIngesterAssignmentFailure(t *testing.T) { + + // This test ensures that when there are not ingesters in enough distinct zones + // an error will occur when attempting to get a replication set for a token. + + r := NewDesc() + + n := 16 // number of ingesters in ring + z := 1 // number of availability zones. + + testCount := 10 // number of key tests to run. + + var prevTokens []uint32 + for i := 0; i < n; i++ { + name := fmt.Sprintf("ing%v", i) + ingTokens := GenerateTokens(128, prevTokens) + + r.AddIngester(name, fmt.Sprintf("addr%v", i), fmt.Sprintf("zone-%v", i%z), ingTokens, ACTIVE) + + prevTokens = append(prevTokens, ingTokens...) + } + + // Create a ring with the ingesters + ring := Ring{ + name: "main ring", + cfg: Config{ + HeartbeatTimeout: time.Hour, + ReplicationFactor: 3, + }, + ringDesc: r, + ringTokens: r.getTokens(), + } + // use the GenerateTokens to get an array of random uint32 values + testValues := make([]uint32, testCount) + testValues = GenerateTokens(testCount, testValues) + ing := r.GetIngesters() + ingesters := make([]IngesterDesc, 0, len(ing)) + for _, v := range ing { + ingesters = append(ingesters, v) + } + + for i := 0; i < testCount; i++ { + // Since there is only 1 zone assigned, we are expecting an error here. + _, e := ring.Get(testValues[i], Write, ingesters) + if e != nil { + require.Equal(t, "at least 2 live replicas required, could only find 1", e.Error()) + continue + } + t.Fail() + } + +} From fb2f78bf7ab2a30e260aa7754f73ce92f903675a Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Mon, 23 Mar 2020 12:02:20 -0700 Subject: [PATCH 06/10] updating config ref doc Signed-off-by: Ken Haines --- docs/configuration/config-file-reference.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a5a212d0bbb..9c7d4c51d05 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -485,6 +485,11 @@ lifecycler: # CLI flag: -ingester.tokens-file-path [tokens_file_path: | default = ""] + # The availability zone of the host, this instance is running on. Default is + # the hostname value. + # CLI flag: -ingester.availability-zone + [availability_zone: | default = _hostname_] + # Number of times to try and transfer chunks before falling back to flushing. # Negative value or zero disables hand-over. # CLI flag: -ingester.max-transfer-retries From a978fd4a244c5896124926dc31c3daf24f481cc6 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Mon, 23 Mar 2020 12:13:42 -0700 Subject: [PATCH 07/10] updating changelog Signed-off-by: Ken Haines --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc61609713c..fcbcee2091a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * `-flusher.wal-dir` for the WAL directory to recover from. * `-flusher.concurrent-flushes` for number of concurrent flushes. * `-flusher.flush-op-timeout` is duration after which a flush should timeout. +* [FEATURE] Ingesters can now have an optional availability zone set, to ensure metric replication is distributed across zones. This is set via the `-ingester.availability-zone` flag or the `availability_zone` field in the config file. #2317 * [ENHANCEMENT] Better re-use of connections to DynamoDB and S3. #2268 * [ENHANCEMENT] Experimental TSDB: Add support for local `filesystem` backend. #2245 * [ENHANCEMENT] Experimental TSDB: Added memcached support for the TSDB index cache. #2290 From ccd0aa35b32282c2fe1e4cff45b59c52d0c46210 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Mon, 23 Mar 2020 13:51:04 -0700 Subject: [PATCH 08/10] correcting misspell caught in linting Signed-off-by: Ken Haines --- pkg/ring/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/http.go b/pkg/ring/http.go index 1cfbae88564..1b10c1a891b 100644 --- a/pkg/ring/http.go +++ b/pkg/ring/http.go @@ -30,7 +30,7 @@ const pageContent = ` Instance ID - Availabilty Zone + Availability Zone State Address Last Heartbeat From 60722a259502e3a874da3b91e0b28aa48a17e6ac Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Tue, 24 Mar 2020 09:46:06 -0700 Subject: [PATCH 09/10] updating lifecycler config for availbility zone so that the docs are generated in a consistent/verifiable way but still have a decent default value Signed-off-by: Ken Haines --- docs/configuration/config-file-reference.md | 4 ++-- pkg/ring/lifecycler.go | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 9c7d4c51d05..5f65d304dec 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -486,9 +486,9 @@ lifecycler: [tokens_file_path: | default = ""] # The availability zone of the host, this instance is running on. Default is - # the hostname value. + # the lifecycler ID. # CLI flag: -ingester.availability-zone - [availability_zone: | default = _hostname_] + [availability_zone: | default = ""] # Number of times to try and transfer chunks before falling back to flushing. # Negative value or zero disables hand-over. diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 195fcef38ff..f3820dc45ca 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -104,7 +104,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.StringVar(&cfg.Addr, prefix+"lifecycler.addr", "", "IP address to advertise in consul.") f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).") f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register into consul.") - f.StringVar(&cfg.Zone, prefix+"availability-zone", hostname, "The availability zone of the host, this instance is running on. Default is the hostname value.") + f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone of the host, this instance is running on. Default is the lifecycler ID.") } // Lifecycler is responsible for managing the lifecycle of entries in the ring. @@ -163,6 +163,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa return nil, err } + zone := cfg.Zone + if zone == "" { + zone = cfg.ID + } + // We do allow a nil FlushTransferer, but to keep the ring logic easier we assume // it's always set, so we use a noop FlushTransferer if flushTransferer == nil { @@ -179,7 +184,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa RingName: ringName, RingKey: ringKey, flushOnShutdown: flushOnShutdown, - Zone: cfg.Zone, + Zone: zone, actorChan: make(chan func()), From 00fe153a6088f9cf6f2fe2dadd26ed21831134d9 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Sat, 28 Mar 2020 14:18:50 -0700 Subject: [PATCH 10/10] Adding some zone based replication docs Signed-off-by: Ken Haines --- docs/guides/running.md | 4 +++- docs/guides/zone-replication.md | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 docs/guides/zone-replication.md diff --git a/docs/guides/running.md b/docs/guides/running.md index d29d8c9e5e8..817a6f2cd1e 100644 --- a/docs/guides/running.md +++ b/docs/guides/running.md @@ -51,7 +51,9 @@ Memcached is not essential but highly recommended. The standard replication factor is three, so that we can drop one replica and be unconcerned, as we still have two copies of the data left for redundancy. This is configurable: you can run with more -redundancy or less, depending on your risk appetite. +redundancy or less, depending on your risk appetite. By default +ingesters are not aware of availability zones. See [zone aware replication](zone-replication.md) +to change this. ### Schema diff --git a/docs/guides/zone-replication.md b/docs/guides/zone-replication.md new file mode 100644 index 00000000000..d02a858ed93 --- /dev/null +++ b/docs/guides/zone-replication.md @@ -0,0 +1,30 @@ +--- +title: "Ingester Hand-over" +linkTitle: "Ingester Hand-over" +weight: 5 +slug: ingester-handover +--- + +In a default configuration, time-series written to ingesters are replicated based on the container/pod name of the ingester instances. It is completely possible that all the replicas for the given time-series are held with in the same availability zone, even if the cortex infrastructure spans multiple zones within the region. Storing multiple replicas for a given time-series poses a risk for data loss if there is an outage affecting various nodes within a zone or a total outage. + +## Configuration + +Cortex can be configured to consider an availability zone value in its replication system. Doing so mitigates risks associated with losing multiple nodes with in the same availability zone. The availability zone for an ingester can be defined on the command line of the ingester using the `ingester.availability-zone` flag or using the yaml configuration: + +```yaml +ingester: + lifecycler: + availability_zone: "zone-3" +``` + +## Zone Replication Considerations + +Enabling availability zone awareness helps mitigate risks regarding data loss within a single zone, some items need consideration by an operator if they are thinking of enabling this feature. + +### Minimum number of Zones + +For cortex to function correctly, there must be at least the same number of availability zones as there is replica count. So by default, a cortex cluster should be spread over 3 zones as the default replica count is 3. It is safe to have more zones than the replica count, but it cannot be less. Having fewer availability zones than replica count causes a replica write to be missed, and in some cases, the write fails if the availability zone count is too low. + +### Cost + +Depending on the existing cortex infrastructure being used, this may cause an increase in running costs as most cloud providers charge for cross availability zone traffic. The most significant change would be for a cortex cluster currently running in a singular zone. \ No newline at end of file