diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 206845e90ce81e..664997088d9835 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -413,8 +413,8 @@ void PrefetchBuffer::reset_offset(size_t offset) { } else { _exceed = false; } - static_cast(ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( - [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); })); + _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( + [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); } // only this function would run concurrently in another thread diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 0195f5d8308c0e..70c8445db233e6 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -173,7 +173,6 @@ class MergeRangeFileReader : public io::FileReader { for (char* box : _boxes) { delete[] box; } - static_cast(close()); } Status close() override { diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp index fa96f0c5a0bad9..6e6a530f8dbd72 100644 --- a/be/src/olap/wal/wal_reader.cpp +++ b/be/src/olap/wal/wal_reader.cpp @@ -50,11 +50,8 @@ Status WalReader::init() { } Status WalReader::finalize() { - if (file_reader != nullptr) { - auto st = file_reader->close(); - if (!st.ok()) { - LOG(WARNING) << "fail to close wal " << _file_name << " st= " << st.to_string(); - } + if (file_reader) { + return file_reader->close(); } return Status::OK(); } diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index ec08c58d18a937..03135aa5c94e8c 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -29,16 +29,12 @@ AvroJNIReader::AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, const std::vector& file_slot_descs, const TFileRangeDesc& range) - : _file_slot_descs(file_slot_descs), - _state(state), - _profile(profile), - _params(params), - _range(range) {} + : JniReader(file_slot_descs, state, profile), _params(params), _range(range) {} AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector& file_slot_descs) - : _file_slot_descs(file_slot_descs), _profile(profile), _params(params), _range(range) {} + : JniReader(file_slot_descs, nullptr, profile), _params(params), _range(range) {} AvroJNIReader::~AvroJNIReader() = default; diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h b/be/src/vec/exec/format/avro/avro_jni_reader.h index 64dac0aba4feca..82388f32915027 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.h +++ b/be/src/vec/exec/format/avro/avro_jni_reader.h @@ -28,8 +28,7 @@ #include "common/status.h" #include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -48,7 +47,7 @@ namespace doris::vectorized { /** * Read avro-format file */ -class AvroJNIReader : public GenericReader { +class AvroJNIReader : public JniReader { ENABLE_FACTORY_CREATOR(AvroJNIReader); public: @@ -83,16 +82,10 @@ class AvroJNIReader : public GenericReader { TypeDescriptor convert_to_doris_type(const rapidjson::Value& column_schema); - TypeDescriptor convert_complex_type(const rapidjson::Document::ConstObject child_schema); - private: - const std::vector& _file_slot_descs; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; const TFileScanRangeParams _params; const TFileRangeDesc _range; std::unordered_map* _colname_to_value_range = nullptr; - std::unique_ptr _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/jni_reader.cpp b/be/src/vec/exec/format/jni_reader.cpp index 625c79ffff2fdc..563f6cbea5181e 100644 --- a/be/src/vec/exec/format/jni_reader.cpp +++ b/be/src/vec/exec/format/jni_reader.cpp @@ -37,7 +37,7 @@ namespace doris::vectorized { MockJniReader::MockJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile) - : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { + : JniReader(file_slot_descs, state, profile) { std::ostringstream required_fields; std::ostringstream columns_types; std::vector column_names; diff --git a/be/src/vec/exec/format/jni_reader.h b/be/src/vec/exec/format/jni_reader.h index d3a0f0da4c0312..714bdb96b19b59 100644 --- a/be/src/vec/exec/format/jni_reader.h +++ b/be/src/vec/exec/format/jni_reader.h @@ -42,13 +42,35 @@ struct TypeDescriptor; namespace doris::vectorized { +class JniReader : public GenericReader { +public: + JniReader(const std::vector& file_slot_descs, RuntimeState* state, + RuntimeProfile* profile) + : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {}; + + ~JniReader() override = default; + + Status close() override { + if (_jni_connector) { + return _jni_connector->close(); + } + return Status::OK(); + } + +protected: + const std::vector& _file_slot_descs; + RuntimeState* _state = nullptr; + RuntimeProfile* _profile = nullptr; + std::unique_ptr _jni_connector; +}; + /** * The demo usage of JniReader, showing how to read data from java scanner. * The java side is also a mock reader that provide values for each type. * This class will only be retained during the functional testing phase to verify that * the communication and data exchange with the jvm are correct. */ -class MockJniReader : public GenericReader { +class MockJniReader : public JniReader { public: MockJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile); @@ -63,6 +85,13 @@ class MockJniReader : public GenericReader { Status init_reader( std::unordered_map* colname_to_value_range); + Status close() override { + if (_jni_connector) { + return _jni_connector->close(); + } + return Status::OK(); + } + protected: void _collect_profile_before_close() override { if (_jni_connector != nullptr) { @@ -71,11 +100,7 @@ class MockJniReader : public GenericReader { } private: - const std::vector& _file_slot_descs; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; std::unordered_map* _colname_to_value_range; - std::unique_ptr _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 44cfe8b4060ff5..7410bc2b4b05e3 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -862,7 +862,7 @@ Status OrcReader::set_fill_columns( _batch = _row_reader->createRowBatch(_batch_size); auto& selected_type = _row_reader->getSelectedType(); int idx = 0; - static_cast(_init_select_types(selected_type, idx)); + RETURN_IF_ERROR(_init_select_types(selected_type, idx)); _remaining_rows = _row_reader->getNumberOfRows(); @@ -906,7 +906,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) { const orc::Type* sub_type = type.getSubtype(i); _col_orc_type.push_back(sub_type); if (_is_acid && sub_type->getKind() == orc::TypeKind::STRUCT) { - static_cast(_init_select_types(*sub_type, idx)); + RETURN_IF_ERROR(_init_select_types(*sub_type, idx)); } } return Status::OK(); @@ -1527,6 +1527,17 @@ std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof)); + if (_orc_filter) { + RETURN_IF_ERROR(_orc_filter->get_status()); + } + if (_string_dict_filter) { + RETURN_IF_ERROR(_string_dict_filter->get_status()); + } + return Status::OK(); +} + +Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof) { if (_io_ctx && _io_ctx->should_stop) { *eof = true; *read_rows = 0; @@ -1602,7 +1613,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { _fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns)); if (block->rows() == 0) { - static_cast(_convert_dict_cols_to_string_cols(block, nullptr)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr)); *eof = true; *read_rows = 0; return Status::OK(); @@ -1614,14 +1625,14 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { Block::filter_block_internal(block, columns_to_filter, *_filter)); } if (!_not_single_slot_filter_conjuncts.empty()) { - static_cast(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); RETURN_IF_CATCH_EXCEPTION( RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, column_to_keep))); } else { Block::erase_useless_column(block, column_to_keep); - static_cast(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); } *read_rows = block->rows(); } else { @@ -1694,7 +1705,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { _fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns)); if (block->rows() == 0) { - static_cast(_convert_dict_cols_to_string_cols(block, nullptr)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr)); *eof = true; *read_rows = 0; return Status::OK(); @@ -1735,8 +1746,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { std::move(*block->get_by_position(col).column).assume_mutable()->clear(); } Block::erase_useless_column(block, column_to_keep); - static_cast(_convert_dict_cols_to_string_cols(block, &batch_vec)); - return Status::OK(); + return _convert_dict_cols_to_string_cols(block, &batch_vec); } _execute_filter_position_delete_rowids(result_filter); { @@ -1745,14 +1755,14 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { Block::filter_block_internal(block, columns_to_filter, result_filter)); } if (!_not_single_slot_filter_conjuncts.empty()) { - static_cast(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); RETURN_IF_CATCH_EXCEPTION( RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, column_to_keep))); } else { Block::erase_useless_column(block, column_to_keep); - static_cast(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); } } else { if (_delete_rows_filter_ptr) { @@ -1768,7 +1778,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { Block::filter_block_internal(block, columns_to_filter, (*filter))); } Block::erase_useless_column(block, column_to_keep); - static_cast(_convert_dict_cols_to_string_cols(block, &batch_vec)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); } *read_rows = block->rows(); } @@ -1909,7 +1919,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s block->get_by_name(col.first).column->assume_mutable()->clear(); } Block::erase_useless_column(block, origin_column_num); - static_cast(_convert_dict_cols_to_string_cols(block, nullptr)); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr)); } uint16_t new_size = 0; @@ -2145,7 +2155,7 @@ Status OrcReader::on_string_dicts_loaded( } // 4. Rewrite conjuncts. - static_cast(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); + RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); ++it; } return Status::OK(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 08b576fe90c233..605f436aa49002 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -165,6 +165,8 @@ class OrcReader : public GenericReader { Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status get_next_block_impl(Block* block, size_t* read_rows, bool* eof); + void _fill_batch_vec(std::vector& result, orc::ColumnVectorBatch* batch, int idx); @@ -228,15 +230,19 @@ class OrcReader : public GenericReader { class ORCFilterImpl : public orc::ORCFilter { public: - ORCFilterImpl(OrcReader* orcReader) : orcReader(orcReader) {} + ORCFilterImpl(OrcReader* orcReader) : _orcReader(orcReader) {} ~ORCFilterImpl() override = default; void filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) const override { - static_cast(orcReader->filter(data, sel, size, arg)); + if (_status.ok()) { + _status = _orcReader->filter(data, sel, size, arg); + } } + Status get_status() { return _status; } private: - OrcReader* orcReader = nullptr; + mutable Status _status = Status::OK(); + OrcReader* _orcReader = nullptr; }; class StringDictFilterImpl : public orc::StringDictFilter { @@ -247,17 +253,24 @@ class OrcReader : public GenericReader { virtual void fillDictFilterColumnNames( std::unique_ptr current_strip_information, std::list& column_names) const override { - static_cast(_orc_reader->fill_dict_filter_column_names( - std::move(current_strip_information), column_names)); + if (_status.ok()) { + _status = _orc_reader->fill_dict_filter_column_names( + std::move(current_strip_information), column_names); + } } virtual void onStringDictsLoaded( std::unordered_map& column_name_to_dict_map, bool* is_stripe_filtered) const override { - static_cast(_orc_reader->on_string_dicts_loaded(column_name_to_dict_map, - is_stripe_filtered)); + if (_status.ok()) { + _status = _orc_reader->on_string_dicts_loaded(column_name_to_dict_map, + is_stripe_filtered); + } } + Status get_status() { return _status; } + private: + mutable Status _status = Status::OK(); OrcReader* _orc_reader = nullptr; }; @@ -596,7 +609,7 @@ class OrcReader : public GenericReader { // std::pair std::vector> _dict_filter_cols; std::shared_ptr _obj_pool; - std::unique_ptr _string_dict_filter; + std::unique_ptr _string_dict_filter; bool _dict_cols_has_converted = false; bool _has_complex_type = false; std::vector* _unsupported_pushdown_types; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 335207070dd367..90b82c52e079fb 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -883,7 +883,7 @@ Status RowGroupReader::_rewrite_dict_predicates() { } // 4. Rewrite conjuncts. - static_cast(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); + RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); ++it; } return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index c7399d9e1221c2..61d66840641967 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -774,8 +774,8 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, auto& conjuncts = conjunct_iter->second; std::vector skipped_page_range; const FieldSchema* col_schema = schema_desc.get_column(read_col); - static_cast(page_index.collect_skipped_page_range( - &column_index, conjuncts, col_schema, skipped_page_range, *_ctz)); + RETURN_IF_ERROR(page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema, + skipped_page_range, *_ctz)); if (skipped_page_range.empty()) { continue; } @@ -783,8 +783,8 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, RETURN_IF_ERROR(page_index.parse_offset_index(chunk, off_index_buff.data(), &offset_index)); for (int page_id : skipped_page_range) { RowRange skipped_row_range; - static_cast(page_index.create_skipped_row_range(offset_index, row_group.num_rows, - page_id, &skipped_row_range)); + RETURN_IF_ERROR(page_index.create_skipped_row_range(offset_index, row_group.num_rows, + page_id, &skipped_row_range)); // use the union row range skipped_row_ranges.emplace_back(skipped_row_range); } @@ -826,7 +826,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& row_group, bool* filter_group) { - static_cast(_process_column_stat_filter(row_group.columns, filter_group)); + RETURN_IF_ERROR(_process_column_stat_filter(row_group.columns, filter_group)); _init_chunk_dicts(); RETURN_IF_ERROR(_process_dict_filter(filter_group)); _init_bloom_filter(); diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp b/be/src/vec/exec/format/table/hudi_jni_reader.cpp index 1cba7da7c65d03..cffa2ce9ac4b71 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp @@ -43,11 +43,9 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, const THudiFileDesc& hudi_params, const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile) - : _scan_params(scan_params), - _hudi_params(hudi_params), - _file_slot_descs(file_slot_descs), - _state(state), - _profile(profile) { + : JniReader(file_slot_descs, state, profile), + _scan_params(scan_params), + _hudi_params(hudi_params) { std::vector required_fields; for (auto& desc : _file_slot_descs) { required_fields.emplace_back(desc->col_name()); diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h b/be/src/vec/exec/format/table/hudi_jni_reader.h index c0438e93289063..e9bb55a69a77e7 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.h +++ b/be/src/vec/exec/format/table/hudi_jni_reader.h @@ -27,8 +27,7 @@ #include "common/status.h" #include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -42,7 +41,7 @@ struct TypeDescriptor; namespace doris::vectorized { -class HudiJniReader : public GenericReader { +class HudiJniReader : public JniReader { ENABLE_FACTORY_CREATOR(HudiJniReader); public: @@ -66,11 +65,7 @@ class HudiJniReader : public GenericReader { private: const TFileScanRangeParams& _scan_params; const THudiFileDesc& _hudi_params; - const std::vector& _file_slot_descs; - RuntimeState* _state; - RuntimeProfile* _profile; std::unordered_map* _colname_to_value_range; - std::unique_ptr _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp index f7dd9c9846f768..d520bd9b295002 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp @@ -42,11 +42,9 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des const std::vector& file_slot_descs, const TFileRangeDesc& range, RuntimeState* state, RuntimeProfile* profile) - : _max_compute_params(max_compute_params), - _file_slot_descs(file_slot_descs), - _range(range), - _state(state), - _profile(profile) { + : JniReader(file_slot_descs, state, profile), + _max_compute_params(max_compute_params), + _range(range) { _table_desc = mc_desc; std::ostringstream required_fields; std::ostringstream columns_types; diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h b/be/src/vec/exec/format/table/max_compute_jni_reader.h index e027678148fd0d..9bfef59432d6d1 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.h +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h @@ -28,8 +28,7 @@ #include "common/status.h" #include "exec/olap_common.h" #include "runtime/descriptors.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -49,7 +48,7 @@ namespace doris::vectorized { * This class will only be retained during the functional testing phase to verify that * the communication and data exchange with the jvm are correct. */ -class MaxComputeJniReader : public GenericReader { +class MaxComputeJniReader : public JniReader { ENABLE_FACTORY_CREATOR(MaxComputeJniReader); public: @@ -71,12 +70,8 @@ class MaxComputeJniReader : public GenericReader { private: const MaxComputeTableDescriptor* _table_desc = nullptr; const TMaxComputeFileDesc& _max_compute_params; - const std::vector& _file_slot_descs; const TFileRangeDesc& _range; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; std::unordered_map* _colname_to_value_range = nullptr; - std::unique_ptr _jni_connector = nullptr; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index 06d24466104e07..ef690c15b684d2 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -40,7 +40,7 @@ const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix. PaimonJniReader::PaimonJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, const TFileRangeDesc& range) - : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { + : JniReader(file_slot_descs, state, profile) { std::vector column_names; std::vector column_types; for (auto& desc : _file_slot_descs) { diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index 162c6ff2cdb3aa..6b6a6907270657 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -26,9 +26,7 @@ #include "common/status.h" #include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/format/table/table_format_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -48,7 +46,7 @@ namespace doris::vectorized { * This class will only be retained during the functional testing phase to verify that * the communication and data exchange with the jvm are correct. */ -class PaimonJniReader : public GenericReader { +class PaimonJniReader : public JniReader { ENABLE_FACTORY_CREATOR(PaimonJniReader); public: @@ -67,11 +65,7 @@ class PaimonJniReader : public GenericReader { std::unordered_map* colname_to_value_range); private: - const std::vector& _file_slot_descs; - RuntimeState* _state = nullptr; - RuntimeProfile* _profile = nullptr; std::unordered_map* _colname_to_value_range; - std::unique_ptr _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index 85bfbed071396f..a5756e687e9b87 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -135,7 +135,7 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range) { std::unordered_map> partition_columns; std::unordered_map missing_columns; - static_cast(delete_reader.set_fill_columns(partition_columns, missing_columns)); + RETURN_IF_ERROR(delete_reader.set_fill_columns(partition_columns, missing_columns)); bool eof = false; while (!eof) { diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp index 93e122ae0de964..c9b10e716ca90d 100644 --- a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp +++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp @@ -42,7 +42,7 @@ const std::string TrinoConnectorJniReader::TRINO_CONNECTOR_OPTION_PREFIX = TrinoConnectorJniReader::TrinoConnectorJniReader( const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, const TFileRangeDesc& range) - : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { + : JniReader(file_slot_descs, state, profile) { std::vector column_names; for (const auto& desc : _file_slot_descs) { std::string field = desc->col_name(); diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h b/be/src/vec/exec/format/table/trino_connector_jni_reader.h index 43b2d4fbf9c02d..de0cf21a881129 100644 --- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h +++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.h @@ -27,8 +27,7 @@ #include "common/status.h" #include "exec/olap_common.h" -#include "vec/exec/format/generic_reader.h" -#include "vec/exec/jni_connector.h" +#include "vec/exec/format/jni_reader.h" namespace doris { class RuntimeProfile; @@ -42,7 +41,7 @@ struct TypeDescriptor; namespace doris::vectorized { -class TrinoConnectorJniReader : public GenericReader { +class TrinoConnectorJniReader : public JniReader { ENABLE_FACTORY_CREATOR(TrinoConnectorJniReader); public: @@ -63,9 +62,5 @@ class TrinoConnectorJniReader : public GenericReader { private: Status _set_spi_plugins_dir(); - const std::vector& _file_slot_descs; - RuntimeState* _state; - RuntimeProfile* _profile; - std::unique_ptr _jni_connector; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index 5010f1912abf12..a9e15f6cac52e7 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -30,12 +30,6 @@ WalReader::WalReader(RuntimeState* state) : _state(state) { _wal_id = state->wal_id(); } -WalReader::~WalReader() { - if (_wal_reader.get() != nullptr) { - static_cast(_wal_reader->finalize()); - } -} - Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) { _tuple_descriptor = tuple_descriptor; RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path)); diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h index 09311496c16f29..5834d74efeaced 100644 --- a/be/src/vec/exec/format/wal/wal_reader.h +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -26,12 +26,19 @@ struct ScannerCounter; class WalReader : public GenericReader { public: WalReader(RuntimeState* state); - ~WalReader() override; + ~WalReader() override = default; Status init_reader(const TupleDescriptor* tuple_descriptor); Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; + Status close() override { + if (_wal_reader) { + return _wal_reader->finalize(); + } + return Status::OK(); + } + private: RuntimeState* _state = nullptr; int64_t _wal_id; diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 4b8eb20f22715f..3df8044f66aff9 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -65,10 +65,6 @@ namespace doris::vectorized { M(TypeIndex::DateTime, ColumnVector, Int64) \ M(TypeIndex::DateTimeV2, ColumnVector, UInt64) -JniConnector::~JniConnector() { - static_cast(close()); -} - Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { _state = state; _profile = profile; diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index 22e33f01053c91..52a3fb2e7782ca 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -208,8 +208,7 @@ class JniConnector : public ProfileCollector { _is_table_schema = true; } - /// Should release jni resources if other functions are failed. - ~JniConnector(); + ~JniConnector() override = default; /** * Open java scanner, and get the following scanner methods by jni: diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index e7b7a6b96b9675..4abf1e239dea05 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -280,7 +280,7 @@ Status VJoinNodeBase::open(RuntimeState* state) { std::promise thread_status; try { - static_cast(state->exec_env()->join_node_thread_pool()->submit_func( + RETURN_IF_ERROR(state->exec_env()->join_node_thread_pool()->submit_func( [this, state, thread_status_p = &thread_status] { this->_probe_side_open_thread(state, thread_status_p); })); diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index afc2412f2b76a7..a1c3488fa3a6df 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -232,7 +232,7 @@ Status NewEsScanner::close(RuntimeState* state) { } if (_es_reader != nullptr) { - static_cast(_es_reader->close()); + RETURN_IF_ERROR(_es_reader->close()); } RETURN_IF_ERROR(VScanner::close(state)); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index eba62dcf19a1ea..58ecc4883b1633 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -103,11 +103,11 @@ Status ScannerScheduler::init(ExecEnv* env) { _remote_thread_pool_max_size, remote_scan_pool_queue_size, "RemoteScanThreadPool"); // 3. limited scan thread pool - static_cast(ThreadPoolBuilder("LimitedScanThreadPool") - .set_min_threads(config::doris_scanner_thread_pool_thread_num) - .set_max_threads(config::doris_scanner_thread_pool_thread_num) - .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) - .build(&_limited_scan_thread_pool)); + RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool") + .set_min_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) + .build(&_limited_scan_thread_pool)); _register_metrics(); _is_init = true; return Status::OK(); @@ -246,7 +246,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, scanner->set_opened(); } - static_cast(scanner->try_append_late_arrival_runtime_filter()); + Status rf_status = scanner->try_append_late_arrival_runtime_filter(); + if (!rf_status.ok()) { + LOG(WARNING) << "Failed to append late arrival runtime filter: " << rf_status.to_string(); + } size_t raw_bytes_threshold = config::doris_scanner_row_bytes; size_t raw_bytes_read = 0; diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index b3d02860f9a3d4..f194afe4bb04c2 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -147,11 +147,27 @@ class SimplifiedScanScheduler { return; } if (new_max_thread_num >= cur_max_thread_num) { - static_cast(_scan_thread_pool->set_max_threads(new_max_thread_num)); - static_cast(_scan_thread_pool->set_min_threads(new_min_thread_num)); + Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num); + if (!st_max.ok()) { + LOG(WARNING) << "Failed to set max threads for scan thread pool: " + << st_max.to_string(); + } + Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num); + if (!st_min.ok()) { + LOG(WARNING) << "Failed to set min threads for scan thread pool: " + << st_min.to_string(); + } } else { - static_cast(_scan_thread_pool->set_min_threads(new_min_thread_num)); - static_cast(_scan_thread_pool->set_max_threads(new_max_thread_num)); + Status st_min = _scan_thread_pool->set_min_threads(new_min_thread_num); + if (!st_min.ok()) { + LOG(WARNING) << "Failed to set min threads for scan thread pool: " + << st_min.to_string(); + } + Status st_max = _scan_thread_pool->set_max_threads(new_max_thread_num); + if (!st_max.ok()) { + LOG(WARNING) << "Failed to set max threads for scan thread pool: " + << st_max.to_string(); + } } }