diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index db4fb1a504a..47fe3d17f50 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -382,7 +382,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ // that would involve locking all the series & sorting, so until we have // a better solution in the ingesters I'd rather take the hit in the queriers. err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { - numSeries++ chunks := make([]*desc, 0, len(series.chunkDescs)) for _, chunk := range series.chunkDescs { if !(chunk.FirstTime.After(through) || chunk.LastTime.Before(from)) { @@ -390,6 +389,11 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ } } + if len(chunks) == 0 { + return nil + } + + numSeries++ wireChunks, err := toWireChunks(chunks) if err != nil { return err diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 0fd867c9b69..6e5f107c256 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -123,11 +123,15 @@ func matrixToSamples(m model.Matrix) []model.Sample { } func runTestQuery(ctx context.Context, t *testing.T, ing *Ingester, ty labels.MatchType, n, v string) (model.Matrix, *client.QueryRequest, error) { + return runTestQueryTimes(ctx, t, ing, ty, n, v, model.Earliest, model.Latest) +} + +func runTestQueryTimes(ctx context.Context, t *testing.T, ing *Ingester, ty labels.MatchType, n, v string, start, end model.Time) (model.Matrix, *client.QueryRequest, error) { matcher, err := labels.NewMatcher(ty, n, v) if err != nil { return nil, nil, err } - req, err := client.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) + req, err := client.ToQueryRequest(start, end, []*labels.Matcher{matcher}) if err != nil { return nil, nil, err } @@ -187,6 +191,31 @@ func TestIngesterAppend(t *testing.T) { store.checkData(t, userIDs, testData) } +func TestIngesterSendsOnlySeriesWithData(t *testing.T) { + _, ing := newDefaultTestStore(t) + + userIDs, _ := pushTestSamples(t, ing, 10, 1000) + + // Read samples back via ingester queries. + for _, userID := range userIDs { + ctx := user.InjectOrgID(context.Background(), userID) + _, req, err := runTestQueryTimes(ctx, t, ing, labels.MatchRegexp, model.JobLabel, ".+", model.Latest.Add(-15*time.Second), model.Latest) + require.NoError(t, err) + + s := stream{ + ctx: ctx, + } + err = ing.QueryStream(req, &s) + require.NoError(t, err) + + // Nothing should be selected. + require.Equal(t, 0, len(s.responses)) + } + + // Read samples back via chunk store. + ing.Shutdown() +} + func TestIngesterIdleFlush(t *testing.T) { // Create test ingester with short flush cycle cfg := defaultIngesterTestConfig() diff --git a/pkg/querier/ingester_streaming_queryable.go b/pkg/querier/ingester_streaming_queryable.go index 45d09432536..96a518f6077 100644 --- a/pkg/querier/ingester_streaming_queryable.go +++ b/pkg/querier/ingester_streaming_queryable.go @@ -54,6 +54,11 @@ func (i ingesterQueryable) Get(ctx context.Context, from, through model.Time, ma chunks := make([]chunk.Chunk, 0, len(results)) for _, result := range results { + // Sometimes the ingester can send series that have no data. + if len(result.Chunks) == 0 { + continue + } + metric := client.FromLabelAdaptersToMetric(result.Labels) cs, err := chunkcompat.FromChunks(userID, metric, result.Chunks) if err != nil { @@ -89,6 +94,11 @@ func (q *ingesterStreamingQuerier) Select(sp *storage.SelectParams, matchers ... serieses := make([]storage.Series, 0, len(results)) for _, result := range results { + // Sometimes the ingester can send series that have no data. + if len(result.Chunks) == 0 { + continue + } + chunks, err := chunkcompat.FromChunks(userID, nil, result.Chunks) if err != nil { return nil, nil, promql.ErrStorage{Err: err} diff --git a/pkg/querier/ingester_streaming_queryable_test.go b/pkg/querier/ingester_streaming_queryable_test.go index 0ee83a9a635..9deab45f781 100644 --- a/pkg/querier/ingester_streaming_queryable_test.go +++ b/pkg/querier/ingester_streaming_queryable_test.go @@ -4,25 +4,41 @@ import ( "context" "testing" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/require" + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/weaveworks/common/user" ) func TestIngesterStreaming(t *testing.T) { + // We need to make sure that there is atleast one chunk present, + // else no series will be selected. + promChunk, err := encoding.NewForEncoding(encoding.Bigchunk) + require.NoError(t, err) + + clientChunks, err := chunkcompat.ToChunks([]chunk.Chunk{ + chunk.NewChunk("", 0, nil, promChunk, model.Earliest, model.Earliest), + }) + require.NoError(t, err) + d := &mockDistributor{ r: []client.TimeSeriesChunk{ { Labels: []client.LabelAdapter{ {Name: "bar", Value: "baz"}, }, + Chunks: clientChunks, }, { Labels: []client.LabelAdapter{ {Name: "foo", Value: "bar"}, }, + Chunks: clientChunks, }, }, }