diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 41b7de75e3f27b..2fe81d964f373e 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -356,6 +356,20 @@ Status OlapScanNode::close(RuntimeState* state) { return ScanNode::close(state); } +// PlanFragmentExecutor will call this method to set scan range +// Doris scan range is defined in thrift file like this +// struct TPaloScanRange { +// 1: required list hosts +// 2: required string schema_hash +// 3: required string version +// 4: required string version_hash +// 5: required Types.TTabletId tablet_id +// 6: required string db_name +// 7: optional list partition_column_ranges +// 8: optional string index_name +// 9: optional string table_name +//} +// every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet Status OlapScanNode::set_scan_ranges(const std::vector& scan_ranges) { for (auto& scan_range : scan_ranges) { DCHECK(scan_range.scan_range.__isset.palo_scan_range); @@ -588,6 +602,8 @@ Status OlapScanNode::split_scan_range() { std::vector sub_ranges; VLOG(1) << "_doris_scan_ranges.size()=" << _doris_scan_ranges.size(); + // doris scan range is related with one tablet + // split scan range for every tablet for (auto scan_range : _doris_scan_ranges) { sub_ranges.clear(); RETURN_IF_ERROR(get_sub_scan_range(scan_range, &sub_ranges)); @@ -597,6 +613,7 @@ Status OlapScanNode::split_scan_range() { << sub_range.begin_scan_range << " : " << sub_range.end_scan_range << (sub_range.end_include ? "]" : ")"); + // just to get sub_range related scan_range? why not create a object? _query_key_ranges.push_back(sub_range); _query_scan_ranges.push_back(scan_range); } diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 195d66bf8e0526..27ae379e36600c 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -92,28 +92,42 @@ Status OlapScanner::_prepare( { ReadLock rdlock(_tablet->get_header_lock_ptr()); const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); - if (rowset == NULL) { + if (rowset == nullptr) { std::stringstream ss; ss << "fail to get latest version of tablet: " << tablet_id; - OLAP_LOG_WARNING(ss.str().c_str()); + LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } if (rowset->end_version() == _version && rowset->version_hash() != version_hash) { - OLAP_LOG_WARNING("fail to check latest version hash. " - "[tablet_id=%ld version_hash=%ld request_version_hash=%ld]", - tablet_id, rowset->version_hash(), version_hash); + LOG(WARNING) << "fail to check latest version hash. " + << " tablet_id=" << tablet_id + << " version_hash=" << rowset->version_hash() + << " request_version_hash=" << version_hash; std::stringstream ss; ss << "fail to check version hash of tablet: " << tablet_id; return Status::InternalError(ss.str()); } + + // acquire tablet rowset readers at the beginning of the scan node + // to prevent this case: when there are lots of olap scanners to run for example 10000 + // the rowsets maybe compacted when the last olap scanner starts + Version rd_version(0, _version); + OLAPStatus acquire_reader_st = _tablet->capture_rs_readers(rd_version, &_params.rs_readers); + if (acquire_reader_st != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" << _params.tablet->full_name() + << ", res=" << acquire_reader_st << ", backend=" << BackendOptions::get_localhost(); + return Status::InternalError(ss.str().c_str()); + } } } - - // Initialize _params + { + // Initialize _params RETURN_IF_ERROR(_init_params(key_ranges, filters, is_nulls)); } @@ -139,6 +153,7 @@ Status OlapScanner::open() { return Status::OK(); } +// it will be called under tablet read lock because capture rs readers need Status OlapScanner::_init_params( const std::vector& key_ranges, const std::vector& filters, @@ -474,6 +489,13 @@ Status OlapScanner::close(RuntimeState* state) { if (_is_closed) { return Status::OK(); } + // olap scan node will call scanner.close() when finished + // will release resources here + // if not clear rowset readers in read_params here + // readers will be release when runtime state deconstructed but + // deconstructor in reader references runtime state + // so that it will core + _params.rs_readers.clear(); update_counter(); _reader.reset(); Expr::close(_conjunct_ctxs, state); diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 464b4ba2c50964..28fdc10eb0845c 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -449,23 +449,11 @@ void Reader::close() { } OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { - const std::vector* rs_readers; - if (read_params.reader_type == READER_ALTER_TABLE - || read_params.reader_type == READER_BASE_COMPACTION - || read_params.reader_type == READER_CUMULATIVE_COMPACTION) { - rs_readers = &read_params.rs_readers; - } else { - _tablet->obtain_header_rdlock(); - OLAPStatus status = _tablet->capture_rs_readers(_version, &_own_rs_readers); - _tablet->release_header_lock(); - RETURN_NOT_OK(status); - - if (_own_rs_readers.size() < 1) { - LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name() - << ", version=" << _version.first << "-" << _version.second; - return OLAP_ERR_VERSION_NOT_EXIST; - } - rs_readers = &_own_rs_readers; + const std::vector* rs_readers = &read_params.rs_readers; + if (rs_readers->size() < 1) { + LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name() + << ", version=" << _version.first << "-" << _version.second; + return OLAP_ERR_VERSION_NOT_EXIST; } // do not use index stream cache when be/ce/alter/checksum, diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index c8d66df6fe5078..823d2bf2765591 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -56,6 +56,13 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { return OLAP_ERR_TABLE_NOT_FOUND; } + + Reader reader; + ReaderParams reader_params; + reader_params.tablet = tablet; + reader_params.reader_type = READER_CHECKSUM; + reader_params.version = Version(0, _version); + { ReadLock rdlock(tablet->get_header_lock_ptr()); const RowsetSharedPtr message = tablet->rowset_with_max_version(); @@ -71,14 +78,14 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { res, _tablet_id, message->version_hash(), _version_hash); return OLAP_ERR_CE_CMD_PARAMS_ERROR; } + OLAPStatus acquire_reader_st = tablet->capture_rs_readers(reader_params.version, &reader_params.rs_readers); + if (acquire_reader_st != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init reader. tablet=" << tablet->full_name() + << "res=" << acquire_reader_st; + return acquire_reader_st; + } } - Reader reader; - ReaderParams reader_params; - reader_params.tablet = tablet; - reader_params.reader_type = READER_CHECKSUM; - reader_params.version = Version(0, _version); - // ignore float and double type considering to precision lose for (size_t i = 0; i < tablet->tablet_schema().num_columns(); ++i) { FieldType type = tablet->tablet_schema().column(i).type(); diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 4a9fad23ee52f6..84ca11d472b81d 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -173,6 +173,8 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate( break; } + // it will change rowset id and its create time + // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load res = SnapshotManager::instance()->convert_rowset_ids(*(stores[0]), schema_hash_path, tablet_id, schema_hash, nullptr); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to convert rowset id when do storage migration" diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 9e8359bc15349d..822b5eb67a0f70 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -483,9 +483,8 @@ public class Config extends ConfigBase { * After dropping database(table/partition), you can recover it by using RECOVER stmt. * And this specifies the maximal data retention time. After time, the data will be deleted permanently. */ - // TODO(ygl): temp modify it for test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! @ConfField(mutable = true, masterOnly = true) - public static long catalog_trash_expire_second = 10L; // 1day + public static long catalog_trash_expire_second = 86400L; // 1day /* * Maximal bytes that a single broker scanner will read. * Do not set this if you know what you are doing.