From eeaa90e1f5f6213cc3b919df31968c9d8b8eae5b Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 28 Mar 2019 10:20:57 +0000 Subject: [PATCH 1/5] Benchmark for QueryStream. Signed-off-by: Tom Wilkie --- pkg/ingester/query_test.go | 111 +++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 pkg/ingester/query_test.go diff --git a/pkg/ingester/query_test.go b/pkg/ingester/query_test.go new file mode 100644 index 00000000000..edb384cee38 --- /dev/null +++ b/pkg/ingester/query_test.go @@ -0,0 +1,111 @@ +package ingester + +import ( + "fmt" + "io" + "net" + "testing" + "time" + + "google.golang.org/grpc" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "golang.org/x/net/context" + + "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/wire" +) + +func BenchmarkQueryStream(b *testing.B) { + cfg := defaultIngesterTestConfig() + clientCfg := defaultClientTestConfig() + limits := defaultLimitsTestConfig() + + const ( + numSeries = 1e6 // Put 1 million timeseries, each with 100 samples. + numSamples = 100 + numCPUs = 32 + ) + + encoding.DefaultEncoding = encoding.Bigchunk + limits.MaxSeriesPerMetric = numSeries + limits.MaxSeriesPerQuery = numSeries + cfg.FlushCheckPeriod = 15 * time.Minute + _, ing := newTestStore(b, cfg, clientCfg, limits) + // defer ing.Shutdown() + + ctx := user.InjectOrgID(context.Background(), "1") + instances := make([]string, numSeries/numCPUs) + for i := 0; i < numSeries/numCPUs; i++ { + instances[i] = fmt.Sprintf("node%04d", i) + } + cpus := make([]string, numCPUs) + for i := 0; i < numCPUs; i++ { + cpus[i] = fmt.Sprintf("cpu%02d", i) + } + + for i := 0; i < numSeries; i++ { + labels := labelPairs{ + {Name: wire.Bytes("__name__"), Value: wire.Bytes("node_cpu")}, + {Name: wire.Bytes("job"), Value: wire.Bytes("node_exporter")}, + {Name: wire.Bytes("instance"), Value: wire.Bytes(instances[i/numCPUs])}, + {Name: wire.Bytes("cpu"), Value: wire.Bytes(cpus[i%numCPUs])}, + } + + state, fp, series, err := ing.userStates.getOrCreateSeries(ctx, labels) + require.NoError(b, err) + + for j := 0; j < numSamples; j++ { + err = series.add(model.SamplePair{ + Value: model.SampleValue(float64(i)), + Timestamp: model.Time(int64(i)), + }) + require.NoError(b, err) + } + + state.fpLocker.Unlock(fp) + } + + server := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor)) + defer server.GracefulStop() + client.RegisterIngesterServer(server, ing) + + l, err := net.Listen("tcp", "localhost:0") + require.NoError(b, err) + go server.Serve(l) + + b.ResetTimer() + for iter := 0; iter < b.N; iter++ { + b.Run("QueryStream", func(b *testing.B) { + c, err := client.MakeIngesterClient(l.Addr().String(), clientCfg) + require.NoError(b, err) + defer c.Close() + + s, err := c.QueryStream(ctx, &client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: numSamples, + Matchers: []*client.LabelMatcher{{ + Type: client.EQUAL, + Name: model.MetricNameLabel, + Value: "node_cpu", + }}, + }) + require.NoError(b, err) + + count := 0 + for { + resp, err := s.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) + count += len(resp.Timeseries) + } + require.Equal(b, count, int(numSeries)) + }) + } +} From 89fda869a37642e4ec3ef73cb11857641cfe9332 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 28 Mar 2019 11:03:42 +0000 Subject: [PATCH 2/5] Remove the sort from the inverted index Lookup, its not actually needed. Signed-off-by: Tom Wilkie --- pkg/ingester/index/index.go | 1 - pkg/ingester/user_state.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 32771046379..ef660dae15b 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -49,7 +49,6 @@ func (ii *InvertedIndex) Lookup(matchers []*labels.Matcher) []model.Fingerprint result = append(result, fps...) } - sort.Sort(fingerprints(result)) return result } diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index dfc9ab54e1f..37a3dd67ef6 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -279,7 +279,7 @@ func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels level.Debug(log).Log("series", len(fps)) - // fps is sorted, lock them in order to prevent deadlocks + // We only hold one FP lock at once here, so no opportunity to deadlock. i := 0 outer: for ; i < len(fps); i++ { From f416b18207e33963f6565ba35a13031ca2a066b8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 28 Mar 2019 11:15:23 +0000 Subject: [PATCH 3/5] Size encoded chunk buffer correctly. Signed-off-by: Tom Wilkie --- pkg/chunk/encoding/bigchunk.go | 3 ++- pkg/ingester/transfer.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/chunk/encoding/bigchunk.go b/pkg/chunk/encoding/bigchunk.go index 29746f87e56..0282c08e997 100644 --- a/pkg/chunk/encoding/bigchunk.go +++ b/pkg/chunk/encoding/bigchunk.go @@ -161,8 +161,9 @@ func (b *bigchunk) Len() int { } func (b *bigchunk) Size() int { - sum := 0 + sum := 2 // For the number of sub chunks. for _, c := range b.chunks { + sum += 2 // For the length of the sub chunk. sum += len(c.Bytes()) } return sum diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index fad58e3464c..2332272b8e3 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -146,7 +146,7 @@ func toWireChunks(descs []*desc) ([]client.Chunk, error) { Encoding: int32(d.C.Encoding()), } - buf := bytes.NewBuffer(make([]byte, 0, encoding.ChunkLen)) + buf := bytes.NewBuffer(make([]byte, 0, d.C.Size())) if err := d.C.Marshal(buf); err != nil { return nil, err } From 84ab021621423d06d04dd2a60c2609957b5e5dcf Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 28 Mar 2019 11:19:10 +0000 Subject: [PATCH 4/5] Up QueryStream batch size to 128. Signed-off-by: Tom Wilkie --- pkg/ingester/ingester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c4f911bd995..f2b8f78b7d7 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -35,7 +35,7 @@ const ( duplicateSample = "multiple_values_for_timestamp" // Number of timeseries to return in each batch of a QueryStream. - queryStreamBatchSize = 10 + queryStreamBatchSize = 128 ) var ( From 922a3b989a9e7da45f2ee40a7101068e7e4be71a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 28 Mar 2019 10:22:04 +0000 Subject: [PATCH 5/5] Big change: Remove the label copying in QueryStream. This change removes the use of LabelPair and wire.Bytes. We directly marshal and unmarshal from the bytes on the wire into labels.Labels, via LabelsAdapater. I've migrated everywhere I could find to these new types. We can almost completely remove wire.Bytes, execpt for its use in the caching index proto. So I moved the type there. I'd draw attention to the use of yoloString and the use of unsafe for casting between []LabelAdapater and labels.Labels. They should be safe as we take a copy of the label string in the inverted index. Also: - changed all occurances of `__name__` to model.MetricNameLabel. - removed a bunch of unused functions for converting between old types. - make the FromXXXToYYY naming consistent in compat.go. - fork stdlib fnv32 hashing like we have with fnv64. Signed-off-by: Tom Wilkie --- pkg/{util/wire => chunk/storage}/bytes.go | 2 +- pkg/chunk/storage/caching_index_client.proto | 4 +- pkg/distributor/distributor.go | 35 ++-- pkg/distributor/distributor_test.go | 22 +-- pkg/distributor/query.go | 2 +- pkg/ingester/client/client_test.go | 5 +- pkg/ingester/client/compat.go | 111 +++++------ pkg/ingester/client/compat_test.go | 21 -- pkg/ingester/client/cortex.proto | 10 +- pkg/ingester/client/fnv.go | 18 +- pkg/ingester/client/timeseries.go | 187 +++++++++++++++++- pkg/ingester/flush.go | 3 +- pkg/ingester/index/index.go | 24 ++- pkg/ingester/index/index_test.go | 2 +- pkg/ingester/ingester.go | 17 +- pkg/ingester/ingester_test.go | 16 +- pkg/ingester/label_pairs.go | 30 +-- pkg/ingester/label_pairs_test.go | 24 +-- pkg/ingester/lifecycle_test.go | 2 +- pkg/ingester/locker.go | 4 +- pkg/ingester/mapper_test.go | 41 ++-- pkg/ingester/query_test.go | 9 +- pkg/ingester/transfer.go | 2 +- pkg/ingester/user_state.go | 4 +- pkg/querier/frontend/frontend.proto | 2 +- pkg/querier/frontend/query_range.go | 8 +- pkg/querier/frontend/query_range_test.go | 13 +- pkg/querier/frontend/results_cache_test.go | 16 +- pkg/querier/ingester_streaming_queryable.go | 4 +- .../ingester_streaming_queryable_test.go | 9 +- pkg/querier/querier_test.go | 7 +- pkg/querier/remote_read_test.go | 8 +- pkg/util/chunkcompat/compat.go | 2 +- pkg/util/extract/extract.go | 10 +- pkg/util/hash_fp.go | 4 +- pkg/util/validation/validate.go | 10 +- pkg/util/validation/validate_test.go | 2 +- 37 files changed, 408 insertions(+), 282 deletions(-) rename pkg/{util/wire => chunk/storage}/bytes.go (97%) diff --git a/pkg/util/wire/bytes.go b/pkg/chunk/storage/bytes.go similarity index 97% rename from pkg/util/wire/bytes.go rename to pkg/chunk/storage/bytes.go index dfabadd8e06..c4804995ff8 100644 --- a/pkg/util/wire/bytes.go +++ b/pkg/chunk/storage/bytes.go @@ -1,4 +1,4 @@ -package wire +package storage import ( "bytes" diff --git a/pkg/chunk/storage/caching_index_client.proto b/pkg/chunk/storage/caching_index_client.proto index 1c22c94c8ab..2621db517bb 100644 --- a/pkg/chunk/storage/caching_index_client.proto +++ b/pkg/chunk/storage/caching_index_client.proto @@ -8,8 +8,8 @@ option (gogoproto.marshaler_all) = true; option (gogoproto.unmarshaler_all) = true; message Entry { - bytes Column = 1 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; - bytes Value = 2 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; + bytes Column = 1 [(gogoproto.customtype) = "Bytes", (gogoproto.nullable) = false]; + bytes Value = 2 [(gogoproto.customtype) = "Bytes", (gogoproto.nullable) = false]; } message ReadBatch { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index c1a07e984c5..753da6b9add 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1,13 +1,12 @@ package distributor import ( - "bytes" "context" "flag" "fmt" - "hash/fnv" "net/http" "sort" + "strings" "sync" "time" @@ -185,37 +184,37 @@ func (d *Distributor) Stop() { d.ingesterPool.Stop() } -func (d *Distributor) tokenForLabels(userID string, labels []client.LabelPair) (uint32, error) { +func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter) (uint32, error) { if d.cfg.ShardByAllLabels { return shardByAllLabels(userID, labels) } - metricName, err := extract.MetricNameFromLabelPairs(labels) + metricName, err := extract.MetricNameFromLabelAdapters(labels) if err != nil { return 0, err } return shardByMetricName(userID, metricName), nil } -func shardByMetricName(userID string, metricName []byte) uint32 { - h := fnv.New32() - h.Write([]byte(userID)) - h.Write(metricName) - return h.Sum32() +func shardByMetricName(userID string, metricName string) uint32 { + h := client.HashNew32() + h = client.HashAdd32(h, userID) + h = client.HashAdd32(h, metricName) + return h } -func shardByAllLabels(userID string, labels []client.LabelPair) (uint32, error) { - h := fnv.New32() - h.Write([]byte(userID)) - lastLabelName := []byte{} +func shardByAllLabels(userID string, labels []client.LabelAdapter) (uint32, error) { + h := client.HashNew32() + h = client.HashAdd32(h, userID) + var lastLabelName string for _, label := range labels { - if bytes.Compare(lastLabelName, label.Name) >= 0 { + if strings.Compare(lastLabelName, label.Name) >= 0 { return 0, fmt.Errorf("Labels not sorted") } - h.Write(label.Name) - h.Write(label.Value) + h = client.HashAdd32(h, label.Name) + h = client.HashAdd32(h, label.Value) } - return h.Sum32(), nil + return h, nil } // Push implements client.IngesterServer @@ -242,7 +241,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie continue } - metricName, _ := extract.MetricNameFromLabelPairs(ts.Labels) + metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels) samples := make([]client.Sample, 0, len(ts.Samples)) for _, s := range ts.Samples { if err := d.limits.ValidateSample(userID, metricName, s); err != nil { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 7eb33443ca2..29fbbb4c599 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -107,7 +107,7 @@ func TestDistributorPush(t *testing.T) { } func TestDistributorPushQuery(t *testing.T) { - nameMatcher := mustEqualMatcher("__name__", "foo") + nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo") barMatcher := mustEqualMatcher("bar", "baz") type testcase struct { @@ -219,7 +219,7 @@ func TestDistributorPushQuery(t *testing.T) { } func TestSlowQueries(t *testing.T) { - nameMatcher := mustEqualMatcher("__name__", "foo") + nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo") nIngesters := 3 for _, shardByAllLabels := range []bool{true, false} { for happy := 0; happy <= nIngesters; happy++ { @@ -300,10 +300,10 @@ func makeWriteRequest(samples int) *client.WriteRequest { for i := 0; i < samples; i++ { ts := client.PreallocTimeseries{ TimeSeries: client.TimeSeries{ - Labels: []client.LabelPair{ - {Name: []byte("__name__"), Value: []byte("foo")}, - {Name: []byte("bar"), Value: []byte("baz")}, - {Name: []byte("sample"), Value: []byte(fmt.Sprintf("%d", i))}, + Labels: []client.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "bar", Value: "baz"}, + {Name: "sample", Value: fmt.Sprintf("%d", i)}, }, }, } @@ -323,9 +323,9 @@ func expectedResponse(start, end int) model.Matrix { for i := start; i < end; i++ { result = append(result, &model.SampleStream{ Metric: model.Metric{ - "__name__": "foo", - "bar": "baz", - "sample": model.LabelValue(fmt.Sprintf("%d", i)), + model.MetricNameLabel: "foo", + "bar": "baz", + "sample": model.LabelValue(fmt.Sprintf("%d", i)), }, Values: []model.SamplePair{ { @@ -529,11 +529,11 @@ func (i *mockIngester) AllUserStats(ctx context.Context, in *client.UserStatsReq return &i.stats, nil } -func match(labels []client.LabelPair, matchers []*labels.Matcher) bool { +func match(labels []client.LabelAdapter, matchers []*labels.Matcher) bool { outer: for _, matcher := range matchers { for _, labels := range labels { - if matcher.Name == string(labels.Name) && matcher.Matches(string(labels.Value)) { + if matcher.Name == labels.Name && matcher.Matches(labels.Value) { continue outer } } diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 3402fc08ffb..9c1e2b5422e 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -68,7 +68,7 @@ func (d *Distributor) queryPrep(ctx context.Context, from, to model.Time, matche // Get ingesters by metricName if one exists, otherwise get all ingesters metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers) if !d.cfg.ShardByAllLabels && ok && metricNameMatcher.Type == labels.MatchEqual { - replicationSet, err = d.ring.Get(shardByMetricName(userID, []byte(metricNameMatcher.Value)), ring.Read) + replicationSet, err = d.ring.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read) } else { replicationSet, err = d.ring.GetAll() } diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 2ca72ce328d..d5ffe626144 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/stretchr/testify/require" ) @@ -19,8 +18,8 @@ func TestMarshall(t *testing.T) { for i := 0; i < 10; i++ { req.Timeseries = append(req.Timeseries, PreallocTimeseries{ TimeSeries{ - Labels: []LabelPair{ - {wire.Bytes([]byte("foo")), wire.Bytes([]byte(strconv.Itoa(i)))}, + Labels: []LabelAdapter{ + {"foo", strconv.Itoa(i)}, }, Samples: []Sample{ {TimestampMs: int64(i), Value: float64(i)}, diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index f177eefabf3..f95d1e27877 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -1,11 +1,11 @@ package client import ( - "bytes" stdjson "encoding/json" "fmt" "sort" "strconv" + "strings" "time" "unsafe" @@ -16,22 +16,6 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary -// FromWriteRequest converts a WriteRequest proto into an array of samples. -func FromWriteRequest(req *WriteRequest) []model.Sample { - // Just guess that there is one sample per timeseries - samples := make([]model.Sample, 0, len(req.Timeseries)) - for _, ts := range req.Timeseries { - for _, s := range ts.Samples { - samples = append(samples, model.Sample{ - Metric: FromLabelPairs(ts.Labels), - Value: model.SampleValue(s.Value), - Timestamp: model.Time(s.TimestampMs), - }) - } - } - return samples -} - // ToWriteRequest converts an array of samples into a WriteRequest proto. func ToWriteRequest(samples []model.Sample, source WriteRequest_SourceEnum) *WriteRequest { req := &WriteRequest{ @@ -42,7 +26,7 @@ func ToWriteRequest(samples []model.Sample, source WriteRequest_SourceEnum) *Wri for _, s := range samples { ts := PreallocTimeseries{ TimeSeries: TimeSeries{ - Labels: ToLabelPairs(s.Metric), + Labels: FromMetricsToLabelAdapters(s.Metric), Samples: []Sample{ { Value: float64(s.Value), @@ -87,7 +71,7 @@ func ToQueryResponse(matrix model.Matrix) *QueryResponse { resp := &QueryResponse{} for _, ss := range matrix { ts := TimeSeries{ - Labels: ToLabelPairs(ss.Metric), + Labels: FromMetricsToLabelAdapters(ss.Metric), Samples: make([]Sample, 0, len(ss.Values)), } for _, s := range ss.Values { @@ -106,7 +90,7 @@ func FromQueryResponse(resp *QueryResponse) model.Matrix { m := make(model.Matrix, 0, len(resp.Timeseries)) for _, ts := range resp.Timeseries { var ss model.SampleStream - ss.Metric = FromLabelPairs(ts.Labels) + ss.Metric = FromLabelAdaptersToMetric(ts.Labels) ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) for _, s := range ts.Samples { ss.Values = append(ss.Values, model.SamplePair{ @@ -153,7 +137,7 @@ func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (mo func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse) []model.Metric { metrics := []model.Metric{} for _, m := range resp.Metric { - metrics = append(metrics, FromLabelPairs(m.Labels)) + metrics = append(metrics, FromLabelAdaptersToMetric(m.Labels)) } return metrics } @@ -208,70 +192,63 @@ func fromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) { return result, nil } -// ToLabelPairs builds a []LabelPair from a model.Metric -func ToLabelPairs(metric model.Metric) []LabelPair { - labelPairs := make([]LabelPair, 0, len(metric)) - for k, v := range metric { - labelPairs = append(labelPairs, LabelPair{ - Name: []byte(k), - Value: []byte(v), - }) - } - sort.Sort(byLabel(labelPairs)) // The labels should be sorted upon initialisation. - return labelPairs +// FromLabelAdaptersToLabels casts []LabelAdapter to labels.Labels. +// It uses unsafe, but as LabelAdapter == labels.Label this should be safe. +// This allows us to use labels.Labels directly in protos. +func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels { + return *(*labels.Labels)(unsafe.Pointer(&ls)) } -type byLabel []LabelPair - -func (s byLabel) Len() int { return len(s) } -func (s byLabel) Less(i, j int) bool { return bytes.Compare(s[i].Name, s[j].Name) < 0 } -func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -// FromLabelPairs unpack a []LabelPair to a model.Metric -func FromLabelPairs(labelPairs []LabelPair) model.Metric { - metric := make(model.Metric, len(labelPairs)) - for _, l := range labelPairs { - metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - return metric +// FromLabelsToLabelAdapaters casts labels.Labels to []LabelAdapter. +// It uses unsafe, but as LabelAdapter == labels.Label this should be safe. +// This allows us to use labels.Labels directly in protos. +func FromLabelsToLabelAdapaters(ls labels.Labels) []LabelAdapter { + return *(*[]LabelAdapter)(unsafe.Pointer(&ls)) } -// FromLabelPairsToLabels unpack a []LabelPair to a labels.Labels -func FromLabelPairsToLabels(labelPairs []LabelPair) labels.Labels { - ls := make(labels.Labels, 0, len(labelPairs)) - for _, l := range labelPairs { - ls = append(ls, labels.Label{ - Name: string(l.Name), - Value: string(l.Value), - }) +// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric. +// Don't do this on any performance sensitive paths. +func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric { + result := make(model.Metric, len(ls)) + for _, l := range ls { + result[model.LabelName(l.Name)] = model.LabelValue(l.Value) } - return ls + return result } -// FromLabelsToLabelPairs converts labels.Labels to []LabelPair -func FromLabelsToLabelPairs(s labels.Labels) []LabelPair { - labelPairs := make([]LabelPair, 0, len(s)) - for _, v := range s { - labelPairs = append(labelPairs, LabelPair{ - Name: []byte(v.Name), - Value: []byte(v.Value), +// FromMetricsToLabelAdapters converts model.Metric to []LabelAdapter. +// Don't do this on any performance sensitive paths. +// The result is sorted. +func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter { + result := make([]LabelAdapter, 0, len(metric)) + for k, v := range metric { + result = append(result, LabelAdapter{ + Name: string(k), + Value: string(v), }) } - return labelPairs // note already sorted + sort.Sort(byLabel(result)) // The labels should be sorted upon initialisation. + return result } +type byLabel []LabelAdapter + +func (s byLabel) Len() int { return len(s) } +func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 } +func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + // FastFingerprint runs the same algorithm as Prometheus labelSetToFastFingerprint() -func FastFingerprint(labelPairs []LabelPair) model.Fingerprint { - if len(labelPairs) == 0 { +func FastFingerprint(ls []LabelAdapter) model.Fingerprint { + if len(ls) == 0 { return model.Metric(nil).FastFingerprint() } var result uint64 - for _, pair := range labelPairs { + for _, l := range ls { sum := hashNew() - sum = hashAdd(sum, pair.Name) + sum = hashAdd(sum, l.Name) sum = hashAddByte(sum, model.SeparatorByte) - sum = hashAdd(sum, pair.Value) + sum = hashAdd(sum, l.Value) result ^= sum } return model.Fingerprint(result) diff --git a/pkg/ingester/client/compat_test.go b/pkg/ingester/client/compat_test.go index d89be185c79..c9f66818ad2 100644 --- a/pkg/ingester/client/compat_test.go +++ b/pkg/ingester/client/compat_test.go @@ -8,29 +8,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/weaveworks/common/test" ) -func TestWriteRequest(t *testing.T) { - want := []model.Sample{} - for i := 0; i < 10; i++ { - want = append(want, model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: model.LabelValue(fmt.Sprintf("testmetric_%d", i)), - model.JobLabel: "testjob", - }, - Timestamp: model.Time(i), - Value: model.SampleValue(float64(i)), - }) - } - - have := FromWriteRequest(ToWriteRequest(want, API)) - - if !reflect.DeepEqual(want, have) { - t.Fatalf(test.Diff(want, have)) - } -} - func TestQueryRequest(t *testing.T) { from, to := model.Time(int64(0)), model.Time(int64(10)) matchers := []*labels.Matcher{} diff --git a/pkg/ingester/client/cortex.proto b/pkg/ingester/client/cortex.proto index 02e17e35b8a..e1659919920 100644 --- a/pkg/ingester/client/cortex.proto +++ b/pkg/ingester/client/cortex.proto @@ -104,7 +104,7 @@ message MetricsForLabelMatchersResponse { message TimeSeriesChunk { string from_ingester_id = 1; string user_id = 2; - repeated LabelPair labels = 3 [(gogoproto.nullable) = false]; + repeated LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; repeated Chunk chunks = 4 [(gogoproto.nullable) = false]; } @@ -119,14 +119,14 @@ message TransferChunksResponse { } message TimeSeries { - repeated LabelPair labels = 1 [(gogoproto.nullable) = false]; + repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; // Sorted by time, oldest sample first. repeated Sample samples = 2 [(gogoproto.nullable) = false]; } message LabelPair { - bytes name = 1 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; - bytes value = 2 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; + bytes name = 1; + bytes value = 2; } message Sample { @@ -139,7 +139,7 @@ message LabelMatchers { } message Metric { - repeated LabelPair labels = 1 [(gogoproto.nullable) = false]; + repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; } enum MatchType { diff --git a/pkg/ingester/client/fnv.go b/pkg/ingester/client/fnv.go index 41453b10aaa..f151186f0e3 100644 --- a/pkg/ingester/client/fnv.go +++ b/pkg/ingester/client/fnv.go @@ -19,6 +19,8 @@ package client const ( offset64 = 14695981039346656037 prime64 = 1099511628211 + offset32 = 2166136261 + prime32 = 16777619 ) // hashNew initializies a new fnv64a hash value. @@ -27,7 +29,7 @@ func hashNew() uint64 { } // hashAdd adds a string to a fnv64a hash value, returning the updated hash. -func hashAdd(h uint64, s []byte) uint64 { +func hashAdd(h uint64, s string) uint64 { for i := 0; i < len(s); i++ { h ^= uint64(s[i]) h *= prime64 @@ -41,3 +43,17 @@ func hashAddByte(h uint64, b byte) uint64 { h *= prime64 return h } + +// HashNew32 initializies a new fnv64a hash value. +func HashNew32() uint32 { + return offset32 +} + +// HashAdd32 adds a string to a fnv64a hash value, returning the updated hash. +func HashAdd32(h uint32, s string) uint32 { + for i := 0; i < len(s); i++ { + h ^= uint32(s[i]) + h *= prime32 + } + return h +} diff --git a/pkg/ingester/client/timeseries.go b/pkg/ingester/client/timeseries.go index b5b46b9ab87..8c7cc40588c 100644 --- a/pkg/ingester/client/timeseries.go +++ b/pkg/ingester/client/timeseries.go @@ -1,6 +1,14 @@ package client -import "flag" +import ( + "flag" + "fmt" + "io" + "strings" + "unsafe" + + "github.com/prometheus/prometheus/pkg/labels" +) var ( expectedTimeseries = 100 @@ -37,7 +45,182 @@ type PreallocTimeseries struct { // Unmarshal implements proto.Message. func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { - p.Labels = make([]LabelPair, 0, expectedLabels) + p.Labels = make([]LabelAdapter, 0, expectedLabels) p.Samples = make([]Sample, 0, expectedSamplesPerSeries) return p.TimeSeries.Unmarshal(dAtA) } + +// LabelAdapter is a labels.Label that can be marshalled to/from protos. +type LabelAdapter labels.Label + +// Marshal implements proto.Marshaller. +func (bs *LabelAdapter) Marshal() ([]byte, error) { + buf := make([]byte, bs.Size()) + _, err := bs.MarshalTo(buf) + return buf, err +} + +// MarshalTo implements proto.Marshaller. +func (bs *LabelAdapter) MarshalTo(buf []byte) (n int, err error) { + var i int + ls := (*labels.Label)(bs) + + buf[i] = 0xa + i++ + i = encodeVarintCortex(buf, i, uint64(len(ls.Name))) + i += copy(buf[i:], ls.Name) + + buf[i] = 0x12 + i++ + i = encodeVarintCortex(buf, i, uint64(len(ls.Value))) + i += copy(buf[i:], ls.Value) + + return i, nil +} + +// Unmarshal a LabelAdapater, implements proto.Unmarshaller. +// NB this is a copy of the autogenerated code to unmarshal a LabelPair, +// with the byte copying replaced with a yoloString. +func (bs *LabelAdapter) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelPair: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + bs.Name = yoloString(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + bs.Value = yoloString(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCortex(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} + +func yoloString(buf []byte) string { + return *((*string)(unsafe.Pointer(&buf))) +} + +// Size implements proto.Sizer. +func (bs *LabelAdapter) Size() int { + ls := (*labels.Label)(bs) + var n int + l := len(ls.Name) + n += 1 + l + sovCortex(uint64(l)) + l = len(ls.Value) + n += 1 + l + sovCortex(uint64(l)) + return n +} + +// Equal implements proto.Equaler. +func (bs *LabelAdapter) Equal(other LabelAdapter) bool { + return bs.Name == other.Name && bs.Value == other.Value +} + +// Compare implements proto.Comparer. +func (bs *LabelAdapter) Compare(other LabelAdapter) int { + if c := strings.Compare(bs.Name, other.Name); c != 0 { + return c + } + return strings.Compare(bs.Value, other.Value) +} diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index e31a64ffc69..ddde3a0f717 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/common/model" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" "github.com/weaveworks/common/user" ) @@ -276,7 +277,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. sp.SetTag("organization", userID) util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric, "queue", flushQueueIndex) - err := i.flushChunks(ctx, fp, labelsToMetric(series.metric), chunks) + err := i.flushChunks(ctx, fp, client.FromLabelAdaptersToMetric(client.FromLabelsToLabelAdapaters(series.metric)), chunks) if err != nil { return err } diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index ef660dae15b..6e072e70335 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -32,7 +32,7 @@ func New() *InvertedIndex { } // Add a fingerprint under the specified labels. -func (ii *InvertedIndex) Add(labels []client.LabelPair, fp model.Fingerprint) labels.Labels { +func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard := &ii.shards[util.HashFP(fp)%indexShards] return shard.add(labels, fp) } @@ -104,25 +104,31 @@ type indexShard struct { pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(unlockIndex{})]byte } +func copyString(s string) string { + return string([]byte(s)) +} + // add metric to the index; return all the name/value pairs as strings from the index, sorted -func (shard *indexShard) add(metric []client.LabelPair, fp model.Fingerprint) labels.Labels { +func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard.mtx.Lock() defer shard.mtx.Unlock() internedLabels := make(labels.Labels, len(metric)) for i, pair := range metric { - values, ok := shard.idx[string(pair.Name)] + values, ok := shard.idx[pair.Name] if !ok { values = indexEntry{ - name: string(pair.Name), + name: copyString(pair.Name), fps: map[string]indexValueEntry{}, } shard.idx[values.name] = values } - fingerprints, ok := values.fps[string(pair.Value)] + fingerprints, ok := values.fps[pair.Value] if !ok { - fingerprints = indexValueEntry{value: string(pair.Value)} + fingerprints = indexValueEntry{ + value: copyString(pair.Value), + } } // Insert into the right position to keep fingerprints sorted j := sort.Search(len(fingerprints.fps), func(i int) bool { @@ -132,7 +138,7 @@ func (shard *indexShard) add(metric []client.LabelPair, fp model.Fingerprint) la copy(fingerprints.fps[j+1:], fingerprints.fps[j:]) fingerprints.fps[j] = fp values.fps[fingerprints.value] = fingerprints - internedLabels[i] = labels.Label{Name: string(values.name), Value: string(fingerprints.value)} + internedLabels[i] = labels.Label{Name: values.name, Value: fingerprints.value} } sort.Sort(internedLabels) return internedLabels @@ -161,7 +167,7 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint // accumulate the matching fingerprints (which are all distinct) // then sort to maintain the invariant for value, fps := range values.fps { - if matcher.Matches(string(value)) { + if matcher.Matches(value) { toIntersect = append(toIntersect, fps.fps...) } } @@ -212,7 +218,7 @@ func (shard *indexShard) delete(labels labels.Labels, fp model.Fingerprint) { defer shard.mtx.Unlock() for _, pair := range labels { - name, value := string(pair.Name), string(pair.Value) + name, value := pair.Name, pair.Value values, ok := shard.idx[name] if !ok { continue diff --git a/pkg/ingester/index/index_test.go b/pkg/ingester/index/index_test.go index 6c2fc30b74c..9b3351c55f3 100644 --- a/pkg/ingester/index/index_test.go +++ b/pkg/ingester/index/index_test.go @@ -23,7 +23,7 @@ func TestIndex(t *testing.T) { {model.Metric{"foo": "baz", "flip": "flop"}, 1}, {model.Metric{"foo": "baz", "flip": "flap"}, 0}, } { - index.Add(client.ToLabelPairs(entry.m), entry.fp) + index.Add(client.FromMetricsToLabelAdapters(entry.m), entry.fp) } for _, tc := range []struct { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f2b8f78b7d7..0556798ad6c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" cortex_chunk "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -345,7 +346,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client } ts := client.TimeSeries{ - Labels: client.FromLabelsToLabelPairs(series.metric), + Labels: client.FromLabelsToLabelAdapaters(series.metric), Samples: make([]client.Sample, 0, len(values)), } for _, s := range values { @@ -404,7 +405,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ numChunks += len(wireChunks) batch = append(batch, client.TimeSeriesChunk{ - Labels: client.FromLabelsToLabelPairs(series.metric), + Labels: client.FromLabelsToLabelAdapaters(series.metric), Chunks: wireChunks, }) @@ -485,11 +486,11 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr return nil, err } - metrics := map[model.Fingerprint]labelPairs{} + lss := map[model.Fingerprint]labels.Labels{} for _, matchers := range matchersSet { if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error { - if _, ok := metrics[fp]; !ok { - metrics[fp] = client.FromLabelsToLabelPairs(series.metric) + if _, ok := lss[fp]; !ok { + lss[fp] = series.metric } return nil }, nil, 0); err != nil { @@ -498,10 +499,10 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr } result := &client.MetricsForLabelMatchersResponse{ - Metric: make([]*client.Metric, 0, len(metrics)), + Metric: make([]*client.Metric, 0, len(lss)), } - for _, metric := range metrics { - result.Metric = append(result.Metric, &client.Metric{Labels: metric}) + for _, ls := range lss { + result.Metric = append(result.Metric, &client.Metric{Labels: client.FromLabelsToLabelAdapaters(ls)}) } return result, nil diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index a9c0012162e..0fd867c9b69 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -23,7 +23,6 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" ) @@ -156,6 +155,7 @@ func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries in _, err := ing.Push(ctx, client.ToWriteRequest(matrixToSamples(testData[userID]), client.API)) require.NoError(t, err) } + return userIDs, testData } @@ -242,7 +242,7 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) { defer ing.Shutdown() m := labelPairs{ - {Name: []byte(model.MetricNameLabel), Value: []byte("testmetric")}, + {Name: model.MetricNameLabel, Value: "testmetric"}, } ctx := user.InjectOrgID(context.Background(), userID) err := ing.append(ctx, m, 1, 0, client.API) @@ -273,9 +273,9 @@ func TestIngesterAppendBlankLabel(t *testing.T) { defer ing.Shutdown() lp := labelPairs{ - {Name: []byte(model.MetricNameLabel), Value: []byte("testmetric")}, - {Name: []byte("foo"), Value: []byte("")}, - {Name: []byte("bar"), Value: []byte("")}, + {Name: model.MetricNameLabel, Value: "testmetric"}, + {Name: "foo", Value: ""}, + {Name: "bar", Value: ""}, } ctx := user.InjectOrgID(context.Background(), userID) err := ing.append(ctx, lp, 1, 0, client.API) @@ -443,8 +443,8 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) { Timeseries: []client.PreallocTimeseries{ { TimeSeries: client.TimeSeries{ - Labels: []client.LabelPair{ - {Name: wire.Bytes("__name__"), Value: wire.Bytes(fmt.Sprintf("metric_%d", j))}, + Labels: []client.LabelAdapter{ + {Name: model.MetricNameLabel, Value: fmt.Sprintf("metric_%d", j)}, }, Samples: []client.Sample{ {TimestampMs: int64(j), Value: float64(j)}, @@ -479,7 +479,7 @@ func BenchmarkIngesterPush(b *testing.B) { labels["cpu"] = model.LabelValue(fmt.Sprintf("cpu%02d", j)) ts = append(ts, client.PreallocTimeseries{ TimeSeries: client.TimeSeries{ - Labels: client.ToLabelPairs(labels), + Labels: client.FromMetricsToLabelAdapters(labels), Samples: []client.Sample{ {TimestampMs: 0, Value: float64(j)}, }, diff --git a/pkg/ingester/label_pairs.go b/pkg/ingester/label_pairs.go index 412a5233491..896cbf0053e 100644 --- a/pkg/ingester/label_pairs.go +++ b/pkg/ingester/label_pairs.go @@ -13,34 +13,24 @@ import ( // A series is uniquely identified by its set of label name/value // pairs, which may arrive in any order over the wire -type labelPairs []client.LabelPair - -func labelsToMetric(s labels.Labels) model.Metric { - metric := make(model.Metric, len(s)) - for _, l := range s { - metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - return metric -} - -var labelNameBytes = []byte(model.MetricNameLabel) +type labelPairs []client.LabelAdapter func (a labelPairs) String() string { var b strings.Builder - metricName, err := extract.MetricNameFromLabelPairs(a) + metricName, err := extract.MetricNameFromLabelAdapters(a) numLabels := len(a) - 1 if err != nil { numLabels = len(a) } - b.Write(metricName) + b.WriteString(metricName) b.WriteByte('{') count := 0 for _, pair := range a { - if !pair.Name.Equal(labelNameBytes) { - b.Write(pair.Name) + if pair.Name != model.MetricNameLabel { + b.WriteString(pair.Name) b.WriteString("=\"") - b.Write(pair.Value) + b.WriteString(pair.Value) b.WriteByte('"') count++ if count < numLabels { @@ -66,9 +56,9 @@ func (a *labelPairs) removeBlanks() { } } -func valueForName(s labels.Labels, name []byte) (string, bool) { - pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= string(name) }) - if pos == len(s) || s[pos].Name != string(name) { +func valueForName(s labels.Labels, name string) (string, bool) { + pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= name }) + if pos == len(s) || s[pos].Name != name { return "", false } return s[pos].Value, true @@ -92,7 +82,7 @@ func (a labelPairs) equal(b labels.Labels) bool { // Now check remaining values using binary search for ; i < len(a); i++ { v, found := valueForName(b, a[i].Name) - if !found || v != string(a[i].Value) { + if !found || v != a[i].Value { return false } } diff --git a/pkg/ingester/label_pairs_test.go b/pkg/ingester/label_pairs_test.go index c233775f97a..e79d0ee55e5 100644 --- a/pkg/ingester/label_pairs_test.go +++ b/pkg/ingester/label_pairs_test.go @@ -22,7 +22,7 @@ func TestLabelPairsEqual(t *testing.T) { { name: "labelPairs nonblank; labels blank", a: labelPairs{ - {Name: []byte("foo"), Value: []byte("a")}, + {Name: "foo", Value: "a"}, }, b: labels.Labels{}, equal: false, @@ -38,8 +38,8 @@ func TestLabelPairsEqual(t *testing.T) { { name: "same contents; labelPairs not sorted", a: labelPairs{ - {Name: []byte("foo"), Value: []byte("a")}, - {Name: []byte("bar"), Value: []byte("b")}, + {Name: "foo", Value: "a"}, + {Name: "bar", Value: "b"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, @@ -50,8 +50,8 @@ func TestLabelPairsEqual(t *testing.T) { { name: "same contents", a: labelPairs{ - {Name: []byte("bar"), Value: []byte("b")}, - {Name: []byte("foo"), Value: []byte("a")}, + {Name: "bar", Value: "b"}, + {Name: "foo", Value: "a"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, @@ -62,8 +62,8 @@ func TestLabelPairsEqual(t *testing.T) { { name: "same names, different value", a: labelPairs{ - {Name: []byte("bar"), Value: []byte("b")}, - {Name: []byte("foo"), Value: []byte("c")}, + {Name: "bar", Value: "b"}, + {Name: "foo", Value: "c"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, @@ -74,8 +74,8 @@ func TestLabelPairsEqual(t *testing.T) { { name: "labels has one extra value", a: labelPairs{ - {Name: []byte("bar"), Value: []byte("b")}, - {Name: []byte("foo"), Value: []byte("a")}, + {Name: "bar", Value: "b"}, + {Name: "foo", Value: "a"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, @@ -87,9 +87,9 @@ func TestLabelPairsEqual(t *testing.T) { { name: "labelPairs has one extra value", a: labelPairs{ - {Name: []byte("bar"), Value: []byte("b")}, - {Name: []byte("foo"), Value: []byte("a")}, - {Name: []byte("firble"), Value: []byte("c")}, + {Name: "bar", Value: "b"}, + {Name: "foo", Value: "a"}, + {Name: "firble", Value: "c"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 40b7b4927bc..2ba851e23de 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -155,7 +155,7 @@ func TestIngesterTransfer(t *testing.T) { assert.Equal(t, &client.QueryResponse{ Timeseries: []client.TimeSeries{ { - Labels: client.ToLabelPairs(m), + Labels: client.FromMetricsToLabelAdapters(m), Samples: []client.Sample{ { Value: 456, diff --git a/pkg/ingester/locker.go b/pkg/ingester/locker.go index 2faa2cbe2f4..ce5962ba302 100644 --- a/pkg/ingester/locker.go +++ b/pkg/ingester/locker.go @@ -30,7 +30,7 @@ type paddedMutex struct { // the same mutex twice). type fingerprintLocker struct { fpMtxs []paddedMutex - numFpMtxs uint + numFpMtxs uint32 } // newFingerprintLocker returns a new fingerprintLocker ready for use. At least @@ -41,7 +41,7 @@ func newFingerprintLocker(preallocatedMutexes int) *fingerprintLocker { } return &fingerprintLocker{ make([]paddedMutex, preallocatedMutexes), - uint(preallocatedMutexes), + uint32(preallocatedMutexes), } } diff --git a/pkg/ingester/mapper_test.go b/pkg/ingester/mapper_test.go index bda48d7057c..bffe3cc473d 100644 --- a/pkg/ingester/mapper_test.go +++ b/pkg/ingester/mapper_test.go @@ -19,52 +19,37 @@ var ( fp2 = model.Fingerprint(maxMappedFP + 2) fp3 = model.Fingerprint(1) cm11 = labelPairs{ - {Name: []byte("foo"), Value: []byte("bar")}, - {Name: []byte("dings"), Value: []byte("bumms")}, + {Name: "foo", Value: "bar"}, + {Name: "dings", Value: "bumms"}, } cm12 = labelPairs{ - {Name: []byte("bar"), Value: []byte("foo")}, + {Name: "bar", Value: "foo"}, } cm13 = labelPairs{ - {Name: []byte("foo"), Value: []byte("bar")}, + {Name: "foo", Value: "bar"}, } cm21 = labelPairs{ - {Name: []byte("foo"), Value: []byte("bumms")}, - {Name: []byte("dings"), Value: []byte("bar")}, + {Name: "foo", Value: "bumms"}, + {Name: "dings", Value: "bar"}, } cm22 = labelPairs{ - {Name: []byte("dings"), Value: []byte("foo")}, - {Name: []byte("bar"), Value: []byte("bumms")}, + {Name: "dings", Value: "foo"}, + {Name: "bar", Value: "bumms"}, } cm31 = labelPairs{ - {Name: []byte("bumms"), Value: []byte("dings")}, + {Name: "bumms", Value: "dings"}, } cm32 = labelPairs{ - {Name: []byte("bumms"), Value: []byte("dings")}, - {Name: []byte("bar"), Value: []byte("foo")}, + {Name: "bumms", Value: "dings"}, + {Name: "bar", Value: "foo"}, } ) func (a labelPairs) copyValuesAndSort() labels.Labels { c := make(labels.Labels, len(a)) - // Since names and values may point into a much larger buffer, - // make a copy of all the names and values, in one block for efficiency - copyBytes := make([]byte, 0, len(a)*32) // guess at initial length - for _, pair := range a { - copyBytes = append(copyBytes, pair.Name...) - copyBytes = append(copyBytes, pair.Value...) - } - // Now we need to copy the byte slice into a string for the values to point into - copyString := string(copyBytes) - pos := 0 - stringSlice := func(val []byte) string { - start := pos - pos += len(val) - return copyString[start:pos] - } for i, pair := range a { - c[i].Name = stringSlice(pair.Name) - c[i].Value = stringSlice(pair.Value) + c[i].Name = pair.Name + c[i].Value = pair.Value } sort.Sort(c) return c diff --git a/pkg/ingester/query_test.go b/pkg/ingester/query_test.go index edb384cee38..9debe783b19 100644 --- a/pkg/ingester/query_test.go +++ b/pkg/ingester/query_test.go @@ -17,7 +17,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/wire" ) func BenchmarkQueryStream(b *testing.B) { @@ -50,10 +49,10 @@ func BenchmarkQueryStream(b *testing.B) { for i := 0; i < numSeries; i++ { labels := labelPairs{ - {Name: wire.Bytes("__name__"), Value: wire.Bytes("node_cpu")}, - {Name: wire.Bytes("job"), Value: wire.Bytes("node_exporter")}, - {Name: wire.Bytes("instance"), Value: wire.Bytes(instances[i/numCPUs])}, - {Name: wire.Bytes("cpu"), Value: wire.Bytes(cpus[i%numCPUs])}, + {Name: model.MetricNameLabel, Value: "node_cpu"}, + {Name: "job", Value: "node_exporter"}, + {Name: "instance", Value: instances[i/numCPUs]}, + {Name: "cpu", Value: cpus[i%numCPUs]}, } state, fp, series, err := ing.userStates.getOrCreateSeries(ctx, labels) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 2332272b8e3..8d52eac1710 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -246,7 +246,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { err = stream.Send(&client.TimeSeriesChunk{ FromIngesterId: i.lifecycler.ID, UserId: userID, - Labels: client.FromLabelsToLabelPairs(pair.series.metric), + Labels: client.FromLabelsToLabelAdapaters(pair.series.metric), Chunks: chunks, }) state.fpLocker.Unlock(pair.fp) diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 37a3dd67ef6..3b41f65bd8b 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -129,7 +129,7 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro return state, ok, nil } -func (us *userStates) getOrCreateSeries(ctx context.Context, labels labelPairs) (*userState, model.Fingerprint, *memorySeries, error) { +func (us *userStates) getOrCreateSeries(ctx context.Context, labels []client.LabelAdapter) (*userState, model.Fingerprint, *memorySeries, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, 0, nil, fmt.Errorf("no user id") @@ -198,7 +198,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user series limit (%d) exceeded", u.limits.MaxSeriesPerUser(u.userID)) } - metricName, err := extract.MetricNameFromLabelPairs(metric) + metricName, err := extract.MetricNameFromLabelAdapters(metric) if err != nil { u.fpLocker.Unlock(fp) return fp, nil, err diff --git a/pkg/querier/frontend/frontend.proto b/pkg/querier/frontend/frontend.proto index 1fafc42848d..b22281a99ee 100644 --- a/pkg/querier/frontend/frontend.proto +++ b/pkg/querier/frontend/frontend.proto @@ -48,7 +48,7 @@ message QueryRangeResponse { } message SampleStream { - repeated cortex.LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "metric"]; + repeated cortex.LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "metric", (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"]; repeated cortex.Sample samples = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "values"]; } diff --git a/pkg/querier/frontend/query_range.go b/pkg/querier/frontend/query_range.go index 62bb90ed9cc..1cb40bc9c41 100644 --- a/pkg/querier/frontend/query_range.go +++ b/pkg/querier/frontend/query_range.go @@ -12,7 +12,7 @@ import ( "time" "github.com/go-kit/kit/log/level" - "github.com/json-iterator/go" + jsoniter "github.com/json-iterator/go" opentracing "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" @@ -162,7 +162,7 @@ func (s *SampleStream) UnmarshalJSON(data []byte) error { if err := json.Unmarshal(data, &stream); err != nil { return err } - s.Labels = client.ToLabelPairs(stream.Metric) + s.Labels = client.FromMetricsToLabelAdapters(stream.Metric) s.Samples = stream.Values return nil } @@ -173,7 +173,7 @@ func (s *SampleStream) MarshalJSON() ([]byte, error) { Metric model.Metric `json:"metric"` Values []client.Sample `json:"values"` }{ - Metric: client.FromLabelPairs(s.Labels), + Metric: client.FromLabelAdaptersToMetric(s.Labels), Values: s.Samples, } return json.Marshal(stream) @@ -278,7 +278,7 @@ func matrixMerge(resps []*APIResponse) []SampleStream { output := map[string]*SampleStream{} for _, resp := range resps { for _, stream := range resp.Data.Result { - metric := client.FromLabelPairsToLabels(stream.Labels).String() + metric := client.FromLabelAdaptersToLabels(stream.Labels).String() existing, ok := output[metric] if !ok { existing = &SampleStream{ diff --git a/pkg/querier/frontend/query_range_test.go b/pkg/querier/frontend/query_range_test.go index 9dcd9238552..f4228c127a0 100644 --- a/pkg/querier/frontend/query_range_test.go +++ b/pkg/querier/frontend/query_range_test.go @@ -16,7 +16,6 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/wire" ) const ( @@ -38,8 +37,8 @@ var ( ResultType: model.ValMatrix.String(), Result: []SampleStream{ { - Labels: []client.LabelPair{ - {Name: wire.Bytes("foo"), Value: wire.Bytes("bar")}, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, Samples: []client.Sample{ {Value: 137, TimestampMs: 1536673680000}, @@ -205,7 +204,7 @@ func TestMergeAPIResponses(t *testing.T) { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{}, + Labels: []client.LabelAdapter{}, Samples: []client.Sample{ {Value: 0, TimestampMs: 0}, {Value: 1, TimestampMs: 1}, @@ -219,7 +218,7 @@ func TestMergeAPIResponses(t *testing.T) { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{}, + Labels: []client.LabelAdapter{}, Samples: []client.Sample{ {Value: 2, TimestampMs: 2}, {Value: 3, TimestampMs: 3}, @@ -235,7 +234,7 @@ func TestMergeAPIResponses(t *testing.T) { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{}, + Labels: []client.LabelAdapter{}, Samples: []client.Sample{ {Value: 0, TimestampMs: 0}, {Value: 1, TimestampMs: 1}, @@ -260,7 +259,7 @@ func TestMergeAPIResponses(t *testing.T) { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{{Name: []byte("a"), Value: []byte("b")}, {Name: []byte("c"), Value: []byte("d")}}, + Labels: []client.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, Samples: []client.Sample{ {Value: 0, TimestampMs: 0}, {Value: 1, TimestampMs: 1000}, diff --git a/pkg/querier/frontend/results_cache_test.go b/pkg/querier/frontend/results_cache_test.go index d8eaa8b7ca9..4de06de2d6e 100644 --- a/pkg/querier/frontend/results_cache_test.go +++ b/pkg/querier/frontend/results_cache_test.go @@ -5,14 +5,14 @@ import ( "strconv" "testing" - "github.com/cortexproject/cortex/pkg/chunk/cache" - client "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/chunk/cache" + client "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/flagext" ) var dummyResponse = &APIResponse{ @@ -21,8 +21,8 @@ var dummyResponse = &APIResponse{ ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{ - {Name: wire.Bytes("foo"), Value: wire.Bytes("bar")}, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, Samples: []client.Sample{ { @@ -50,8 +50,8 @@ func mkAPIResponse(start, end, step int64) *APIResponse { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{ - {Name: wire.Bytes("foo"), Value: wire.Bytes("bar")}, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, Samples: samples, }, diff --git a/pkg/querier/ingester_streaming_queryable.go b/pkg/querier/ingester_streaming_queryable.go index e0e992640c9..45d09432536 100644 --- a/pkg/querier/ingester_streaming_queryable.go +++ b/pkg/querier/ingester_streaming_queryable.go @@ -54,7 +54,7 @@ func (i ingesterQueryable) Get(ctx context.Context, from, through model.Time, ma chunks := make([]chunk.Chunk, 0, len(results)) for _, result := range results { - metric := client.FromLabelPairs(result.Labels) + metric := client.FromLabelAdaptersToMetric(result.Labels) cs, err := chunkcompat.FromChunks(userID, metric, result.Chunks) if err != nil { return nil, promql.ErrStorage{Err: err} @@ -94,7 +94,7 @@ func (q *ingesterStreamingQuerier) Select(sp *storage.SelectParams, matchers ... return nil, nil, promql.ErrStorage{Err: err} } - ls := client.FromLabelPairsToLabels(result.Labels) + ls := client.FromLabelAdaptersToLabels(result.Labels) sort.Sort(ls) series := &chunkSeries{ labels: ls, diff --git a/pkg/querier/ingester_streaming_queryable_test.go b/pkg/querier/ingester_streaming_queryable_test.go index a97e11f6aa7..0ee83a9a635 100644 --- a/pkg/querier/ingester_streaming_queryable_test.go +++ b/pkg/querier/ingester_streaming_queryable_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/weaveworks/common/user" ) @@ -16,13 +15,13 @@ func TestIngesterStreaming(t *testing.T) { d := &mockDistributor{ r: []client.TimeSeriesChunk{ { - Labels: []client.LabelPair{ - {Name: wire.Bytes("bar"), Value: wire.Bytes("baz")}, + Labels: []client.LabelAdapter{ + {Name: "bar", Value: "baz"}, }, }, { - Labels: []client.LabelPair{ - {Name: wire.Bytes("foo"), Value: wire.Bytes("bar")}, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, }, }, diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 45e4e7d7da9..ee363f3c666 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -22,7 +22,6 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/weaveworks/common/user" ) @@ -83,7 +82,7 @@ var ( query: "foo", step: sampleRate * 4, labels: labels.Labels{ - labels.Label{Name: "__name__", Value: "foo"}, + labels.Label{Name: model.MetricNameLabel, Value: "foo"}, }, samples: func(from, through time.Time, step time.Duration) int { return int(through.Sub(from)/step) + 1 @@ -111,7 +110,7 @@ var ( query: "foo", step: sampleRate * 4 * 10, labels: labels.Labels{ - labels.Label{Name: "__name__", Value: "foo"}, + labels.Label{Name: model.MetricNameLabel, Value: "foo"}, }, samples: func(from, through time.Time, step time.Duration) int { return int(through.Sub(from)/step) + 1 @@ -236,7 +235,7 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc require.NoError(t, err) tsc := client.TimeSeriesChunk{ - Labels: []client.LabelPair{{Name: wire.Bytes(model.MetricNameLabel), Value: wire.Bytes("foo")}}, + Labels: []client.LabelAdapter{{Name: model.MetricNameLabel, Value: "foo"}}, Chunks: chunks, } matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through) diff --git a/pkg/querier/remote_read_test.go b/pkg/querier/remote_read_test.go index 65dd63d9b73..b96a90fa40f 100644 --- a/pkg/querier/remote_read_test.go +++ b/pkg/querier/remote_read_test.go @@ -10,7 +10,6 @@ import ( "testing" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/common/model" @@ -65,11 +64,8 @@ func TestRemoteReadHandler(t *testing.T) { { Timeseries: []client.TimeSeries{ { - Labels: []client.LabelPair{ - { - Name: wire.Bytes([]byte("foo")), - Value: wire.Bytes([]byte("bar")), - }, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, Samples: []client.Sample{ {Value: 0, TimestampMs: 0}, diff --git a/pkg/util/chunkcompat/compat.go b/pkg/util/chunkcompat/compat.go index 446f7a4544d..038a1ca2c40 100644 --- a/pkg/util/chunkcompat/compat.go +++ b/pkg/util/chunkcompat/compat.go @@ -33,7 +33,7 @@ func SeriesChunksToMatrix(from, through model.Time, serieses []client.TimeSeries result := model.Matrix{} for _, series := range serieses { - metric := client.FromLabelPairs(series.Labels) + metric := client.FromLabelAdaptersToMetric(series.Labels) chunks, err := FromChunks("", metric, series.Chunks) if err != nil { return nil, err diff --git a/pkg/util/extract/extract.go b/pkg/util/extract/extract.go index e170382f183..8de926d83e2 100644 --- a/pkg/util/extract/extract.go +++ b/pkg/util/extract/extract.go @@ -8,16 +8,14 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) -var labelNameBytes = []byte(model.MetricNameLabel) - -// MetricNameFromLabelPairs extracts the metric name from a list of LabelPairs. -func MetricNameFromLabelPairs(labels []client.LabelPair) ([]byte, error) { +// MetricNameFromLabelAdapters extracts the metric name from a list of LabelPairs. +func MetricNameFromLabelAdapters(labels []client.LabelAdapter) (string, error) { for _, label := range labels { - if label.Name.Equal(labelNameBytes) { + if label.Name == model.MetricNameLabel { return label.Value, nil } } - return nil, fmt.Errorf("No metric name label") + return "", fmt.Errorf("No metric name label") } // MetricNameFromMetric extract the metric name from a model.Metric diff --git a/pkg/util/hash_fp.go b/pkg/util/hash_fp.go index ba0a03801e2..209b8b45c06 100644 --- a/pkg/util/hash_fp.go +++ b/pkg/util/hash_fp.go @@ -9,6 +9,6 @@ import "github.com/prometheus/common/model" // function we use is prone to only change a few bits for similar metrics. We // really want to make use of every change in the fingerprint to vary mutex // selection.) -func HashFP(fp model.Fingerprint) uint { - return uint(fp ^ (fp >> 32) ^ (fp >> 16)) +func HashFP(fp model.Fingerprint) uint32 { + return uint32(fp ^ (fp >> 32) ^ (fp >> 16)) } diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 750ceb25b91..11044eae562 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -48,7 +48,7 @@ func init() { } // ValidateSample returns an err if the sample is invalid. -func (cfg *Overrides) ValidateSample(userID string, metricName []byte, s client.Sample) error { +func (cfg *Overrides) ValidateSample(userID string, metricName string, s client.Sample) error { if cfg.RejectOldSamples(userID) && model.Time(s.TimestampMs) < model.Now().Add(-cfg.RejectOldSamplesMaxAge(userID)) { DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc() return httpgrpc.Errorf(http.StatusBadRequest, errTooOld, metricName, model.Time(s.TimestampMs)) @@ -63,8 +63,8 @@ func (cfg *Overrides) ValidateSample(userID string, metricName []byte, s client. } // ValidateLabels returns an err if the labels are invalid. -func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error { - metricName, err := extract.MetricNameFromLabelPairs(ls) +func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelAdapter) error { + metricName, err := extract.MetricNameFromLabelAdapters(ls) if cfg.EnforceMetricName(userID) { if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, errMissingMetricName) @@ -78,7 +78,7 @@ func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error numLabelNames := len(ls) if numLabelNames > cfg.MaxLabelNamesPerSeries(userID) { DiscardedSamples.WithLabelValues(maxLabelNamesPerSeries, userID).Inc() - return httpgrpc.Errorf(http.StatusBadRequest, errTooManyLabels, client.FromLabelPairs(ls).String(), numLabelNames, cfg.MaxLabelNamesPerSeries(userID)) + return httpgrpc.Errorf(http.StatusBadRequest, errTooManyLabels, client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, cfg.MaxLabelNamesPerSeries(userID)) } maxLabelNameLength := cfg.MaxLabelNameLength(userID) @@ -102,7 +102,7 @@ func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error } if errTemplate != "" { DiscardedSamples.WithLabelValues(reason, userID).Inc() - return httpgrpc.Errorf(http.StatusBadRequest, errTemplate, cause, client.FromLabelPairs(ls).String()) + return httpgrpc.Errorf(http.StatusBadRequest, errTemplate, cause, client.FromLabelAdaptersToMetric(ls).String()) } } return nil diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index cd481c0a03c..685ea7df7d1 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -57,7 +57,7 @@ func TestValidateLabels(t *testing.T) { }, } { - err := overrides.ValidateLabels(userID, client.ToLabelPairs(c.metric)) + err := overrides.ValidateLabels(userID, client.FromMetricsToLabelAdapters(c.metric)) assert.Equal(t, c.err, err, "wrong error") } }