Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
}
content_size = 0;
hollow_size = 0;
double amplified_ratio = config::max_amplified_read_ratio;
std::vector<std::pair<double, size_t>> ratio_and_size;
// Calculate the read amplified ratio for each merge operation and the size of the merged data.
// Find the largest size of the merged data whose amplified ratio is less than config::max_amplified_read_ratio
Expand All @@ -168,9 +167,12 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
}
}
size_t best_merged_size = 0;
for (const std::pair<double, size_t>& rs : ratio_and_size) {
for (int i = 0; i < ratio_and_size.size(); ++i) {
const std::pair<double, size_t>& rs = ratio_and_size[i];
size_t equivalent_size = rs.second / (i + 1);
if (rs.second > best_merged_size) {
if (rs.first < amplified_ratio || rs.second <= MIN_READ_SIZE) {
if (rs.first <= _max_amplified_ratio ||
(_max_amplified_ratio < 1 && equivalent_size <= _equivalent_io_size)) {
best_merged_size = rs.second;
}
}
Expand Down
11 changes: 10 additions & 1 deletion be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ class MergeRangeFileReader : public io::FileReader {
static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
static constexpr size_t HDFS_MIN_IO_SIZE = 4 * 1024; // 4KB
static constexpr size_t OSS_MIN_IO_SIZE = 512 * 1024; // 512KB
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128
static constexpr size_t MIN_READ_SIZE = 4096; // 4KB

MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
const std::vector<PrefetchRange>& random_access_ranges)
Expand All @@ -142,6 +143,11 @@ class MergeRangeFileReader : public io::FileReader {
_range_cached_data.resize(random_access_ranges.size());
_size = _reader->size();
_remaining = TOTAL_BUFFER_SIZE;
_is_oss = typeid_cast<io::S3FileReader*>(_reader.get()) != nullptr;
_max_amplified_ratio = config::max_amplified_read_ratio;
// Equivalent min size of each IO that can reach the maximum storage speed limit:
// 512KB for oss, 4KB for hdfs
_equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
if (_profile != nullptr) {
const char* random_profile = "MergedSmallIO";
ADD_TIMER(_profile, random_profile);
Expand Down Expand Up @@ -235,6 +241,9 @@ class MergeRangeFileReader : public io::FileReader {
int16 _last_box_ref = -1;
uint32 _last_box_usage = 0;
std::vector<int16> _box_ref;
bool _is_oss;
double _max_amplified_ratio;
size_t _equivalent_io_size;

Statistics _statistics;
};
Expand Down