Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,36 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}

Status CloudTablet::merge_rowsets_schema() {
// Find the rowset with the max version
auto max_version_rowset =
std::max_element(
_rs_version_map.begin(), _rs_version_map.end(),
[](const auto& a, const auto& b) {
return !a.second->tablet_schema()
? true
: (!b.second->tablet_schema()
? false
: a.second->tablet_schema()->schema_version() <
b.second->tablet_schema()
->schema_version());
})
->second;
TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
// If the schema has variant columns, perform a merge to create a wide tablet schema
if (max_version_schema->num_variant_columns() > 0) {
std::vector<TabletSchemaSPtr> schemas;
std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas),
[](const auto& rs_meta) { return rs_meta.second->tablet_schema(); });
// Merge the collected schemas to obtain the least common schema
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
max_version_schema));
VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema();
_merged_tablet_schema = max_version_schema;
}
return Status::OK();
}

// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
Expand All @@ -133,6 +163,10 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}

// Merge all rowset schemas within a CloudTablet
RETURN_IF_ERROR(merge_rowsets_schema());

return st;
}

Expand Down Expand Up @@ -188,16 +222,7 @@ Status CloudTablet::sync_if_not_running() {
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rdlock(_meta_lock);
TabletSchemaSPtr target_schema;
std::vector<TabletSchemaSPtr> schemas;
for (const auto& [_, rowset] : _rs_version_map) {
schemas.push_back(rowset->tablet_schema());
}
// get the max version schema and merge all schema
static_cast<void>(
vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema));
return target_schema;
return _merged_tablet_schema;
}

void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ class CloudTablet final : public BaseTablet {

Status sync_if_not_running();

// Merge all rowset schemas within a CloudTablet
Status merge_rowsets_schema();

CloudStorageEngine& _engine;

// this mutex MUST ONLY be used when sync meta
Expand Down Expand Up @@ -246,6 +249,9 @@ class CloudTablet final : public BaseTablet {
std::mutex _base_compaction_lock;
std::mutex _cumulative_compaction_lock;
mutable std::mutex _rowset_update_lock;

// Schema will be merged from all rowsets when sync_rowsets
TabletSchemaSPtr _merged_tablet_schema;
};

using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
Expand Down
5 changes: 4 additions & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,10 @@ void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle
LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id;
continue;
}
tablet_schemas.push_back(res.value()->merged_tablet_schema());
auto schema = res.value()->merged_tablet_schema();
if (schema != nullptr) {
tablet_schemas.push_back(schema);
}
}
if (!tablet_schemas.empty()) {
// merge all
Expand Down