Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b

// merge small IO
size_t merge_start = offset + has_read;
const size_t merge_end = merge_start + READ_SLICE_SIZE;
const size_t merge_end = merge_start + _merged_read_slice_size;
// <slice_size, is_content>
std::vector<std::pair<size_t, bool>> merged_slice;
size_t content_size = 0;
Expand Down Expand Up @@ -315,7 +315,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t off
Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, size_t to_read,
size_t* bytes_read, const IOContext* io_ctx) {
if (!_read_slice) {
_read_slice = std::make_unique<OwnedSlice>(READ_SLICE_SIZE);
_read_slice = std::make_unique<OwnedSlice>(_merged_read_slice_size);
}

*bytes_read = 0;
Expand Down
11 changes: 10 additions & 1 deletion be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ class MergeRangeFileReader : public io::FileReader {
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128

MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
const std::vector<PrefetchRange>& random_access_ranges)
const std::vector<PrefetchRange>& random_access_ranges,
int64_t merge_read_slice_size = READ_SLICE_SIZE)
: _profile(profile),
_reader(std::move(reader)),
_random_access_ranges(random_access_ranges) {
Expand All @@ -291,6 +292,13 @@ class MergeRangeFileReader : public io::FileReader {
// 1MB for oss, 8KB for hdfs
_equivalent_io_size =
_is_oss ? config::merged_oss_min_io_size : config::merged_hdfs_min_io_size;

_merged_read_slice_size = merge_read_slice_size;

if (_merged_read_slice_size < 0) {
_merged_read_slice_size = READ_SLICE_SIZE;
}

for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
Expand Down Expand Up @@ -391,6 +399,7 @@ class MergeRangeFileReader : public io::FileReader {
bool _is_oss;
double _max_amplified_ratio;
size_t _equivalent_io_size;
int64_t _merged_read_slice_size;

Statistics _statistics;
};
Expand Down
13 changes: 9 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -866,12 +866,17 @@ Status ParquetReader::_next_row_group_reader() {
size_t avg_io_size = 0;
const std::vector<io::PrefetchRange> io_ranges =
_generate_random_access_ranges(row_group_index, &avg_io_size);
int64_t merged_read_slice_size = -1;
if (_state != nullptr && _state->query_options().__isset.merge_read_slice_size) {
merged_read_slice_size = _state->query_options().merge_read_slice_size;
}
// The underlying page reader will prefetch data in column.
// Using both MergeRangeFileReader and BufferedStreamReader simultaneously would waste a lot of memory.
group_file_reader = avg_io_size < io::MergeRangeFileReader::SMALL_IO
? std::make_shared<io::MergeRangeFileReader>(
_profile, _file_reader, io_ranges)
: _file_reader;
group_file_reader =
avg_io_size < io::MergeRangeFileReader::SMALL_IO
? std::make_shared<io::MergeRangeFileReader>(
_profile, _file_reader, io_ranges, merged_read_slice_size)
: _file_reader;
}
_current_group_reader.reset(
new RowGroupReader(_io_ctx ? std::make_shared<io::TracingFileReader>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,8 @@ public static double getHotValueThreshold() {
public static final String MULTI_DISTINCT_STRATEGY = "multi_distinct_strategy";
public static final String AGG_PHASE = "agg_phase";

public static final String MERGE_IO_READ_SLICE_SIZE = "merge_io_read_slice_size";

public static final String ENABLE_PREFER_CACHED_ROWSET = "enable_prefer_cached_rowset";
public static final String QUERY_FRESHNESS_TOLERANCE_MS = "query_freshness_tolerance_ms";

Expand Down Expand Up @@ -2563,6 +2565,11 @@ public void setSkewRewriteAggBucketNum(int num) {
checker = "checkAggPhase")
public int aggPhase = 0;


@VariableMgr.VarAttr(name = MERGE_IO_READ_SLICE_SIZE, description = {"调整 READ_SLICE_SIZE 大小,降低 Merge IO 读放大影响",
"Make the READ_SLICE_SIZE variable configurable to reduce the impact caused by read amplification."})
public int mergeReadSliceSize = 8388608;

public void setAggPhase(int phase) {
aggPhase = phase;
}
Expand Down Expand Up @@ -4680,7 +4687,7 @@ public TQueryOptions toThrift() {
tResult.setHnswEfSearch(hnswEFSearch);
tResult.setHnswCheckRelativeDistance(hnswCheckRelativeDistance);
tResult.setHnswBoundedQueue(hnswBoundedQueue);

tResult.setMergeReadSliceSize(mergeReadSliceSize);
return tResult;
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ struct TQueryOptions {

172: optional bool enable_prefer_cached_rowset
173: optional i64 query_freshness_tolerance_ms
174: optional i64 merge_read_slice_size = 8388608;

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
Expand Down
Loading