diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 02497f6a044b91..e198017f17a0d3 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -39,6 +39,7 @@ #include "cloud/cloud_tablet.h" #include "cloud/config.h" #include "cloud/pb_convert.h" +#include "common/config.h" #include "common/logging.h" #include "common/status.h" #include "cpp/sync_point.h" @@ -410,6 +411,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ req.set_cumulative_point(tablet->cumulative_layer_point()); } req.set_end_version(-1); + // backend side use schema dict + if (config::variant_use_cloud_schema_dict) { + req.set_schema_op(GetRowsetRequest::RETURN_DICT); + } VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString(); stub->get_rowset(&cntl, &req, &resp, nullptr); @@ -524,7 +529,8 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) { continue; // Same rowset, skip it } - RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb); + RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris( + cloud_rs_meta_pb, resp.has_schema_dict() ? &resp.schema_dict() : nullptr); auto rs_meta = std::make_shared(); rs_meta->init_from_pb(meta_pb); RowsetSharedPtr rowset; diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp index 550c08c5481d3a..c65d3208be4871 100644 --- a/be/src/cloud/pb_convert.cpp +++ b/be/src/cloud/pb_convert.cpp @@ -17,6 +17,7 @@ #include "cloud/pb_convert.h" +#include #include #include @@ -138,19 +139,54 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) { out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info()); } -RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) { +static void fill_schema_with_dict(const RowsetMetaCloudPB& in, RowsetMetaPB* out, + const SchemaCloudDictionary& dict) { + std::unordered_map unique_id_map; + //init map + for (ColumnPB& column : *out->mutable_tablet_schema()->mutable_column()) { + unique_id_map[column.unique_id()] = &column; + } + // column info + for (size_t i = 0; i < in.schema_dict_key_list().column_dict_key_list_size(); ++i) { + int dict_key = in.schema_dict_key_list().column_dict_key_list(i); + const ColumnPB& dict_val = dict.column_dict().at(dict_key); + ColumnPB& to_add = *out->mutable_tablet_schema()->add_column(); + to_add = dict_val; + VLOG_DEBUG << "fill dict column " << dict_val.ShortDebugString(); + } + + // index info + for (size_t i = 0; i < in.schema_dict_key_list().index_info_dict_key_list_size(); ++i) { + int dict_key = in.schema_dict_key_list().index_info_dict_key_list(i); + const TabletIndexPB& dict_val = dict.index_dict().at(dict_key); + *out->mutable_tablet_schema()->add_index() = dict_val; + VLOG_DEBUG << "fill dict index " << dict_val.ShortDebugString(); + } + + // sparse column info + for (size_t i = 0; i < in.schema_dict_key_list().sparse_column_dict_key_list_size(); ++i) { + int dict_key = in.schema_dict_key_list().sparse_column_dict_key_list(i); + const ColumnPB& dict_val = dict.column_dict().at(dict_key); + *unique_id_map.at(dict_val.parent_unique_id())->add_sparse_columns() = dict_val; + VLOG_DEBUG << "fill dict sparse column" << dict_val.ShortDebugString(); + } +} + +RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in, + const SchemaCloudDictionary* dict) { RowsetMetaPB out; - cloud_rowset_meta_to_doris(&out, in); + cloud_rowset_meta_to_doris(&out, in, dict); return out; } -RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&& in) { +RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&& in, const SchemaCloudDictionary* dict) { RowsetMetaPB out; - cloud_rowset_meta_to_doris(&out, std::move(in)); + cloud_rowset_meta_to_doris(&out, std::move(in), dict); return out; } -void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) { +void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in, + const SchemaCloudDictionary* dict) { // ATTN: please keep the set order aligned with the definition of proto `TabletSchemaCloudPB`. out->set_rowset_id(in.rowset_id()); out->set_partition_id(in.partition_id()); @@ -185,6 +221,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) if (in.has_tablet_schema()) { cloud_tablet_schema_to_doris(out->mutable_tablet_schema(), in.tablet_schema()); } + if (dict != nullptr) { + fill_schema_with_dict(in, out, *dict); + } out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->mutable_segments_file_size()->CopyFrom(in.segments_file_size()); @@ -198,7 +237,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info()); } -void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) { +void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in, + const SchemaCloudDictionary* dict) { // ATTN: please keep the set order aligned with the definition of proto `TabletSchemaCloudPB`. out->set_rowset_id(in.rowset_id()); out->set_partition_id(in.partition_id()); @@ -234,6 +274,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) { cloud_tablet_schema_to_doris(out->mutable_tablet_schema(), std::move(*in.mutable_tablet_schema())); } + if (dict != nullptr) { + fill_schema_with_dict(in, out, *dict); + } out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size()); diff --git a/be/src/cloud/pb_convert.h b/be/src/cloud/pb_convert.h index 0cfa033f2930a0..31fe43adb11a6d 100644 --- a/be/src/cloud/pb_convert.h +++ b/be/src/cloud/pb_convert.h @@ -24,10 +24,14 @@ RowsetMetaCloudPB doris_rowset_meta_to_cloud(const RowsetMetaPB&); RowsetMetaCloudPB doris_rowset_meta_to_cloud(RowsetMetaPB&&); void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in); void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in); -RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB&); -RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&&); -void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in); -void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in); +RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB&, + const SchemaCloudDictionary* dict = nullptr); +RowsetMetaPB cloud_rowset_meta_to_doris(RowsetMetaCloudPB&&, + const SchemaCloudDictionary* dict = nullptr); +void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in, + const SchemaCloudDictionary* dict = nullptr); +void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in, + const SchemaCloudDictionary* dict = nullptr); // TabletSchemaPB <=> TabletSchemaCloudPB TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB&); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index bb92cf18b73ee5..48d4565c1d3407 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1003,6 +1003,8 @@ DEFINE_Int32(pipeline_executor_size, "0"); DEFINE_Bool(enable_workload_group_for_scan, "false"); DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000"); +// Whether use schema dict in backend side instead of MetaService side(cloud mode) +DEFINE_mBool(variant_use_cloud_schema_dict, "true"); DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1"); DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048"); DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 7e70e067f3ae79..27e697b0c800f1 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1199,6 +1199,7 @@ DECLARE_mInt64(LZ4_HC_compression_level); // Threshold of a column as sparse column // Notice: TEST ONLY DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column); +DECLARE_mBool(variant_use_cloud_schema_dict); // Threshold to estimate a column is sparsed // Notice: TEST ONLY DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 0769dbe86d2a63..4f94189a6212eb 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -666,8 +666,10 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& _opts.rowset_ctx->merged_tablet_schema = _opts.rowset_ctx->tablet_schema; } TabletSchemaSPtr update_schema; + bool check_schema_size = true; RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( - {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, update_schema)); + {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, update_schema, + check_schema_size)); CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns()) << "Rowset merge schema columns count is " << update_schema->num_columns() << ", but flush_schema is larger " << _flush_schema->num_columns() diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index f6a0205d8a5555..da865650c608ec 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -135,8 +135,10 @@ CONF_mBool(snapshot_get_tablet_stats, "true"); // Value codec version CONF_mInt16(meta_schema_value_version, "1"); -// Limit kv size of Schema SchemaDictKeyList, default 10MB -CONF_mInt32(schema_dict_kv_size_limit, "10485760"); +// Limit kv size of Schema SchemaDictKeyList, default 5MB +CONF_mInt32(schema_dict_kv_size_limit, "5242880"); +// Limit the count of columns in schema dict value, default 4K +CONF_mInt32(schema_dict_key_count_limit, "4096"); // For instance check interval CONF_Int64(reserved_buffer_days, "3"); diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 7adbc8ccf12aab..107ca9c0447c13 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1587,8 +1587,8 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, } if (need_read_schema_dict) { - read_schema_from_dict(code, msg, instance_id, idx.index_id(), txn.get(), - response->mutable_rowset_meta()); + read_schema_dict(code, msg, instance_id, idx.index_id(), txn.get(), response, + request->schema_op()); if (code != MetaServiceCode::OK) return; } TEST_SYNC_POINT_CALLBACK("get_rowset::finish", &response); diff --git a/cloud/src/meta-service/meta_service_schema.cpp b/cloud/src/meta-service/meta_service_schema.cpp index d99f026d051612..ca0a15d8577b31 100644 --- a/cloud/src/meta-service/meta_service_schema.cpp +++ b/cloud/src/meta-service/meta_service_schema.cpp @@ -292,11 +292,20 @@ void write_schema_dict(MetaServiceCode& code, std::string& msg, const std::strin } // Limit the size of dict value if (dict_val.size() > config::schema_dict_kv_size_limit) { - code = MetaServiceCode::KV_TXN_COMMIT_ERR; + code = MetaServiceCode::INVALID_ARGUMENT; ss << "Failed to write dictionary for saving, txn_id=" << rowset_meta->txn_id() << ", reached the limited size threshold of SchemaDictKeyList " << config::schema_dict_kv_size_limit; msg = ss.str(); + return; + } + // Limit the count of dict keys + if (dict.column_dict_size() > config::schema_dict_key_count_limit) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "Reached max column size limit " << config::schema_dict_key_count_limit + << ", txn_id=" << rowset_meta->txn_id(); + msg = ss.str(); + return; } // splitting large values (>90*1000) into multiple KVs cloud::put(txn, dict_key, dict_val, 0); @@ -307,9 +316,9 @@ void write_schema_dict(MetaServiceCode& code, std::string& msg, const std::strin } } -void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id, - int64_t index_id, Transaction* txn, - google::protobuf::RepeatedPtrField* rowset_metas) { +void read_schema_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id, + int64_t index_id, Transaction* txn, GetRowsetResponse* response, + GetRowsetRequest::SchemaOp schema_op) { std::stringstream ss; // read dict if any rowset has dict key list @@ -331,6 +340,12 @@ void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const std::s LOG(INFO) << "Get schema_dict, column size=" << dict.column_dict_size() << ", index size=" << dict.index_dict_size(); + // Return dict, let backend to fill schema with dict info + if (schema_op == GetRowsetRequest::RETURN_DICT) { + response->mutable_schema_dict()->Swap(&dict); + return; + } + auto fill_schema_with_dict = [&](RowsetMetaCloudPB* out) { std::unordered_map unique_id_map; //init map @@ -366,7 +381,7 @@ void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const std::s }; // fill rowsets's schema with dict info - for (auto& rowset_meta : *rowset_metas) { + for (auto& rowset_meta : *response->mutable_rowset_meta()) { if (rowset_meta.has_schema_dict_key_list()) { fill_schema_with_dict(&rowset_meta); } diff --git a/cloud/src/meta-service/meta_service_schema.h b/cloud/src/meta-service/meta_service_schema.h index d44f01f9747128..ec1dcc6731f458 100644 --- a/cloud/src/meta-service/meta_service_schema.h +++ b/cloud/src/meta-service/meta_service_schema.h @@ -35,8 +35,8 @@ void write_schema_dict(MetaServiceCode& code, std::string& msg, const std::strin Transaction* txn, RowsetMetaCloudPB* rowset_meta); // Read schema from dictionary metadata, modified to rowset_metas -void read_schema_from_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id, - int64_t index_id, Transaction* txn, - google::protobuf::RepeatedPtrField* rowset_metas); +void read_schema_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id, + int64_t index_id, Transaction* txn, GetRowsetResponse* response, + GetRowsetRequest::SchemaOp schema_op); } // namespace doris::cloud diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 8ae48851601b99..d116a8106db8da 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -961,6 +961,11 @@ message CreateRowsetResponse { } message GetRowsetRequest { + enum SchemaOp { + FILL_WITH_DICT = 0; // fill rowset schema with SchemaCloudDictionary value + RETURN_DICT = 1; // not use dict value in MS, return SchemaCloudDictionary directly + NO_DICT = 2; // not read dict info, use local cached SchemaCloudDictionary instead + } optional string cloud_unique_id = 1; // For auth optional TabletIndexPB idx = 2; optional int64 start_version = 3; @@ -968,13 +973,17 @@ message GetRowsetRequest { optional int64 base_compaction_cnt = 5; optional int64 cumulative_compaction_cnt = 6; optional int64 cumulative_point = 7; - // TODO: There may be more fields TBD + // returned schema format on rowset schema, used in variant type directly. + // for compability reason we use FILL_WITH_DICT as default + optional SchemaOp schema_op = 8 [default = FILL_WITH_DICT]; } message GetRowsetResponse { optional MetaServiceResponseStatus status = 1; repeated doris.RowsetMetaCloudPB rowset_meta = 2; optional TabletStatsPB stats = 3; + // Return dict value if SchemaOp is RETURN_DICT + optional SchemaCloudDictionary schema_dict = 4; } message IndexRequest { diff --git a/regression-test/suites/variant_p0/column_size_limit.groovy b/regression-test/suites/variant_p0/column_size_limit.groovy index 70567d89c07d2a..5e9d05b558e7cc 100644 --- a/regression-test/suites/variant_p0/column_size_limit.groovy +++ b/regression-test/suites/variant_p0/column_size_limit.groovy @@ -16,17 +16,7 @@ // under the License. import groovy.json.JsonBuilder -suite("regression_test_variant_column_limit", "nonConcurrent"){ - def set_be_config = { key, value -> - String backend_id; - def backendId_to_backendIP = [:] - def backendId_to_backendHttpPort = [:] - getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); - - backend_id = backendId_to_backendIP.keySet()[0] - def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) - logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) - } +suite("regression_test_variant_column_limit"){ def table_name = "var_column_limit" sql "DROP TABLE IF EXISTS ${table_name}" sql """ @@ -38,21 +28,18 @@ suite("regression_test_variant_column_limit", "nonConcurrent"){ DISTRIBUTED BY HASH(k) BUCKETS 1 properties("replication_num" = "1", "disable_auto_compaction" = "false"); """ - try { - def jsonBuilder = new JsonBuilder() - def root = jsonBuilder { - // Generate 2049 fields - (1..2049).each { fieldNumber -> - "field$fieldNumber" fieldNumber - } + def jsonBuilder = new JsonBuilder() + def root = jsonBuilder { + // Generate 4097 fields + (1..4097).each { fieldNumber -> + "field$fieldNumber" fieldNumber } + } - String jsonString = jsonBuilder.toPrettyString() + String jsonString = jsonBuilder.toPrettyString() + test { sql """insert into ${table_name} values (1, '$jsonString')""" - } catch(Exception ex) { - logger.info("""INSERT INTO ${table_name} failed: """ + ex) - assertTrue(ex.toString().contains("Reached max column")); - } finally { + exception("Reached max column size limit") } sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" : 3}')"""