Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ Status OrcReader::init_reader(
_orc_tiny_stripe_threshold_bytes = _state->query_options().orc_tiny_stripe_threshold_bytes;
_orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes;
_orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
_orc_tiny_stripe_amplification_factor =
_state->query_options().orc_tiny_stripe_amplification_factor;
}

{
Expand Down Expand Up @@ -1191,19 +1193,38 @@ Status OrcReader::set_fill_columns(
tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
}
if (all_tiny_stripes && number_of_stripes > 0) {
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
_orc_max_merge_distance_bytes,
_orc_once_max_read_bytes);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));

auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
orc_input_stream_ptr->set_all_tiny_stripes();
auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(_profile, orc_inner_reader,
range_finder);
//The first stripe in the read file is selected to calculate an approximate
// read amplification factor using the tiny stripe optimization.
std::vector<bool> selected_columns;
_reader->getSelectedColumns(_read_cols, selected_columns);
std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(0);
size_t need_stream_bytes = 0;
for (uint64_t stream_id = 0; stream_id < strip_info->getNumberOfStreams();
stream_id++) {
std::unique_ptr<orc::StreamInformation> stream_info =
strip_info->getStreamInformation(stream_id);
if (selected_columns[stream_info->getColumnId()]) {
need_stream_bytes += stream_info->getLength();
}
}

// If the read amplification factor is too large, the optimization is not used.
if ((double)(need_stream_bytes + strip_info->getFooterLength()) >=
(double)strip_info->getLength() * _orc_tiny_stripe_amplification_factor) {
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
_orc_max_merge_distance_bytes,
_orc_once_max_read_bytes);
auto range_finder = std::make_shared<io::LinearProbeRangeFinder>(
std::move(prefetch_merge_ranges));

auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
orc_input_stream_ptr->set_all_tiny_stripes();
auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
orc_file_reader = std::make_shared<io::RangeCacheFileReader>(
_profile, orc_inner_reader, range_finder);
}
}

if (!_lazy_read_ctx.can_lazy_read) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,12 @@ class OrcReader : public GenericReader {

std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>
_row_id_column_iterator_pair = {nullptr, -1};

// When using the tiny stripe read optimization, if there are many columns in the orc file and only a
// few of them are used in a query, the tiny stripe optimization will cause serious read amplification.
// When the proportion of the actual number of bytes to be read in the entire stripe is greater than
// this parameter, read optimization is used.
double _orc_tiny_stripe_amplification_factor = 0.4;
};

class StripeStreamInputStream : public orc::InputStream, public ProfileCollector {
Expand Down
32 changes: 32 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ORC_MAX_MERGE_DISTANCE_BYTES = "orc_max_merge_distance_bytes";

public static final String ORC_TINY_STRIPE_AMPLIFICATION_FACTOR = "orc_tiny_stripe_amplification_factor";

public static final String ENABLE_PARQUET_FILTER_BY_MIN_MAX = "enable_parquet_filter_by_min_max";

public static final String ENABLE_ORC_FILTER_BY_MIN_MAX = "enable_orc_filter_by_min_max";
Expand Down Expand Up @@ -1954,6 +1956,19 @@ public boolean isEnableHboNonStrictMatchingMode() {
public long orcMaxMergeDistanceBytes = 1024L * 1024L;


@VariableMgr.VarAttr(
name = ORC_TINY_STRIPE_AMPLIFICATION_FACTOR,
description = {"在使用tiny stripe读取优化的时候,如果orc文件中有非常多的列,某次查询只用到了其中几列,tiny stripe 优化会"
+ "带来严重的读放大。当实际需要读取的字节数占整个stripe size的比例大于该参数时,使用读取优化",
"When using the tiny stripe read optimization, if there are many columns in the orc file "
+ "and only a few of them are used in a query, the tiny stripe optimization will "
+ "cause serious read amplification. When the proportion of the actual number of "
+ "bytes to be read in the entire stripe is greater than this parameter, read "
+ "optimization is used."},
needForward = true,
setter = "setOrcTinyStripeAmplificationFactor")
public double orcTinyStripeAmplificationFactor = 0.4;

@VariableMgr.VarAttr(
name = ENABLE_PARQUET_FILTER_BY_MIN_MAX,
description = {"控制 parquet reader 是否启用 min-max 值过滤。默认为 true。",
Expand Down Expand Up @@ -3144,6 +3159,22 @@ public void setOrcMaxMergeDistanceBytes(String value) throws Exception {
this.orcMaxMergeDistanceBytes = val;
}

public void setOrcTinyStripeAmplificationFactor(String value) throws Exception {
double val = checkFieldDoubleValue(ORC_TINY_STRIPE_AMPLIFICATION_FACTOR, 0, value);
this.orcTinyStripeAmplificationFactor = val;
}

private double checkFieldDoubleValue(String variableName, double minValue, String value) throws Exception {
double val = Double.parseDouble(value);
if (val < minValue) {
throw new Exception(
variableName + " value should greater than or equal " + String.valueOf(minValue)
+ ", you set value is: " + value);
}
return val;
}


private long checkFieldLongValue(String variableName, long minValue, String value) throws Exception {
long val = Long.parseLong(value);
if (val < minValue) {
Expand Down Expand Up @@ -4248,6 +4279,7 @@ public TQueryOptions toThrift() {
tResult.setOrcTinyStripeThresholdBytes(orcTinyStripeThresholdBytes);
tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes);
tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes);
tResult.setOrcTinyStripeAmplificationFactor(orcTinyStripeAmplificationFactor);
tResult.setIgnoreRuntimeFilterError(ignoreRuntimeFilterError);
tResult.setEnableFixedLenToUint32V2(enableFixedLenToUint32V2);
tResult.setProfileLevel(getProfileLevel());
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ struct TQueryOptions {
164: optional bool check_orc_init_sargs_success = false
165: optional i32 exchange_multi_blocks_byte_size = 262144

166: optional double orc_tiny_stripe_amplification_factor = 0.4

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ suite("test_orc_exception_files","external,hive,tvf,external_docker") {
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "orc"); """
exception "Orc row reader nextBatch failed. reason = bad StripeFooter from zlib"
exception "Failed to parse the stripe footer"
}
}
}
Loading