From c83879989093db8ca389170bcf84c04ea3fbd112 Mon Sep 17 00:00:00 2001 From: zclllhhjj Date: Thu, 25 Jul 2024 17:46:55 +0800 Subject: [PATCH] [Fix](insert-overwrite) Fix insert overwrite auto detect transaction safe (#38103) Before, if insert overwrite auto detect failed because of some transaction conflicts, it will go into an unexpected situation with some of the partition replace success but some retains. Now it will wholly success, or wholly failed. --- .../org/apache/doris/catalog/OlapTable.java | 9 +- .../InsertOverwriteManager.java | 66 +++++++++-- .../insertoverwrite/InsertOverwriteUtil.java | 1 + .../insert/InsertOverwriteTableCommand.java | 11 +- .../doris/service/FrontendServiceImpl.java | 107 ++++++++++++------ .../test_iot_auto_detect_concurrent.groovy | 2 + 6 files changed, 143 insertions(+), 53 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index de93324891a1d3..1903e45685ff04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1108,10 +1108,13 @@ public Set getPartitionNames() { return Sets.newHashSet(nameToPartition.keySet()); } - public List uncheckedGetPartNamesById(List partitionIds) { + // for those elements equal in partiton ids, get their names. + public List getEqualPartitionNames(List partitionIds1, List partitionIds2) { List names = new ArrayList(); - for (Long id : partitionIds) { - names.add(idToPartition.get(id).getName()); + for (int i = 0; i < partitionIds1.size(); i++) { + if (partitionIds1.get(i).equals(partitionIds2.get(i))) { + names.add(getPartition(partitionIds1.get(i)).getName()); + } } return names; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index e0c46dde920b6e..81524ae020810e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -58,7 +58,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { // but we only change one time and save the relations in partitionPairs. they're protected by taskLocks @SerializedName(value = "taskLocks") private Map taskLocks = Maps.newConcurrentMap(); - // > + // >. no need concern which task it belongs to. @SerializedName(value = "partitionPairs") private Map> partitionPairs = Maps.newConcurrentMap(); @@ -91,7 +91,7 @@ public long registerTask(long dbId, long tableId, List tempPartitionName * * @return group id, like a transaction id. */ - public long preRegisterTask() { + public long registerTaskGroup() { long groupId = Env.getCurrentEnv().getNextId(); taskGroups.put(groupId, new ArrayList()); taskLocks.put(groupId, new ReentrantLock()); @@ -107,44 +107,81 @@ public void registerTaskInGroup(long groupId, long taskId) { taskGroups.get(groupId).add(taskId); } - public List tryReplacePartitionIds(long groupId, List oldPartitionIds) { + /** + * this func should in lock scope of getLock(groupId) + * + * @param newIds if have replaced, replace with new. otherwise itself. + */ + public boolean tryReplacePartitionIds(long groupId, List oldPartitionIds, List newIds) { Map relations = partitionPairs.get(groupId); - List newIds = new ArrayList(); - for (Long id : oldPartitionIds) { + boolean needReplace = false; + for (int i = 0; i < oldPartitionIds.size(); i++) { + long id = oldPartitionIds.get(i); if (relations.containsKey(id)) { // if we replaced it. then return new one. newIds.add(relations.get(id)); } else { - // otherwise itself. we will deal it soon. newIds.add(id); + needReplace = true; } } - return newIds; + return needReplace; } + // this func should in lock scope of getLock(groupId) public void recordPartitionPairs(long groupId, List oldIds, List newIds) { Map relations = partitionPairs.get(groupId); Preconditions.checkArgument(oldIds.size() == newIds.size()); for (int i = 0; i < oldIds.size(); i++) { relations.put(oldIds.get(i), newIds.get(i)); + if (LOG.isDebugEnabled()) { + LOG.debug("recorded partition pairs: [" + oldIds.get(i) + ", " + newIds.get(i) + "]"); + } } } + // lock is a symbol of TaskGroup exist. if not, means already failed. public ReentrantLock getLock(long groupId) { return taskLocks.get(groupId); } + // When goes into failure, some BE may still not know and send new request. + // it will cause ConcurrentModification or NullPointer. public void taskGroupFail(long groupId) { LOG.info("insert overwrite auto detect partition task group [" + groupId + "] failed"); - for (Long taskId : taskGroups.get(groupId)) { - taskFail(taskId); + ReentrantLock lock = getLock(groupId); + lock.lock(); + try { + // will rollback temp partitions in `taskFail` + for (Long taskId : taskGroups.get(groupId)) { + taskFail(taskId); + } + cleanTaskGroup(groupId); + } finally { + lock.unlock(); } - cleanTaskGroup(groupId); } - public void taskGroupSuccess(long groupId) { + // here we will make all raplacement of this group visiable. if someone fails, nothing happen. + public void taskGroupSuccess(long groupId, OlapTable targetTable) throws DdlException { + try { + Map relations = partitionPairs.get(groupId); + ArrayList oldNames = new ArrayList<>(); + ArrayList newNames = new ArrayList<>(); + for (Entry partitionPair : relations.entrySet()) { + oldNames.add(targetTable.getPartition(partitionPair.getKey()).getName()); + newNames.add(targetTable.getPartition(partitionPair.getValue()).getName()); + } + InsertOverwriteUtil.replacePartition(targetTable, oldNames, newNames); + } catch (Exception e) { + LOG.warn("insert overwrite task making replacement failed because " + e.getMessage() + + "all new partition will not be visible and will be recycled by partition GC."); + throw e; + } LOG.info("insert overwrite auto detect partition task group [" + groupId + "] succeed"); for (Long taskId : taskGroups.get(groupId)) { + Env.getCurrentEnv().getEditLog() + .logInsertOverwrite(new InsertOverwriteLog(taskId, tasks.get(taskId), InsertOverwriteOpType.ADD)); taskSuccess(taskId); } cleanTaskGroup(groupId); @@ -164,6 +201,9 @@ private void cleanTaskGroup(long groupId) { public void taskFail(long taskId) { LOG.info("insert overwrite task [" + taskId + "] failed"); boolean rollback = rollback(taskId); + if (!rollback) { + LOG.warn("roll back task [" + taskId + "] failed"); + } if (rollback) { removeTask(taskId); } else { @@ -192,6 +232,7 @@ public void allTaskFail() { } } + // cancel it. should try to remove them after. private void cancelTask(long taskId) { if (tasks.containsKey(taskId)) { LOG.info("cancel insert overwrite task: {}", tasks.get(taskId)); @@ -201,6 +242,7 @@ private void cancelTask(long taskId) { } } + // task and partitions has been removed. it's safe to remove task. private void removeTask(long taskId) { if (tasks.containsKey(taskId)) { LOG.info("remove insert overwrite task: {}", tasks.get(taskId)); @@ -222,7 +264,7 @@ private boolean rollback(long taskId) { try { olapTable = task.getTable(); } catch (DdlException e) { - LOG.warn("can not get table, task: {}", task); + LOG.warn("can not get table, task: {}, reason: {}", task, e.getMessage()); return true; } return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java index c4d3068e09f160..a0e04a35bd921a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -54,6 +54,7 @@ public static void addTempPartitions(TableIf tableIf, List partitionName for (int i = 0; i < partitionNames.size(); i++) { Env.getCurrentEnv().addPartitionLike((Database) tableIf.getDatabase(), tableIf.getName(), new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true)); + LOG.info("successfully add temp partition [{}] for [{}]", tempPartitionNames.get(i), tableIf.getName()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index bd5af3225f0b28..75b80ade5815a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -169,11 +169,12 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask(); - // When inserting, BE will call to replace partition by FrontendService. FE do the real - // add&replacement and return replace result. So there's no need to do anything else. + taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + // When inserting, BE will call to replace partition by FrontendService. FE will register new temp + // partitions and return. for transactional, the replacement will really occur when insert successed, + // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId); + Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); taskId = Env.getCurrentEnv().getInsertOverwriteManager() @@ -184,7 +185,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); } } catch (Exception e) { - LOG.warn("insert into overwrite failed"); + LOG.warn("insert into overwrite failed with task(or group) id " + taskId); if (isAutoDetectOverwrite()) { Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 448b1231ddea86..07590446331d4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -276,7 +276,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import java.util.stream.IntStream; // Frontend service used to serve all request for this frontend through // thrift protocol @@ -3568,7 +3567,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request LOG.info("Receive replace partition request: {}", request); long dbId = request.getDbId(); long tableId = request.getTableId(); - List partitionIds = request.getPartitionIds(); + List reqPartitionIds = request.getPartitionIds(); long taskGroupId = request.getOverwriteGroupId(); TReplacePartitionResult result = new TReplacePartitionResult(); TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR); @@ -3607,41 +3606,60 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request OlapTable olapTable = (OlapTable) table; InsertOverwriteManager overwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); ReentrantLock taskLock = overwriteManager.getLock(taskGroupId); - List allReqPartNames; // all request partitions + if (taskLock == null) { + errorStatus.setErrorMsgs(Lists + .newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed."))); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + + ArrayList resultPartitionIds = new ArrayList<>(); // [1 2 5 6] -> [7 8 5 6] + ArrayList pendingPartitionIds = new ArrayList<>(); // pending: [1 2] + ArrayList newPartitionIds = new ArrayList<>(); // requested temp partition ids. for [7 8] + boolean needReplace = false; try { taskLock.lock(); + // double check lock. maybe taskLock is not null, but has been removed from the Map. means the task failed. + if (overwriteManager.getLock(taskGroupId) == null) { + errorStatus.setErrorMsgs(Lists + .newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed."))); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + // we dont lock the table. other thread in this txn will be controled by taskLock. - // if we have already replaced. dont do it again, but acquire the recorded new partition directly. + // if we have already replaced, dont do it again, but acquire the recorded new partition directly. // if not by this txn, just let it fail naturally is ok. - List replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); - // here if replacedPartIds still have null. this will throw exception. - allReqPartNames = olapTable.uncheckedGetPartNamesById(replacedPartIds); - - List pendingPartitionIds = IntStream.range(0, partitionIds.size()) - .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced - .mapToObj(partitionIds::get) - .collect(Collectors.toList()); - // from here we ONLY deal the pending partitions. not include the dealed(by others). - if (!pendingPartitionIds.isEmpty()) { - // below two must have same order inner. - List pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); - List tempPartitionNames = InsertOverwriteUtil - .generateTempPartitionNames(pendingPartitionNames); + needReplace = overwriteManager.tryReplacePartitionIds(taskGroupId, reqPartitionIds, resultPartitionIds); + // request: [1 2 3 4] result: [1 2 5 6] means ONLY 1 and 2 need replace. + if (needReplace) { + // names for [1 2] + List pendingPartitionNames = olapTable.getEqualPartitionNames(reqPartitionIds, + resultPartitionIds); + for (String name : pendingPartitionNames) { + pendingPartitionIds.add(olapTable.getPartition(name).getId()); // put [1 2] + } - long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames); + // names for [7 8] + List newTempNames = InsertOverwriteUtil + .generateTempPartitionNames(pendingPartitionNames); + // a task means one time insert overwrite + long taskId = overwriteManager.registerTask(dbId, tableId, newTempNames); overwriteManager.registerTaskInGroup(taskGroupId, taskId); - InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); - InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); + InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, newTempNames); // now temp partitions are bumped up and use new names. we get their ids and record them. - List newPartitionIds = new ArrayList(); - for (String newPartName : pendingPartitionNames) { - newPartitionIds.add(olapTable.getPartition(newPartName).getId()); + for (String newPartName : newTempNames) { + newPartitionIds.add(olapTable.getPartition(newPartName).getId()); // put [7 8] } overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds); + if (LOG.isDebugEnabled()) { LOG.debug("partition replacement: "); for (int i = 0; i < pendingPartitionIds.size(); i++) { - LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], "); + LOG.debug("[" + pendingPartitionIds.get(i) + " - " + pendingPartitionNames.get(i) + ", " + + newPartitionIds.get(i) + " - " + newTempNames.get(i) + "], "); } } } @@ -3654,15 +3672,38 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request taskLock.unlock(); } - // build partition & tablets. now all partitions in allReqPartNames are replaced - // an recorded. - // so they won't be changed again. if other transaction changing it. just let it - // fail. - List partitions = Lists.newArrayList(); - List tablets = Lists.newArrayList(); + // result: [1 2 5 6], make it [7 8 5 6] + int idx = 0; + if (needReplace) { + for (int i = 0; i < reqPartitionIds.size(); i++) { + if (reqPartitionIds.get(i).equals(resultPartitionIds.get(i))) { + resultPartitionIds.set(i, newPartitionIds.get(idx++)); + } + } + } + if (idx != newPartitionIds.size()) { + errorStatus.addToErrorMsgs("changed partition number " + idx + " is not correct"); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("replace partition origin ids: [" + + String.join(", ", reqPartitionIds.stream().map(String::valueOf).collect(Collectors.toList())) + + ']'); + LOG.debug("replace partition result ids: [" + + String.join(", ", resultPartitionIds.stream().map(String::valueOf).collect(Collectors.toList())) + + ']'); + } + + // build partition & tablets. now all partitions in allReqPartNames are replaced an recorded. + // so they won't be changed again. if other transaction changing it. just let it fail. + List partitions = new ArrayList<>(); + List tablets = new ArrayList<>(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - for (String partitionName : allReqPartNames) { - Partition partition = table.getPartition(partitionName); + for (long partitionId : resultPartitionIds) { + Partition partition = olapTable.getPartition(partitionId); TOlapTablePartition tPartition = new TOlapTablePartition(); tPartition.setId(partition.getId()); diff --git a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy index 200dd874df9540..96b285ea4cad2e 100644 --- a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy +++ b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy @@ -83,11 +83,13 @@ suite("test_iot_auto_detect_concurrent") { thread5.join() // suppose result: success zero or one if (success_status) { // success zero + log.info("test 1: success zero") result = sql " select count(k0) from test_concurrent_write; " assertEquals(result[0][0], 1000) result = sql " select count(distinct k0) from test_concurrent_write; " assertEquals(result[0][0], 1000) } else { // success one + log.info("test 1: success one") result = sql " select count(k0) from test_concurrent_write; " assertEquals(result[0][0], 100) result = sql " select count(distinct k0) from test_concurrent_write; "