diff --git a/pkg/chunk/encoding/bigchunk.go b/pkg/chunk/encoding/bigchunk.go index 29746f87e56..0282c08e997 100644 --- a/pkg/chunk/encoding/bigchunk.go +++ b/pkg/chunk/encoding/bigchunk.go @@ -161,8 +161,9 @@ func (b *bigchunk) Len() int { } func (b *bigchunk) Size() int { - sum := 0 + sum := 2 // For the number of sub chunks. for _, c := range b.chunks { + sum += 2 // For the length of the sub chunk. sum += len(c.Bytes()) } return sum diff --git a/pkg/util/wire/bytes.go b/pkg/chunk/storage/bytes.go similarity index 97% rename from pkg/util/wire/bytes.go rename to pkg/chunk/storage/bytes.go index dfabadd8e06..c4804995ff8 100644 --- a/pkg/util/wire/bytes.go +++ b/pkg/chunk/storage/bytes.go @@ -1,4 +1,4 @@ -package wire +package storage import ( "bytes" diff --git a/pkg/chunk/storage/caching_index_client.proto b/pkg/chunk/storage/caching_index_client.proto index 1c22c94c8ab..2621db517bb 100644 --- a/pkg/chunk/storage/caching_index_client.proto +++ b/pkg/chunk/storage/caching_index_client.proto @@ -8,8 +8,8 @@ option (gogoproto.marshaler_all) = true; option (gogoproto.unmarshaler_all) = true; message Entry { - bytes Column = 1 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; - bytes Value = 2 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; + bytes Column = 1 [(gogoproto.customtype) = "Bytes", (gogoproto.nullable) = false]; + bytes Value = 2 [(gogoproto.customtype) = "Bytes", (gogoproto.nullable) = false]; } message ReadBatch { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index c1a07e984c5..753da6b9add 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1,13 +1,12 @@ package distributor import ( - "bytes" "context" "flag" "fmt" - "hash/fnv" "net/http" "sort" + "strings" "sync" "time" @@ -185,37 +184,37 @@ func (d *Distributor) Stop() { d.ingesterPool.Stop() } -func (d *Distributor) tokenForLabels(userID string, labels []client.LabelPair) (uint32, error) { +func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter) (uint32, error) { if d.cfg.ShardByAllLabels { return shardByAllLabels(userID, labels) } - metricName, err := extract.MetricNameFromLabelPairs(labels) + metricName, err := extract.MetricNameFromLabelAdapters(labels) if err != nil { return 0, err } return shardByMetricName(userID, metricName), nil } -func shardByMetricName(userID string, metricName []byte) uint32 { - h := fnv.New32() - h.Write([]byte(userID)) - h.Write(metricName) - return h.Sum32() +func shardByMetricName(userID string, metricName string) uint32 { + h := client.HashNew32() + h = client.HashAdd32(h, userID) + h = client.HashAdd32(h, metricName) + return h } -func shardByAllLabels(userID string, labels []client.LabelPair) (uint32, error) { - h := fnv.New32() - h.Write([]byte(userID)) - lastLabelName := []byte{} +func shardByAllLabels(userID string, labels []client.LabelAdapter) (uint32, error) { + h := client.HashNew32() + h = client.HashAdd32(h, userID) + var lastLabelName string for _, label := range labels { - if bytes.Compare(lastLabelName, label.Name) >= 0 { + if strings.Compare(lastLabelName, label.Name) >= 0 { return 0, fmt.Errorf("Labels not sorted") } - h.Write(label.Name) - h.Write(label.Value) + h = client.HashAdd32(h, label.Name) + h = client.HashAdd32(h, label.Value) } - return h.Sum32(), nil + return h, nil } // Push implements client.IngesterServer @@ -242,7 +241,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie continue } - metricName, _ := extract.MetricNameFromLabelPairs(ts.Labels) + metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels) samples := make([]client.Sample, 0, len(ts.Samples)) for _, s := range ts.Samples { if err := d.limits.ValidateSample(userID, metricName, s); err != nil { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 7eb33443ca2..29fbbb4c599 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -107,7 +107,7 @@ func TestDistributorPush(t *testing.T) { } func TestDistributorPushQuery(t *testing.T) { - nameMatcher := mustEqualMatcher("__name__", "foo") + nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo") barMatcher := mustEqualMatcher("bar", "baz") type testcase struct { @@ -219,7 +219,7 @@ func TestDistributorPushQuery(t *testing.T) { } func TestSlowQueries(t *testing.T) { - nameMatcher := mustEqualMatcher("__name__", "foo") + nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo") nIngesters := 3 for _, shardByAllLabels := range []bool{true, false} { for happy := 0; happy <= nIngesters; happy++ { @@ -300,10 +300,10 @@ func makeWriteRequest(samples int) *client.WriteRequest { for i := 0; i < samples; i++ { ts := client.PreallocTimeseries{ TimeSeries: client.TimeSeries{ - Labels: []client.LabelPair{ - {Name: []byte("__name__"), Value: []byte("foo")}, - {Name: []byte("bar"), Value: []byte("baz")}, - {Name: []byte("sample"), Value: []byte(fmt.Sprintf("%d", i))}, + Labels: []client.LabelAdapter{ + {Name: model.MetricNameLabel, Value: "foo"}, + {Name: "bar", Value: "baz"}, + {Name: "sample", Value: fmt.Sprintf("%d", i)}, }, }, } @@ -323,9 +323,9 @@ func expectedResponse(start, end int) model.Matrix { for i := start; i < end; i++ { result = append(result, &model.SampleStream{ Metric: model.Metric{ - "__name__": "foo", - "bar": "baz", - "sample": model.LabelValue(fmt.Sprintf("%d", i)), + model.MetricNameLabel: "foo", + "bar": "baz", + "sample": model.LabelValue(fmt.Sprintf("%d", i)), }, Values: []model.SamplePair{ { @@ -529,11 +529,11 @@ func (i *mockIngester) AllUserStats(ctx context.Context, in *client.UserStatsReq return &i.stats, nil } -func match(labels []client.LabelPair, matchers []*labels.Matcher) bool { +func match(labels []client.LabelAdapter, matchers []*labels.Matcher) bool { outer: for _, matcher := range matchers { for _, labels := range labels { - if matcher.Name == string(labels.Name) && matcher.Matches(string(labels.Value)) { + if matcher.Name == labels.Name && matcher.Matches(labels.Value) { continue outer } } diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 3402fc08ffb..9c1e2b5422e 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -68,7 +68,7 @@ func (d *Distributor) queryPrep(ctx context.Context, from, to model.Time, matche // Get ingesters by metricName if one exists, otherwise get all ingesters metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers) if !d.cfg.ShardByAllLabels && ok && metricNameMatcher.Type == labels.MatchEqual { - replicationSet, err = d.ring.Get(shardByMetricName(userID, []byte(metricNameMatcher.Value)), ring.Read) + replicationSet, err = d.ring.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read) } else { replicationSet, err = d.ring.GetAll() } diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 2ca72ce328d..d5ffe626144 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/stretchr/testify/require" ) @@ -19,8 +18,8 @@ func TestMarshall(t *testing.T) { for i := 0; i < 10; i++ { req.Timeseries = append(req.Timeseries, PreallocTimeseries{ TimeSeries{ - Labels: []LabelPair{ - {wire.Bytes([]byte("foo")), wire.Bytes([]byte(strconv.Itoa(i)))}, + Labels: []LabelAdapter{ + {"foo", strconv.Itoa(i)}, }, Samples: []Sample{ {TimestampMs: int64(i), Value: float64(i)}, diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index f177eefabf3..f95d1e27877 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -1,11 +1,11 @@ package client import ( - "bytes" stdjson "encoding/json" "fmt" "sort" "strconv" + "strings" "time" "unsafe" @@ -16,22 +16,6 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary -// FromWriteRequest converts a WriteRequest proto into an array of samples. -func FromWriteRequest(req *WriteRequest) []model.Sample { - // Just guess that there is one sample per timeseries - samples := make([]model.Sample, 0, len(req.Timeseries)) - for _, ts := range req.Timeseries { - for _, s := range ts.Samples { - samples = append(samples, model.Sample{ - Metric: FromLabelPairs(ts.Labels), - Value: model.SampleValue(s.Value), - Timestamp: model.Time(s.TimestampMs), - }) - } - } - return samples -} - // ToWriteRequest converts an array of samples into a WriteRequest proto. func ToWriteRequest(samples []model.Sample, source WriteRequest_SourceEnum) *WriteRequest { req := &WriteRequest{ @@ -42,7 +26,7 @@ func ToWriteRequest(samples []model.Sample, source WriteRequest_SourceEnum) *Wri for _, s := range samples { ts := PreallocTimeseries{ TimeSeries: TimeSeries{ - Labels: ToLabelPairs(s.Metric), + Labels: FromMetricsToLabelAdapters(s.Metric), Samples: []Sample{ { Value: float64(s.Value), @@ -87,7 +71,7 @@ func ToQueryResponse(matrix model.Matrix) *QueryResponse { resp := &QueryResponse{} for _, ss := range matrix { ts := TimeSeries{ - Labels: ToLabelPairs(ss.Metric), + Labels: FromMetricsToLabelAdapters(ss.Metric), Samples: make([]Sample, 0, len(ss.Values)), } for _, s := range ss.Values { @@ -106,7 +90,7 @@ func FromQueryResponse(resp *QueryResponse) model.Matrix { m := make(model.Matrix, 0, len(resp.Timeseries)) for _, ts := range resp.Timeseries { var ss model.SampleStream - ss.Metric = FromLabelPairs(ts.Labels) + ss.Metric = FromLabelAdaptersToMetric(ts.Labels) ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) for _, s := range ts.Samples { ss.Values = append(ss.Values, model.SamplePair{ @@ -153,7 +137,7 @@ func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (mo func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse) []model.Metric { metrics := []model.Metric{} for _, m := range resp.Metric { - metrics = append(metrics, FromLabelPairs(m.Labels)) + metrics = append(metrics, FromLabelAdaptersToMetric(m.Labels)) } return metrics } @@ -208,70 +192,63 @@ func fromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) { return result, nil } -// ToLabelPairs builds a []LabelPair from a model.Metric -func ToLabelPairs(metric model.Metric) []LabelPair { - labelPairs := make([]LabelPair, 0, len(metric)) - for k, v := range metric { - labelPairs = append(labelPairs, LabelPair{ - Name: []byte(k), - Value: []byte(v), - }) - } - sort.Sort(byLabel(labelPairs)) // The labels should be sorted upon initialisation. - return labelPairs +// FromLabelAdaptersToLabels casts []LabelAdapter to labels.Labels. +// It uses unsafe, but as LabelAdapter == labels.Label this should be safe. +// This allows us to use labels.Labels directly in protos. +func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels { + return *(*labels.Labels)(unsafe.Pointer(&ls)) } -type byLabel []LabelPair - -func (s byLabel) Len() int { return len(s) } -func (s byLabel) Less(i, j int) bool { return bytes.Compare(s[i].Name, s[j].Name) < 0 } -func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -// FromLabelPairs unpack a []LabelPair to a model.Metric -func FromLabelPairs(labelPairs []LabelPair) model.Metric { - metric := make(model.Metric, len(labelPairs)) - for _, l := range labelPairs { - metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - return metric +// FromLabelsToLabelAdapaters casts labels.Labels to []LabelAdapter. +// It uses unsafe, but as LabelAdapter == labels.Label this should be safe. +// This allows us to use labels.Labels directly in protos. +func FromLabelsToLabelAdapaters(ls labels.Labels) []LabelAdapter { + return *(*[]LabelAdapter)(unsafe.Pointer(&ls)) } -// FromLabelPairsToLabels unpack a []LabelPair to a labels.Labels -func FromLabelPairsToLabels(labelPairs []LabelPair) labels.Labels { - ls := make(labels.Labels, 0, len(labelPairs)) - for _, l := range labelPairs { - ls = append(ls, labels.Label{ - Name: string(l.Name), - Value: string(l.Value), - }) +// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric. +// Don't do this on any performance sensitive paths. +func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric { + result := make(model.Metric, len(ls)) + for _, l := range ls { + result[model.LabelName(l.Name)] = model.LabelValue(l.Value) } - return ls + return result } -// FromLabelsToLabelPairs converts labels.Labels to []LabelPair -func FromLabelsToLabelPairs(s labels.Labels) []LabelPair { - labelPairs := make([]LabelPair, 0, len(s)) - for _, v := range s { - labelPairs = append(labelPairs, LabelPair{ - Name: []byte(v.Name), - Value: []byte(v.Value), +// FromMetricsToLabelAdapters converts model.Metric to []LabelAdapter. +// Don't do this on any performance sensitive paths. +// The result is sorted. +func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter { + result := make([]LabelAdapter, 0, len(metric)) + for k, v := range metric { + result = append(result, LabelAdapter{ + Name: string(k), + Value: string(v), }) } - return labelPairs // note already sorted + sort.Sort(byLabel(result)) // The labels should be sorted upon initialisation. + return result } +type byLabel []LabelAdapter + +func (s byLabel) Len() int { return len(s) } +func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 } +func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + // FastFingerprint runs the same algorithm as Prometheus labelSetToFastFingerprint() -func FastFingerprint(labelPairs []LabelPair) model.Fingerprint { - if len(labelPairs) == 0 { +func FastFingerprint(ls []LabelAdapter) model.Fingerprint { + if len(ls) == 0 { return model.Metric(nil).FastFingerprint() } var result uint64 - for _, pair := range labelPairs { + for _, l := range ls { sum := hashNew() - sum = hashAdd(sum, pair.Name) + sum = hashAdd(sum, l.Name) sum = hashAddByte(sum, model.SeparatorByte) - sum = hashAdd(sum, pair.Value) + sum = hashAdd(sum, l.Value) result ^= sum } return model.Fingerprint(result) diff --git a/pkg/ingester/client/compat_test.go b/pkg/ingester/client/compat_test.go index d89be185c79..c9f66818ad2 100644 --- a/pkg/ingester/client/compat_test.go +++ b/pkg/ingester/client/compat_test.go @@ -8,29 +8,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/weaveworks/common/test" ) -func TestWriteRequest(t *testing.T) { - want := []model.Sample{} - for i := 0; i < 10; i++ { - want = append(want, model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: model.LabelValue(fmt.Sprintf("testmetric_%d", i)), - model.JobLabel: "testjob", - }, - Timestamp: model.Time(i), - Value: model.SampleValue(float64(i)), - }) - } - - have := FromWriteRequest(ToWriteRequest(want, API)) - - if !reflect.DeepEqual(want, have) { - t.Fatalf(test.Diff(want, have)) - } -} - func TestQueryRequest(t *testing.T) { from, to := model.Time(int64(0)), model.Time(int64(10)) matchers := []*labels.Matcher{} diff --git a/pkg/ingester/client/cortex.proto b/pkg/ingester/client/cortex.proto index 02e17e35b8a..e1659919920 100644 --- a/pkg/ingester/client/cortex.proto +++ b/pkg/ingester/client/cortex.proto @@ -104,7 +104,7 @@ message MetricsForLabelMatchersResponse { message TimeSeriesChunk { string from_ingester_id = 1; string user_id = 2; - repeated LabelPair labels = 3 [(gogoproto.nullable) = false]; + repeated LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; repeated Chunk chunks = 4 [(gogoproto.nullable) = false]; } @@ -119,14 +119,14 @@ message TransferChunksResponse { } message TimeSeries { - repeated LabelPair labels = 1 [(gogoproto.nullable) = false]; + repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; // Sorted by time, oldest sample first. repeated Sample samples = 2 [(gogoproto.nullable) = false]; } message LabelPair { - bytes name = 1 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; - bytes value = 2 [(gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/util/wire.Bytes", (gogoproto.nullable) = false]; + bytes name = 1; + bytes value = 2; } message Sample { @@ -139,7 +139,7 @@ message LabelMatchers { } message Metric { - repeated LabelPair labels = 1 [(gogoproto.nullable) = false]; + repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; } enum MatchType { diff --git a/pkg/ingester/client/fnv.go b/pkg/ingester/client/fnv.go index 41453b10aaa..f151186f0e3 100644 --- a/pkg/ingester/client/fnv.go +++ b/pkg/ingester/client/fnv.go @@ -19,6 +19,8 @@ package client const ( offset64 = 14695981039346656037 prime64 = 1099511628211 + offset32 = 2166136261 + prime32 = 16777619 ) // hashNew initializies a new fnv64a hash value. @@ -27,7 +29,7 @@ func hashNew() uint64 { } // hashAdd adds a string to a fnv64a hash value, returning the updated hash. -func hashAdd(h uint64, s []byte) uint64 { +func hashAdd(h uint64, s string) uint64 { for i := 0; i < len(s); i++ { h ^= uint64(s[i]) h *= prime64 @@ -41,3 +43,17 @@ func hashAddByte(h uint64, b byte) uint64 { h *= prime64 return h } + +// HashNew32 initializies a new fnv64a hash value. +func HashNew32() uint32 { + return offset32 +} + +// HashAdd32 adds a string to a fnv64a hash value, returning the updated hash. +func HashAdd32(h uint32, s string) uint32 { + for i := 0; i < len(s); i++ { + h ^= uint32(s[i]) + h *= prime32 + } + return h +} diff --git a/pkg/ingester/client/timeseries.go b/pkg/ingester/client/timeseries.go index b5b46b9ab87..8c7cc40588c 100644 --- a/pkg/ingester/client/timeseries.go +++ b/pkg/ingester/client/timeseries.go @@ -1,6 +1,14 @@ package client -import "flag" +import ( + "flag" + "fmt" + "io" + "strings" + "unsafe" + + "github.com/prometheus/prometheus/pkg/labels" +) var ( expectedTimeseries = 100 @@ -37,7 +45,182 @@ type PreallocTimeseries struct { // Unmarshal implements proto.Message. func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { - p.Labels = make([]LabelPair, 0, expectedLabels) + p.Labels = make([]LabelAdapter, 0, expectedLabels) p.Samples = make([]Sample, 0, expectedSamplesPerSeries) return p.TimeSeries.Unmarshal(dAtA) } + +// LabelAdapter is a labels.Label that can be marshalled to/from protos. +type LabelAdapter labels.Label + +// Marshal implements proto.Marshaller. +func (bs *LabelAdapter) Marshal() ([]byte, error) { + buf := make([]byte, bs.Size()) + _, err := bs.MarshalTo(buf) + return buf, err +} + +// MarshalTo implements proto.Marshaller. +func (bs *LabelAdapter) MarshalTo(buf []byte) (n int, err error) { + var i int + ls := (*labels.Label)(bs) + + buf[i] = 0xa + i++ + i = encodeVarintCortex(buf, i, uint64(len(ls.Name))) + i += copy(buf[i:], ls.Name) + + buf[i] = 0x12 + i++ + i = encodeVarintCortex(buf, i, uint64(len(ls.Value))) + i += copy(buf[i:], ls.Value) + + return i, nil +} + +// Unmarshal a LabelAdapater, implements proto.Unmarshaller. +// NB this is a copy of the autogenerated code to unmarshal a LabelPair, +// with the byte copying replaced with a yoloString. +func (bs *LabelAdapter) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelPair: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + bs.Name = yoloString(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + bs.Value = yoloString(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCortex(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} + +func yoloString(buf []byte) string { + return *((*string)(unsafe.Pointer(&buf))) +} + +// Size implements proto.Sizer. +func (bs *LabelAdapter) Size() int { + ls := (*labels.Label)(bs) + var n int + l := len(ls.Name) + n += 1 + l + sovCortex(uint64(l)) + l = len(ls.Value) + n += 1 + l + sovCortex(uint64(l)) + return n +} + +// Equal implements proto.Equaler. +func (bs *LabelAdapter) Equal(other LabelAdapter) bool { + return bs.Name == other.Name && bs.Value == other.Value +} + +// Compare implements proto.Comparer. +func (bs *LabelAdapter) Compare(other LabelAdapter) int { + if c := strings.Compare(bs.Name, other.Name); c != 0 { + return c + } + return strings.Compare(bs.Value, other.Value) +} diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index e31a64ffc69..ddde3a0f717 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/common/model" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" "github.com/weaveworks/common/user" ) @@ -276,7 +277,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. sp.SetTag("organization", userID) util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric, "queue", flushQueueIndex) - err := i.flushChunks(ctx, fp, labelsToMetric(series.metric), chunks) + err := i.flushChunks(ctx, fp, client.FromLabelAdaptersToMetric(client.FromLabelsToLabelAdapaters(series.metric)), chunks) if err != nil { return err } diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 32771046379..6e072e70335 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -32,7 +32,7 @@ func New() *InvertedIndex { } // Add a fingerprint under the specified labels. -func (ii *InvertedIndex) Add(labels []client.LabelPair, fp model.Fingerprint) labels.Labels { +func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard := &ii.shards[util.HashFP(fp)%indexShards] return shard.add(labels, fp) } @@ -49,7 +49,6 @@ func (ii *InvertedIndex) Lookup(matchers []*labels.Matcher) []model.Fingerprint result = append(result, fps...) } - sort.Sort(fingerprints(result)) return result } @@ -105,25 +104,31 @@ type indexShard struct { pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(unlockIndex{})]byte } +func copyString(s string) string { + return string([]byte(s)) +} + // add metric to the index; return all the name/value pairs as strings from the index, sorted -func (shard *indexShard) add(metric []client.LabelPair, fp model.Fingerprint) labels.Labels { +func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard.mtx.Lock() defer shard.mtx.Unlock() internedLabels := make(labels.Labels, len(metric)) for i, pair := range metric { - values, ok := shard.idx[string(pair.Name)] + values, ok := shard.idx[pair.Name] if !ok { values = indexEntry{ - name: string(pair.Name), + name: copyString(pair.Name), fps: map[string]indexValueEntry{}, } shard.idx[values.name] = values } - fingerprints, ok := values.fps[string(pair.Value)] + fingerprints, ok := values.fps[pair.Value] if !ok { - fingerprints = indexValueEntry{value: string(pair.Value)} + fingerprints = indexValueEntry{ + value: copyString(pair.Value), + } } // Insert into the right position to keep fingerprints sorted j := sort.Search(len(fingerprints.fps), func(i int) bool { @@ -133,7 +138,7 @@ func (shard *indexShard) add(metric []client.LabelPair, fp model.Fingerprint) la copy(fingerprints.fps[j+1:], fingerprints.fps[j:]) fingerprints.fps[j] = fp values.fps[fingerprints.value] = fingerprints - internedLabels[i] = labels.Label{Name: string(values.name), Value: string(fingerprints.value)} + internedLabels[i] = labels.Label{Name: values.name, Value: fingerprints.value} } sort.Sort(internedLabels) return internedLabels @@ -162,7 +167,7 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint // accumulate the matching fingerprints (which are all distinct) // then sort to maintain the invariant for value, fps := range values.fps { - if matcher.Matches(string(value)) { + if matcher.Matches(value) { toIntersect = append(toIntersect, fps.fps...) } } @@ -213,7 +218,7 @@ func (shard *indexShard) delete(labels labels.Labels, fp model.Fingerprint) { defer shard.mtx.Unlock() for _, pair := range labels { - name, value := string(pair.Name), string(pair.Value) + name, value := pair.Name, pair.Value values, ok := shard.idx[name] if !ok { continue diff --git a/pkg/ingester/index/index_test.go b/pkg/ingester/index/index_test.go index 6c2fc30b74c..9b3351c55f3 100644 --- a/pkg/ingester/index/index_test.go +++ b/pkg/ingester/index/index_test.go @@ -23,7 +23,7 @@ func TestIndex(t *testing.T) { {model.Metric{"foo": "baz", "flip": "flop"}, 1}, {model.Metric{"foo": "baz", "flip": "flap"}, 0}, } { - index.Add(client.ToLabelPairs(entry.m), entry.fp) + index.Add(client.FromMetricsToLabelAdapters(entry.m), entry.fp) } for _, tc := range []struct { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c4f911bd995..0556798ad6c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" cortex_chunk "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -35,7 +36,7 @@ const ( duplicateSample = "multiple_values_for_timestamp" // Number of timeseries to return in each batch of a QueryStream. - queryStreamBatchSize = 10 + queryStreamBatchSize = 128 ) var ( @@ -345,7 +346,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client } ts := client.TimeSeries{ - Labels: client.FromLabelsToLabelPairs(series.metric), + Labels: client.FromLabelsToLabelAdapaters(series.metric), Samples: make([]client.Sample, 0, len(values)), } for _, s := range values { @@ -404,7 +405,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ numChunks += len(wireChunks) batch = append(batch, client.TimeSeriesChunk{ - Labels: client.FromLabelsToLabelPairs(series.metric), + Labels: client.FromLabelsToLabelAdapaters(series.metric), Chunks: wireChunks, }) @@ -485,11 +486,11 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr return nil, err } - metrics := map[model.Fingerprint]labelPairs{} + lss := map[model.Fingerprint]labels.Labels{} for _, matchers := range matchersSet { if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error { - if _, ok := metrics[fp]; !ok { - metrics[fp] = client.FromLabelsToLabelPairs(series.metric) + if _, ok := lss[fp]; !ok { + lss[fp] = series.metric } return nil }, nil, 0); err != nil { @@ -498,10 +499,10 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr } result := &client.MetricsForLabelMatchersResponse{ - Metric: make([]*client.Metric, 0, len(metrics)), + Metric: make([]*client.Metric, 0, len(lss)), } - for _, metric := range metrics { - result.Metric = append(result.Metric, &client.Metric{Labels: metric}) + for _, ls := range lss { + result.Metric = append(result.Metric, &client.Metric{Labels: client.FromLabelsToLabelAdapaters(ls)}) } return result, nil diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index a9c0012162e..0fd867c9b69 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -23,7 +23,6 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" ) @@ -156,6 +155,7 @@ func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries in _, err := ing.Push(ctx, client.ToWriteRequest(matrixToSamples(testData[userID]), client.API)) require.NoError(t, err) } + return userIDs, testData } @@ -242,7 +242,7 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) { defer ing.Shutdown() m := labelPairs{ - {Name: []byte(model.MetricNameLabel), Value: []byte("testmetric")}, + {Name: model.MetricNameLabel, Value: "testmetric"}, } ctx := user.InjectOrgID(context.Background(), userID) err := ing.append(ctx, m, 1, 0, client.API) @@ -273,9 +273,9 @@ func TestIngesterAppendBlankLabel(t *testing.T) { defer ing.Shutdown() lp := labelPairs{ - {Name: []byte(model.MetricNameLabel), Value: []byte("testmetric")}, - {Name: []byte("foo"), Value: []byte("")}, - {Name: []byte("bar"), Value: []byte("")}, + {Name: model.MetricNameLabel, Value: "testmetric"}, + {Name: "foo", Value: ""}, + {Name: "bar", Value: ""}, } ctx := user.InjectOrgID(context.Background(), userID) err := ing.append(ctx, lp, 1, 0, client.API) @@ -443,8 +443,8 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) { Timeseries: []client.PreallocTimeseries{ { TimeSeries: client.TimeSeries{ - Labels: []client.LabelPair{ - {Name: wire.Bytes("__name__"), Value: wire.Bytes(fmt.Sprintf("metric_%d", j))}, + Labels: []client.LabelAdapter{ + {Name: model.MetricNameLabel, Value: fmt.Sprintf("metric_%d", j)}, }, Samples: []client.Sample{ {TimestampMs: int64(j), Value: float64(j)}, @@ -479,7 +479,7 @@ func BenchmarkIngesterPush(b *testing.B) { labels["cpu"] = model.LabelValue(fmt.Sprintf("cpu%02d", j)) ts = append(ts, client.PreallocTimeseries{ TimeSeries: client.TimeSeries{ - Labels: client.ToLabelPairs(labels), + Labels: client.FromMetricsToLabelAdapters(labels), Samples: []client.Sample{ {TimestampMs: 0, Value: float64(j)}, }, diff --git a/pkg/ingester/label_pairs.go b/pkg/ingester/label_pairs.go index 412a5233491..896cbf0053e 100644 --- a/pkg/ingester/label_pairs.go +++ b/pkg/ingester/label_pairs.go @@ -13,34 +13,24 @@ import ( // A series is uniquely identified by its set of label name/value // pairs, which may arrive in any order over the wire -type labelPairs []client.LabelPair - -func labelsToMetric(s labels.Labels) model.Metric { - metric := make(model.Metric, len(s)) - for _, l := range s { - metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - return metric -} - -var labelNameBytes = []byte(model.MetricNameLabel) +type labelPairs []client.LabelAdapter func (a labelPairs) String() string { var b strings.Builder - metricName, err := extract.MetricNameFromLabelPairs(a) + metricName, err := extract.MetricNameFromLabelAdapters(a) numLabels := len(a) - 1 if err != nil { numLabels = len(a) } - b.Write(metricName) + b.WriteString(metricName) b.WriteByte('{') count := 0 for _, pair := range a { - if !pair.Name.Equal(labelNameBytes) { - b.Write(pair.Name) + if pair.Name != model.MetricNameLabel { + b.WriteString(pair.Name) b.WriteString("=\"") - b.Write(pair.Value) + b.WriteString(pair.Value) b.WriteByte('"') count++ if count < numLabels { @@ -66,9 +56,9 @@ func (a *labelPairs) removeBlanks() { } } -func valueForName(s labels.Labels, name []byte) (string, bool) { - pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= string(name) }) - if pos == len(s) || s[pos].Name != string(name) { +func valueForName(s labels.Labels, name string) (string, bool) { + pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= name }) + if pos == len(s) || s[pos].Name != name { return "", false } return s[pos].Value, true @@ -92,7 +82,7 @@ func (a labelPairs) equal(b labels.Labels) bool { // Now check remaining values using binary search for ; i < len(a); i++ { v, found := valueForName(b, a[i].Name) - if !found || v != string(a[i].Value) { + if !found || v != a[i].Value { return false } } diff --git a/pkg/ingester/label_pairs_test.go b/pkg/ingester/label_pairs_test.go index c233775f97a..e79d0ee55e5 100644 --- a/pkg/ingester/label_pairs_test.go +++ b/pkg/ingester/label_pairs_test.go @@ -22,7 +22,7 @@ func TestLabelPairsEqual(t *testing.T) { { name: "labelPairs nonblank; labels blank", a: labelPairs{ - {Name: []byte("foo"), Value: []byte("a")}, + {Name: "foo", Value: "a"}, }, b: labels.Labels{}, equal: false, @@ -38,8 +38,8 @@ func TestLabelPairsEqual(t *testing.T) { { name: "same contents; labelPairs not sorted", a: labelPairs{ - {Name: []byte("foo"), Value: []byte("a")}, - {Name: []byte("bar"), Value: []byte("b")}, + {Name: "foo", Value: "a"}, + {Name: "bar", Value: "b"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, @@ -50,8 +50,8 @@ func TestLabelPairsEqual(t *testing.T) { { name: "same contents", a: labelPairs{ - {Name: []byte("bar"), Value: []byte("b")}, - {Name: []byte("foo"), Value: []byte("a")}, + {Name: "bar", Value: "b"}, + {Name: "foo", Value: "a"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, @@ -62,8 +62,8 @@ func TestLabelPairsEqual(t *testing.T) { { name: "same names, different value", a: labelPairs{ - {Name: []byte("bar"), Value: []byte("b")}, - {Name: []byte("foo"), Value: []byte("c")}, + {Name: "bar", Value: "b"}, + {Name: "foo", Value: "c"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, @@ -74,8 +74,8 @@ func TestLabelPairsEqual(t *testing.T) { { name: "labels has one extra value", a: labelPairs{ - {Name: []byte("bar"), Value: []byte("b")}, - {Name: []byte("foo"), Value: []byte("a")}, + {Name: "bar", Value: "b"}, + {Name: "foo", Value: "a"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, @@ -87,9 +87,9 @@ func TestLabelPairsEqual(t *testing.T) { { name: "labelPairs has one extra value", a: labelPairs{ - {Name: []byte("bar"), Value: []byte("b")}, - {Name: []byte("foo"), Value: []byte("a")}, - {Name: []byte("firble"), Value: []byte("c")}, + {Name: "bar", Value: "b"}, + {Name: "foo", Value: "a"}, + {Name: "firble", Value: "c"}, }, b: labels.Labels{ {Name: "bar", Value: "b"}, diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 40b7b4927bc..2ba851e23de 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -155,7 +155,7 @@ func TestIngesterTransfer(t *testing.T) { assert.Equal(t, &client.QueryResponse{ Timeseries: []client.TimeSeries{ { - Labels: client.ToLabelPairs(m), + Labels: client.FromMetricsToLabelAdapters(m), Samples: []client.Sample{ { Value: 456, diff --git a/pkg/ingester/locker.go b/pkg/ingester/locker.go index 2faa2cbe2f4..ce5962ba302 100644 --- a/pkg/ingester/locker.go +++ b/pkg/ingester/locker.go @@ -30,7 +30,7 @@ type paddedMutex struct { // the same mutex twice). type fingerprintLocker struct { fpMtxs []paddedMutex - numFpMtxs uint + numFpMtxs uint32 } // newFingerprintLocker returns a new fingerprintLocker ready for use. At least @@ -41,7 +41,7 @@ func newFingerprintLocker(preallocatedMutexes int) *fingerprintLocker { } return &fingerprintLocker{ make([]paddedMutex, preallocatedMutexes), - uint(preallocatedMutexes), + uint32(preallocatedMutexes), } } diff --git a/pkg/ingester/mapper_test.go b/pkg/ingester/mapper_test.go index bda48d7057c..bffe3cc473d 100644 --- a/pkg/ingester/mapper_test.go +++ b/pkg/ingester/mapper_test.go @@ -19,52 +19,37 @@ var ( fp2 = model.Fingerprint(maxMappedFP + 2) fp3 = model.Fingerprint(1) cm11 = labelPairs{ - {Name: []byte("foo"), Value: []byte("bar")}, - {Name: []byte("dings"), Value: []byte("bumms")}, + {Name: "foo", Value: "bar"}, + {Name: "dings", Value: "bumms"}, } cm12 = labelPairs{ - {Name: []byte("bar"), Value: []byte("foo")}, + {Name: "bar", Value: "foo"}, } cm13 = labelPairs{ - {Name: []byte("foo"), Value: []byte("bar")}, + {Name: "foo", Value: "bar"}, } cm21 = labelPairs{ - {Name: []byte("foo"), Value: []byte("bumms")}, - {Name: []byte("dings"), Value: []byte("bar")}, + {Name: "foo", Value: "bumms"}, + {Name: "dings", Value: "bar"}, } cm22 = labelPairs{ - {Name: []byte("dings"), Value: []byte("foo")}, - {Name: []byte("bar"), Value: []byte("bumms")}, + {Name: "dings", Value: "foo"}, + {Name: "bar", Value: "bumms"}, } cm31 = labelPairs{ - {Name: []byte("bumms"), Value: []byte("dings")}, + {Name: "bumms", Value: "dings"}, } cm32 = labelPairs{ - {Name: []byte("bumms"), Value: []byte("dings")}, - {Name: []byte("bar"), Value: []byte("foo")}, + {Name: "bumms", Value: "dings"}, + {Name: "bar", Value: "foo"}, } ) func (a labelPairs) copyValuesAndSort() labels.Labels { c := make(labels.Labels, len(a)) - // Since names and values may point into a much larger buffer, - // make a copy of all the names and values, in one block for efficiency - copyBytes := make([]byte, 0, len(a)*32) // guess at initial length - for _, pair := range a { - copyBytes = append(copyBytes, pair.Name...) - copyBytes = append(copyBytes, pair.Value...) - } - // Now we need to copy the byte slice into a string for the values to point into - copyString := string(copyBytes) - pos := 0 - stringSlice := func(val []byte) string { - start := pos - pos += len(val) - return copyString[start:pos] - } for i, pair := range a { - c[i].Name = stringSlice(pair.Name) - c[i].Value = stringSlice(pair.Value) + c[i].Name = pair.Name + c[i].Value = pair.Value } sort.Sort(c) return c diff --git a/pkg/ingester/query_test.go b/pkg/ingester/query_test.go new file mode 100644 index 00000000000..9debe783b19 --- /dev/null +++ b/pkg/ingester/query_test.go @@ -0,0 +1,110 @@ +package ingester + +import ( + "fmt" + "io" + "net" + "testing" + "time" + + "google.golang.org/grpc" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "golang.org/x/net/context" + + "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/ingester/client" +) + +func BenchmarkQueryStream(b *testing.B) { + cfg := defaultIngesterTestConfig() + clientCfg := defaultClientTestConfig() + limits := defaultLimitsTestConfig() + + const ( + numSeries = 1e6 // Put 1 million timeseries, each with 100 samples. + numSamples = 100 + numCPUs = 32 + ) + + encoding.DefaultEncoding = encoding.Bigchunk + limits.MaxSeriesPerMetric = numSeries + limits.MaxSeriesPerQuery = numSeries + cfg.FlushCheckPeriod = 15 * time.Minute + _, ing := newTestStore(b, cfg, clientCfg, limits) + // defer ing.Shutdown() + + ctx := user.InjectOrgID(context.Background(), "1") + instances := make([]string, numSeries/numCPUs) + for i := 0; i < numSeries/numCPUs; i++ { + instances[i] = fmt.Sprintf("node%04d", i) + } + cpus := make([]string, numCPUs) + for i := 0; i < numCPUs; i++ { + cpus[i] = fmt.Sprintf("cpu%02d", i) + } + + for i := 0; i < numSeries; i++ { + labels := labelPairs{ + {Name: model.MetricNameLabel, Value: "node_cpu"}, + {Name: "job", Value: "node_exporter"}, + {Name: "instance", Value: instances[i/numCPUs]}, + {Name: "cpu", Value: cpus[i%numCPUs]}, + } + + state, fp, series, err := ing.userStates.getOrCreateSeries(ctx, labels) + require.NoError(b, err) + + for j := 0; j < numSamples; j++ { + err = series.add(model.SamplePair{ + Value: model.SampleValue(float64(i)), + Timestamp: model.Time(int64(i)), + }) + require.NoError(b, err) + } + + state.fpLocker.Unlock(fp) + } + + server := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor)) + defer server.GracefulStop() + client.RegisterIngesterServer(server, ing) + + l, err := net.Listen("tcp", "localhost:0") + require.NoError(b, err) + go server.Serve(l) + + b.ResetTimer() + for iter := 0; iter < b.N; iter++ { + b.Run("QueryStream", func(b *testing.B) { + c, err := client.MakeIngesterClient(l.Addr().String(), clientCfg) + require.NoError(b, err) + defer c.Close() + + s, err := c.QueryStream(ctx, &client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: numSamples, + Matchers: []*client.LabelMatcher{{ + Type: client.EQUAL, + Name: model.MetricNameLabel, + Value: "node_cpu", + }}, + }) + require.NoError(b, err) + + count := 0 + for { + resp, err := s.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) + count += len(resp.Timeseries) + } + require.Equal(b, count, int(numSeries)) + }) + } +} diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index fad58e3464c..8d52eac1710 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -146,7 +146,7 @@ func toWireChunks(descs []*desc) ([]client.Chunk, error) { Encoding: int32(d.C.Encoding()), } - buf := bytes.NewBuffer(make([]byte, 0, encoding.ChunkLen)) + buf := bytes.NewBuffer(make([]byte, 0, d.C.Size())) if err := d.C.Marshal(buf); err != nil { return nil, err } @@ -246,7 +246,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { err = stream.Send(&client.TimeSeriesChunk{ FromIngesterId: i.lifecycler.ID, UserId: userID, - Labels: client.FromLabelsToLabelPairs(pair.series.metric), + Labels: client.FromLabelsToLabelAdapaters(pair.series.metric), Chunks: chunks, }) state.fpLocker.Unlock(pair.fp) diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index dfc9ab54e1f..3b41f65bd8b 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -129,7 +129,7 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro return state, ok, nil } -func (us *userStates) getOrCreateSeries(ctx context.Context, labels labelPairs) (*userState, model.Fingerprint, *memorySeries, error) { +func (us *userStates) getOrCreateSeries(ctx context.Context, labels []client.LabelAdapter) (*userState, model.Fingerprint, *memorySeries, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, 0, nil, fmt.Errorf("no user id") @@ -198,7 +198,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user series limit (%d) exceeded", u.limits.MaxSeriesPerUser(u.userID)) } - metricName, err := extract.MetricNameFromLabelPairs(metric) + metricName, err := extract.MetricNameFromLabelAdapters(metric) if err != nil { u.fpLocker.Unlock(fp) return fp, nil, err @@ -279,7 +279,7 @@ func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels level.Debug(log).Log("series", len(fps)) - // fps is sorted, lock them in order to prevent deadlocks + // We only hold one FP lock at once here, so no opportunity to deadlock. i := 0 outer: for ; i < len(fps); i++ { diff --git a/pkg/querier/frontend/frontend.proto b/pkg/querier/frontend/frontend.proto index 1fafc42848d..b22281a99ee 100644 --- a/pkg/querier/frontend/frontend.proto +++ b/pkg/querier/frontend/frontend.proto @@ -48,7 +48,7 @@ message QueryRangeResponse { } message SampleStream { - repeated cortex.LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "metric"]; + repeated cortex.LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "metric", (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"]; repeated cortex.Sample samples = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "values"]; } diff --git a/pkg/querier/frontend/query_range.go b/pkg/querier/frontend/query_range.go index 62bb90ed9cc..1cb40bc9c41 100644 --- a/pkg/querier/frontend/query_range.go +++ b/pkg/querier/frontend/query_range.go @@ -12,7 +12,7 @@ import ( "time" "github.com/go-kit/kit/log/level" - "github.com/json-iterator/go" + jsoniter "github.com/json-iterator/go" opentracing "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" @@ -162,7 +162,7 @@ func (s *SampleStream) UnmarshalJSON(data []byte) error { if err := json.Unmarshal(data, &stream); err != nil { return err } - s.Labels = client.ToLabelPairs(stream.Metric) + s.Labels = client.FromMetricsToLabelAdapters(stream.Metric) s.Samples = stream.Values return nil } @@ -173,7 +173,7 @@ func (s *SampleStream) MarshalJSON() ([]byte, error) { Metric model.Metric `json:"metric"` Values []client.Sample `json:"values"` }{ - Metric: client.FromLabelPairs(s.Labels), + Metric: client.FromLabelAdaptersToMetric(s.Labels), Values: s.Samples, } return json.Marshal(stream) @@ -278,7 +278,7 @@ func matrixMerge(resps []*APIResponse) []SampleStream { output := map[string]*SampleStream{} for _, resp := range resps { for _, stream := range resp.Data.Result { - metric := client.FromLabelPairsToLabels(stream.Labels).String() + metric := client.FromLabelAdaptersToLabels(stream.Labels).String() existing, ok := output[metric] if !ok { existing = &SampleStream{ diff --git a/pkg/querier/frontend/query_range_test.go b/pkg/querier/frontend/query_range_test.go index 9dcd9238552..f4228c127a0 100644 --- a/pkg/querier/frontend/query_range_test.go +++ b/pkg/querier/frontend/query_range_test.go @@ -16,7 +16,6 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/wire" ) const ( @@ -38,8 +37,8 @@ var ( ResultType: model.ValMatrix.String(), Result: []SampleStream{ { - Labels: []client.LabelPair{ - {Name: wire.Bytes("foo"), Value: wire.Bytes("bar")}, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, Samples: []client.Sample{ {Value: 137, TimestampMs: 1536673680000}, @@ -205,7 +204,7 @@ func TestMergeAPIResponses(t *testing.T) { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{}, + Labels: []client.LabelAdapter{}, Samples: []client.Sample{ {Value: 0, TimestampMs: 0}, {Value: 1, TimestampMs: 1}, @@ -219,7 +218,7 @@ func TestMergeAPIResponses(t *testing.T) { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{}, + Labels: []client.LabelAdapter{}, Samples: []client.Sample{ {Value: 2, TimestampMs: 2}, {Value: 3, TimestampMs: 3}, @@ -235,7 +234,7 @@ func TestMergeAPIResponses(t *testing.T) { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{}, + Labels: []client.LabelAdapter{}, Samples: []client.Sample{ {Value: 0, TimestampMs: 0}, {Value: 1, TimestampMs: 1}, @@ -260,7 +259,7 @@ func TestMergeAPIResponses(t *testing.T) { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{{Name: []byte("a"), Value: []byte("b")}, {Name: []byte("c"), Value: []byte("d")}}, + Labels: []client.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, Samples: []client.Sample{ {Value: 0, TimestampMs: 0}, {Value: 1, TimestampMs: 1000}, diff --git a/pkg/querier/frontend/results_cache_test.go b/pkg/querier/frontend/results_cache_test.go index d8eaa8b7ca9..4de06de2d6e 100644 --- a/pkg/querier/frontend/results_cache_test.go +++ b/pkg/querier/frontend/results_cache_test.go @@ -5,14 +5,14 @@ import ( "strconv" "testing" - "github.com/cortexproject/cortex/pkg/chunk/cache" - client "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/chunk/cache" + client "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/flagext" ) var dummyResponse = &APIResponse{ @@ -21,8 +21,8 @@ var dummyResponse = &APIResponse{ ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{ - {Name: wire.Bytes("foo"), Value: wire.Bytes("bar")}, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, Samples: []client.Sample{ { @@ -50,8 +50,8 @@ func mkAPIResponse(start, end, step int64) *APIResponse { ResultType: matrix, Result: []SampleStream{ { - Labels: []client.LabelPair{ - {Name: wire.Bytes("foo"), Value: wire.Bytes("bar")}, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, Samples: samples, }, diff --git a/pkg/querier/ingester_streaming_queryable.go b/pkg/querier/ingester_streaming_queryable.go index e0e992640c9..45d09432536 100644 --- a/pkg/querier/ingester_streaming_queryable.go +++ b/pkg/querier/ingester_streaming_queryable.go @@ -54,7 +54,7 @@ func (i ingesterQueryable) Get(ctx context.Context, from, through model.Time, ma chunks := make([]chunk.Chunk, 0, len(results)) for _, result := range results { - metric := client.FromLabelPairs(result.Labels) + metric := client.FromLabelAdaptersToMetric(result.Labels) cs, err := chunkcompat.FromChunks(userID, metric, result.Chunks) if err != nil { return nil, promql.ErrStorage{Err: err} @@ -94,7 +94,7 @@ func (q *ingesterStreamingQuerier) Select(sp *storage.SelectParams, matchers ... return nil, nil, promql.ErrStorage{Err: err} } - ls := client.FromLabelPairsToLabels(result.Labels) + ls := client.FromLabelAdaptersToLabels(result.Labels) sort.Sort(ls) series := &chunkSeries{ labels: ls, diff --git a/pkg/querier/ingester_streaming_queryable_test.go b/pkg/querier/ingester_streaming_queryable_test.go index a97e11f6aa7..0ee83a9a635 100644 --- a/pkg/querier/ingester_streaming_queryable_test.go +++ b/pkg/querier/ingester_streaming_queryable_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/weaveworks/common/user" ) @@ -16,13 +15,13 @@ func TestIngesterStreaming(t *testing.T) { d := &mockDistributor{ r: []client.TimeSeriesChunk{ { - Labels: []client.LabelPair{ - {Name: wire.Bytes("bar"), Value: wire.Bytes("baz")}, + Labels: []client.LabelAdapter{ + {Name: "bar", Value: "baz"}, }, }, { - Labels: []client.LabelPair{ - {Name: wire.Bytes("foo"), Value: wire.Bytes("bar")}, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, }, }, diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 45e4e7d7da9..ee363f3c666 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -22,7 +22,6 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/weaveworks/common/user" ) @@ -83,7 +82,7 @@ var ( query: "foo", step: sampleRate * 4, labels: labels.Labels{ - labels.Label{Name: "__name__", Value: "foo"}, + labels.Label{Name: model.MetricNameLabel, Value: "foo"}, }, samples: func(from, through time.Time, step time.Duration) int { return int(through.Sub(from)/step) + 1 @@ -111,7 +110,7 @@ var ( query: "foo", step: sampleRate * 4 * 10, labels: labels.Labels{ - labels.Label{Name: "__name__", Value: "foo"}, + labels.Label{Name: model.MetricNameLabel, Value: "foo"}, }, samples: func(from, through time.Time, step time.Duration) int { return int(through.Sub(from)/step) + 1 @@ -236,7 +235,7 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc require.NoError(t, err) tsc := client.TimeSeriesChunk{ - Labels: []client.LabelPair{{Name: wire.Bytes(model.MetricNameLabel), Value: wire.Bytes("foo")}}, + Labels: []client.LabelAdapter{{Name: model.MetricNameLabel, Value: "foo"}}, Chunks: chunks, } matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through) diff --git a/pkg/querier/remote_read_test.go b/pkg/querier/remote_read_test.go index 65dd63d9b73..b96a90fa40f 100644 --- a/pkg/querier/remote_read_test.go +++ b/pkg/querier/remote_read_test.go @@ -10,7 +10,6 @@ import ( "testing" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/common/model" @@ -65,11 +64,8 @@ func TestRemoteReadHandler(t *testing.T) { { Timeseries: []client.TimeSeries{ { - Labels: []client.LabelPair{ - { - Name: wire.Bytes([]byte("foo")), - Value: wire.Bytes([]byte("bar")), - }, + Labels: []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, }, Samples: []client.Sample{ {Value: 0, TimestampMs: 0}, diff --git a/pkg/util/chunkcompat/compat.go b/pkg/util/chunkcompat/compat.go index 446f7a4544d..038a1ca2c40 100644 --- a/pkg/util/chunkcompat/compat.go +++ b/pkg/util/chunkcompat/compat.go @@ -33,7 +33,7 @@ func SeriesChunksToMatrix(from, through model.Time, serieses []client.TimeSeries result := model.Matrix{} for _, series := range serieses { - metric := client.FromLabelPairs(series.Labels) + metric := client.FromLabelAdaptersToMetric(series.Labels) chunks, err := FromChunks("", metric, series.Chunks) if err != nil { return nil, err diff --git a/pkg/util/extract/extract.go b/pkg/util/extract/extract.go index e170382f183..8de926d83e2 100644 --- a/pkg/util/extract/extract.go +++ b/pkg/util/extract/extract.go @@ -8,16 +8,14 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) -var labelNameBytes = []byte(model.MetricNameLabel) - -// MetricNameFromLabelPairs extracts the metric name from a list of LabelPairs. -func MetricNameFromLabelPairs(labels []client.LabelPair) ([]byte, error) { +// MetricNameFromLabelAdapters extracts the metric name from a list of LabelPairs. +func MetricNameFromLabelAdapters(labels []client.LabelAdapter) (string, error) { for _, label := range labels { - if label.Name.Equal(labelNameBytes) { + if label.Name == model.MetricNameLabel { return label.Value, nil } } - return nil, fmt.Errorf("No metric name label") + return "", fmt.Errorf("No metric name label") } // MetricNameFromMetric extract the metric name from a model.Metric diff --git a/pkg/util/hash_fp.go b/pkg/util/hash_fp.go index ba0a03801e2..209b8b45c06 100644 --- a/pkg/util/hash_fp.go +++ b/pkg/util/hash_fp.go @@ -9,6 +9,6 @@ import "github.com/prometheus/common/model" // function we use is prone to only change a few bits for similar metrics. We // really want to make use of every change in the fingerprint to vary mutex // selection.) -func HashFP(fp model.Fingerprint) uint { - return uint(fp ^ (fp >> 32) ^ (fp >> 16)) +func HashFP(fp model.Fingerprint) uint32 { + return uint32(fp ^ (fp >> 32) ^ (fp >> 16)) } diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 750ceb25b91..11044eae562 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -48,7 +48,7 @@ func init() { } // ValidateSample returns an err if the sample is invalid. -func (cfg *Overrides) ValidateSample(userID string, metricName []byte, s client.Sample) error { +func (cfg *Overrides) ValidateSample(userID string, metricName string, s client.Sample) error { if cfg.RejectOldSamples(userID) && model.Time(s.TimestampMs) < model.Now().Add(-cfg.RejectOldSamplesMaxAge(userID)) { DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc() return httpgrpc.Errorf(http.StatusBadRequest, errTooOld, metricName, model.Time(s.TimestampMs)) @@ -63,8 +63,8 @@ func (cfg *Overrides) ValidateSample(userID string, metricName []byte, s client. } // ValidateLabels returns an err if the labels are invalid. -func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error { - metricName, err := extract.MetricNameFromLabelPairs(ls) +func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelAdapter) error { + metricName, err := extract.MetricNameFromLabelAdapters(ls) if cfg.EnforceMetricName(userID) { if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, errMissingMetricName) @@ -78,7 +78,7 @@ func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error numLabelNames := len(ls) if numLabelNames > cfg.MaxLabelNamesPerSeries(userID) { DiscardedSamples.WithLabelValues(maxLabelNamesPerSeries, userID).Inc() - return httpgrpc.Errorf(http.StatusBadRequest, errTooManyLabels, client.FromLabelPairs(ls).String(), numLabelNames, cfg.MaxLabelNamesPerSeries(userID)) + return httpgrpc.Errorf(http.StatusBadRequest, errTooManyLabels, client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, cfg.MaxLabelNamesPerSeries(userID)) } maxLabelNameLength := cfg.MaxLabelNameLength(userID) @@ -102,7 +102,7 @@ func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error } if errTemplate != "" { DiscardedSamples.WithLabelValues(reason, userID).Inc() - return httpgrpc.Errorf(http.StatusBadRequest, errTemplate, cause, client.FromLabelPairs(ls).String()) + return httpgrpc.Errorf(http.StatusBadRequest, errTemplate, cause, client.FromLabelAdaptersToMetric(ls).String()) } } return nil diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index cd481c0a03c..685ea7df7d1 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -57,7 +57,7 @@ func TestValidateLabels(t *testing.T) { }, } { - err := overrides.ValidateLabels(userID, client.ToLabelPairs(c.metric)) + err := overrides.ValidateLabels(userID, client.FromMetricsToLabelAdapters(c.metric)) assert.Equal(t, c.err, err, "wrong error") } }