From 3ce158e55f7570328a31f3f859ca4e852f730778 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Wed, 14 Aug 2019 18:38:34 +0800 Subject: [PATCH] Acquire data_sources at the beginning of the olapscanner --- be/src/exec/olap_scanner.cpp | 12 ++++++++++++ be/src/olap/cumulative_compaction.cpp | 1 - be/src/olap/olap_engine.cpp | 22 ++++++++++++++++------ be/src/olap/reader.cpp | 22 +++++----------------- be/src/olap/reader.h | 3 --- 5 files changed, 33 insertions(+), 27 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 97257922f3642a..0c388197231f24 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -108,6 +108,17 @@ Status OlapScanner::_prepare( ss << "fail to check version hash of tablet: " << tablet_id; return Status::InternalError(ss.str()); } + + Version rd_version(0, _version); + _olap_table->acquire_data_sources(rd_version, &_params.olap_data_arr); + if (_params.olap_data_arr.empty()) { + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" << _olap_table->full_name() + << ", version=" << _version << ", backend=" << BackendOptions::get_localhost(); + LOG(WARNING) << ss.str(); + return Status(ss.str()); + } + } } @@ -477,6 +488,7 @@ Status OlapScanner::close(RuntimeState* state) { if (_is_closed) { return Status::OK(); } + _olap_table->release_data_sources(&_params.olap_data_arr); update_counter(); _reader.reset(); Expr::close(_conjunct_ctxs, state); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 6a208bb13fd1a9..eafe8438a71f02 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -534,7 +534,6 @@ OLAPStatus CumulativeCompaction::_roll_back(const vector& old_ola // unused_indices will only contain new cumulative index // we don't need to delete it here; we will delete new cumulative index in the end. vector unused_indices; - OLAPStatus res = OLAP_SUCCESS; res = _table->replace_data_sources(&need_remove_version, &old_olap_indices, &unused_indices); if (res != OLAP_SUCCESS) { diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index 656fd437bb3728..8d05c7740ae1d2 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -2590,6 +2590,12 @@ OLAPStatus OLAPEngine::compute_checksum( return OLAP_ERR_TABLE_NOT_FOUND; } + Reader reader; + ReaderParams reader_params; + reader_params.olap_table = tablet; + reader_params.reader_type = READER_CHECKSUM; + reader_params.version = Version(0, version); + { ReadLock rdlock(tablet->get_header_lock_ptr()); const PDelta* message = tablet->lastest_version(); @@ -2605,13 +2611,13 @@ OLAPStatus OLAPEngine::compute_checksum( res, tablet_id, message->version_hash(), version_hash); return OLAP_ERR_CE_CMD_PARAMS_ERROR; } - } - Reader reader; - ReaderParams reader_params; - reader_params.olap_table = tablet; - reader_params.reader_type = READER_CHECKSUM; - reader_params.version = Version(0, version); + tablet->acquire_data_sources(reader_params.version, &reader_params.olap_data_arr); + if (reader_params.olap_data_arr.empty()) { + LOG(WARNING) << "fail to init reader. tablet=" << tablet->full_name() << ", version=" << version; + return OLAP_ERR_VERSION_NOT_EXIST; + } + } // ignore float and double type considering to precision lose for (size_t i = 0; i < tablet->tablet_schema().size(); ++i) { @@ -2626,6 +2632,7 @@ OLAPStatus OLAPEngine::compute_checksum( res = reader.init(reader_params); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("initiate reader fail. [res=%d]", res); + tablet->release_data_sources(&reader_params.olap_data_arr); return res; } @@ -2633,6 +2640,7 @@ OLAPStatus OLAPEngine::compute_checksum( res = row.init(tablet->tablet_schema(), reader_params.return_columns); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("failed to init row cursor. [res=%d]", res); + tablet->release_data_sources(&reader_params.olap_data_arr); return res; } row.allocate_memory_for_string_type(tablet->tablet_schema()); @@ -2652,6 +2660,8 @@ OLAPStatus OLAPEngine::compute_checksum( row_checksum = row.hash_code(row_checksum); } + tablet->release_data_sources(&reader_params.olap_data_arr); + LOG(INFO) << "success to finish compute checksum. checksum=" << row_checksum; *checksum = row_checksum; return OLAP_SUCCESS; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 9ff0badaf6f23c..358cc99dee91b0 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -469,7 +469,6 @@ void Reader::close() { VLOG(3) << "merged rows:" << _merged_rows; _conditions.finalize(); _delete_handler.finalize(); - _olap_table->release_data_sources(&_own_data_sources); for (auto pred : _col_predicates) { delete pred; @@ -479,22 +478,11 @@ void Reader::close() { } OLAPStatus Reader::_acquire_data_sources(const ReaderParams& read_params) { - const std::vector* data_sources; - if (read_params.reader_type == READER_ALTER_TABLE - || read_params.reader_type == READER_BASE_COMPACTION - || read_params.reader_type == READER_CUMULATIVE_COMPACTION) { - data_sources = &read_params.olap_data_arr; - } else { - _olap_table->obtain_header_rdlock(); - _olap_table->acquire_data_sources(_version, &_own_data_sources); - _olap_table->release_header_lock(); - - if (_own_data_sources.size() < 1) { - LOG(WARNING) << "fail to acquire data sources. [table_name='" << _olap_table->full_name() - << "' version=" << _version.first << "-" << _version.second << "]"; - return OLAP_ERR_VERSION_NOT_EXIST; - } - data_sources = &_own_data_sources; + const std::vector* data_sources = &read_params.olap_data_arr; + if (data_sources->empty()) { + LOG(WARNING) << "fail to acquire data sources. [table_name='" << _olap_table->full_name() + << "' version=" << _version.first << "-" << _version.second << "]"; + return OLAP_ERR_VERSION_NOT_EXIST; } // do not use index stream cache when be/ce/alter/checksum, diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 53f08b1bff1033..e902b9b5111311 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -203,9 +203,6 @@ class Reader { OLAPTablePtr _olap_table; - // _own_data_sources is data source that reader aquire from olap_table, so we need to - // release these when reader closing - std::vector _own_data_sources; std::vector _data_sources; KeysParam _keys_param;