Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,20 @@ Status OlapScanNode::close(RuntimeState* state) {
return ScanNode::close(state);
}

// PlanFragmentExecutor will call this method to set scan range
// Doris scan range is defined in thrift file like this
// struct TPaloScanRange {
// 1: required list<Types.TNetworkAddress> hosts
// 2: required string schema_hash
// 3: required string version
// 4: required string version_hash
// 5: required Types.TTabletId tablet_id
// 6: required string db_name
// 7: optional list<TKeyRange> partition_column_ranges
// 8: optional string index_name
// 9: optional string table_name
//}
// every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet
Status OlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
for (auto& scan_range : scan_ranges) {
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
Expand Down Expand Up @@ -588,6 +602,8 @@ Status OlapScanNode::split_scan_range() {
std::vector<OlapScanRange> sub_ranges;
VLOG(1) << "_doris_scan_ranges.size()=" << _doris_scan_ranges.size();

// doris scan range is related with one tablet
// split scan range for every tablet
for (auto scan_range : _doris_scan_ranges) {
sub_ranges.clear();
RETURN_IF_ERROR(get_sub_scan_range(scan_range, &sub_ranges));
Expand All @@ -597,6 +613,7 @@ Status OlapScanNode::split_scan_range() {
<< sub_range.begin_scan_range
<< " : " << sub_range.end_scan_range <<
(sub_range.end_include ? "]" : ")");
// just to get sub_range related scan_range? why not create a object?
_query_key_ranges.push_back(sub_range);
_query_scan_ranges.push_back(scan_range);
}
Expand Down
36 changes: 29 additions & 7 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,28 +92,42 @@ Status OlapScanner::_prepare(
{
ReadLock rdlock(_tablet->get_header_lock_ptr());
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
if (rowset == NULL) {
if (rowset == nullptr) {
std::stringstream ss;
ss << "fail to get latest version of tablet: " << tablet_id;
OLAP_LOG_WARNING(ss.str().c_str());
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}

if (rowset->end_version() == _version
&& rowset->version_hash() != version_hash) {
OLAP_LOG_WARNING("fail to check latest version hash. "
"[tablet_id=%ld version_hash=%ld request_version_hash=%ld]",
tablet_id, rowset->version_hash(), version_hash);
LOG(WARNING) << "fail to check latest version hash. "
<< " tablet_id=" << tablet_id
<< " version_hash=" << rowset->version_hash()
<< " request_version_hash=" << version_hash;

std::stringstream ss;
ss << "fail to check version hash of tablet: " << tablet_id;
return Status::InternalError(ss.str());
}

// acquire tablet rowset readers at the beginning of the scan node
// to prevent this case: when there are lots of olap scanners to run for example 10000
// the rowsets maybe compacted when the last olap scanner starts
Version rd_version(0, _version);
OLAPStatus acquire_reader_st = _tablet->capture_rs_readers(rd_version, &_params.rs_readers);
if (acquire_reader_st != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
std::stringstream ss;
ss << "failed to initialize storage reader. tablet=" << _params.tablet->full_name()
<< ", res=" << acquire_reader_st << ", backend=" << BackendOptions::get_localhost();
return Status::InternalError(ss.str().c_str());
}
}
}

// Initialize _params

{
// Initialize _params
RETURN_IF_ERROR(_init_params(key_ranges, filters, is_nulls));
}

Expand All @@ -139,6 +153,7 @@ Status OlapScanner::open() {
return Status::OK();
}

// it will be called under tablet read lock because capture rs readers need
Status OlapScanner::_init_params(
const std::vector<OlapScanRange>& key_ranges,
const std::vector<TCondition>& filters,
Expand Down Expand Up @@ -474,6 +489,13 @@ Status OlapScanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
// olap scan node will call scanner.close() when finished
// will release resources here
// if not clear rowset readers in read_params here
// readers will be release when runtime state deconstructed but
// deconstructor in reader references runtime state
// so that it will core
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment make me confusing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means, we should deconstructor resource in close method. Not in deconstructor method.

_params.rs_readers.clear();
update_counter();
_reader.reset();
Expr::close(_conjunct_ctxs, state);
Expand Down
22 changes: 5 additions & 17 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,23 +449,11 @@ void Reader::close() {
}

OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
const std::vector<RowsetReaderSharedPtr>* rs_readers;
if (read_params.reader_type == READER_ALTER_TABLE
|| read_params.reader_type == READER_BASE_COMPACTION
|| read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
rs_readers = &read_params.rs_readers;
} else {
_tablet->obtain_header_rdlock();
OLAPStatus status = _tablet->capture_rs_readers(_version, &_own_rs_readers);
_tablet->release_header_lock();
RETURN_NOT_OK(status);

if (_own_rs_readers.size() < 1) {
LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name()
<< ", version=" << _version.first << "-" << _version.second;
return OLAP_ERR_VERSION_NOT_EXIST;
}
rs_readers = &_own_rs_readers;
const std::vector<RowsetReaderSharedPtr>* rs_readers = &read_params.rs_readers;
if (rs_readers->size() < 1) {
LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name()
<< ", version=" << _version.first << "-" << _version.second;
return OLAP_ERR_VERSION_NOT_EXIST;
}

// do not use index stream cache when be/ce/alter/checksum,
Expand Down
19 changes: 13 additions & 6 deletions be/src/olap/task/engine_checksum_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
return OLAP_ERR_TABLE_NOT_FOUND;
}


Reader reader;
ReaderParams reader_params;
reader_params.tablet = tablet;
reader_params.reader_type = READER_CHECKSUM;
reader_params.version = Version(0, _version);

{
ReadLock rdlock(tablet->get_header_lock_ptr());
const RowsetSharedPtr message = tablet->rowset_with_max_version();
Expand All @@ -71,14 +78,14 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
res, _tablet_id, message->version_hash(), _version_hash);
return OLAP_ERR_CE_CMD_PARAMS_ERROR;
}
OLAPStatus acquire_reader_st = tablet->capture_rs_readers(reader_params.version, &reader_params.rs_readers);
if (acquire_reader_st != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init reader. tablet=" << tablet->full_name()
<< "res=" << acquire_reader_st;
return acquire_reader_st;
}
}

Reader reader;
ReaderParams reader_params;
reader_params.tablet = tablet;
reader_params.reader_type = READER_CHECKSUM;
reader_params.version = Version(0, _version);

// ignore float and double type considering to precision lose
for (size_t i = 0; i < tablet->tablet_schema().num_columns(); ++i) {
FieldType type = tablet->tablet_schema().column(i).type();
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate(
break;
}

// it will change rowset id and its create time
// rowset create time is useful when load tablet from meta to check which tablet is the tablet to load
res = SnapshotManager::instance()->convert_rowset_ids(*(stores[0]), schema_hash_path, tablet_id, schema_hash, nullptr);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to convert rowset id when do storage migration"
Expand Down
3 changes: 1 addition & 2 deletions fe/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,8 @@ public class Config extends ConfigBase {
* After dropping database(table/partition), you can recover it by using RECOVER stmt.
* And this specifies the maximal data retention time. After time, the data will be deleted permanently.
*/
// TODO(ygl): temp modify it for test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@ConfField(mutable = true, masterOnly = true)
public static long catalog_trash_expire_second = 10L; // 1day
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do test by using modify configuration file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past, it is one day。。。。

public static long catalog_trash_expire_second = 86400L; // 1day
/*
* Maximal bytes that a single broker scanner will read.
* Do not set this if you know what you are doing.
Expand Down