Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ void PrefetchBuffer::reset_offset(size_t offset) {
} else {
_exceed = false;
}
static_cast<void>(ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
[buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }));
_prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
[buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); });
}

// only this function would run concurrently in another thread
Expand Down
1 change: 0 additions & 1 deletion be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ class MergeRangeFileReader : public io::FileReader {
for (char* box : _boxes) {
delete[] box;
}
static_cast<void>(close());
}

Status close() override {
Expand Down
7 changes: 2 additions & 5 deletions be/src/olap/wal/wal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ Status WalReader::init() {
}

Status WalReader::finalize() {
if (file_reader != nullptr) {
auto st = file_reader->close();
if (!st.ok()) {
LOG(WARNING) << "fail to close wal " << _file_name << " st= " << st.to_string();
}
if (file_reader) {
return file_reader->close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@w41ter PTAL, I not sure why we ignore the error before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should have been returned but was left out. This change is LGTM.

}
return Status::OK();
}
Expand Down
8 changes: 2 additions & 6 deletions be/src/vec/exec/format/avro/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,12 @@ AvroJNIReader::AvroJNIReader(RuntimeState* state, RuntimeProfile* profile,
const TFileScanRangeParams& params,
const std::vector<SlotDescriptor*>& file_slot_descs,
const TFileRangeDesc& range)
: _file_slot_descs(file_slot_descs),
_state(state),
_profile(profile),
_params(params),
_range(range) {}
: JniReader(file_slot_descs, state, profile), _params(params), _range(range) {}

AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs)
: _file_slot_descs(file_slot_descs), _profile(profile), _params(params), _range(range) {}
: JniReader(file_slot_descs, nullptr, profile), _params(params), _range(range) {}

AvroJNIReader::~AvroJNIReader() = default;

Expand Down
11 changes: 2 additions & 9 deletions be/src/vec/exec/format/avro/avro_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@

#include "common/status.h"
#include "exec/olap_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/jni_connector.h"
#include "vec/exec/format/jni_reader.h"

namespace doris {
class RuntimeProfile;
Expand All @@ -48,7 +47,7 @@ namespace doris::vectorized {
/**
* Read avro-format file
*/
class AvroJNIReader : public GenericReader {
class AvroJNIReader : public JniReader {
ENABLE_FACTORY_CREATOR(AvroJNIReader);

public:
Expand Down Expand Up @@ -83,16 +82,10 @@ class AvroJNIReader : public GenericReader {

TypeDescriptor convert_to_doris_type(const rapidjson::Value& column_schema);

TypeDescriptor convert_complex_type(const rapidjson::Document::ConstObject child_schema);

private:
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
const TFileScanRangeParams _params;
const TFileRangeDesc _range;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
std::unique_ptr<JniConnector> _jni_connector;
};

} // namespace doris::vectorized
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace doris::vectorized {

MockJniReader::MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile)
: _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
: JniReader(file_slot_descs, state, profile) {
std::ostringstream required_fields;
std::ostringstream columns_types;
std::vector<std::string> column_names;
Expand Down
35 changes: 30 additions & 5 deletions be/src/vec/exec/format/jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,35 @@ struct TypeDescriptor;

namespace doris::vectorized {

class JniReader : public GenericReader {
public:
JniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile)
: _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {};

~JniReader() override = default;

Status close() override {
if (_jni_connector) {
return _jni_connector->close();
}
return Status::OK();
}

protected:
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
std::unique_ptr<JniConnector> _jni_connector;
};

/**
* The demo usage of JniReader, showing how to read data from java scanner.
* The java side is also a mock reader that provide values for each type.
* This class will only be retained during the functional testing phase to verify that
* the communication and data exchange with the jvm are correct.
*/
class MockJniReader : public GenericReader {
class MockJniReader : public JniReader {
public:
MockJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile);
Expand All @@ -63,6 +85,13 @@ class MockJniReader : public GenericReader {
Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

Status close() override {
if (_jni_connector) {
return _jni_connector->close();
}
return Status::OK();
}

protected:
void _collect_profile_before_close() override {
if (_jni_connector != nullptr) {
Expand All @@ -71,11 +100,7 @@ class MockJniReader : public GenericReader {
}

private:
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::unique_ptr<JniConnector> _jni_connector;
};

} // namespace doris::vectorized
36 changes: 23 additions & 13 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ Status OrcReader::set_fill_columns(
_batch = _row_reader->createRowBatch(_batch_size);
auto& selected_type = _row_reader->getSelectedType();
int idx = 0;
static_cast<void>(_init_select_types(selected_type, idx));
RETURN_IF_ERROR(_init_select_types(selected_type, idx));

_remaining_rows = _row_reader->getNumberOfRows();

Expand Down Expand Up @@ -906,7 +906,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) {
const orc::Type* sub_type = type.getSubtype(i);
_col_orc_type.push_back(sub_type);
if (_is_acid && sub_type->getKind() == orc::TypeKind::STRUCT) {
static_cast<void>(_init_select_types(*sub_type, idx));
RETURN_IF_ERROR(_init_select_types(*sub_type, idx));
}
}
return Status::OK();
Expand Down Expand Up @@ -1527,6 +1527,17 @@ std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int
}

Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof));
if (_orc_filter) {
RETURN_IF_ERROR(_orc_filter->get_status());
}
if (_string_dict_filter) {
RETURN_IF_ERROR(_string_dict_filter->get_status());
}
return Status::OK();
}

Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof) {
if (_io_ctx && _io_ctx->should_stop) {
*eof = true;
*read_rows = 0;
Expand Down Expand Up @@ -1602,7 +1613,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
_fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns));

if (block->rows() == 0) {
static_cast<void>(_convert_dict_cols_to_string_cols(block, nullptr));
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
*eof = true;
*read_rows = 0;
return Status::OK();
Expand All @@ -1614,14 +1625,14 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
Block::filter_block_internal(block, columns_to_filter, *_filter));
}
if (!_not_single_slot_filter_conjuncts.empty()) {
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter,
column_to_keep)));
} else {
Block::erase_useless_column(block, column_to_keep);
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
*read_rows = block->rows();
} else {
Expand Down Expand Up @@ -1694,7 +1705,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
_fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns));

if (block->rows() == 0) {
static_cast<void>(_convert_dict_cols_to_string_cols(block, nullptr));
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
*eof = true;
*read_rows = 0;
return Status::OK();
Expand Down Expand Up @@ -1735,8 +1746,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
Block::erase_useless_column(block, column_to_keep);
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
return Status::OK();
return _convert_dict_cols_to_string_cols(block, &batch_vec);
}
_execute_filter_position_delete_rowids(result_filter);
{
Expand All @@ -1745,14 +1755,14 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
Block::filter_block_internal(block, columns_to_filter, result_filter));
}
if (!_not_single_slot_filter_conjuncts.empty()) {
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, nullptr, block,
columns_to_filter, column_to_keep)));
} else {
Block::erase_useless_column(block, column_to_keep);
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
} else {
if (_delete_rows_filter_ptr) {
Expand All @@ -1768,7 +1778,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
Block::filter_block_internal(block, columns_to_filter, (*filter)));
}
Block::erase_useless_column(block, column_to_keep);
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
*read_rows = block->rows();
}
Expand Down Expand Up @@ -1909,7 +1919,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
block->get_by_name(col.first).column->assume_mutable()->clear();
}
Block::erase_useless_column(block, origin_column_num);
static_cast<void>(_convert_dict_cols_to_string_cols(block, nullptr));
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
}

uint16_t new_size = 0;
Expand Down Expand Up @@ -2145,7 +2155,7 @@ Status OrcReader::on_string_dicts_loaded(
}

// 4. Rewrite conjuncts.
static_cast<void>(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable()));
RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable()));
++it;
}
return Status::OK();
Expand Down
29 changes: 21 additions & 8 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class OrcReader : public GenericReader {

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status get_next_block_impl(Block* block, size_t* read_rows, bool* eof);

void _fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
orc::ColumnVectorBatch* batch, int idx);

Expand Down Expand Up @@ -228,15 +230,19 @@ class OrcReader : public GenericReader {

class ORCFilterImpl : public orc::ORCFilter {
public:
ORCFilterImpl(OrcReader* orcReader) : orcReader(orcReader) {}
ORCFilterImpl(OrcReader* orcReader) : _orcReader(orcReader) {}
~ORCFilterImpl() override = default;
void filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size,
void* arg) const override {
static_cast<void>(orcReader->filter(data, sel, size, arg));
if (_status.ok()) {
_status = _orcReader->filter(data, sel, size, arg);
}
}
Status get_status() { return _status; }

private:
OrcReader* orcReader = nullptr;
mutable Status _status = Status::OK();
OrcReader* _orcReader = nullptr;
};

class StringDictFilterImpl : public orc::StringDictFilter {
Expand All @@ -247,17 +253,24 @@ class OrcReader : public GenericReader {
virtual void fillDictFilterColumnNames(
std::unique_ptr<orc::StripeInformation> current_strip_information,
std::list<std::string>& column_names) const override {
static_cast<void>(_orc_reader->fill_dict_filter_column_names(
std::move(current_strip_information), column_names));
if (_status.ok()) {
_status = _orc_reader->fill_dict_filter_column_names(
std::move(current_strip_information), column_names);
}
}
virtual void onStringDictsLoaded(
std::unordered_map<std::string, orc::StringDictionary*>& column_name_to_dict_map,
bool* is_stripe_filtered) const override {
static_cast<void>(_orc_reader->on_string_dicts_loaded(column_name_to_dict_map,
is_stripe_filtered));
if (_status.ok()) {
_status = _orc_reader->on_string_dicts_loaded(column_name_to_dict_map,
is_stripe_filtered);
}
}

Status get_status() { return _status; }

private:
mutable Status _status = Status::OK();
OrcReader* _orc_reader = nullptr;
};

Expand Down Expand Up @@ -596,7 +609,7 @@ class OrcReader : public GenericReader {
// std::pair<col_name, slot_id>
std::vector<std::pair<std::string, int>> _dict_filter_cols;
std::shared_ptr<ObjectPool> _obj_pool;
std::unique_ptr<orc::StringDictFilter> _string_dict_filter;
std::unique_ptr<StringDictFilterImpl> _string_dict_filter;
bool _dict_cols_has_converted = false;
bool _has_complex_type = false;
std::vector<orc::TypeKind>* _unsupported_pushdown_types;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ Status RowGroupReader::_rewrite_dict_predicates() {
}

// 4. Rewrite conjuncts.
static_cast<void>(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable()));
RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable()));
++it;
}
return Status::OK();
Expand Down
10 changes: 5 additions & 5 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,17 +774,17 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
auto& conjuncts = conjunct_iter->second;
std::vector<int> skipped_page_range;
const FieldSchema* col_schema = schema_desc.get_column(read_col);
static_cast<void>(page_index.collect_skipped_page_range(
&column_index, conjuncts, col_schema, skipped_page_range, *_ctz));
RETURN_IF_ERROR(page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema,
skipped_page_range, *_ctz));
if (skipped_page_range.empty()) {
continue;
}
tparquet::OffsetIndex offset_index;
RETURN_IF_ERROR(page_index.parse_offset_index(chunk, off_index_buff.data(), &offset_index));
for (int page_id : skipped_page_range) {
RowRange skipped_row_range;
static_cast<void>(page_index.create_skipped_row_range(offset_index, row_group.num_rows,
page_id, &skipped_row_range));
RETURN_IF_ERROR(page_index.create_skipped_row_range(offset_index, row_group.num_rows,
page_id, &skipped_row_range));
// use the union row range
skipped_row_ranges.emplace_back(skipped_row_range);
}
Expand Down Expand Up @@ -826,7 +826,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,

Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& row_group,
bool* filter_group) {
static_cast<void>(_process_column_stat_filter(row_group.columns, filter_group));
RETURN_IF_ERROR(_process_column_stat_filter(row_group.columns, filter_group));
_init_chunk_dicts();
RETURN_IF_ERROR(_process_dict_filter(filter_group));
_init_bloom_filter();
Expand Down
Loading