diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index a22b08c06599a5..cae08d284179c4 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -106,7 +106,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b // merge small IO size_t merge_start = offset + has_read; - const size_t merge_end = merge_start + READ_SLICE_SIZE; + const size_t merge_end = merge_start + _merged_read_slice_size; // std::vector> merged_slice; size_t content_size = 0; @@ -315,7 +315,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t off Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, size_t to_read, size_t* bytes_read, const IOContext* io_ctx) { if (!_read_slice) { - _read_slice = std::make_unique(READ_SLICE_SIZE); + _read_slice = std::make_unique(_merged_read_slice_size); } *bytes_read = 0; diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 394c7eb9ad4836..6bcf634aef35ea 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -278,7 +278,8 @@ class MergeRangeFileReader : public io::FileReader { static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128 MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader, - const std::vector& random_access_ranges) + const std::vector& random_access_ranges, + int64_t merge_read_slice_size = READ_SLICE_SIZE) : _profile(profile), _reader(std::move(reader)), _random_access_ranges(random_access_ranges) { @@ -291,6 +292,13 @@ class MergeRangeFileReader : public io::FileReader { // 1MB for oss, 8KB for hdfs _equivalent_io_size = _is_oss ? config::merged_oss_min_io_size : config::merged_hdfs_min_io_size; + + _merged_read_slice_size = merge_read_slice_size; + + if (_merged_read_slice_size < 0) { + _merged_read_slice_size = READ_SLICE_SIZE; + } + for (const PrefetchRange& range : _random_access_ranges) { _statistics.apply_bytes += range.end_offset - range.start_offset; } @@ -391,6 +399,7 @@ class MergeRangeFileReader : public io::FileReader { bool _is_oss; double _max_amplified_ratio; size_t _equivalent_io_size; + int64_t _merged_read_slice_size; Statistics _statistics; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index f48ad48d0ba9dc..787161efc22624 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -866,12 +866,17 @@ Status ParquetReader::_next_row_group_reader() { size_t avg_io_size = 0; const std::vector io_ranges = _generate_random_access_ranges(row_group_index, &avg_io_size); + int64_t merged_read_slice_size = -1; + if (_state != nullptr && _state->query_options().__isset.merge_read_slice_size) { + merged_read_slice_size = _state->query_options().merge_read_slice_size; + } // The underlying page reader will prefetch data in column. // Using both MergeRangeFileReader and BufferedStreamReader simultaneously would waste a lot of memory. - group_file_reader = avg_io_size < io::MergeRangeFileReader::SMALL_IO - ? std::make_shared( - _profile, _file_reader, io_ranges) - : _file_reader; + group_file_reader = + avg_io_size < io::MergeRangeFileReader::SMALL_IO + ? std::make_shared( + _profile, _file_reader, io_ranges, merged_read_slice_size) + : _file_reader; } _current_group_reader.reset( new RowGroupReader(_io_ctx ? std::make_shared( diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index af197bfd73492a..4c3be81121ba59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -859,6 +859,8 @@ public static double getHotValueThreshold() { public static final String MULTI_DISTINCT_STRATEGY = "multi_distinct_strategy"; public static final String AGG_PHASE = "agg_phase"; + public static final String MERGE_IO_READ_SLICE_SIZE = "merge_io_read_slice_size"; + public static final String ENABLE_PREFER_CACHED_ROWSET = "enable_prefer_cached_rowset"; public static final String QUERY_FRESHNESS_TOLERANCE_MS = "query_freshness_tolerance_ms"; @@ -2563,6 +2565,11 @@ public void setSkewRewriteAggBucketNum(int num) { checker = "checkAggPhase") public int aggPhase = 0; + + @VariableMgr.VarAttr(name = MERGE_IO_READ_SLICE_SIZE, description = {"调整 READ_SLICE_SIZE 大小,降低 Merge IO 读放大影响", + "Make the READ_SLICE_SIZE variable configurable to reduce the impact caused by read amplification."}) + public int mergeReadSliceSize = 8388608; + public void setAggPhase(int phase) { aggPhase = phase; } @@ -4680,7 +4687,7 @@ public TQueryOptions toThrift() { tResult.setHnswEfSearch(hnswEFSearch); tResult.setHnswCheckRelativeDistance(hnswCheckRelativeDistance); tResult.setHnswBoundedQueue(hnswBoundedQueue); - + tResult.setMergeReadSliceSize(mergeReadSliceSize); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 71179e17c487ae..16cd37c82648a3 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -406,6 +406,7 @@ struct TQueryOptions { 172: optional bool enable_prefer_cached_rowset 173: optional i64 query_freshness_tolerance_ms + 174: optional i64 merge_read_slice_size = 8388608; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache.