diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index d0ce910ca14..ef8c5251b70 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -438,7 +438,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*"))) } - _, err = convert.ConvertTSDBBlock( + numShards, err := convert.ConvertTSDBBlock( ctx, uBucket, tsdbBlock.MinTime(), @@ -459,9 +459,9 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin } duration := time.Since(start) c.metrics.convertBlockDuration.WithLabelValues(userID).Set(duration.Seconds()) - level.Info(logger).Log("msg", "successfully converted block", "block", b.ULID.String(), "duration", duration) + level.Info(logger).Log("msg", "successfully converted block", "block", b.ULID.String(), "duration", duration, "shards", numShards) - if err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket); err != nil { + if err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket, numShards); err != nil { level.Error(logger).Log("msg", "failed to write parquet converter marker", "block", b.ULID.String(), "err", err) if c.checkConvertError(userID, err) { return err diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 03a96a68cf2..bdcf46b3d36 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -110,6 +110,8 @@ func TestConverter(t *testing.T) { require.NoError(t, err) if m.Version == parquet.CurrentVersion { blocksConverted = append(blocksConverted, bIds) + // Verify that shards field is populated (should be > 0) + require.Greater(t, m.Shards, 0, "expected shards to be greater than 0 for block %s", bIds.String()) } } return len(blocksConverted) @@ -455,6 +457,7 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) { // Write a converter mark with version 1 to simulate an already converted block markerV1 := parquet.ConverterMark{ Version: parquet.ParquetConverterMarkVersion1, + Shards: 2, // Simulate a block with 2 shards } markerBytes, err := json.Marshal(markerV1) require.NoError(t, err) @@ -462,10 +465,11 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) { err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes)) require.NoError(t, err) - // Verify the marker exists with version 1 + // Verify the marker exists with version 1 and has shards marker, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger) require.NoError(t, err) require.Equal(t, parquet.ParquetConverterMarkVersion1, marker.Version) + require.Equal(t, 2, marker.Shards) // Start the converter err = services.StartAndAwaitRunning(context.Background(), c) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 0e5b16739c7..214805dfb2e 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -216,41 +216,65 @@ func NewParquetQueryable( } userBkt := bucket.NewUserBucketClient(userID, bucketClient, limits) bucketOpener := parquet_storage.NewParquetBucketOpener(userBkt) - shards := make([]parquet_storage.ParquetShard, len(blocks)) + + // Calculate total number of shards across all blocks + totalShards := 0 + for _, block := range blocks { + numShards := 1 // Default to 1 shard for backward compatibility + if block.Parquet != nil && block.Parquet.Shards > 0 { + numShards = block.Parquet.Shards + } + totalShards += numShards + } + + shards := make([]parquet_storage.ParquetShard, totalShards) errGroup := &errgroup.Group{} span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.OpenShards") defer span.Finish() - for i, block := range blocks { - errGroup.Go(func() error { - cacheKey := fmt.Sprintf("%v-%v", userID, block.ID) - shard := cache.Get(cacheKey) - if shard == nil { - // we always only have 1 shard - shard 0 - // Use context.Background() here as the file can be cached and live after the request ends. - shard, err = parquet_storage.NewParquetShardOpener( - context.WithoutCancel(ctx), - block.ID.String(), - bucketOpener, - bucketOpener, - 0, - parquet_storage.WithFileOptions( - parquet.SkipMagicBytes(true), - parquet.ReadBufferSize(100*1024), - parquet.SkipBloomFilters(true), - parquet.OptimisticRead(true), - ), - ) - if err != nil { - return errors.Wrapf(err, "failed to open parquet shard. block: %v", block.ID.String()) + shardIdx := 0 + for _, block := range blocks { + numShards := 1 // Default to 1 shard for backward compatibility + if block.Parquet != nil && block.Parquet.Shards > 0 { + numShards = block.Parquet.Shards + } + + for shardID := 0; shardID < numShards; shardID++ { + idx := shardIdx + shardIdx++ + blockID := block.ID + currentShardID := shardID + + errGroup.Go(func() error { + cacheKey := fmt.Sprintf("%v-%v-%v", userID, blockID, currentShardID) + shard := cache.Get(cacheKey) + if shard == nil { + // Use context.Background() here as the file can be cached and live after the request ends. + var err error + shard, err = parquet_storage.NewParquetShardOpener( + context.WithoutCancel(ctx), + blockID.String(), + bucketOpener, + bucketOpener, + currentShardID, + parquet_storage.WithFileOptions( + parquet.SkipMagicBytes(true), + parquet.ReadBufferSize(100*1024), + parquet.SkipBloomFilters(true), + parquet.OptimisticRead(true), + ), + ) + if err != nil { + return errors.Wrapf(err, "failed to open parquet shard. block: %v, shard: %v", blockID.String(), currentShardID) + } + cache.Set(cacheKey, shard) } - cache.Set(cacheKey, shard) - } - shards[i] = shard - return nil - }) + shards[idx] = shard + return nil + }) + } } return shards, errGroup.Wait() diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index bc10d7a31fd..f3d6897b9d9 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -569,7 +569,7 @@ func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient o convert.WithName(blockID.String()), } - _, err = convert.ConvertTSDBBlock( + numShards, err := convert.ConvertTSDBBlock( ctx, userBucketClient, tsdbBlock.MinTime(), @@ -583,7 +583,7 @@ func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient o _ = tsdbBlock.Close() // Write parquet converter marker - err = parquet.WriteConverterMark(ctx, blockID, userBucketClient) + err = parquet.WriteConverterMark(ctx, blockID, userBucketClient, numShards) require.NoError(t, err) return nil diff --git a/pkg/storage/parquet/converter_marker.go b/pkg/storage/parquet/converter_marker.go index 4aeb41ebdfa..55d85c45b3c 100644 --- a/pkg/storage/parquet/converter_marker.go +++ b/pkg/storage/parquet/converter_marker.go @@ -29,6 +29,9 @@ const ( type ConverterMark struct { Version int `json:"version"` + // Shards is the number of parquet shards created for this block. + // This field is optional for backward compatibility. + Shards int `json:"shards,omitempty"` } func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*ConverterMark, error) { @@ -53,9 +56,10 @@ func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Instr return &marker, err } -func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Bucket) error { +func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Bucket, shards int) error { marker := ConverterMark{ Version: CurrentVersion, + Shards: shards, } markerPath := path.Join(id.String(), ConverterMarkerFileName) b, err := json.Marshal(marker) @@ -68,6 +72,9 @@ func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Buck // ConverterMarkMeta is used in Bucket Index. It might not be the same as ConverterMark. type ConverterMarkMeta struct { Version int `json:"version"` + // Shards is the number of parquet shards created for this block. + // This field is optional for backward compatibility. + Shards int `json:"shards,omitempty"` } func ValidConverterMarkVersion(version int) bool { diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index ccc2ade9beb..505dca0be12 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -205,6 +205,7 @@ func (w *Updater) updateParquetBlockIndexEntry(ctx context.Context, id ulid.ULID block.Parquet = &parquet.ConverterMarkMeta{ Version: marker.Version, + Shards: marker.Shards, } return nil } diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index 25988d6dc3a..dc38b2f77c7 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -316,8 +316,8 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) { block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) - // Add parquet marker to block 1. - block1ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block1) + // Add parquet marker to block 1 with 3 shards. + block1ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block1, 3) w := NewUpdater(bkt, userID, nil, logger).EnableParquet() returnedIdx, _, _, err := w.UpdateIndex(ctx, nil) @@ -325,7 +325,7 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) { assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block2}, []*metadata.DeletionMark{block2Mark}, map[string]*parquet.ConverterMarkMeta{ - block1.ULID.String(): {Version: block1ParquetMark.Version}, + block1.ULID.String(): {Version: block1ParquetMark.Version, Shards: block1ParquetMark.Shards}, }) // Create new blocks, and update the index. @@ -339,7 +339,7 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) { []tsdb.BlockMeta{block1, block2, block3, block4}, []*metadata.DeletionMark{block2Mark, block4Mark}, map[string]*parquet.ConverterMarkMeta{ - block1.ULID.String(): {Version: block1ParquetMark.Version}, + block1.ULID.String(): {Version: block1ParquetMark.Version, Shards: block1ParquetMark.Shards}, }) // Hard delete a block and update the index. @@ -350,18 +350,18 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) { assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block3, block4}, []*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{ - block1.ULID.String(): {Version: block1ParquetMark.Version}, + block1.ULID.String(): {Version: block1ParquetMark.Version, Shards: block1ParquetMark.Shards}, }) - // Upload parquet marker to an old block and update index - block3ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block3) + // Upload parquet marker to an old block and update index with 5 shards + block3ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block3, 5) returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx) require.NoError(t, err) assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block3, block4}, []*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{ - block1.ULID.String(): {Version: block1ParquetMark.Version}, - block3.ULID.String(): {Version: block3ParquetMark.Version}, + block1.ULID.String(): {Version: block1ParquetMark.Version, Shards: block1ParquetMark.Shards}, + block3.ULID.String(): {Version: block3ParquetMark.Version, Shards: block3ParquetMark.Shards}, }) } @@ -392,6 +392,22 @@ func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) { expectParquet: true, expectParquetMeta: &parquet.ConverterMarkMeta{Version: 1}, }, + { + name: "should successfully read parquet marker with shards", + setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket { + parquetMark := parquet.ConverterMarkMeta{ + Version: 2, + Shards: 4, + } + data, err := json.Marshal(parquetMark) + require.NoError(t, err) + require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), parquet.ConverterMarkerFileName), bytes.NewReader(data))) + return bkt + }, + expectedError: nil, + expectParquet: true, + expectParquetMeta: &parquet.ConverterMarkMeta{Version: 2, Shards: 4}, + }, { name: "should handle missing parquet marker", setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket { diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index c8fdd59a323..510b74b781c 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -65,7 +65,8 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket) noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx)) for _, blockID := range blockIDs { - block, err := p.newParquetBlock(ctx, blockID, bucketOpener, bucketOpener, p.chunksDecoder, noopQuota, noopQuota, noopQuota) + // TODO: support shard ID > 0 later. + block, err := p.newParquetBlock(ctx, blockID, 0, bucketOpener, bucketOpener, p.chunksDecoder, noopQuota, noopQuota, noopQuota) if err != nil { return nil, err } diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index 6fa94a81092..b51bf758ae5 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -274,13 +274,13 @@ type parquetBlock struct { concurrency int } -func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) { +func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, shardID int, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) { userID, err := users.TenantID(ctx) if err != nil { return nil, err } - cacheKey := fmt.Sprintf("%v-%v", userID, name) + cacheKey := fmt.Sprintf("%v-%v-%v", userID, name, shardID) shard := p.parquetShardCache.Get(cacheKey) if shard == nil { diff --git a/pkg/util/testutil/block_mock.go b/pkg/util/testutil/block_mock.go index e3a96002169..9aa16485cf1 100644 --- a/pkg/util/testutil/block_mock.go +++ b/pkg/util/testutil/block_mock.go @@ -89,9 +89,10 @@ func MockStorageNonCompactionMark(t testing.TB, bucket objstore.Bucket, userID s return &mark } -func MockStorageParquetConverterMark(t testing.TB, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *parquet.ConverterMark { +func MockStorageParquetConverterMark(t testing.TB, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta, shards int) *parquet.ConverterMark { mark := parquet.ConverterMark{ - Version: 1, + Version: parquet.CurrentVersion, + Shards: shards, } markContent, err := json.Marshal(mark)