Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,10 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() {
status_code = TStatusCode::RUNTIME_ERROR;
} else {
++_s_report_version;
TReplicaId replica_id = create_tablet_req.__isset.replica_id ? create_tablet_req.replica_id : 0;
// get path hash of the created tablet
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
create_tablet_req.tablet_id, create_tablet_req.tablet_schema.schema_hash);
create_tablet_req.tablet_id, create_tablet_req.tablet_schema.schema_hash, replica_id);
DCHECK(tablet != nullptr);
TTabletInfo tablet_info;
tablet_info.tablet_id = tablet->table_id();
Expand All @@ -383,6 +384,7 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() {
tablet_info.row_count = 0;
tablet_info.data_size = 0;
tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
tablet_info.replica_id = tablet->replica_id();
finish_tablet_infos.push_back(tablet_info);
}
TRACE("StorageEngine create tablet finish, status: $0", create_status);
Expand Down Expand Up @@ -425,11 +427,18 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
std::vector<string> error_msgs;
TStatus task_status;
string err;
TReplicaId replica_id = drop_tablet_req.__isset.replica_id ? drop_tablet_req.replica_id : 0;
TabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
drop_tablet_req.tablet_id, drop_tablet_req.schema_hash, false, &err);
drop_tablet_req.tablet_id, drop_tablet_req.schema_hash, replica_id, false, &err);
if (dropped_tablet != nullptr) {
if (dropped_tablet->clone_mode()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do this check in tablet_manager()->drop_tablet?

LOG(WARNING) << "drop table cancelled as tablet is in clone mode! signature: " << agent_task_req.signature;
error_msgs.push_back("drop table cancelled as tablet is in clone mode! signature: " + agent_task_req.signature);
status_code = TStatusCode::CANCELLED;
}

OLAPStatus drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet(
drop_tablet_req.tablet_id, drop_tablet_req.schema_hash);
drop_tablet_req.tablet_id, replica_id, drop_tablet_req.schema_hash);
if (drop_status != OLAP_SUCCESS) {
LOG(WARNING) << "drop table failed! signature: " << agent_task_req.signature;
error_msgs.push_back("drop table failed!");
Expand Down Expand Up @@ -901,6 +910,14 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
DorisMetrics::instance()->clone_requests_total->increment(1);
LOG(INFO) << "get clone task. signature:" << agent_task_req.signature;

TReplicaId replica_id = clone_req.__isset.replica_id ? clone_req.replica_id : 0;
// check tablet with the same tabletId existance, if exist, set tablet in clone mode
TabletSharedPtr exist_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this in EngineCloneTask.

clone_req.tablet_id, clone_req.schema_hash, replica_id);
if (exist_tablet != nullptr) {
exist_tablet->set_clone_mode(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not checking replica id here?

}

std::vector<string> error_msgs;
std::vector<TTabletInfo> tablet_infos;
EngineCloneTask engine_task(clone_req, _master_info, agent_task_req.signature, &error_msgs,
Expand Down Expand Up @@ -928,6 +945,14 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
task_status.__set_error_msgs(error_msgs);
finish_task_request.__set_task_status(task_status);

// clone done, set clone mode false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this in clone task.

// Retrieve once again to prevent tablet from being dropped
exist_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
clone_req.tablet_id, clone_req.schema_hash, replica_id);
if (exist_tablet != nullptr) {
exist_tablet->set_clone_mode(false);
}

_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
Expand Down Expand Up @@ -991,6 +1016,7 @@ OLAPStatus TaskWorkerPool::_check_migrate_requset(const TStorageMediumMigrateReq
TabletSharedPtr& tablet, DataDir** dest_store) {
int64_t tablet_id = req.tablet_id;
int32_t schema_hash = req.schema_hash;
// tablet migration no need to know replica_id
tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "can't find tablet. tablet_id= " << tablet_id
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ Status OlapScanNode::get_hints(const TPaloScanRange& scan_range, int block_row_c
int32_t schema_hash = strtoul(scan_range.schema_hash.c_str(), NULL, 10);
std::string err;
TabletSharedPtr table = StorageEngine::instance()->tablet_manager()->get_tablet(
tablet_id, schema_hash, true, &err);
tablet_id, schema_hash, 0 /*replica_id*/, true, &err);
if (table == nullptr) {
std::stringstream ss;
ss << "failed to get tablet: " << tablet_id << " with schema hash: " << schema_hash
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Status OlapScanner::prepare(
_version = strtoul(scan_range.version.c_str(), nullptr, 10);
{
std::string err;
_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash,
_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash, 0 /*replica_id*/,
true, &err);
if (_tablet.get() == nullptr) {
std::stringstream ss;
Expand Down
14 changes: 11 additions & 3 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
inline const std::string full_name() const;
inline int64_t partition_id() const;
inline int64_t tablet_id() const;
inline int64_t replica_id() const;
inline int32_t schema_hash() const;
inline int16_t shard_id();
inline const int64_t creation_time() const;
inline void set_creation_time(int64_t creation_time);
inline bool equal(int64_t tablet_id, int32_t schema_hash);
inline bool equal(int64_t tablet_id, int32_t schema_hash, int64_t replica_id = 0);

// properties encapsulated in TabletSchema
inline const TabletSchema& tablet_schema() const;
Expand Down Expand Up @@ -125,6 +126,10 @@ inline int64_t BaseTablet::tablet_id() const {
return _tablet_meta->tablet_id();
}

inline int64_t BaseTablet::replica_id() const {
return _tablet_meta->replica_id();
}

inline int32_t BaseTablet::schema_hash() const {
return _tablet_meta->schema_hash();
}
Expand All @@ -141,8 +146,11 @@ inline void BaseTablet::set_creation_time(int64_t creation_time) {
_tablet_meta->set_creation_time(creation_time);
}

inline bool BaseTablet::equal(int64_t id, int32_t hash) {
return (tablet_id() == id) && (schema_hash() == hash);
inline bool BaseTablet::equal(int64_t id, int32_t hash, int64_t r_id) {
// For compatibility with older data, there is no replica id in the old version of the tablet meta
// For new data with replica_id in the meta, there are some tasks that do not need to check the replica_id
// Only check replica_id in creat/drop/clone tablet tasks
return (tablet_id() == id) && ((replica_id() == 0 || r_id == 0) ? true : (replica_id() == r_id)) && (schema_hash() == hash);
}

inline const TabletSchema& BaseTablet::tablet_schema() const {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
// TODO support beta rowset
// For now, alpha and beta rowset meta have same fields, so we can just use
// AlphaRowsetMeta here.
OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, int64_t tablet_id,
OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, int64_t tablet_id, int64_t replica_id,
const int32_t& schema_hash) {
OLAPStatus res = OLAP_SUCCESS;
// check clone dir existed
Expand Down Expand Up @@ -154,6 +154,7 @@ OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, int64_t
// equal to tablet id in meta
new_tablet_meta_pb.set_tablet_id(tablet_id);
new_tablet_meta_pb.set_schema_hash(schema_hash);
new_tablet_meta_pb.set_replica_id(replica_id);
TabletSchema tablet_schema;
tablet_schema.init_from_pb(new_tablet_meta_pb.schema());

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class SnapshotManager {

static SnapshotManager* instance();

OLAPStatus convert_rowset_ids(const string& clone_dir, int64_t tablet_id,
OLAPStatus convert_rowset_ids(const string& clone_dir, int64_t tablet_id, int64_t replica_id,
const int32_t& schema_hash);

private:
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,14 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
return true;
}

bool Tablet::clone_mode() {
return _tablet_meta->in_clone_mode();
}

void Tablet::set_clone_mode(bool clone_mode) {
_tablet_meta->set_in_clone_mode(clone_mode);
}

uint32_t Tablet::calc_compaction_score(
CompactionType compaction_type,
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
Expand Down Expand Up @@ -1279,6 +1287,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
tablet_info->__set_version_count(_tablet_meta->version_count());
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory());
tablet_info->__set_replica_id(_tablet_meta->replica_id());
}

// should use this method to get a copy of current tablet meta
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ class Tablet : public BaseTablet {
bool version_for_delete_predicate(const Version& version);
bool version_for_load_deletion(const Version& version);

// message for clone task
bool clone_mode();
void set_clone_mode(bool clone_mode);

// meta lock
inline void obtain_header_rdlock() { _meta_lock.rdlock(); }
inline void obtain_header_wrlock() { _meta_lock.wrlock(); }
Expand Down
Loading