Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
Expand Down Expand Up @@ -410,6 +411,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
// backend side use schema dict
if (config::variant_use_cloud_schema_dict) {
req.set_schema_op(GetRowsetRequest::RETURN_DICT);
}
VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();

stub->get_rowset(&cntl, &req, &resp, nullptr);
Expand Down Expand Up @@ -524,7 +529,8 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) {
continue; // Same rowset, skip it
}
RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(
cloud_rs_meta_pb, resp.has_schema_dict() ? &resp.schema_dict() : nullptr);
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->init_from_pb(meta_pb);
RowsetSharedPtr rowset;
Expand Down
55 changes: 49 additions & 6 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "cloud/pb_convert.h"

#include <common/logging.h>
#include <gen_cpp/olap_file.pb.h>

#include <utility>
Expand Down Expand Up @@ -138,19 +139,54 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
}

RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) {
static void fill_schema_with_dict(const RowsetMetaCloudPB& in, RowsetMetaPB* out,
const SchemaCloudDictionary& dict) {
std::unordered_map<int32_t, ColumnPB*> unique_id_map;
//init map
for (ColumnPB& column : *out->mutable_tablet_schema()->mutable_column()) {
unique_id_map[column.unique_id()] = &column;
}
// column info
for (size_t i = 0; i < in.schema_dict_key_list().column_dict_key_list_size(); ++i) {
int dict_key = in.schema_dict_key_list().column_dict_key_list(i);
const ColumnPB& dict_val = dict.column_dict().at(dict_key);
ColumnPB& to_add = *out->mutable_tablet_schema()->add_column();
to_add = dict_val;
VLOG_DEBUG << "fill dict column " << dict_val.ShortDebugString();
}

// index info
for (size_t i = 0; i < in.schema_dict_key_list().index_info_dict_key_list_size(); ++i) {
int dict_key = in.schema_dict_key_list().index_info_dict_key_list(i);
const TabletIndexPB& dict_val = dict.index_dict().at(dict_key);
*out->mutable_tablet_schema()->add_index() = dict_val;
VLOG_DEBUG << "fill dict index " << dict_val.ShortDebugString();
}

// sparse column info
for (size_t i = 0; i < in.schema_dict_key_list().sparse_column_dict_key_list_size(); ++i) {
int dict_key = in.schema_dict_key_list().sparse_column_dict_key_list(i);
const ColumnPB& dict_val = dict.column_dict().at(dict_key);
*unique_id_map.at(dict_val.parent_unique_id())->add_sparse_columns() = dict_val;
VLOG_DEBUG << "fill dict sparse column" << dict_val.ShortDebugString();
}
}

RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in,
const SchemaCloudDictionary* dict) {
RowsetMetaPB out;
cloud_rowset_meta_to_doris(&out, in);
cloud_rowset_meta_to_doris(&out, in, dict);
return out;
}

RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&& in) {
RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&& in, const SchemaCloudDictionary* dict) {
RowsetMetaPB out;
cloud_rowset_meta_to_doris(&out, std::move(in));
cloud_rowset_meta_to_doris(&out, std::move(in), dict);
return out;
}

void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) {
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in,
const SchemaCloudDictionary* dict) {
// ATTN: please keep the set order aligned with the definition of proto `TabletSchemaCloudPB`.
out->set_rowset_id(in.rowset_id());
out->set_partition_id(in.partition_id());
Expand Down Expand Up @@ -185,6 +221,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
if (in.has_tablet_schema()) {
cloud_tablet_schema_to_doris(out->mutable_tablet_schema(), in.tablet_schema());
}
if (dict != nullptr) {
fill_schema_with_dict(in, out, *dict);
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
Expand All @@ -198,7 +237,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
}

void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
const SchemaCloudDictionary* dict) {
// ATTN: please keep the set order aligned with the definition of proto `TabletSchemaCloudPB`.
out->set_rowset_id(in.rowset_id());
out->set_partition_id(in.partition_id());
Expand Down Expand Up @@ -234,6 +274,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
cloud_tablet_schema_to_doris(out->mutable_tablet_schema(),
std::move(*in.mutable_tablet_schema()));
}
if (dict != nullptr) {
fill_schema_with_dict(in, out, *dict);
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
Expand Down
12 changes: 8 additions & 4 deletions be/src/cloud/pb_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ RowsetMetaCloudPB doris_rowset_meta_to_cloud(const RowsetMetaPB&);
RowsetMetaCloudPB doris_rowset_meta_to_cloud(RowsetMetaPB&&);
void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in);
void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in);
RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB&);
RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&&);
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in);
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in);
RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB&,
const SchemaCloudDictionary* dict = nullptr);
RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&&,
const SchemaCloudDictionary* dict = nullptr);
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in,
const SchemaCloudDictionary* dict = nullptr);
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
const SchemaCloudDictionary* dict = nullptr);

// TabletSchemaPB <=> TabletSchemaCloudPB
TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB&);
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ DEFINE_Int32(pipeline_executor_size, "0");
DEFINE_Bool(enable_workload_group_for_scan, "false");
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");

// Whether use schema dict in backend side instead of MetaService side(cloud mode)
DEFINE_mBool(variant_use_cloud_schema_dict, "true");
DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ DECLARE_mInt64(LZ4_HC_compression_level);
// Threshold of a column as sparse column
// Notice: TEST ONLY
DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column);
DECLARE_mBool(variant_use_cloud_schema_dict);
// Threshold to estimate a column is sparsed
// Notice: TEST ONLY
DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,10 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
_opts.rowset_ctx->merged_tablet_schema = _opts.rowset_ctx->tablet_schema;
}
TabletSchemaSPtr update_schema;
bool check_schema_size = true;
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
{_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, update_schema));
{_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, update_schema,
check_schema_size));
CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns())
<< "Rowset merge schema columns count is " << update_schema->num_columns()
<< ", but flush_schema is larger " << _flush_schema->num_columns()
Expand Down
6 changes: 4 additions & 2 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ CONF_mBool(snapshot_get_tablet_stats, "true");
// Value codec version
CONF_mInt16(meta_schema_value_version, "1");

// Limit kv size of Schema SchemaDictKeyList, default 10MB
CONF_mInt32(schema_dict_kv_size_limit, "10485760");
// Limit kv size of Schema SchemaDictKeyList, default 5MB
CONF_mInt32(schema_dict_kv_size_limit, "5242880");
// Limit the count of columns in schema dict value, default 4K
CONF_mInt32(schema_dict_key_count_limit, "4096");

// For instance check interval
CONF_Int64(reserved_buffer_days, "3");
Expand Down
4 changes: 2 additions & 2 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1587,8 +1587,8 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
}

if (need_read_schema_dict) {
read_schema_from_dict(code, msg, instance_id, idx.index_id(), txn.get(),
response->mutable_rowset_meta());
read_schema_dict(code, msg, instance_id, idx.index_id(), txn.get(), response,
request->schema_op());
if (code != MetaServiceCode::OK) return;
}
TEST_SYNC_POINT_CALLBACK("get_rowset::finish", &response);
Expand Down
25 changes: 20 additions & 5 deletions cloud/src/meta-service/meta_service_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,20 @@ void write_schema_dict(MetaServiceCode& code, std::string& msg, const std::strin
}
// Limit the size of dict value
if (dict_val.size() > config::schema_dict_kv_size_limit) {
code = MetaServiceCode::KV_TXN_COMMIT_ERR;
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "Failed to write dictionary for saving, txn_id=" << rowset_meta->txn_id()
<< ", reached the limited size threshold of SchemaDictKeyList "
<< config::schema_dict_kv_size_limit;
msg = ss.str();
return;
}
// Limit the count of dict keys
if (dict.column_dict_size() > config::schema_dict_key_count_limit) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "Reached max column size limit " << config::schema_dict_key_count_limit
<< ", txn_id=" << rowset_meta->txn_id();
msg = ss.str();
return;
}
// splitting large values (>90*1000) into multiple KVs
cloud::put(txn, dict_key, dict_val, 0);
Expand All @@ -307,9 +316,9 @@ void write_schema_dict(MetaServiceCode& code, std::string& msg, const std::strin
}
}

void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id,
int64_t index_id, Transaction* txn,
google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>* rowset_metas) {
void read_schema_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id,
int64_t index_id, Transaction* txn, GetRowsetResponse* response,
GetRowsetRequest::SchemaOp schema_op) {
std::stringstream ss;

// read dict if any rowset has dict key list
Expand All @@ -331,6 +340,12 @@ void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const std::s
LOG(INFO) << "Get schema_dict, column size=" << dict.column_dict_size()
<< ", index size=" << dict.index_dict_size();

// Return dict, let backend to fill schema with dict info
if (schema_op == GetRowsetRequest::RETURN_DICT) {
response->mutable_schema_dict()->Swap(&dict);
return;
}

auto fill_schema_with_dict = [&](RowsetMetaCloudPB* out) {
std::unordered_map<int32_t, ColumnPB*> unique_id_map;
//init map
Expand Down Expand Up @@ -366,7 +381,7 @@ void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const std::s
};

// fill rowsets's schema with dict info
for (auto& rowset_meta : *rowset_metas) {
for (auto& rowset_meta : *response->mutable_rowset_meta()) {
if (rowset_meta.has_schema_dict_key_list()) {
fill_schema_with_dict(&rowset_meta);
}
Expand Down
6 changes: 3 additions & 3 deletions cloud/src/meta-service/meta_service_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ void write_schema_dict(MetaServiceCode& code, std::string& msg, const std::strin
Transaction* txn, RowsetMetaCloudPB* rowset_meta);

// Read schema from dictionary metadata, modified to rowset_metas
void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id,
int64_t index_id, Transaction* txn,
google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>* rowset_metas);
void read_schema_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id,
int64_t index_id, Transaction* txn, GetRowsetResponse* response,
GetRowsetRequest::SchemaOp schema_op);

} // namespace doris::cloud
11 changes: 10 additions & 1 deletion gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -961,20 +961,29 @@ message CreateRowsetResponse {
}

message GetRowsetRequest {
enum SchemaOp {
FILL_WITH_DICT = 0; // fill rowset schema with SchemaCloudDictionary value
RETURN_DICT = 1; // not use dict value in MS, return SchemaCloudDictionary directly
NO_DICT = 2; // not read dict info, use local cached SchemaCloudDictionary instead
}
optional string cloud_unique_id = 1; // For auth
optional TabletIndexPB idx = 2;
optional int64 start_version = 3;
optional int64 end_version = 4;
optional int64 base_compaction_cnt = 5;
optional int64 cumulative_compaction_cnt = 6;
optional int64 cumulative_point = 7;
// TODO: There may be more fields TBD
// returned schema format on rowset schema, used in variant type directly.
// for compability reason we use FILL_WITH_DICT as default
optional SchemaOp schema_op = 8 [default = FILL_WITH_DICT];
}

message GetRowsetResponse {
optional MetaServiceResponseStatus status = 1;
repeated doris.RowsetMetaCloudPB rowset_meta = 2;
optional TabletStatsPB stats = 3;
// Return dict value if SchemaOp is RETURN_DICT
optional SchemaCloudDictionary schema_dict = 4;
}

message IndexRequest {
Expand Down
33 changes: 10 additions & 23 deletions regression-test/suites/variant_p0/column_size_limit.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,7 @@
// under the License.
import groovy.json.JsonBuilder

suite("regression_test_variant_column_limit", "nonConcurrent"){
def set_be_config = { key, value ->
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
suite("regression_test_variant_column_limit"){
def table_name = "var_column_limit"
sql "DROP TABLE IF EXISTS ${table_name}"
sql """
Expand All @@ -38,21 +28,18 @@ suite("regression_test_variant_column_limit", "nonConcurrent"){
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "false");
"""
try {
def jsonBuilder = new JsonBuilder()
def root = jsonBuilder {
// Generate 2049 fields
(1..2049).each { fieldNumber ->
"field$fieldNumber" fieldNumber
}
def jsonBuilder = new JsonBuilder()
def root = jsonBuilder {
// Generate 4097 fields
(1..4097).each { fieldNumber ->
"field$fieldNumber" fieldNumber
}
}

String jsonString = jsonBuilder.toPrettyString()
String jsonString = jsonBuilder.toPrettyString()
test {
sql """insert into ${table_name} values (1, '$jsonString')"""
} catch(Exception ex) {
logger.info("""INSERT INTO ${table_name} failed: """ + ex)
assertTrue(ex.toString().contains("Reached max column"));
} finally {
exception("Reached max column size limit")
}
sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" : 3}')"""

Expand Down