Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")))
}

_, err = convert.ConvertTSDBBlock(
numShards, err := convert.ConvertTSDBBlock(
ctx,
uBucket,
tsdbBlock.MinTime(),
Expand All @@ -459,9 +459,9 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
}
duration := time.Since(start)
c.metrics.convertBlockDuration.WithLabelValues(userID).Set(duration.Seconds())
level.Info(logger).Log("msg", "successfully converted block", "block", b.ULID.String(), "duration", duration)
level.Info(logger).Log("msg", "successfully converted block", "block", b.ULID.String(), "duration", duration, "shards", numShards)

if err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket); err != nil {
if err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket, numShards); err != nil {
level.Error(logger).Log("msg", "failed to write parquet converter marker", "block", b.ULID.String(), "err", err)
if c.checkConvertError(userID, err) {
return err
Expand Down
6 changes: 5 additions & 1 deletion pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func TestConverter(t *testing.T) {
require.NoError(t, err)
if m.Version == parquet.CurrentVersion {
blocksConverted = append(blocksConverted, bIds)
// Verify that shards field is populated (should be > 0)
require.Greater(t, m.Shards, 0, "expected shards to be greater than 0 for block %s", bIds.String())
}
}
return len(blocksConverted)
Expand Down Expand Up @@ -455,17 +457,19 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) {
// Write a converter mark with version 1 to simulate an already converted block
markerV1 := parquet.ConverterMark{
Version: parquet.ParquetConverterMarkVersion1,
Shards: 2, // Simulate a block with 2 shards
}
markerBytes, err := json.Marshal(markerV1)
require.NoError(t, err)
markerPath := path.Join(blockID.String(), parquet.ConverterMarkerFileName)
err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes))
require.NoError(t, err)

// Verify the marker exists with version 1
// Verify the marker exists with version 1 and has shards
marker, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger)
require.NoError(t, err)
require.Equal(t, parquet.ParquetConverterMarkVersion1, marker.Version)
require.Equal(t, 2, marker.Shards)

// Start the converter
err = services.StartAndAwaitRunning(context.Background(), c)
Expand Down
80 changes: 52 additions & 28 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,41 +216,65 @@ func NewParquetQueryable(
}
userBkt := bucket.NewUserBucketClient(userID, bucketClient, limits)
bucketOpener := parquet_storage.NewParquetBucketOpener(userBkt)
shards := make([]parquet_storage.ParquetShard, len(blocks))

// Calculate total number of shards across all blocks
totalShards := 0
for _, block := range blocks {
numShards := 1 // Default to 1 shard for backward compatibility
if block.Parquet != nil && block.Parquet.Shards > 0 {
numShards = block.Parquet.Shards
}
totalShards += numShards
}

shards := make([]parquet_storage.ParquetShard, totalShards)
errGroup := &errgroup.Group{}

span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.OpenShards")
defer span.Finish()

for i, block := range blocks {
errGroup.Go(func() error {
cacheKey := fmt.Sprintf("%v-%v", userID, block.ID)
shard := cache.Get(cacheKey)
if shard == nil {
// we always only have 1 shard - shard 0
// Use context.Background() here as the file can be cached and live after the request ends.
shard, err = parquet_storage.NewParquetShardOpener(
context.WithoutCancel(ctx),
block.ID.String(),
bucketOpener,
bucketOpener,
0,
parquet_storage.WithFileOptions(
parquet.SkipMagicBytes(true),
parquet.ReadBufferSize(100*1024),
parquet.SkipBloomFilters(true),
parquet.OptimisticRead(true),
),
)
if err != nil {
return errors.Wrapf(err, "failed to open parquet shard. block: %v", block.ID.String())
shardIdx := 0
for _, block := range blocks {
numShards := 1 // Default to 1 shard for backward compatibility
if block.Parquet != nil && block.Parquet.Shards > 0 {
numShards = block.Parquet.Shards
}

for shardID := 0; shardID < numShards; shardID++ {
idx := shardIdx
shardIdx++
blockID := block.ID
currentShardID := shardID

errGroup.Go(func() error {
cacheKey := fmt.Sprintf("%v-%v-%v", userID, blockID, currentShardID)
shard := cache.Get(cacheKey)
if shard == nil {
// Use context.Background() here as the file can be cached and live after the request ends.
var err error
shard, err = parquet_storage.NewParquetShardOpener(
context.WithoutCancel(ctx),
blockID.String(),
bucketOpener,
bucketOpener,
currentShardID,
parquet_storage.WithFileOptions(
parquet.SkipMagicBytes(true),
parquet.ReadBufferSize(100*1024),
parquet.SkipBloomFilters(true),
parquet.OptimisticRead(true),
),
)
if err != nil {
return errors.Wrapf(err, "failed to open parquet shard. block: %v, shard: %v", blockID.String(), currentShardID)
}
cache.Set(cacheKey, shard)
}
cache.Set(cacheKey, shard)
}

shards[i] = shard
return nil
})
shards[idx] = shard
return nil
})
}
}

return shards, errGroup.Wait()
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/parquet_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient o
convert.WithName(blockID.String()),
}

_, err = convert.ConvertTSDBBlock(
numShards, err := convert.ConvertTSDBBlock(
ctx,
userBucketClient,
tsdbBlock.MinTime(),
Expand All @@ -583,7 +583,7 @@ func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient o
_ = tsdbBlock.Close()

// Write parquet converter marker
err = parquet.WriteConverterMark(ctx, blockID, userBucketClient)
err = parquet.WriteConverterMark(ctx, blockID, userBucketClient, numShards)
require.NoError(t, err)

return nil
Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/parquet/converter_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (

type ConverterMark struct {
Version int `json:"version"`
// Shards is the number of parquet shards created for this block.
// This field is optional for backward compatibility.
Shards int `json:"shards,omitempty"`
}

func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*ConverterMark, error) {
Expand All @@ -53,9 +56,10 @@ func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Instr
return &marker, err
}

func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Bucket) error {
func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Bucket, shards int) error {
marker := ConverterMark{
Version: CurrentVersion,
Shards: shards,
}
markerPath := path.Join(id.String(), ConverterMarkerFileName)
b, err := json.Marshal(marker)
Expand All @@ -68,6 +72,9 @@ func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Buck
// ConverterMarkMeta is used in Bucket Index. It might not be the same as ConverterMark.
type ConverterMarkMeta struct {
Version int `json:"version"`
// Shards is the number of parquet shards created for this block.
// This field is optional for backward compatibility.
Shards int `json:"shards,omitempty"`
}

func ValidConverterMarkVersion(version int) bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/tsdb/bucketindex/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func (w *Updater) updateParquetBlockIndexEntry(ctx context.Context, id ulid.ULID

block.Parquet = &parquet.ConverterMarkMeta{
Version: marker.Version,
Shards: marker.Shards,
}
return nil
}
Expand Down
34 changes: 25 additions & 9 deletions pkg/storage/tsdb/bucketindex/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,16 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20)
block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30)
block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2)
// Add parquet marker to block 1.
block1ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block1)
// Add parquet marker to block 1 with 3 shards.
block1ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block1, 3)

w := NewUpdater(bkt, userID, nil, logger).EnableParquet()
returnedIdx, _, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
[]tsdb.BlockMeta{block1, block2},
[]*metadata.DeletionMark{block2Mark}, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1ParquetMark.Version},
block1.ULID.String(): {Version: block1ParquetMark.Version, Shards: block1ParquetMark.Shards},
})

// Create new blocks, and update the index.
Expand All @@ -339,7 +339,7 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
[]tsdb.BlockMeta{block1, block2, block3, block4},
[]*metadata.DeletionMark{block2Mark, block4Mark},
map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1ParquetMark.Version},
block1.ULID.String(): {Version: block1ParquetMark.Version, Shards: block1ParquetMark.Shards},
})

// Hard delete a block and update the index.
Expand All @@ -350,18 +350,18 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
[]tsdb.BlockMeta{block1, block3, block4},
[]*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1ParquetMark.Version},
block1.ULID.String(): {Version: block1ParquetMark.Version, Shards: block1ParquetMark.Shards},
})

// Upload parquet marker to an old block and update index
block3ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block3)
// Upload parquet marker to an old block and update index with 5 shards
block3ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block3, 5)
returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
[]tsdb.BlockMeta{block1, block3, block4},
[]*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1ParquetMark.Version},
block3.ULID.String(): {Version: block3ParquetMark.Version},
block1.ULID.String(): {Version: block1ParquetMark.Version, Shards: block1ParquetMark.Shards},
block3.ULID.String(): {Version: block3ParquetMark.Version, Shards: block3ParquetMark.Shards},
})
}

Expand Down Expand Up @@ -392,6 +392,22 @@ func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) {
expectParquet: true,
expectParquetMeta: &parquet.ConverterMarkMeta{Version: 1},
},
{
name: "should successfully read parquet marker with shards",
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
parquetMark := parquet.ConverterMarkMeta{
Version: 2,
Shards: 4,
}
data, err := json.Marshal(parquetMark)
require.NoError(t, err)
require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), parquet.ConverterMarkerFileName), bytes.NewReader(data)))
return bkt
},
expectedError: nil,
expectParquet: true,
expectParquetMeta: &parquet.ConverterMarkMeta{Version: 2, Shards: 4},
},
{
name: "should handle missing parquet marker",
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/parquet_bucket_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher
bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket)
noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx))
for _, blockID := range blockIDs {
block, err := p.newParquetBlock(ctx, blockID, bucketOpener, bucketOpener, p.chunksDecoder, noopQuota, noopQuota, noopQuota)
// TODO: support shard ID > 0 later.
block, err := p.newParquetBlock(ctx, blockID, 0, bucketOpener, bucketOpener, p.chunksDecoder, noopQuota, noopQuota, noopQuota)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/parquet_bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,13 @@ type parquetBlock struct {
concurrency int
}

func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) {
func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, shardID int, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) {
userID, err := users.TenantID(ctx)
if err != nil {
return nil, err
}

cacheKey := fmt.Sprintf("%v-%v", userID, name)
cacheKey := fmt.Sprintf("%v-%v-%v", userID, name, shardID)
shard := p.parquetShardCache.Get(cacheKey)

if shard == nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/testutil/block_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ func MockStorageNonCompactionMark(t testing.TB, bucket objstore.Bucket, userID s
return &mark
}

func MockStorageParquetConverterMark(t testing.TB, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *parquet.ConverterMark {
func MockStorageParquetConverterMark(t testing.TB, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta, shards int) *parquet.ConverterMark {
mark := parquet.ConverterMark{
Version: 1,
Version: parquet.CurrentVersion,
Shards: shards,
}

markContent, err := json.Marshal(mark)
Expand Down
Loading