Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/chunk/encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ func (b *bigchunk) Len() int {
}

func (b *bigchunk) Size() int {
sum := 0
sum := 2 // For the number of sub chunks.
for _, c := range b.chunks {
sum += 2 // For the length of the sub chunk.
sum += len(c.Bytes())
}
return sum
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/wire/bytes.go → pkg/chunk/storage/bytes.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package wire
package storage

import (
"bytes"
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/storage/caching_index_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;

message Entry {
bytes Column = 1 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false];
bytes Value = 2 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false];
bytes Column = 1 [(gogoproto.customtype) = "Bytes", (gogoproto.nullable) = false];
bytes Value = 2 [(gogoproto.customtype) = "Bytes", (gogoproto.nullable) = false];
}

message ReadBatch {
Expand Down
35 changes: 17 additions & 18 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package distributor

import (
"bytes"
"context"
"flag"
"fmt"
"hash/fnv"
"net/http"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -185,37 +184,37 @@ func (d *Distributor) Stop() {
d.ingesterPool.Stop()
}

func (d *Distributor) tokenForLabels(userID string, labels []client.LabelPair) (uint32, error) {
func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter) (uint32, error) {
if d.cfg.ShardByAllLabels {
return shardByAllLabels(userID, labels)
}

metricName, err := extract.MetricNameFromLabelPairs(labels)
metricName, err := extract.MetricNameFromLabelAdapters(labels)
if err != nil {
return 0, err
}
return shardByMetricName(userID, metricName), nil
}

func shardByMetricName(userID string, metricName []byte) uint32 {
h := fnv.New32()
h.Write([]byte(userID))
h.Write(metricName)
return h.Sum32()
func shardByMetricName(userID string, metricName string) uint32 {
h := client.HashNew32()
h = client.HashAdd32(h, userID)
h = client.HashAdd32(h, metricName)
return h
}

func shardByAllLabels(userID string, labels []client.LabelPair) (uint32, error) {
h := fnv.New32()
h.Write([]byte(userID))
lastLabelName := []byte{}
func shardByAllLabels(userID string, labels []client.LabelAdapter) (uint32, error) {
h := client.HashNew32()
h = client.HashAdd32(h, userID)
var lastLabelName string
for _, label := range labels {
if bytes.Compare(lastLabelName, label.Name) >= 0 {
if strings.Compare(lastLabelName, label.Name) >= 0 {
return 0, fmt.Errorf("Labels not sorted")
}
h.Write(label.Name)
h.Write(label.Value)
h = client.HashAdd32(h, label.Name)
h = client.HashAdd32(h, label.Value)
}
return h.Sum32(), nil
return h, nil
}

// Push implements client.IngesterServer
Expand All @@ -242,7 +241,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
continue
}

metricName, _ := extract.MetricNameFromLabelPairs(ts.Labels)
metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels)
samples := make([]client.Sample, 0, len(ts.Samples))
for _, s := range ts.Samples {
if err := d.limits.ValidateSample(userID, metricName, s); err != nil {
Expand Down
22 changes: 11 additions & 11 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestDistributorPush(t *testing.T) {
}

func TestDistributorPushQuery(t *testing.T) {
nameMatcher := mustEqualMatcher("__name__", "foo")
nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo")
barMatcher := mustEqualMatcher("bar", "baz")

type testcase struct {
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestDistributorPushQuery(t *testing.T) {
}

func TestSlowQueries(t *testing.T) {
nameMatcher := mustEqualMatcher("__name__", "foo")
nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo")
nIngesters := 3
for _, shardByAllLabels := range []bool{true, false} {
for happy := 0; happy <= nIngesters; happy++ {
Expand Down Expand Up @@ -300,10 +300,10 @@ func makeWriteRequest(samples int) *client.WriteRequest {
for i := 0; i < samples; i++ {
ts := client.PreallocTimeseries{
TimeSeries: client.TimeSeries{
Labels: []client.LabelPair{
{Name: []byte("__name__"), Value: []byte("foo")},
{Name: []byte("bar"), Value: []byte("baz")},
{Name: []byte("sample"), Value: []byte(fmt.Sprintf("%d", i))},
Labels: []client.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "sample", Value: fmt.Sprintf("%d", i)},
},
},
}
Expand All @@ -323,9 +323,9 @@ func expectedResponse(start, end int) model.Matrix {
for i := start; i < end; i++ {
result = append(result, &model.SampleStream{
Metric: model.Metric{
"__name__": "foo",
"bar": "baz",
"sample": model.LabelValue(fmt.Sprintf("%d", i)),
model.MetricNameLabel: "foo",
"bar": "baz",
"sample": model.LabelValue(fmt.Sprintf("%d", i)),
},
Values: []model.SamplePair{
{
Expand Down Expand Up @@ -529,11 +529,11 @@ func (i *mockIngester) AllUserStats(ctx context.Context, in *client.UserStatsReq
return &i.stats, nil
}

func match(labels []client.LabelPair, matchers []*labels.Matcher) bool {
func match(labels []client.LabelAdapter, matchers []*labels.Matcher) bool {
outer:
for _, matcher := range matchers {
for _, labels := range labels {
if matcher.Name == string(labels.Name) && matcher.Matches(string(labels.Value)) {
if matcher.Name == labels.Name && matcher.Matches(labels.Value) {
continue outer
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (d *Distributor) queryPrep(ctx context.Context, from, to model.Time, matche
// Get ingesters by metricName if one exists, otherwise get all ingesters
metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers)
if !d.cfg.ShardByAllLabels && ok && metricNameMatcher.Type == labels.MatchEqual {
replicationSet, err = d.ring.Get(shardByMetricName(userID, []byte(metricNameMatcher.Value)), ring.Read)
replicationSet, err = d.ring.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read)
} else {
replicationSet, err = d.ring.GetAll()
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/ingester/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/wire"
"github.com/stretchr/testify/require"
)

Expand All @@ -19,8 +18,8 @@ func TestMarshall(t *testing.T) {
for i := 0; i < 10; i++ {
req.Timeseries = append(req.Timeseries, PreallocTimeseries{
TimeSeries{
Labels: []LabelPair{
{wire.Bytes([]byte("foo")), wire.Bytes([]byte(strconv.Itoa(i)))},
Labels: []LabelAdapter{
{"foo", strconv.Itoa(i)},
},
Samples: []Sample{
{TimestampMs: int64(i), Value: float64(i)},
Expand Down
111 changes: 44 additions & 67 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package client

import (
"bytes"
stdjson "encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"time"
"unsafe"

Expand All @@ -16,22 +16,6 @@ import (

var json = jsoniter.ConfigCompatibleWithStandardLibrary

// FromWriteRequest converts a WriteRequest proto into an array of samples.
func FromWriteRequest(req *WriteRequest) []model.Sample {
// Just guess that there is one sample per timeseries
samples := make([]model.Sample, 0, len(req.Timeseries))
for _, ts := range req.Timeseries {
for _, s := range ts.Samples {
samples = append(samples, model.Sample{
Metric: FromLabelPairs(ts.Labels),
Value: model.SampleValue(s.Value),
Timestamp: model.Time(s.TimestampMs),
})
}
}
return samples
}

// ToWriteRequest converts an array of samples into a WriteRequest proto.
func ToWriteRequest(samples []model.Sample, source WriteRequest_SourceEnum) *WriteRequest {
req := &WriteRequest{
Expand All @@ -42,7 +26,7 @@ func ToWriteRequest(samples []model.Sample, source WriteRequest_SourceEnum) *Wri
for _, s := range samples {
ts := PreallocTimeseries{
TimeSeries: TimeSeries{
Labels: ToLabelPairs(s.Metric),
Labels: FromMetricsToLabelAdapters(s.Metric),
Samples: []Sample{
{
Value: float64(s.Value),
Expand Down Expand Up @@ -87,7 +71,7 @@ func ToQueryResponse(matrix model.Matrix) *QueryResponse {
resp := &QueryResponse{}
for _, ss := range matrix {
ts := TimeSeries{
Labels: ToLabelPairs(ss.Metric),
Labels: FromMetricsToLabelAdapters(ss.Metric),
Samples: make([]Sample, 0, len(ss.Values)),
}
for _, s := range ss.Values {
Expand All @@ -106,7 +90,7 @@ func FromQueryResponse(resp *QueryResponse) model.Matrix {
m := make(model.Matrix, 0, len(resp.Timeseries))
for _, ts := range resp.Timeseries {
var ss model.SampleStream
ss.Metric = FromLabelPairs(ts.Labels)
ss.Metric = FromLabelAdaptersToMetric(ts.Labels)
ss.Values = make([]model.SamplePair, 0, len(ts.Samples))
for _, s := range ts.Samples {
ss.Values = append(ss.Values, model.SamplePair{
Expand Down Expand Up @@ -153,7 +137,7 @@ func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (mo
func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse) []model.Metric {
metrics := []model.Metric{}
for _, m := range resp.Metric {
metrics = append(metrics, FromLabelPairs(m.Labels))
metrics = append(metrics, FromLabelAdaptersToMetric(m.Labels))
}
return metrics
}
Expand Down Expand Up @@ -208,70 +192,63 @@ func fromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
return result, nil
}

// ToLabelPairs builds a []LabelPair from a model.Metric
func ToLabelPairs(metric model.Metric) []LabelPair {
labelPairs := make([]LabelPair, 0, len(metric))
for k, v := range metric {
labelPairs = append(labelPairs, LabelPair{
Name: []byte(k),
Value: []byte(v),
})
}
sort.Sort(byLabel(labelPairs)) // The labels should be sorted upon initialisation.
return labelPairs
// FromLabelAdaptersToLabels casts []LabelAdapter to labels.Labels.
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
// This allows us to use labels.Labels directly in protos.
func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&ls))
}

type byLabel []LabelPair

func (s byLabel) Len() int { return len(s) }
func (s byLabel) Less(i, j int) bool { return bytes.Compare(s[i].Name, s[j].Name) < 0 }
func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

// FromLabelPairs unpack a []LabelPair to a model.Metric
func FromLabelPairs(labelPairs []LabelPair) model.Metric {
metric := make(model.Metric, len(labelPairs))
for _, l := range labelPairs {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
return metric
// FromLabelsToLabelAdapaters casts labels.Labels to []LabelAdapter.
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
// This allows us to use labels.Labels directly in protos.
func FromLabelsToLabelAdapaters(ls labels.Labels) []LabelAdapter {
return *(*[]LabelAdapter)(unsafe.Pointer(&ls))
}

// FromLabelPairsToLabels unpack a []LabelPair to a labels.Labels
func FromLabelPairsToLabels(labelPairs []LabelPair) labels.Labels {
ls := make(labels.Labels, 0, len(labelPairs))
for _, l := range labelPairs {
ls = append(ls, labels.Label{
Name: string(l.Name),
Value: string(l.Value),
})
// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric.
// Don't do this on any performance sensitive paths.
func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric {
result := make(model.Metric, len(ls))
for _, l := range ls {
result[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
return ls
return result
}

// FromLabelsToLabelPairs converts labels.Labels to []LabelPair
func FromLabelsToLabelPairs(s labels.Labels) []LabelPair {
labelPairs := make([]LabelPair, 0, len(s))
for _, v := range s {
labelPairs = append(labelPairs, LabelPair{
Name: []byte(v.Name),
Value: []byte(v.Value),
// FromMetricsToLabelAdapters converts model.Metric to []LabelAdapter.
// Don't do this on any performance sensitive paths.
// The result is sorted.
func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
result := make([]LabelAdapter, 0, len(metric))
for k, v := range metric {
result = append(result, LabelAdapter{
Name: string(k),
Value: string(v),
})
}
return labelPairs // note already sorted
sort.Sort(byLabel(result)) // The labels should be sorted upon initialisation.
return result
}

type byLabel []LabelAdapter

func (s byLabel) Len() int { return len(s) }
func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 }
func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

// FastFingerprint runs the same algorithm as Prometheus labelSetToFastFingerprint()
func FastFingerprint(labelPairs []LabelPair) model.Fingerprint {
if len(labelPairs) == 0 {
func FastFingerprint(ls []LabelAdapter) model.Fingerprint {
if len(ls) == 0 {
return model.Metric(nil).FastFingerprint()
}

var result uint64
for _, pair := range labelPairs {
for _, l := range ls {
sum := hashNew()
sum = hashAdd(sum, pair.Name)
sum = hashAdd(sum, l.Name)
sum = hashAddByte(sum, model.SeparatorByte)
sum = hashAdd(sum, pair.Value)
sum = hashAdd(sum, l.Value)
result ^= sum
}
return model.Fingerprint(result)
Expand Down
Loading