Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 0 additions & 80 deletions ring/consul_client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package ring

import (
"encoding/json"
"flag"
"fmt"
"sync"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -122,84 +120,6 @@ func (p ProtoCodec) Encode(msg interface{}) ([]byte, error) {
return snappy.Encode(nil, bytes), nil
}

// JSONCodec is a Codec for JSON
type JSONCodec struct {
Factory func() interface{}
}

// Decode implements Codec
func (j JSONCodec) Decode(bytes []byte) (interface{}, error) {
out := j.Factory()
if err := json.Unmarshal(bytes, out); err != nil {
return nil, err
}
return out, nil
}

// Encode implemenrs Codec
func (j JSONCodec) Encode(msg interface{}) ([]byte, error) {
return json.Marshal(msg)
}

// DynamicCodec is a Codec that can read json and proto, and
// that can serialise to either (selectively).
// Once it fails to decode JSON, it will start decoding (and
// writing) protos.
type DynamicCodec struct {
mtx sync.Mutex
useProto bool
json Codec
proto Codec
}

// NewDynamicCodec makes a new DynamicCodec
func NewDynamicCodec(json, proto Codec) *DynamicCodec {
return &DynamicCodec{
useProto: false,
json: json,
proto: proto,
}
}

// UseProto allow you to change the Codec at runtime.
func (d *DynamicCodec) UseProto(useProto bool) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.useProto != useProto {
log.Infof("Using to proto serialization: %v", useProto)
d.useProto = useProto
}
}

// Decode implements Codec
func (d *DynamicCodec) Decode(bytes []byte) (interface{}, error) {
d.mtx.Lock()
defer d.mtx.Unlock()

out, err := d.json.Decode(bytes)
if err == nil {
return out, nil
}

out, err = d.proto.Decode(bytes)
if err == nil && !d.useProto {
log.Infof("Error decoding json, switching to writing proto: %v", err)
d.useProto = true
}

return out, err
}

// Encode implemenrs Codec
func (d *DynamicCodec) Encode(msg interface{}) ([]byte, error) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.useProto {
return d.proto.Encode(msg)
}
return d.json.Encode(msg)
}

// CAS atomically modifies a value in a callback.
// If value doesn't exist you'll get nil as an argument to your callback.
func (c *consulClient) CAS(key string, f CASCallback) error {
Expand Down
14 changes: 0 additions & 14 deletions ring/ingester_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type IngesterRegistration struct {
consul ConsulClient
numTokens int
skipUnregister bool
codec *DynamicCodec

id string
addr string
Expand Down Expand Up @@ -110,7 +109,6 @@ func RegisterIngester(cfg IngesterRegistrationConfig) (*IngesterRegistration, er
consul: ring.consul,
numTokens: cfg.NumTokens,
skipUnregister: cfg.skipUnregister,
codec: ring.codec,

id: hostname,
// hostname is the ip+port of this instance, written to consul so
Expand Down Expand Up @@ -204,18 +202,6 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
ringDesc = in.(*Desc)
}

// See if all ingesters can read protos; if so start writing them
allIngestersCanReadProtos := true
for _, ing := range ringDesc.Ingesters {
if !ing.ProtoRing {
allIngestersCanReadProtos = false
break
}
}
if allIngestersCanReadProtos {
r.codec.UseProto(true)
}

ingesterDesc, ok := ringDesc.Ingesters[r.id]
if !ok {
// consul must have restarted
Expand Down
7 changes: 1 addition & 6 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// Ring holds the information about the members of the consistent hash circle.
type Ring struct {
consul ConsulClient
codec *DynamicCodec
quit, done chan struct{}
heartbeatTimeout time.Duration

Expand All @@ -67,17 +66,13 @@ type Ring struct {

// New creates a new Ring
func New(cfg Config) (*Ring, error) {
codec := NewDynamicCodec(
JSONCodec{Factory: DescFactory},
ProtoCodec{Factory: ProtoDescFactory},
)
codec := ProtoCodec{Factory: ProtoDescFactory}
consul, err := NewConsulClient(cfg.ConsulConfig, codec)
if err != nil {
return nil, err
}
r := &Ring{
consul: consul,
codec: codec,
heartbeatTimeout: cfg.HeartbeatTimeout,
quit: make(chan struct{}),
done: make(chan struct{}),
Expand Down