From 091b71cb9e5f40d07b1c0ba20957f3f6b580cf9a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Jan 2017 10:24:58 +0000 Subject: [PATCH] Remove code to migrate from json to proto ring. --- ring/consul_client.go | 80 -------------------------------------- ring/ingester_lifecycle.go | 14 ------- ring/ring.go | 7 +--- 3 files changed, 1 insertion(+), 100 deletions(-) diff --git a/ring/consul_client.go b/ring/consul_client.go index 965cd5ab5f3..51aa2bad118 100644 --- a/ring/consul_client.go +++ b/ring/consul_client.go @@ -1,10 +1,8 @@ package ring import ( - "encoding/json" "flag" "fmt" - "sync" "time" "github.com/golang/protobuf/proto" @@ -122,84 +120,6 @@ func (p ProtoCodec) Encode(msg interface{}) ([]byte, error) { return snappy.Encode(nil, bytes), nil } -// JSONCodec is a Codec for JSON -type JSONCodec struct { - Factory func() interface{} -} - -// Decode implements Codec -func (j JSONCodec) Decode(bytes []byte) (interface{}, error) { - out := j.Factory() - if err := json.Unmarshal(bytes, out); err != nil { - return nil, err - } - return out, nil -} - -// Encode implemenrs Codec -func (j JSONCodec) Encode(msg interface{}) ([]byte, error) { - return json.Marshal(msg) -} - -// DynamicCodec is a Codec that can read json and proto, and -// that can serialise to either (selectively). -// Once it fails to decode JSON, it will start decoding (and -// writing) protos. -type DynamicCodec struct { - mtx sync.Mutex - useProto bool - json Codec - proto Codec -} - -// NewDynamicCodec makes a new DynamicCodec -func NewDynamicCodec(json, proto Codec) *DynamicCodec { - return &DynamicCodec{ - useProto: false, - json: json, - proto: proto, - } -} - -// UseProto allow you to change the Codec at runtime. -func (d *DynamicCodec) UseProto(useProto bool) { - d.mtx.Lock() - defer d.mtx.Unlock() - if d.useProto != useProto { - log.Infof("Using to proto serialization: %v", useProto) - d.useProto = useProto - } -} - -// Decode implements Codec -func (d *DynamicCodec) Decode(bytes []byte) (interface{}, error) { - d.mtx.Lock() - defer d.mtx.Unlock() - - out, err := d.json.Decode(bytes) - if err == nil { - return out, nil - } - - out, err = d.proto.Decode(bytes) - if err == nil && !d.useProto { - log.Infof("Error decoding json, switching to writing proto: %v", err) - d.useProto = true - } - - return out, err -} - -// Encode implemenrs Codec -func (d *DynamicCodec) Encode(msg interface{}) ([]byte, error) { - d.mtx.Lock() - defer d.mtx.Unlock() - if d.useProto { - return d.proto.Encode(msg) - } - return d.json.Encode(msg) -} - // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *consulClient) CAS(key string, f CASCallback) error { diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index bd8df5c359d..66141d1cd95 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -60,7 +60,6 @@ type IngesterRegistration struct { consul ConsulClient numTokens int skipUnregister bool - codec *DynamicCodec id string addr string @@ -110,7 +109,6 @@ func RegisterIngester(cfg IngesterRegistrationConfig) (*IngesterRegistration, er consul: ring.consul, numTokens: cfg.NumTokens, skipUnregister: cfg.skipUnregister, - codec: ring.codec, id: hostname, // hostname is the ip+port of this instance, written to consul so @@ -204,18 +202,6 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { ringDesc = in.(*Desc) } - // See if all ingesters can read protos; if so start writing them - allIngestersCanReadProtos := true - for _, ing := range ringDesc.Ingesters { - if !ing.ProtoRing { - allIngestersCanReadProtos = false - break - } - } - if allIngestersCanReadProtos { - r.codec.UseProto(true) - } - ingesterDesc, ok := ringDesc.Ingesters[r.id] if !ok { // consul must have restarted diff --git a/ring/ring.go b/ring/ring.go index e9217d03d72..20c11ff8d7b 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -53,7 +53,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Ring holds the information about the members of the consistent hash circle. type Ring struct { consul ConsulClient - codec *DynamicCodec quit, done chan struct{} heartbeatTimeout time.Duration @@ -67,17 +66,13 @@ type Ring struct { // New creates a new Ring func New(cfg Config) (*Ring, error) { - codec := NewDynamicCodec( - JSONCodec{Factory: DescFactory}, - ProtoCodec{Factory: ProtoDescFactory}, - ) + codec := ProtoCodec{Factory: ProtoDescFactory} consul, err := NewConsulClient(cfg.ConsulConfig, codec) if err != nil { return nil, err } r := &Ring{ consul: consul, - codec: codec, heartbeatTimeout: cfg.HeartbeatTimeout, quit: make(chan struct{}), done: make(chan struct{}),