From 608deec2a0ba7303ebff83d5ce4845928b6f83c3 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 27 Jun 2019 12:25:11 +0800 Subject: [PATCH 1/5] Acquire rs readers at the beginning of the olapscanner --- be/src/exec/olap_scan_node.cpp | 17 +++++++++++++++ be/src/exec/olap_scanner.cpp | 21 +++++++++++++++++++ be/src/olap/reader.cpp | 3 ++- .../task/engine_storage_migration_task.cpp | 2 ++ .../java/org/apache/doris/common/Config.java | 3 +-- 5 files changed, 43 insertions(+), 3 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 41b7de75e3f27b..2fe81d964f373e 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -356,6 +356,20 @@ Status OlapScanNode::close(RuntimeState* state) { return ScanNode::close(state); } +// PlanFragmentExecutor will call this method to set scan range +// Doris scan range is defined in thrift file like this +// struct TPaloScanRange { +// 1: required list hosts +// 2: required string schema_hash +// 3: required string version +// 4: required string version_hash +// 5: required Types.TTabletId tablet_id +// 6: required string db_name +// 7: optional list partition_column_ranges +// 8: optional string index_name +// 9: optional string table_name +//} +// every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet Status OlapScanNode::set_scan_ranges(const std::vector& scan_ranges) { for (auto& scan_range : scan_ranges) { DCHECK(scan_range.scan_range.__isset.palo_scan_range); @@ -588,6 +602,8 @@ Status OlapScanNode::split_scan_range() { std::vector sub_ranges; VLOG(1) << "_doris_scan_ranges.size()=" << _doris_scan_ranges.size(); + // doris scan range is related with one tablet + // split scan range for every tablet for (auto scan_range : _doris_scan_ranges) { sub_ranges.clear(); RETURN_IF_ERROR(get_sub_scan_range(scan_range, &sub_ranges)); @@ -597,6 +613,7 @@ Status OlapScanNode::split_scan_range() { << sub_range.begin_scan_range << " : " << sub_range.end_scan_range << (sub_range.end_include ? "]" : ")"); + // just to get sub_range related scan_range? why not create a object? _query_key_ranges.push_back(sub_range); _query_scan_ranges.push_back(scan_range); } diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 195d66bf8e0526..13fbd1a4ebbda7 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -150,6 +150,20 @@ Status OlapScanner::_init_params( _params.aggregation = _aggregation; _params.version = Version(0, _version); + // acquire tablet rowset readers at the beginning of the scan node + // to prevent this case: when there are lots of olap scanners to run for example 10000 + // the rowsets maybe compacted when the last olap scanner starts + _tablet->obtain_header_rdlock(); + OLAPStatus acquire_reader_st = _tablet->capture_rs_readers(_params.version, &_params.rs_readers); + _tablet->release_header_lock(); + if (acquire_reader_st != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" << _params.tablet->full_name() + << ", res=" << acquire_reader_st << ", backend=" << BackendOptions::get_localhost(); + return Status::InternalError(ss.str().c_str()); + } + // Condition for (auto& filter : filters) { _params.conditions.push_back(filter); @@ -474,6 +488,13 @@ Status OlapScanner::close(RuntimeState* state) { if (_is_closed) { return Status::OK(); } + // olap scan node will call scanner.close() when finished + // will release resources here + // if not clear rowset readers in read_params here + // readers will be release when runtime state deconstructed but + // deconstructor in reader references runtime state + // so that it will core + _params.rs_readers.clear(); update_counter(); _reader.reset(); Expr::close(_conjunct_ctxs, state); diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 464b4ba2c50964..8e604debee6313 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -452,7 +452,8 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { const std::vector* rs_readers; if (read_params.reader_type == READER_ALTER_TABLE || read_params.reader_type == READER_BASE_COMPACTION - || read_params.reader_type == READER_CUMULATIVE_COMPACTION) { + || read_params.reader_type == READER_CUMULATIVE_COMPACTION + || read_params.reader_type == READER_QUERY) { rs_readers = &read_params.rs_readers; } else { _tablet->obtain_header_rdlock(); diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 4a9fad23ee52f6..84ca11d472b81d 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -173,6 +173,8 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate( break; } + // it will change rowset id and its create time + // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load res = SnapshotManager::instance()->convert_rowset_ids(*(stores[0]), schema_hash_path, tablet_id, schema_hash, nullptr); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to convert rowset id when do storage migration" diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 9e8359bc15349d..822b5eb67a0f70 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -483,9 +483,8 @@ public class Config extends ConfigBase { * After dropping database(table/partition), you can recover it by using RECOVER stmt. * And this specifies the maximal data retention time. After time, the data will be deleted permanently. */ - // TODO(ygl): temp modify it for test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! @ConfField(mutable = true, masterOnly = true) - public static long catalog_trash_expire_second = 10L; // 1day + public static long catalog_trash_expire_second = 86400L; // 1day /* * Maximal bytes that a single broker scanner will read. * Do not set this if you know what you are doing. From 26415c5ffea1ceaa2751e0efffa01898280ad8b2 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 27 Jun 2019 14:17:22 +0800 Subject: [PATCH 2/5] Init params under tablet rdlock --- be/src/exec/olap_scanner.cpp | 65 ++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 13fbd1a4ebbda7..74ed596026d07b 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -78,44 +78,38 @@ Status OlapScanner::_prepare( strtoul(scan_range->scan_range().version.c_str(), nullptr, 10); VersionHash version_hash = strtoul(scan_range->scan_range().version_hash.c_str(), nullptr, 10); - { - std::string err; - _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash, true, &err); - if (_tablet.get() == nullptr) { - std::stringstream ss; - ss << "failed to get tablet. tablet_id=" << tablet_id - << ", with schema_hash=" << schema_hash - << ", reason=" << err; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - { - ReadLock rdlock(_tablet->get_header_lock_ptr()); - const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); - if (rowset == NULL) { - std::stringstream ss; - ss << "fail to get latest version of tablet: " << tablet_id; - OLAP_LOG_WARNING(ss.str().c_str()); - return Status::InternalError(ss.str()); - } + std::string err; + _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash, true, &err); + if (_tablet.get() == nullptr) { + std::stringstream ss; + ss << "failed to get tablet. tablet_id=" << tablet_id + << ", with schema_hash=" << schema_hash + << ", reason=" << err; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + ReadLock rdlock(_tablet->get_header_lock_ptr()); + const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); + if (rowset == nullptr) { + std::stringstream ss; + ss << "fail to get latest version of tablet: " << tablet_id; + OLAP_LOG_WARNING(ss.str().c_str()); + return Status::InternalError(ss.str()); + } - if (rowset->end_version() == _version - && rowset->version_hash() != version_hash) { - OLAP_LOG_WARNING("fail to check latest version hash. " - "[tablet_id=%ld version_hash=%ld request_version_hash=%ld]", - tablet_id, rowset->version_hash(), version_hash); + if (rowset->end_version() == _version + && rowset->version_hash() != version_hash) { + OLAP_LOG_WARNING("fail to check latest version hash. " + "[tablet_id=%ld version_hash=%ld request_version_hash=%ld]", + tablet_id, rowset->version_hash(), version_hash); - std::stringstream ss; - ss << "fail to check version hash of tablet: " << tablet_id; - return Status::InternalError(ss.str()); - } - } + std::stringstream ss; + ss << "fail to check version hash of tablet: " << tablet_id; + return Status::InternalError(ss.str()); } - // Initialize _params - { - RETURN_IF_ERROR(_init_params(key_ranges, filters, is_nulls)); - } + // Initialize _params under read lock + RETURN_IF_ERROR(_init_params(key_ranges, filters, is_nulls)); return Status::OK(); } @@ -139,6 +133,7 @@ Status OlapScanner::open() { return Status::OK(); } +// it will be called under tablet read lock because capture rs readers need rdlock Status OlapScanner::_init_params( const std::vector& key_ranges, const std::vector& filters, @@ -153,9 +148,7 @@ Status OlapScanner::_init_params( // acquire tablet rowset readers at the beginning of the scan node // to prevent this case: when there are lots of olap scanners to run for example 10000 // the rowsets maybe compacted when the last olap scanner starts - _tablet->obtain_header_rdlock(); OLAPStatus acquire_reader_st = _tablet->capture_rs_readers(_params.version, &_params.rs_readers); - _tablet->release_header_lock(); if (acquire_reader_st != OLAP_SUCCESS) { LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; std::stringstream ss; From 016dede5055ec957aebfc0b4f6f323ef5c01ed9f Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 27 Jun 2019 14:46:47 +0800 Subject: [PATCH 3/5] Acquire rs reader in checksum task --- be/src/exec/olap_scanner.cpp | 91 ++++++++++++----------- be/src/olap/reader.cpp | 23 ++---- be/src/olap/task/engine_checksum_task.cpp | 19 +++-- 3 files changed, 67 insertions(+), 66 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 74ed596026d07b..353430af083eda 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -78,38 +78,57 @@ Status OlapScanner::_prepare( strtoul(scan_range->scan_range().version.c_str(), nullptr, 10); VersionHash version_hash = strtoul(scan_range->scan_range().version_hash.c_str(), nullptr, 10); - std::string err; - _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash, true, &err); - if (_tablet.get() == nullptr) { - std::stringstream ss; - ss << "failed to get tablet. tablet_id=" << tablet_id - << ", with schema_hash=" << schema_hash - << ", reason=" << err; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - ReadLock rdlock(_tablet->get_header_lock_ptr()); - const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); - if (rowset == nullptr) { - std::stringstream ss; - ss << "fail to get latest version of tablet: " << tablet_id; - OLAP_LOG_WARNING(ss.str().c_str()); - return Status::InternalError(ss.str()); - } + { + std::string err; + _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash, true, &err); + if (_tablet.get() == nullptr) { + std::stringstream ss; + ss << "failed to get tablet. tablet_id=" << tablet_id + << ", with schema_hash=" << schema_hash + << ", reason=" << err; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + { + ReadLock rdlock(_tablet->get_header_lock_ptr()); + const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); + if (rowset == nullptr) { + std::stringstream ss; + ss << "fail to get latest version of tablet: " << tablet_id; + OLAP_LOG_WARNING(ss.str().c_str()); + return Status::InternalError(ss.str()); + } - if (rowset->end_version() == _version - && rowset->version_hash() != version_hash) { - OLAP_LOG_WARNING("fail to check latest version hash. " - "[tablet_id=%ld version_hash=%ld request_version_hash=%ld]", - tablet_id, rowset->version_hash(), version_hash); + if (rowset->end_version() == _version + && rowset->version_hash() != version_hash) { + OLAP_LOG_WARNING("fail to check latest version hash. " + "[tablet_id=%ld version_hash=%ld request_version_hash=%ld]", + tablet_id, rowset->version_hash(), version_hash); - std::stringstream ss; - ss << "fail to check version hash of tablet: " << tablet_id; - return Status::InternalError(ss.str()); - } + std::stringstream ss; + ss << "fail to check version hash of tablet: " << tablet_id; + return Status::InternalError(ss.str()); + } - // Initialize _params under read lock - RETURN_IF_ERROR(_init_params(key_ranges, filters, is_nulls)); + // acquire tablet rowset readers at the beginning of the scan node + // to prevent this case: when there are lots of olap scanners to run for example 10000 + // the rowsets maybe compacted when the last olap scanner starts + Version rd_version(0, _version); + OLAPStatus acquire_reader_st = _tablet->capture_rs_readers(rd_version, &_params.rs_readers); + if (acquire_reader_st != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" << _params.tablet->full_name() + << ", res=" << acquire_reader_st << ", backend=" << BackendOptions::get_localhost(); + return Status::InternalError(ss.str().c_str()); + } + } + } + + { + // Initialize _params + RETURN_IF_ERROR(_init_params(key_ranges, filters, is_nulls)); + } return Status::OK(); } @@ -133,7 +152,7 @@ Status OlapScanner::open() { return Status::OK(); } -// it will be called under tablet read lock because capture rs readers need rdlock +// it will be called under tablet read lock because capture rs readers need Status OlapScanner::_init_params( const std::vector& key_ranges, const std::vector& filters, @@ -145,18 +164,6 @@ Status OlapScanner::_init_params( _params.aggregation = _aggregation; _params.version = Version(0, _version); - // acquire tablet rowset readers at the beginning of the scan node - // to prevent this case: when there are lots of olap scanners to run for example 10000 - // the rowsets maybe compacted when the last olap scanner starts - OLAPStatus acquire_reader_st = _tablet->capture_rs_readers(_params.version, &_params.rs_readers); - if (acquire_reader_st != OLAP_SUCCESS) { - LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; - std::stringstream ss; - ss << "failed to initialize storage reader. tablet=" << _params.tablet->full_name() - << ", res=" << acquire_reader_st << ", backend=" << BackendOptions::get_localhost(); - return Status::InternalError(ss.str().c_str()); - } - // Condition for (auto& filter : filters) { _params.conditions.push_back(filter); diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 8e604debee6313..28fdc10eb0845c 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -449,24 +449,11 @@ void Reader::close() { } OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { - const std::vector* rs_readers; - if (read_params.reader_type == READER_ALTER_TABLE - || read_params.reader_type == READER_BASE_COMPACTION - || read_params.reader_type == READER_CUMULATIVE_COMPACTION - || read_params.reader_type == READER_QUERY) { - rs_readers = &read_params.rs_readers; - } else { - _tablet->obtain_header_rdlock(); - OLAPStatus status = _tablet->capture_rs_readers(_version, &_own_rs_readers); - _tablet->release_header_lock(); - RETURN_NOT_OK(status); - - if (_own_rs_readers.size() < 1) { - LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name() - << ", version=" << _version.first << "-" << _version.second; - return OLAP_ERR_VERSION_NOT_EXIST; - } - rs_readers = &_own_rs_readers; + const std::vector* rs_readers = &read_params.rs_readers; + if (rs_readers->size() < 1) { + LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name() + << ", version=" << _version.first << "-" << _version.second; + return OLAP_ERR_VERSION_NOT_EXIST; } // do not use index stream cache when be/ce/alter/checksum, diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index c8d66df6fe5078..823d2bf2765591 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -56,6 +56,13 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { return OLAP_ERR_TABLE_NOT_FOUND; } + + Reader reader; + ReaderParams reader_params; + reader_params.tablet = tablet; + reader_params.reader_type = READER_CHECKSUM; + reader_params.version = Version(0, _version); + { ReadLock rdlock(tablet->get_header_lock_ptr()); const RowsetSharedPtr message = tablet->rowset_with_max_version(); @@ -71,14 +78,14 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { res, _tablet_id, message->version_hash(), _version_hash); return OLAP_ERR_CE_CMD_PARAMS_ERROR; } + OLAPStatus acquire_reader_st = tablet->capture_rs_readers(reader_params.version, &reader_params.rs_readers); + if (acquire_reader_st != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init reader. tablet=" << tablet->full_name() + << "res=" << acquire_reader_st; + return acquire_reader_st; + } } - Reader reader; - ReaderParams reader_params; - reader_params.tablet = tablet; - reader_params.reader_type = READER_CHECKSUM; - reader_params.version = Version(0, _version); - // ignore float and double type considering to precision lose for (size_t i = 0; i < tablet->tablet_schema().num_columns(); ++i) { FieldType type = tablet->tablet_schema().column(i).type(); From e5bf5a8d4468f98f55ebb5f4a8e37b4ef5c0808a Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 27 Jun 2019 14:58:40 +0800 Subject: [PATCH 4/5] Fix log format --- be/src/exec/olap_scanner.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 353430af083eda..6146003df8ac04 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -95,15 +95,16 @@ Status OlapScanner::_prepare( if (rowset == nullptr) { std::stringstream ss; ss << "fail to get latest version of tablet: " << tablet_id; - OLAP_LOG_WARNING(ss.str().c_str()); + LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } if (rowset->end_version() == _version && rowset->version_hash() != version_hash) { - OLAP_LOG_WARNING("fail to check latest version hash. " - "[tablet_id=%ld version_hash=%ld request_version_hash=%ld]", - tablet_id, rowset->version_hash(), version_hash); + LOG(WARNING) << "fail to check latest version hash. " + << " tablet_id=" << tablet_id + << " version_hash=" << rowset->version_hash() + << " request_version_hash=" << version_hash; std::stringstream ss; ss << "fail to check version hash of tablet: " << tablet_id; From 92d386187971d88defa8c61168f08da674c8997d Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 27 Jun 2019 15:00:50 +0800 Subject: [PATCH 5/5] Fix log format --- be/src/exec/olap_scanner.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 6146003df8ac04..27ae379e36600c 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -84,8 +84,8 @@ Status OlapScanner::_prepare( if (_tablet.get() == nullptr) { std::stringstream ss; ss << "failed to get tablet. tablet_id=" << tablet_id - << ", with schema_hash=" << schema_hash - << ", reason=" << err; + << ", with schema_hash=" << schema_hash + << ", reason=" << err; LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); }