Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ Status OlapScanner::_prepare(
ss << "fail to check version hash of tablet: " << tablet_id;
return Status::InternalError(ss.str());
}

Version rd_version(0, _version);
_olap_table->acquire_data_sources(rd_version, &_params.olap_data_arr);
if (_params.olap_data_arr.empty()) {
std::stringstream ss;
ss << "failed to initialize storage reader. tablet=" << _olap_table->full_name()
<< ", version=" << _version << ", backend=" << BackendOptions::get_localhost();
LOG(WARNING) << ss.str();
return Status(ss.str());
}

}
}

Expand Down Expand Up @@ -477,6 +488,7 @@ Status OlapScanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
_olap_table->release_data_sources(&_params.olap_data_arr);
update_counter();
_reader.reset();
Expr::close(_conjunct_ctxs, state);
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,6 @@ OLAPStatus CumulativeCompaction::_roll_back(const vector<SegmentGroup*>& old_ola
// unused_indices will only contain new cumulative index
// we don't need to delete it here; we will delete new cumulative index in the end.
vector<SegmentGroup*> unused_indices;

OLAPStatus res = OLAP_SUCCESS;
res = _table->replace_data_sources(&need_remove_version, &old_olap_indices, &unused_indices);
if (res != OLAP_SUCCESS) {
Expand Down
22 changes: 16 additions & 6 deletions be/src/olap/olap_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2590,6 +2590,12 @@ OLAPStatus OLAPEngine::compute_checksum(
return OLAP_ERR_TABLE_NOT_FOUND;
}

Reader reader;
ReaderParams reader_params;
reader_params.olap_table = tablet;
reader_params.reader_type = READER_CHECKSUM;
reader_params.version = Version(0, version);

{
ReadLock rdlock(tablet->get_header_lock_ptr());
const PDelta* message = tablet->lastest_version();
Expand All @@ -2605,13 +2611,13 @@ OLAPStatus OLAPEngine::compute_checksum(
res, tablet_id, message->version_hash(), version_hash);
return OLAP_ERR_CE_CMD_PARAMS_ERROR;
}
}

Reader reader;
ReaderParams reader_params;
reader_params.olap_table = tablet;
reader_params.reader_type = READER_CHECKSUM;
reader_params.version = Version(0, version);
tablet->acquire_data_sources(reader_params.version, &reader_params.olap_data_arr);
if (reader_params.olap_data_arr.empty()) {
LOG(WARNING) << "fail to init reader. tablet=" << tablet->full_name() << ", version=" << version;
return OLAP_ERR_VERSION_NOT_EXIST;
}
}

// ignore float and double type considering to precision lose
for (size_t i = 0; i < tablet->tablet_schema().size(); ++i) {
Expand All @@ -2626,13 +2632,15 @@ OLAPStatus OLAPEngine::compute_checksum(
res = reader.init(reader_params);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("initiate reader fail. [res=%d]", res);
tablet->release_data_sources(&reader_params.olap_data_arr);
return res;
}

RowCursor row;
res = row.init(tablet->tablet_schema(), reader_params.return_columns);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("failed to init row cursor. [res=%d]", res);
tablet->release_data_sources(&reader_params.olap_data_arr);
return res;
}
row.allocate_memory_for_string_type(tablet->tablet_schema());
Expand All @@ -2652,6 +2660,8 @@ OLAPStatus OLAPEngine::compute_checksum(
row_checksum = row.hash_code(row_checksum);
}

tablet->release_data_sources(&reader_params.olap_data_arr);

LOG(INFO) << "success to finish compute checksum. checksum=" << row_checksum;
*checksum = row_checksum;
return OLAP_SUCCESS;
Expand Down
22 changes: 5 additions & 17 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,6 @@ void Reader::close() {
VLOG(3) << "merged rows:" << _merged_rows;
_conditions.finalize();
_delete_handler.finalize();
_olap_table->release_data_sources(&_own_data_sources);

for (auto pred : _col_predicates) {
delete pred;
Expand All @@ -479,22 +478,11 @@ void Reader::close() {
}

OLAPStatus Reader::_acquire_data_sources(const ReaderParams& read_params) {
const std::vector<ColumnData*>* data_sources;
if (read_params.reader_type == READER_ALTER_TABLE
|| read_params.reader_type == READER_BASE_COMPACTION
|| read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
data_sources = &read_params.olap_data_arr;
} else {
_olap_table->obtain_header_rdlock();
_olap_table->acquire_data_sources(_version, &_own_data_sources);
_olap_table->release_header_lock();

if (_own_data_sources.size() < 1) {
LOG(WARNING) << "fail to acquire data sources. [table_name='" << _olap_table->full_name()
<< "' version=" << _version.first << "-" << _version.second << "]";
return OLAP_ERR_VERSION_NOT_EXIST;
}
data_sources = &_own_data_sources;
const std::vector<ColumnData*>* data_sources = &read_params.olap_data_arr;
if (data_sources->empty()) {
LOG(WARNING) << "fail to acquire data sources. [table_name='" << _olap_table->full_name()
<< "' version=" << _version.first << "-" << _version.second << "]";
return OLAP_ERR_VERSION_NOT_EXIST;
}

// do not use index stream cache when be/ce/alter/checksum,
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,6 @@ class Reader {

OLAPTablePtr _olap_table;

// _own_data_sources is data source that reader aquire from olap_table, so we need to
// release these when reader closing
std::vector<ColumnData*> _own_data_sources;
std::vector<ColumnData*> _data_sources;

KeysParam _keys_param;
Expand Down