From 253bfbf589d535a430252375e68eff15f4027bfd Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Thu, 20 Jun 2024 11:39:13 +0800 Subject: [PATCH 1/6] [improvement](cloud) Accelerate creating table by batching RPC --- cloud/src/common/bvars.cpp | 2 + cloud/src/common/bvars.h | 1 + cloud/src/meta-service/meta_service.h | 15 +++ .../meta-service/meta_service_partition.cpp | 91 +++++++++++++++ .../java/org/apache/doris/alter/Alter.java | 2 +- .../java/org/apache/doris/catalog/Env.java | 19 ++- .../clone/DynamicPartitionScheduler.java | 108 +++++++++++++++++- .../datasource/CloudInternalCatalog.java | 86 +++++++++++++- .../doris/cloud/rpc/MetaServiceClient.java | 4 + .../doris/cloud/rpc/MetaServiceProxy.java | 9 ++ .../doris/datasource/InternalCatalog.java | 103 +++++++++++------ .../apache/doris/mtmv/MTMVPartitionUtil.java | 3 +- .../doris/service/FrontendServiceImpl.java | 2 +- .../doris/alter/InternalSchemaAlterTest.java | 4 +- .../doris/catalog/CreateTableLikeTest.java | 2 + .../apache/doris/catalog/CreateTableTest.java | 2 + .../doris/catalog/ModifyBackendTest.java | 2 + .../org/apache/doris/catalog/RecoverTest.java | 2 + .../apache/doris/planner/QueryPlanTest.java | 1 + gensrc/proto/cloud.proto | 25 ++++ 20 files changed, 436 insertions(+), 47 deletions(-) diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 43acb47e365e46..dc401398f68c35 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -79,6 +79,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job" BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", "get_cluster_status"); BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", "set_cluster_status"); +BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv"); + // txn_kv's bvars bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get"); bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index e5b50262104842..f2957e35940334 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -174,6 +174,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance; extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id; +extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv; // txn_kv's bvars extern bvar::LatencyRecorder g_bvar_txn_kv_get; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 6ba3d5b45eb320..9b54bd538202a0 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -145,6 +145,9 @@ class MetaServiceImpl : public cloud::MetaService { void drop_index(::google::protobuf::RpcController* controller, const IndexRequest* request, IndexResponse* response, ::google::protobuf::Closure* done) override; + void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request, + CheckKVResponse* response, ::google::protobuf::Closure* done) override; + void prepare_partition(::google::protobuf::RpcController* controller, const PartitionRequest* request, PartitionResponse* response, ::google::protobuf::Closure* done) override; @@ -280,6 +283,13 @@ class MetaServiceImpl : public cloud::MetaService { const AlterInstanceRequest* request, std::function(InstanceInfoPB*)> action); + using check_create_table_type = std::function, std::string, + std::function>(const CheckKVRequest* request)>; + void check_create_table(std::string instance_id, const CheckKVRequest* request, + CheckKVResponse* response, MetaServiceCode* code, std::string* msg, + check_create_table_type get_check_info); + std::shared_ptr txn_kv_; std::shared_ptr resource_mgr_; std::shared_ptr rate_limiter_; @@ -426,6 +436,11 @@ class MetaServiceProxy final : public MetaService { call_impl(&cloud::MetaService::drop_index, controller, request, response, done); } + void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request, + CheckKVResponse* response, ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::check_kv, controller, request, response, done); + } + void prepare_partition(::google::protobuf::RpcController* controller, const PartitionRequest* request, PartitionResponse* response, ::google::protobuf::Closure* done) override { diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index aae11be5edda47..594a412533dc00 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -67,6 +67,11 @@ static TxnErrorCode index_exists(Transaction* txn, const std::string& instance_i return it->has_next() ? TxnErrorCode::TXN_OK : TxnErrorCode::TXN_KEY_NOT_FOUND; } +static TxnErrorCode check_recycle_key_exist(Transaction* txn, const std::string& key) { + std::string val; + return txn->get(key, &val); +} + void MetaServiceImpl::prepare_index(::google::protobuf::RpcController* controller, const IndexRequest* request, IndexResponse* response, ::google::protobuf::Closure* done) { @@ -614,4 +619,90 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll } } +void MetaServiceImpl::check_create_table(std::string instance_id, const CheckKVRequest* request, + CheckKVResponse* response, MetaServiceCode* code, + std::string* msg, check_create_table_type get_check_info) { + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + *code = cast_as(err); + *msg = "failed to create txn"; + return; + } + auto& [keys, hint, key_func] = get_check_info(request); + + if (keys.empty()) { + *code = MetaServiceCode::INVALID_ARGUMENT; + *msg = "empty partition_ids"; + return; + } + + for (auto id : keys) { + auto key = key_func(instance_id, id); + err = check_recycle_key_exist(txn.get(), key); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + continue; + } else if (err == TxnErrorCode::TXN_OK) { + // find not match, prepare commit + *code = MetaServiceCode::INVALID_ARGUMENT; + *msg = "prepare and commit rpc not match, recycle key remained"; + return; + } else { + // err != TXN_OK, fdb read err + *code = MetaServiceCode::INVALID_ARGUMENT; + *msg = "ms read key error"; + return; + } + } + LOG_INFO("check {} success request={}", hint, request->ShortDebugString()); + return; +} + +void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller, + const CheckKVRequest* request, CheckKVResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(check_kv); + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + return; + } + if (!request->has_op()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "op not given"; + return; + } + if (!request->has_check_keys()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty check keys"; + return; + } + RPC_RATE_LIMIT(check_kv); + switch (request->op()) { + case CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT: { + check_create_table( + instance_id, request, response, &code, &msg, [](const CheckKVRequest* request) { + return std::make_tuple(request->check_keys().index_ids(), "index", + [](std::string instance_id, int64_t id) { + return recycle_index_key({instance_id, id}); + }); + }); + break; + } + case CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT: { + check_create_table( + instance_id, request, response, &code, &msg, [](const CheckKVRequest* request) { + return std::make_tuple(request->check_keys().partition_ids(), "partition", + [](std::string instance_id, int64_t id) { + return recycle_partition_key({instance_id, id}); + }); + }); + break; + } + default: + DCHECK(false); + }; +} + } // namespace doris::cloud diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index a029f604dfdc61..2ed50a27673b90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -483,7 +483,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException { DynamicPartitionUtil.checkAlterAllowed( (OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP)); } - Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause); + Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause, false, 0, true); } else if (alterClause instanceof AddPartitionLikeClause) { if (!((AddPartitionLikeClause) alterClause).getIsTempPartition()) { DynamicPartitionUtil.checkAlterAllowed( diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 207a16a267855d..8821a4b62f281d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3240,8 +3240,23 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio getInternalCatalog().createTableAsSelect(stmt); } - public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException { - getInternalCatalog().addPartition(db, tableName, addPartitionClause); + /** + * Adds a partition to a table + * + * @param db + * @param tableName + * @param addPartitionClause clause in the CreateTableStmt + * @param isCreateTable this call is for creating table + * @param generatedPartitionId the preset partition id for the partition to add + * @param writeEditLog whether to write an edit log for this addition + * @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added. + * @throws DdlException + */ + public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause, + boolean isCreateTable, long generatedPartitionId, + boolean writeEditLog) throws DdlException { + return getInternalCatalog().addPartition(db, tableName, addPartitionClause, + isCreateTable, generatedPartitionId, writeEditLog); } public void addPartitionLike(Database db, String tableName, AddPartitionLikeClause addPartitionLikeClause) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index d17af1836fe762..b8910d1d2563b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.MetaIdGenerator; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionItem; @@ -45,11 +46,14 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.util.AutoBucketUtils; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.persist.PartitionPersistInfo; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TStorageMedium; @@ -615,9 +619,60 @@ public void executeDynamicPartition(Collection> dynamicPartitio } if (!skipAddPartition) { - for (AddPartitionClause addPartitionClause : addPartitionClauses) { + // get partitionIds and indexIds + List indexIds = new ArrayList<>(olapTable.getCopiedIndexIdToMeta().keySet()); + List generatedPartitionIds = new ArrayList<>(); + if (executeFirstTime && Config.isCloudMode() && !addPartitionClauses.isEmpty()) { + AddPartitionClause addPartitionClause = addPartitionClauses.get(0); + DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); try { - Env.getCurrentEnv().addPartition(db, tableName, addPartitionClause); + DistributionInfo distributionInfo = distributionDesc + .toDistributionInfo(olapTable.getBaseSchema()); + if (distributionDesc == null) { + distributionInfo = olapTable.getDefaultDistributionInfo() + .toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema()); + } + long allPartitionBufferSize = 0; + for (int i = 0; i < addPartitionClauses.size(); i++) { + long bufferSize = InternalCatalog.checkAndGetBufferSize(indexIds.size(), + distributionInfo.getBucketNum(), + addPartitionClause.getSingeRangePartitionDesc() + .getReplicaAlloc().getTotalReplicaNum(), + db, tableName); + allPartitionBufferSize += bufferSize; + } + MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv() + .getIdGeneratorBuffer(allPartitionBufferSize); + addPartitionClauses.forEach(p -> generatedPartitionIds.add(idGeneratorBuffer.getNextId())); + // executeFirstTime true + Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), olapTable.getId(), + generatedPartitionIds, indexIds, true); + } catch (Exception e) { + LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId {} indexId {} exception {}", + db.getFullName(), tableName, olapTable.getId(), indexIds, e.getMessage()); + recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); + throw new DdlException("cloud in prepare step err"); + } + } + + List partsInfo = new ArrayList<>(); + for (int i = 0; i < addPartitionClauses.size(); i++) { + try { + boolean needWriteEditLog = true; + // ATTN: !executeFirstTime, needWriteEditLog + // here executeFirstTime is create table, so in cloud edit log will postpone + if (Config.isCloudMode()) { + needWriteEditLog = !executeFirstTime; + } + PartitionPersistInfo info = + Env.getCurrentEnv().addPartition(db, tableName, addPartitionClauses.get(i), + executeFirstTime, + executeFirstTime && Config.isCloudMode() ? generatedPartitionIds.get(i) : 0, + needWriteEditLog); + if (info == null) { + throw new Exception("null persisted partition returned"); + } + partsInfo.add(info); clearCreatePartitionFailedMsg(olapTable.getId()); } catch (Exception e) { recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); @@ -627,6 +682,55 @@ public void executeDynamicPartition(Collection> dynamicPartitio } } } + List succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo + -> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList()); + if (executeFirstTime && Config.isCloudMode() && !addPartitionClauses.isEmpty()) { + try { + // ATTN: failedPids = generatedPartitionIds - succeedPartitionIds, + // means some partitions failed when addPartition, failedPids will be recycled by recycler + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition")) { + LOG.info("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e"); + // not commit, not log edit + throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); + } + Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(), + succeedPartitionIds, indexIds, true); + LOG.info("begin write edit log to add partitions in batch, " + + "numPartitions: {}, db: {}, table: {}, tableId: {}", + partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); + // ATTN: here, edit log must after commit cloud partition, + // prevent commit RPC failure from causing data loss + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) { + LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e"); + // committed, but not log edit + throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); + } + for (int i = 0; i < partsInfo.size(); i++) { + Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i)); + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) { + if (i == partsInfo.size() / 2) { + LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e"); + // committed, but log some edit, others failed + throw new Exception("debug point FE.DynamicPartitionScheduler" + + ".in.commitCloudPartition"); + } + } + } + LOG.info("finish write edit log to add partitions in batch, " + + "numPartitions: {}, db: {}, table: {}, tableId: {}", + partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); + } catch (Exception e) { + LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}", + db.getFullName(), tableName, olapTable.getId(), e.getMessage()); + recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); + throw new DdlException("cloud in commit step err"); + } + } + // cloud mode, check recycle key not remained + if (Config.isCloudMode() && executeFirstTime) { + Env.getCurrentInternalCatalog().checkCreatePartitions(db.getId(), olapTable.getId(), + succeedPartitionIds, indexIds); + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 879621faef914d..060cc53f0f27d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -395,7 +395,8 @@ private void createCloudTablets(MaterializedIndex index, ReplicaState replicaSta } @Override - protected void beforeCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) + public void beforeCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, + boolean isCreateTable) throws DdlException { if (partitionIds == null) { prepareMaterializedIndex(tableId, indexIds, 0); @@ -405,7 +406,7 @@ protected void beforeCreatePartitions(long dbId, long tableId, List partit } @Override - protected void afterCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, + public void afterCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, boolean isCreateTable) throws DdlException { if (partitionIds == null) { @@ -415,6 +416,15 @@ protected void afterCreatePartitions(long dbId, long tableId, List partiti } } + public void checkCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) + throws DdlException { + if (partitionIds == null) { + checkMaterializedIndex(dbId, tableId, indexIds); + } else { + checkPartition(dbId, tableId, partitionIds); + } + } + private void preparePartition(long dbId, long tableId, List partitionIds, List indexIds) throws DdlException { Cloud.PartitionRequest.Builder partitionRequestBuilder = Cloud.PartitionRequest.newBuilder(); @@ -549,6 +559,78 @@ public void commitMaterializedIndex(long dbId, long tableId, List indexIds } } + private void checkPartition(long dbId, long tableId, List partitionIds) + throws DdlException { + Cloud.CheckKeyInfos.Builder checkKeyInfosBuilder = Cloud.CheckKeyInfos.newBuilder(); + checkKeyInfosBuilder.addAllIndexIds(partitionIds); + // for ms log + checkKeyInfosBuilder.addDbIds(dbId); + checkKeyInfosBuilder.addTableIds(tableId); + + Cloud.CheckKVRequest.Builder checkKvRequestBuilder = Cloud.CheckKVRequest.newBuilder(); + checkKvRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); + checkKvRequestBuilder.setCheckKeys(checkKeyInfosBuilder.build()); + final Cloud.CheckKVRequest checkKVRequest = checkKvRequestBuilder.build(); + + Cloud.CheckKVResponse response = null; + int tryTimes = 0; + while (tryTimes++ < Config.metaServiceRpcRetryTimes()) { + try { + response = MetaServiceProxy.getInstance().checkKv(checkKVRequest); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + } catch (RpcException e) { + LOG.warn("tryTimes:{}, checkPartition RpcException", tryTimes, e); + if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) { + throw new DdlException(e.getMessage()); + } + } + sleepSeveralMs(); + } + + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("checkPartition response: {} ", response); + throw new DdlException(response.getStatus().getMsg()); + } + } + + public void checkMaterializedIndex(long dbId, long tableId, List indexIds) + throws DdlException { + Cloud.CheckKeyInfos.Builder checkKeyInfosBuilder = Cloud.CheckKeyInfos.newBuilder(); + checkKeyInfosBuilder.addAllIndexIds(indexIds); + // for ms log + checkKeyInfosBuilder.addDbIds(dbId); + checkKeyInfosBuilder.addTableIds(tableId); + + Cloud.CheckKVRequest.Builder checkKvRequestBuilder = Cloud.CheckKVRequest.newBuilder(); + checkKvRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); + checkKvRequestBuilder.setCheckKeys(checkKeyInfosBuilder.build()); + final Cloud.CheckKVRequest checkKVRequest = checkKvRequestBuilder.build(); + + Cloud.CheckKVResponse response = null; + int tryTimes = 0; + while (tryTimes++ < Config.metaServiceRpcRetryTimes()) { + try { + response = MetaServiceProxy.getInstance().checkKv(checkKVRequest); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + } catch (RpcException e) { + LOG.warn("tryTimes:{}, checkIndex RpcException", tryTimes, e); + if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) { + throw new DdlException(e.getMessage()); + } + } + sleepSeveralMs(); + } + + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("checkIndex response: {} ", response); + throw new DdlException(response.getStatus().getMsg()); + } + } + public Cloud.CreateTabletsResponse sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder requestBuilder) throws DdlException { requestBuilder.setCloudUniqueId(Config.cloud_unique_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index f7a178deb013cc..d5cdc79eb7f7d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -255,6 +255,10 @@ public Cloud.PartitionResponse commitPartition(Cloud.PartitionRequest request) { return blockingStub.commitPartition(request); } + public Cloud.CheckKVResponse checkKv(Cloud.CheckKVRequest request) { + return blockingStub.checkKv(request); + } + public Cloud.PartitionResponse dropPartition(Cloud.PartitionRequest request) { return blockingStub.dropPartition(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index b321b6cffc5b07..7c7747f19a20b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -324,6 +324,15 @@ public Cloud.IndexResponse commitIndex(Cloud.IndexRequest request) throws RpcExc } } + public Cloud.CheckKVResponse checkKv(Cloud.CheckKVRequest request) throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.checkKv(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } + public Cloud.IndexResponse dropIndex(Cloud.IndexRequest request) throws RpcException { try { final MetaServiceClient client = getProxy(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 68d7db6256cce9..a82c3f009eb680 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1479,7 +1479,7 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeClau } finally { table.readUnlock(); } - addPartition(db, tableName, clause); + addPartition(db, tableName, clause, false, 0, true); } catch (UserException e) { throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeClause.getPartitionName() @@ -1487,7 +1487,25 @@ public void addPartitionLike(Database db, String tableName, AddPartitionLikeClau } } - public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException { + public static long checkAndGetBufferSize(long indexNum, long bucketNum, + long replicaNum, Database db, String tableName) throws DdlException { + long totalReplicaNum = indexNum * bucketNum * replicaNum; + if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { + throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " + + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); + } + return 1 + totalReplicaNum + indexNum * bucketNum; + } + + public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause, + boolean isCreateTable, long generatedPartitionId, + boolean writeEditLog) throws DdlException { + // in cloud mode, isCreateTable == true, create dynamic partition use, so partitionId must have been generated. + // isCreateTable == false, other case, partitionId generate in below, must be set 0 + if (!FeConstants.runningUnitTest && Config.isCloudMode() + && (isCreateTable && generatedPartitionId == 0) || (!isCreateTable && generatedPartitionId != 0)) { + throw new DdlException("not impossible"); + } SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc(); DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); boolean isTempPartition = addPartitionClause.isTempPartition(); @@ -1510,7 +1528,7 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa if (singlePartitionDesc.isSetIfNotExists()) { LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) { - return; + return null; } } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); @@ -1660,17 +1678,10 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty(); Preconditions.checkNotNull(dataProperty); // check replica quota if this operation done - long indexNum = indexIdToMeta.size(); - long bucketNum = distributionInfo.getBucketNum(); - long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(); - long totalReplicaNum = indexNum * bucketNum * replicaNum; - if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { - throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " - + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); - } - Set tabletIdSet = new HashSet<>(); - long bufferSize = 1 + totalReplicaNum + indexNum * bucketNum; + long bufferSize = checkAndGetBufferSize(indexIdToMeta.size(), distributionInfo.getBucketNum(), + singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(), db, tableName); IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); + Set tabletIdSet = new HashSet<>(); String storagePolicy = olapTable.getStoragePolicy(); if (!Strings.isNullOrEmpty(dataProperty.getStoragePolicy())) { storagePolicy = dataProperty.getStoragePolicy(); @@ -1681,10 +1692,13 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa } }; try { - long partitionId = idGeneratorBuffer.getNextId(); + long partitionId = Config.isCloudMode() && !FeConstants.runningUnitTest && isCreateTable + ? generatedPartitionId : idGeneratorBuffer.getNextId(); List partitionIds = Lists.newArrayList(partitionId); - List indexIds = indexIdToMeta.keySet().stream().collect(Collectors.toList()); - beforeCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds); + List indexIds = new ArrayList<>(indexIdToMeta.keySet()); + if (!isCreateTable) { + beforeCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable); + } Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName, indexIdToMeta, distributionInfo, dataProperty, singlePartitionDesc.getReplicaAlloc(), @@ -1693,7 +1707,6 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa singlePartitionDesc.getTabletType(), storagePolicy, idGeneratorBuffer, binlogConfig, dataProperty.isStorageMediumSpecified(), null); - // TODO cluster key ids // check again olapTable = db.getOlapTableOrDdlException(tableName); @@ -1705,7 +1718,7 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (singlePartitionDesc.isSetIfNotExists()) { failedCleanCallback.run(); - return; + return null; } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); } @@ -1773,25 +1786,34 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa PartitionPersistInfo info = null; if (partitionInfo.getType() == PartitionType.RANGE) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else if (partitionInfo.getType() == PartitionType.LIST) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } - afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, false); - Env.getCurrentEnv().getEditLog().logAddPartition(info); - - LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); + if (!isCreateTable) { + afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable); + } + if (writeEditLog) { + Env.getCurrentEnv().getEditLog().logAddPartition(info); + LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); + } else { + LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition); + } + if (!isCreateTable) { + checkCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds); + } + return info; } finally { olapTable.writeUnlock(); } @@ -2143,15 +2165,21 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa return partition; } - protected void beforeCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) + public void beforeCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, + boolean isCreateTable) throws DdlException { } - protected void afterCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, + public void afterCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, boolean isCreateTable) throws DdlException { } + public void checkCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) + throws DdlException { + + } + public void checkAvailableCapacity(Database db) throws DdlException { // check cluster capacity Env.getCurrentSystemInfo().checkAvailableCapacity(); @@ -2827,7 +2855,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx "Database " + db.getFullName() + " create unpartitioned table " + tableName + " increasing " + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } - beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList()); + beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName, olapTable.getIndexIdToMeta(), partitionDistributionInfo, @@ -2841,6 +2869,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx keysDesc.getClusterKeysColumnIds()); afterCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); + checkCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList()); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { @@ -2891,7 +2920,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } - beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList()); + beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); // this is a 2-level partitioned tables for (Map.Entry entry : partitionNameToId.entrySet()) { @@ -2928,6 +2957,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx } afterCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); + checkCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList()); } else { throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name()); } @@ -3354,7 +3384,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } List indexIds = copiedTbl.getIndexIdToMeta().keySet().stream().collect(Collectors.toList()); - beforeCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds); + beforeCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); for (Map.Entry entry : origPartitions.entrySet()) { // the new partition must use new id @@ -3380,6 +3410,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } afterCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); + checkCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds); } catch (DdlException e) { // create partition failed, remove all newly created tablets diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 6595afb70f7419..3d77f42a1ccb40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -394,7 +394,8 @@ private static void addPartition(MTMV mtmv, PartitionKeyDesc oldPartitionKeyDesc AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc, mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false); - Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause); + Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause, + false, 0, true); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1b106576720453..72e39dcca9526b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3317,7 +3317,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t for (AddPartitionClause addPartitionClause : addPartitionClauseMap.values()) { try { // here maybe check and limit created partitions num - Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause); + Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause, false, 0, true); } catch (DdlException e) { LOG.warn(e); errorStatus.setErrorMsgs( diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java index 27c00162d5fdce..5e7f5387f57f25 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java @@ -42,10 +42,10 @@ protected int backendNum() { @Override protected void runBeforeAll() throws Exception { - InternalSchemaInitializer.createDb(); - InternalSchemaInitializer.createTbl(); Config.allow_replica_on_same_host = true; FeConstants.runningUnitTest = true; + InternalSchemaInitializer.createDb(); + InternalSchemaInitializer.createTbl(); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java index 24ab4d99caee14..b4a63e5d344df1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.UtFrameUtils; @@ -48,6 +49,7 @@ public class CreateTableLikeTest { @BeforeClass public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; UtFrameUtils.createDorisCluster(runningDir); // create connect context diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index c4901ced10d321..bdd5a66902c6ae 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -23,6 +23,7 @@ import org.apache.doris.common.ConfigException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.utframe.TestWithFeService; @@ -43,6 +44,7 @@ protected int backendNum() { @Override protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; Config.allow_replica_on_same_host = true; createDatabase("test"); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java index a0575eff548269..e98e6f745455dc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java @@ -24,6 +24,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.DdlExecutor; import org.apache.doris.resource.Tag; @@ -48,6 +49,7 @@ public class ModifyBackendTest { @BeforeClass public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; UtFrameUtils.createDorisCluster(runningDir); // create connect context connectContext = UtFrameUtils.createDefaultCtx(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java index a48652678e767c..d3ec208a341add 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.RecoverPartitionStmt; import org.apache.doris.analysis.RecoverTableStmt; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.UtFrameUtils; @@ -45,6 +46,7 @@ public class RecoverTest { @BeforeClass public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; UtFrameUtils.createDorisCluster(runningDir); // create connect context diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 22560492b400cf..632a062ed4a9e8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -61,6 +61,7 @@ public class QueryPlanTest extends TestWithFeService { @Override protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; // disable bucket shuffle join Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); connectContext.getSessionVariable().setEnableRuntimeFilterPrune(false); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index f285ad3f260ae4..ff9d4362b7b4f5 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1414,6 +1414,28 @@ message GetRLTaskCommitAttachResponse { optional RLTaskTxnCommitAttachmentPB commit_attach = 2; } +message CheckKeyInfos { + repeated int64 db_ids = 1; + repeated int64 table_ids = 2; + repeated int64 index_ids = 3; + repeated int64 partition_ids = 4; +} + +message CheckKVRequest { + enum Operation { + CREATE_INDEX_AFTER_FE_COMMIT = 1; + CREATE_PARTITION_AFTER_FE_COMMIT = 2; + } + optional string cloud_unique_id = 1; // For auth + optional CheckKeyInfos check_keys = 2; + optional Operation op = 3; +} + +message CheckKVResponse { + optional MetaServiceResponseStatus status = 1; + optional CheckKeyInfos bad_keys = 2; +} + service MetaService { rpc begin_txn(BeginTxnRequest) returns (BeginTxnResponse); rpc precommit_txn(PrecommitTxnRequest) returns (PrecommitTxnResponse); @@ -1482,6 +1504,9 @@ service MetaService { // routine load progress rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse); + + // check KV + rpc check_kv(CheckKVRequest) returns (CheckKVResponse); }; service RecyclerService { From 4764a7e843a01ff638f706e347f1294745e75da5 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Thu, 27 Jun 2024 17:50:54 +0800 Subject: [PATCH 2/6] fix --- cloud/src/meta-service/meta_service_partition.cpp | 2 +- .../src/main/java/org/apache/doris/common/Config.java | 4 ++++ .../doris/cloud/datasource/CloudInternalCatalog.java | 7 ++++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 594a412533dc00..79b39603631d4b 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -633,7 +633,7 @@ void MetaServiceImpl::check_create_table(std::string instance_id, const CheckKVR if (keys.empty()) { *code = MetaServiceCode::INVALID_ARGUMENT; - *msg = "empty partition_ids"; + *msg = "empty keys"; return; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 3dc7cccca69a97..78c527958d228c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2891,6 +2891,10 @@ public static int metaServiceRpcRetryTimes() { "streamload route policy in cloud mode, availale options are public-private and empty string"}) public static String streamload_redirect_policy = ""; + @ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true", + "create table in cloud mode, check recycler key remained, default true"}) + public static boolean check_create_table_recycle_key_remained = true; + //========================================================================== // end of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 060cc53f0f27d8..f7d9f79e3367b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -418,6 +418,9 @@ public void afterCreatePartitions(long dbId, long tableId, List partitionI public void checkCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) throws DdlException { + if (!Config.check_create_table_recycle_key_remained) { + return; + } if (partitionIds == null) { checkMaterializedIndex(dbId, tableId, indexIds); } else { @@ -562,7 +565,7 @@ public void commitMaterializedIndex(long dbId, long tableId, List indexIds private void checkPartition(long dbId, long tableId, List partitionIds) throws DdlException { Cloud.CheckKeyInfos.Builder checkKeyInfosBuilder = Cloud.CheckKeyInfos.newBuilder(); - checkKeyInfosBuilder.addAllIndexIds(partitionIds); + checkKeyInfosBuilder.addAllPartitionIds(partitionIds); // for ms log checkKeyInfosBuilder.addDbIds(dbId); checkKeyInfosBuilder.addTableIds(tableId); @@ -570,6 +573,7 @@ private void checkPartition(long dbId, long tableId, List partitionIds) Cloud.CheckKVRequest.Builder checkKvRequestBuilder = Cloud.CheckKVRequest.newBuilder(); checkKvRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); checkKvRequestBuilder.setCheckKeys(checkKeyInfosBuilder.build()); + checkKvRequestBuilder.setOp(Cloud.CheckKVRequest.Operation.CREATE_PARTITION_AFTER_FE_COMMIT); final Cloud.CheckKVRequest checkKVRequest = checkKvRequestBuilder.build(); Cloud.CheckKVResponse response = null; @@ -606,6 +610,7 @@ public void checkMaterializedIndex(long dbId, long tableId, List indexIds) Cloud.CheckKVRequest.Builder checkKvRequestBuilder = Cloud.CheckKVRequest.newBuilder(); checkKvRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); checkKvRequestBuilder.setCheckKeys(checkKeyInfosBuilder.build()); + checkKvRequestBuilder.setOp(Cloud.CheckKVRequest.Operation.CREATE_INDEX_AFTER_FE_COMMIT); final Cloud.CheckKVRequest checkKVRequest = checkKvRequestBuilder.build(); Cloud.CheckKVResponse response = null; From b69e696eeb897169e5bea1a70d50688089cdfa7b Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Thu, 27 Jun 2024 23:22:01 +0800 Subject: [PATCH 3/6] add ms ut --- .../meta-service/meta_service_partition.cpp | 4 +- cloud/test/meta_service_test.cpp | 64 +++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 79b39603631d4b..470c2a621b61cf 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -644,12 +644,12 @@ void MetaServiceImpl::check_create_table(std::string instance_id, const CheckKVR continue; } else if (err == TxnErrorCode::TXN_OK) { // find not match, prepare commit - *code = MetaServiceCode::INVALID_ARGUMENT; + *code = MetaServiceCode::ALREADY_EXISTED; *msg = "prepare and commit rpc not match, recycle key remained"; return; } else { // err != TXN_OK, fdb read err - *code = MetaServiceCode::INVALID_ARGUMENT; + *code = cast_as(err); *msg = "ms read key error"; return; } diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index e4b147798143e4..de868108fb14ca 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -5241,6 +5241,70 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); req.add_index_ids(index_id); + // ------------Test check partition----------- + // Normal + req.set_db_id(1); + req.set_table_id(table_id + 1); + req.add_index_ids(index_id + 1); + req.add_partition_ids(partition_id + 1); + meta_service->prepare_partition(&ctrl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + meta_service->commit_partition(&ctrl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + CheckKVRequest req_check; + CheckKVResponse res_check; + meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::INVALID_ARGUMENT); + res_check.Clear(); + req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT); + CheckKeyInfos check_keys_pb; + check_keys_pb.add_table_ids(table_id + 1); + check_keys_pb.add_index_ids(index_id + 1); + check_keys_pb.add_partition_ids(partition_id + 1); + req_check.mutable_check_keys()->CopyFrom(check_keys_pb); + meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::OK); + res_check.Clear(); + // AbNomal not commit + req.Clear(); + req.set_db_id(1); + req.set_table_id(table_id + 2); + req.add_index_ids(index_id + 2); + req.add_partition_ids(partition_id + 2); + meta_service->prepare_partition(&ctrl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + req_check.Clear(); + req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT); + check_keys_pb.Clear(); + check_keys_pb.add_table_ids(table_id + 2); + check_keys_pb.add_index_ids(index_id + 2); + check_keys_pb.add_partition_ids(partition_id + 2); + req_check.mutable_check_keys()->CopyFrom(check_keys_pb); + meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::ALREADY_EXISTED); + + // ------------Test check index----------- + // Normal + IndexRequest req_index; + IndexResponse res_index; + req_index.set_db_id(1); + req_index.set_table_id(table_id + 3); + req_index.add_index_ids(index_id + 3); + meta_service->prepare_index(&ctrl, &req_index, &res_index, nullptr); + ASSERT_EQ(res_index.status().code(), MetaServiceCode::OK); + meta_service->commit_index(&ctrl, &req_index, &res_index, nullptr); + ASSERT_EQ(res_index.status().code(), MetaServiceCode::OK); + req_check.Clear(); + res_check.Clear(); + req_check.set_op(CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT); + check_keys_pb.Clear(); + check_keys_pb.add_table_ids(table_id + 3); + check_keys_pb.add_index_ids(index_id + 3); + req_check.mutable_check_keys()->CopyFrom(check_keys_pb); + meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::OK); + res_check.Clear(); + // ------------Test drop partition------------ reset_meta_service(); req.Clear(); From 06576d5c3e7bb5198c20f3d56e5b2f94967e7f0a Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Thu, 4 Jul 2024 17:20:27 +0800 Subject: [PATCH 4/6] fix review --- cloud/src/meta-service/meta_service.h | 7 - .../meta-service/meta_service_partition.cpp | 41 ++-- .../clone/DynamicPartitionScheduler.java | 176 ++++++++++-------- 3 files changed, 123 insertions(+), 101 deletions(-) diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 9b54bd538202a0..bdcf7fb47c7e08 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -283,13 +283,6 @@ class MetaServiceImpl : public cloud::MetaService { const AlterInstanceRequest* request, std::function(InstanceInfoPB*)> action); - using check_create_table_type = std::function, std::string, - std::function>(const CheckKVRequest* request)>; - void check_create_table(std::string instance_id, const CheckKVRequest* request, - CheckKVResponse* response, MetaServiceCode* code, std::string* msg, - check_create_table_type get_check_info); - std::shared_ptr txn_kv_; std::shared_ptr resource_mgr_; std::shared_ptr rate_limiter_; diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 470c2a621b61cf..1469154ee14055 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -25,7 +25,9 @@ #include "meta_service.h" namespace doris::cloud { - +using check_create_table_type = std::function, std::string, + std::function>(const CheckKVRequest* request)>; // ATTN: xxx_id MUST NOT be reused // // UNKNOWN @@ -619,18 +621,18 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll } } -void MetaServiceImpl::check_create_table(std::string instance_id, const CheckKVRequest* request, - CheckKVResponse* response, MetaServiceCode* code, - std::string* msg, check_create_table_type get_check_info) { +void check_create_table(std::string instance_id, std::shared_ptr txn_kv, + const CheckKVRequest* request, CheckKVResponse* response, + MetaServiceCode* code, std::string* msg, + check_create_table_type get_check_info) { std::unique_ptr txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); + TxnErrorCode err = txn_kv->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { *code = cast_as(err); *msg = "failed to create txn"; return; } auto& [keys, hint, key_func] = get_check_info(request); - if (keys.empty()) { *code = MetaServiceCode::INVALID_ARGUMENT; *msg = "empty keys"; @@ -681,22 +683,25 @@ void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller, RPC_RATE_LIMIT(check_kv); switch (request->op()) { case CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT: { - check_create_table( - instance_id, request, response, &code, &msg, [](const CheckKVRequest* request) { - return std::make_tuple(request->check_keys().index_ids(), "index", - [](std::string instance_id, int64_t id) { - return recycle_index_key({instance_id, id}); - }); - }); + check_create_table(instance_id, txn_kv_, request, response, &code, &msg, + [](const CheckKVRequest* request) { + return std::make_tuple( + request->check_keys().index_ids(), "index", + [](std::string instance_id, int64_t id) { + return recycle_index_key({std::move(instance_id), id}); + }); + }); break; } case CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT: { check_create_table( - instance_id, request, response, &code, &msg, [](const CheckKVRequest* request) { - return std::make_tuple(request->check_keys().partition_ids(), "partition", - [](std::string instance_id, int64_t id) { - return recycle_partition_key({instance_id, id}); - }); + instance_id, txn_kv_, request, response, &code, &msg, + [](const CheckKVRequest* request) { + return std::make_tuple( + request->check_keys().partition_ids(), "partition", + [](std::string instance_id, int64_t id) { + return recycle_partition_key({std::move(instance_id), id}); + }); }); break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index b8910d1d2563b1..0e97b05179cc55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -622,38 +622,8 @@ public void executeDynamicPartition(Collection> dynamicPartitio // get partitionIds and indexIds List indexIds = new ArrayList<>(olapTable.getCopiedIndexIdToMeta().keySet()); List generatedPartitionIds = new ArrayList<>(); - if (executeFirstTime && Config.isCloudMode() && !addPartitionClauses.isEmpty()) { - AddPartitionClause addPartitionClause = addPartitionClauses.get(0); - DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); - try { - DistributionInfo distributionInfo = distributionDesc - .toDistributionInfo(olapTable.getBaseSchema()); - if (distributionDesc == null) { - distributionInfo = olapTable.getDefaultDistributionInfo() - .toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema()); - } - long allPartitionBufferSize = 0; - for (int i = 0; i < addPartitionClauses.size(); i++) { - long bufferSize = InternalCatalog.checkAndGetBufferSize(indexIds.size(), - distributionInfo.getBucketNum(), - addPartitionClause.getSingeRangePartitionDesc() - .getReplicaAlloc().getTotalReplicaNum(), - db, tableName); - allPartitionBufferSize += bufferSize; - } - MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv() - .getIdGeneratorBuffer(allPartitionBufferSize); - addPartitionClauses.forEach(p -> generatedPartitionIds.add(idGeneratorBuffer.getNextId())); - // executeFirstTime true - Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), olapTable.getId(), - generatedPartitionIds, indexIds, true); - } catch (Exception e) { - LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId {} indexId {} exception {}", - db.getFullName(), tableName, olapTable.getId(), indexIds, e.getMessage()); - recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); - throw new DdlException("cloud in prepare step err"); - } - } + cloudBatchBeforeCreatePartitions(executeFirstTime, addPartitionClauses, olapTable, indexIds, + db, tableName, generatedPartitionIds); List partsInfo = new ArrayList<>(); for (int i = 0; i < addPartitionClauses.size(); i++) { @@ -682,50 +652,8 @@ public void executeDynamicPartition(Collection> dynamicPartitio } } } - List succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo - -> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList()); - if (executeFirstTime && Config.isCloudMode() && !addPartitionClauses.isEmpty()) { - try { - // ATTN: failedPids = generatedPartitionIds - succeedPartitionIds, - // means some partitions failed when addPartition, failedPids will be recycled by recycler - if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition")) { - LOG.info("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e"); - // not commit, not log edit - throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); - } - Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(), - succeedPartitionIds, indexIds, true); - LOG.info("begin write edit log to add partitions in batch, " - + "numPartitions: {}, db: {}, table: {}, tableId: {}", - partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); - // ATTN: here, edit log must after commit cloud partition, - // prevent commit RPC failure from causing data loss - if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) { - LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e"); - // committed, but not log edit - throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); - } - for (int i = 0; i < partsInfo.size(); i++) { - Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i)); - if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) { - if (i == partsInfo.size() / 2) { - LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e"); - // committed, but log some edit, others failed - throw new Exception("debug point FE.DynamicPartitionScheduler" - + ".in.commitCloudPartition"); - } - } - } - LOG.info("finish write edit log to add partitions in batch, " - + "numPartitions: {}, db: {}, table: {}, tableId: {}", - partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); - } catch (Exception e) { - LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}", - db.getFullName(), tableName, olapTable.getId(), e.getMessage()); - recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); - throw new DdlException("cloud in commit step err"); - } - } + List succeedPartitionIds = cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo, + addPartitionClauses, db, olapTable, indexIds, tableName); // cloud mode, check recycle key not remained if (Config.isCloudMode() && executeFirstTime) { Env.getCurrentInternalCatalog().checkCreatePartitions(db.getId(), olapTable.getId(), @@ -735,6 +663,102 @@ public void executeDynamicPartition(Collection> dynamicPartitio } } + private List cloudBatchAfterCreatePartitions(boolean executeFirstTime, List partsInfo, + ArrayList addPartitionClauses, Database db, + OlapTable olapTable, List indexIds, + String tableName) throws DdlException { + if (Config.isNotCloudMode()) { + return new ArrayList<>(); + } + List succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo + -> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList()); + if (executeFirstTime && !addPartitionClauses.isEmpty()) { + try { + // ATTN: failedPids = generatedPartitionIds - succeedPartitionIds, + // means some partitions failed when addPartition, failedPids will be recycled by recycler + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition")) { + LOG.info("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e"); + // not commit, not log edit + throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); + } + Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(), + succeedPartitionIds, indexIds, true); + LOG.info("begin write edit log to add partitions in batch, " + + "numPartitions: {}, db: {}, table: {}, tableId: {}", + partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); + // ATTN: here, edit log must after commit cloud partition, + // prevent commit RPC failure from causing data loss + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) { + LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e"); + // committed, but not log edit + throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); + } + for (int i = 0; i < partsInfo.size(); i++) { + Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i)); + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) { + if (i == partsInfo.size() / 2) { + LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e"); + // committed, but log some edit, others failed + throw new Exception("debug point FE.DynamicPartitionScheduler" + + ".in.commitCloudPartition"); + } + } + } + LOG.info("finish write edit log to add partitions in batch, " + + "numPartitions: {}, db: {}, table: {}, tableId: {}", + partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); + } catch (Exception e) { + LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}", + db.getFullName(), tableName, olapTable.getId(), e.getMessage()); + recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); + throw new DdlException("cloud in commit step err"); + } + } + return succeedPartitionIds; + } + + private void cloudBatchBeforeCreatePartitions(boolean executeFirstTime, + ArrayList addPartitionClauses, + OlapTable olapTable, List indexIds, Database db, + String tableName, List generatedPartitionIds) + throws DdlException { + if (Config.isNotCloudMode()) { + return; + } + if (executeFirstTime && !addPartitionClauses.isEmpty()) { + AddPartitionClause addPartitionClause = addPartitionClauses.get(0); + DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); + try { + DistributionInfo distributionInfo = distributionDesc + .toDistributionInfo(olapTable.getBaseSchema()); + if (distributionDesc == null) { + distributionInfo = olapTable.getDefaultDistributionInfo() + .toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema()); + } + long allPartitionBufferSize = 0; + for (int i = 0; i < addPartitionClauses.size(); i++) { + long bufferSize = InternalCatalog.checkAndGetBufferSize(indexIds.size(), + distributionInfo.getBucketNum(), + addPartitionClause.getSingeRangePartitionDesc() + .getReplicaAlloc().getTotalReplicaNum(), + db, tableName); + allPartitionBufferSize += bufferSize; + } + MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv() + .getIdGeneratorBuffer(allPartitionBufferSize); + addPartitionClauses.forEach(p -> generatedPartitionIds.add(idGeneratorBuffer.getNextId())); + // executeFirstTime true + Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), olapTable.getId(), + generatedPartitionIds, indexIds, true); + } catch (Exception e) { + LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId {} indexId {} exception {}", + db.getFullName(), tableName, olapTable.getId(), indexIds, e.getMessage()); + recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); + throw new DdlException("cloud in prepare step err"); + } + } + } + private void recordCreatePartitionFailedMsg(String dbName, String tableName, String msg, long tableId) { LOG.info("dynamic add partition failed: {}, db: {}, table: {}", msg, dbName, tableName); createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.ERROR.toString()); From 67282222733487137b9b66241319485cd23d4d89 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Fri, 5 Jul 2024 20:04:06 +0800 Subject: [PATCH 5/6] fix review --- cloud/src/meta-service/meta_service_partition.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 1469154ee14055..1d917f8245b0a0 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -652,12 +652,11 @@ void check_create_table(std::string instance_id, std::shared_ptr txn_kv, } else { // err != TXN_OK, fdb read err *code = cast_as(err); - *msg = "ms read key error"; + *msg = fmt::format("ms read key error: {}", err); return; } } - LOG_INFO("check {} success request={}", hint, request->ShortDebugString()); - return; + LOG_INFO("check {} success key.size={}", hint, keys.size()); } void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller, @@ -706,7 +705,9 @@ void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller, break; } default: - DCHECK(false); + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "not support op"; + return; }; } From 501b3d4127b2a5db895e5c036e658571b5f951f7 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Wed, 10 Jul 2024 10:56:18 +0800 Subject: [PATCH 6/6] fix review --- .../meta-service/meta_service_partition.cpp | 23 ++- cloud/test/meta_service_test.cpp | 2 +- .../clone/DynamicPartitionScheduler.java | 152 +++++++++--------- .../datasource/CloudInternalCatalog.java | 17 +- .../doris/datasource/InternalCatalog.java | 11 -- 5 files changed, 102 insertions(+), 103 deletions(-) diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 1d917f8245b0a0..d165b8b5e06bb2 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -625,6 +625,7 @@ void check_create_table(std::string instance_id, std::shared_ptr txn_kv, const CheckKVRequest* request, CheckKVResponse* response, MetaServiceCode* code, std::string* msg, check_create_table_type get_check_info) { + StopWatch watch; std::unique_ptr txn; TxnErrorCode err = txn_kv->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -639,16 +640,30 @@ void check_create_table(std::string instance_id, std::shared_ptr txn_kv, return; } - for (auto id : keys) { - auto key = key_func(instance_id, id); + for (int i = 0; i < keys.size();) { + auto key = key_func(instance_id, keys.Get(i)); err = check_recycle_key_exist(txn.get(), key); if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + i++; continue; } else if (err == TxnErrorCode::TXN_OK) { // find not match, prepare commit - *code = MetaServiceCode::ALREADY_EXISTED; + *code = MetaServiceCode::UNDEFINED_ERR; *msg = "prepare and commit rpc not match, recycle key remained"; return; + } else if (err == TxnErrorCode::TXN_TOO_OLD) { + // separate it to several txn for rubustness + txn.reset(); + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + *code = cast_as(err); + *msg = "failed to create txn in cycle"; + return; + } + LOG_INFO("meet txn too long err, gen a new txn, and retry, size={} idx={}", keys.size(), + i); + bthread_usleep(50); + continue; } else { // err != TXN_OK, fdb read err *code = cast_as(err); @@ -656,7 +671,7 @@ void check_create_table(std::string instance_id, std::shared_ptr txn_kv, return; } } - LOG_INFO("check {} success key.size={}", hint, keys.size()); + LOG_INFO("check {} success key.size={}, cost(us)={}", hint, keys.size(), watch.elapsed_us()); } void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller, diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index de868108fb14ca..89a74879c78de1 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -5281,7 +5281,7 @@ TEST(MetaServiceTest, PartitionRequest) { check_keys_pb.add_partition_ids(partition_id + 2); req_check.mutable_check_keys()->CopyFrom(check_keys_pb); meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); - ASSERT_EQ(res_check.status().code(), MetaServiceCode::ALREADY_EXISTED); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::UNDEFINED_ERR); // ------------Test check index----------- // Normal diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 0e97b05179cc55..f593a915b9dbd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -652,69 +652,65 @@ public void executeDynamicPartition(Collection> dynamicPartitio } } } - List succeedPartitionIds = cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo, + cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo, addPartitionClauses, db, olapTable, indexIds, tableName); - // cloud mode, check recycle key not remained - if (Config.isCloudMode() && executeFirstTime) { - Env.getCurrentInternalCatalog().checkCreatePartitions(db.getId(), olapTable.getId(), - succeedPartitionIds, indexIds); - } } } } - private List cloudBatchAfterCreatePartitions(boolean executeFirstTime, List partsInfo, + private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List partsInfo, ArrayList addPartitionClauses, Database db, OlapTable olapTable, List indexIds, String tableName) throws DdlException { if (Config.isNotCloudMode()) { - return new ArrayList<>(); + return; } List succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo -> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList()); - if (executeFirstTime && !addPartitionClauses.isEmpty()) { - try { - // ATTN: failedPids = generatedPartitionIds - succeedPartitionIds, - // means some partitions failed when addPartition, failedPids will be recycled by recycler - if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition")) { - LOG.info("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e"); - // not commit, not log edit - throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); - } - Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(), - succeedPartitionIds, indexIds, true); - LOG.info("begin write edit log to add partitions in batch, " - + "numPartitions: {}, db: {}, table: {}, tableId: {}", - partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); - // ATTN: here, edit log must after commit cloud partition, - // prevent commit RPC failure from causing data loss - if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) { - LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e"); - // committed, but not log edit - throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); - } - for (int i = 0; i < partsInfo.size(); i++) { - Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i)); - if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) { - if (i == partsInfo.size() / 2) { - LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e"); - // committed, but log some edit, others failed - throw new Exception("debug point FE.DynamicPartitionScheduler" - + ".in.commitCloudPartition"); - } + if (!executeFirstTime || addPartitionClauses.isEmpty()) { + LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime, addPartitionClauses.size()); + return; + } + try { + // ATTN: failedPids = generatedPartitionIds - succeedPartitionIds, + // means some partitions failed when addPartition, failedPids will be recycled by recycler + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition")) { + LOG.info("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e"); + // not commit, not log edit + throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); + } + Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(), + succeedPartitionIds, indexIds, true); + LOG.info("begin write edit log to add partitions in batch, " + + "numPartitions: {}, db: {}, table: {}, tableId: {}", + partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); + // ATTN: here, edit log must after commit cloud partition, + // prevent commit RPC failure from causing data loss + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) { + LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e"); + // committed, but not log edit + throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); + } + for (int i = 0; i < partsInfo.size(); i++) { + Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i)); + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) { + if (i == partsInfo.size() / 2) { + LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e"); + // committed, but log some edit, others failed + throw new Exception("debug point FE.DynamicPartitionScheduler" + + ".in.commitCloudPartition"); } } - LOG.info("finish write edit log to add partitions in batch, " - + "numPartitions: {}, db: {}, table: {}, tableId: {}", - partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); - } catch (Exception e) { - LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}", - db.getFullName(), tableName, olapTable.getId(), e.getMessage()); - recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); - throw new DdlException("cloud in commit step err"); } + LOG.info("finish write edit log to add partitions in batch, " + + "numPartitions: {}, db: {}, table: {}, tableId: {}", + partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); + } catch (Exception e) { + LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}", + db.getFullName(), tableName, olapTable.getId(), e.getMessage()); + recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); + throw new DdlException("cloud in commit step err"); } - return succeedPartitionIds; } private void cloudBatchBeforeCreatePartitions(boolean executeFirstTime, @@ -725,37 +721,39 @@ private void cloudBatchBeforeCreatePartitions(boolean executeFirstTime, if (Config.isNotCloudMode()) { return; } - if (executeFirstTime && !addPartitionClauses.isEmpty()) { - AddPartitionClause addPartitionClause = addPartitionClauses.get(0); - DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); - try { - DistributionInfo distributionInfo = distributionDesc - .toDistributionInfo(olapTable.getBaseSchema()); - if (distributionDesc == null) { - distributionInfo = olapTable.getDefaultDistributionInfo() - .toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema()); - } - long allPartitionBufferSize = 0; - for (int i = 0; i < addPartitionClauses.size(); i++) { - long bufferSize = InternalCatalog.checkAndGetBufferSize(indexIds.size(), - distributionInfo.getBucketNum(), - addPartitionClause.getSingeRangePartitionDesc() - .getReplicaAlloc().getTotalReplicaNum(), - db, tableName); - allPartitionBufferSize += bufferSize; - } - MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv() - .getIdGeneratorBuffer(allPartitionBufferSize); - addPartitionClauses.forEach(p -> generatedPartitionIds.add(idGeneratorBuffer.getNextId())); - // executeFirstTime true - Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), olapTable.getId(), - generatedPartitionIds, indexIds, true); - } catch (Exception e) { - LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId {} indexId {} exception {}", - db.getFullName(), tableName, olapTable.getId(), indexIds, e.getMessage()); - recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); - throw new DdlException("cloud in prepare step err"); + if (!executeFirstTime || addPartitionClauses.isEmpty()) { + LOG.info("cloud prepare rpc in batch, {}-{}", !executeFirstTime, addPartitionClauses.size()); + return; + } + AddPartitionClause addPartitionClause = addPartitionClauses.get(0); + DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); + try { + DistributionInfo distributionInfo = distributionDesc + .toDistributionInfo(olapTable.getBaseSchema()); + if (distributionDesc == null) { + distributionInfo = olapTable.getDefaultDistributionInfo() + .toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema()); } + long allPartitionBufferSize = 0; + for (int i = 0; i < addPartitionClauses.size(); i++) { + long bufferSize = InternalCatalog.checkAndGetBufferSize(indexIds.size(), + distributionInfo.getBucketNum(), + addPartitionClause.getSingeRangePartitionDesc() + .getReplicaAlloc().getTotalReplicaNum(), + db, tableName); + allPartitionBufferSize += bufferSize; + } + MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv() + .getIdGeneratorBuffer(allPartitionBufferSize); + addPartitionClauses.forEach(p -> generatedPartitionIds.add(idGeneratorBuffer.getNextId())); + // executeFirstTime true + Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), olapTable.getId(), + generatedPartitionIds, indexIds, true); + } catch (Exception e) { + LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId {} indexId {} exception {}", + db.getFullName(), tableName, olapTable.getId(), indexIds, e.getMessage()); + recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); + throw new DdlException("cloud in prepare step err"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index f7d9f79e3367b4..2b3d04eb927f12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -414,13 +414,14 @@ public void afterCreatePartitions(long dbId, long tableId, List partitionI } else { commitPartition(dbId, tableId, partitionIds, indexIds); } - } - - public void checkCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) - throws DdlException { if (!Config.check_create_table_recycle_key_remained) { return; } + checkCreatePartitions(dbId, tableId, partitionIds, indexIds); + } + + private void checkCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) + throws DdlException { if (partitionIds == null) { checkMaterializedIndex(dbId, tableId, indexIds); } else { @@ -581,9 +582,7 @@ private void checkPartition(long dbId, long tableId, List partitionIds) while (tryTimes++ < Config.metaServiceRpcRetryTimes()) { try { response = MetaServiceProxy.getInstance().checkKv(checkKVRequest); - if (response.getStatus().getCode() != Cloud.MetaServiceCode.KV_TXN_CONFLICT) { - break; - } + break; } catch (RpcException e) { LOG.warn("tryTimes:{}, checkPartition RpcException", tryTimes, e); if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) { @@ -618,9 +617,7 @@ public void checkMaterializedIndex(long dbId, long tableId, List indexIds) while (tryTimes++ < Config.metaServiceRpcRetryTimes()) { try { response = MetaServiceProxy.getInstance().checkKv(checkKVRequest); - if (response.getStatus().getCode() != Cloud.MetaServiceCode.KV_TXN_CONFLICT) { - break; - } + break; } catch (RpcException e) { LOG.warn("tryTimes:{}, checkIndex RpcException", tryTimes, e); if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index a82c3f009eb680..c5e78696e0c88c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1810,9 +1810,6 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti } else { LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition); } - if (!isCreateTable) { - checkCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds); - } return info; } finally { olapTable.writeUnlock(); @@ -2175,11 +2172,6 @@ public void afterCreatePartitions(long dbId, long tableId, List partitionI throws DdlException { } - public void checkCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds) - throws DdlException { - - } - public void checkAvailableCapacity(Database db) throws DdlException { // check cluster capacity Env.getCurrentSystemInfo().checkAvailableCapacity(); @@ -2869,7 +2861,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx keysDesc.getClusterKeysColumnIds()); afterCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); - checkCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList()); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { @@ -2957,7 +2948,6 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx } afterCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); - checkCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList()); } else { throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name()); } @@ -3410,7 +3400,6 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } afterCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); - checkCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds); } catch (DdlException e) { // create partition failed, remove all newly created tablets