Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 78 additions & 10 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,19 +916,26 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
}

TStatusCode::type status_code = TStatusCode::OK;
vector<string> error_msgs;
TStatus task_status;
EngineStorageMigrationTask engine_task(storage_medium_migrate_req);
OLAPStatus res = _env->storage_engine()->execute_task(&engine_task);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "storage media migrate failed. status: " << res
<< ", signature: " << agent_task_req.signature;
status_code = TStatusCode::RUNTIME_ERROR;
// check request and get info
TabletSharedPtr tablet;
DataDir* dest_store;
if (_check_migrate_requset(storage_medium_migrate_req, tablet, &dest_store) != OLAP_SUCCESS) {
status_code = TStatusCode::RUNTIME_ERROR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it print out the reason for the check failure?

} else {
LOG(INFO) << "storage media migrate success. status:" << res << ","
<< ", signature:" << agent_task_req.signature;
EngineStorageMigrationTask engine_task(tablet, dest_store);
OLAPStatus res = _env->storage_engine()->execute_task(&engine_task);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "storage media migrate failed. status: " << res
<< ", signature: " << agent_task_req.signature;
status_code = TStatusCode::RUNTIME_ERROR;
} else {
LOG(INFO) << "storage media migrate success. status:" << res << ","
<< ", signature:" << agent_task_req.signature;
}
}

TStatus task_status;
vector<string> error_msgs;
task_status.__set_status_code(status_code);
task_status.__set_error_msgs(error_msgs);

Expand All @@ -943,6 +950,67 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
}
}

OLAPStatus TaskWorkerPool::_check_migrate_requset(
const TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
DataDir** dest_store) {

int64_t tablet_id = req.tablet_id;
int32_t schema_hash = req.schema_hash;
tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "can't find tablet. tablet_id= " << tablet_id
<< " schema_hash=" << schema_hash;
return OLAP_ERR_TABLE_NOT_FOUND;
}

if (req.__isset.data_dir) {
// request specify the data dir
*dest_store = StorageEngine::instance()->get_store(req.data_dir);
if (*dest_store == nullptr) {
LOG(WARNING) << "data dir not found: " << req.data_dir;
return OLAP_ERR_DIR_NOT_EXIST;
}
} else {
// this is a storage medium
// get data dir by storage medium

// judge case when no need to migrate
uint32_t count = StorageEngine::instance()->available_storage_medium_type_count();
if (count <= 1) {
LOG(INFO) << "available storage medium type count is less than 1, "
<< "no need to migrate. count=" << count;
return OLAP_REQUEST_FAILED;
}
// check current tablet storage medium
TStorageMedium::type storage_medium = req.storage_medium;
TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium();
if (src_storage_medium == storage_medium) {
LOG(INFO) << "tablet is already on specified storage medium. "
<< "storage_medium=" << storage_medium;
return OLAP_REQUEST_FAILED;
}
// get a random store of specified storage medium
auto stores = StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
if (stores.empty()) {
LOG(WARNING) << "fail to get root path for create tablet.";
return OLAP_ERR_INVALID_ROOT_PATH;
}

*dest_store = stores[0];
}

// check disk capacity
int64_t tablet_size = tablet->tablet_footprint();
if ((*dest_store)->reach_capacity_limit(tablet_size)) {
LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
<< ", tablet size: " << tablet_size;
return OLAP_ERR_DISK_REACH_CAPACITY_LIMIT;
}

return OLAP_SUCCESS;
}

void TaskWorkerPool::_check_consistency_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
Expand Down
5 changes: 5 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ class TaskWorkerPool {
bool overwrite,
std::vector<std::string>* error_msgs);

OLAPStatus _check_migrate_requset(
const TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
DataDir** dest_store);

private:
std::string _name;

Expand Down
16 changes: 14 additions & 2 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,13 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
}

// TODO support beta rowset
OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, int64_t tablet_id,
const int32_t& schema_hash) {
// For now, alpha and beta rowset meta have same fields, so we can just use
// AlphaRowsetMeta here.
OLAPStatus SnapshotManager::convert_rowset_ids(
const string& clone_dir,
int64_t tablet_id,
const int32_t& schema_hash) {

OLAPStatus res = OLAP_SUCCESS;
// check clone dir existed
if (!FileUtils::check_exist(clone_dir)) {
Expand Down Expand Up @@ -198,6 +203,11 @@ OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb, co
TabletSchema& tablet_schema, const RowsetId& rowset_id, RowsetMetaPB* new_rs_meta_pb) {
OLAPStatus res = OLAP_SUCCESS;
// TODO use factory to obtain RowsetMeta when SnapshotManager::convert_rowset_ids supports beta rowset
// TODO(cmy): now we only has AlphaRowsetMeta, and no BetaRowsetMeta.
// AlphaRowsetMeta only add some functions about segment group, and no addition fields.
// So we can use AlphaRowsetMeta here even if this is a beta rowset.
// And the `rowset_type` field indicates the real type of rowset, so that the correct rowset
// can be created.
RowsetMetaSharedPtr alpha_rowset_meta(new AlphaRowsetMeta());
alpha_rowset_meta->init_from_pb(rs_meta_pb);
RowsetSharedPtr org_rowset;
Expand Down Expand Up @@ -271,6 +281,8 @@ OLAPStatus SnapshotManager::_calc_snapshot_id_path(
return res;
}

// location: /path/to/data/DATA_PREFIX/shard_id
// return: /path/to/data/DATA_PREFIX/shard_id/tablet_id/schema_hash
string SnapshotManager::get_schema_hash_full_path(
const TabletSharedPtr& ref_tablet,
const string& location) const {
Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,21 +216,21 @@ OLAPStatus TabletMeta::create_from_file(const string& file_path) {
return OLAP_SUCCESS;
}

OLAPStatus TabletMeta::reset_tablet_uid(const string& file_path) {
OLAPStatus TabletMeta::reset_tablet_uid(const string& header_file) {
OLAPStatus res = OLAP_SUCCESS;
TabletMeta tmp_tablet_meta;
if ((res = tmp_tablet_meta.create_from_file(file_path)) != OLAP_SUCCESS) {
if ((res = tmp_tablet_meta.create_from_file(header_file)) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to load tablet meta from file"
<< ", meta_file=" << file_path;
<< ", meta_file=" << header_file;
return res;
}
TabletMetaPB tmp_tablet_meta_pb;
tmp_tablet_meta.to_meta_pb(&tmp_tablet_meta_pb);
*(tmp_tablet_meta_pb.mutable_tablet_uid()) = TabletUid::gen_uid().to_proto();
res = save(file_path, tmp_tablet_meta_pb);
res = save(header_file, tmp_tablet_meta_pb);
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save tablet meta pb to "
<< " meta_file=" << file_path;
<< " meta_file=" << header_file;
return res;
}
return res;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ using rocksdb::kDefaultColumnFamilyName;

namespace doris {

// should use tablet's generate tablet meta copy method to get a copy of current tablet meta
// should use tablet->generate_tablet_meta_copy() method to get a copy of current tablet meta
// there are some rowset meta in local meta store and in in-memory tablet meta
// but not in tablet meta in local meta store
OLAPStatus TabletMetaManager::get_meta(
Expand Down
Loading