diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d55ac52ebfdf5d..e26f52413dbd20 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1011,6 +1011,19 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int tablet_further_repair_max_times = 5; + /** + * if tablet loaded txn failed recently, it will get higher priority to repair. + */ + @ConfField(mutable = true, masterOnly = true) + public static long tablet_recent_load_failed_second = 30 * 60; + + /** + * base time for higher tablet scheduler task, + * set this config value bigger if want the high priority effect last longer. + */ + @ConfField(mutable = true, masterOnly = true) + public static long tablet_schedule_high_priority_second = 30 * 60; + /** * the default slot number per path for hdd in tablet scheduler * TODO(cmy): remove this config and dynamically adjust it by clone task statistic diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 12550899a463d6..cfc57a989918ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -34,7 +34,6 @@ import org.apache.doris.catalog.Partition.PartitionState; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.Tablet.TabletStatus; -import org.apache.doris.clone.TabletSchedCtx; import org.apache.doris.clone.TabletScheduler; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -1841,11 +1840,11 @@ public boolean isStable(SystemInfoService infoService, TabletScheduler tabletSch return false; } - Pair statusPair = tablet.getHealthStatusWithPriority( - infoService, visibleVersion, replicaAlloc, aliveBeIds); - if (statusPair.first != TabletStatus.HEALTHY) { + TabletStatus status = tablet.getHealth(infoService, visibleVersion, + replicaAlloc, aliveBeIds).status; + if (status != TabletStatus.HEALTHY) { LOG.info("table {} is not stable because tablet {} status is {}. replicas: {}", - id, tablet.getId(), statusPair.first, tablet.getReplicas()); + id, tablet.getId(), status, tablet.getReplicas()); return false; } } @@ -2482,6 +2481,10 @@ public boolean getEnableUniqueKeyMergeOnWrite() { return tableProperty.getEnableUniqueKeyMergeOnWrite(); } + public boolean isUniqKeyMergeOnWrite() { + return getKeysType() == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite(); + } + public boolean isDuplicateWithoutKey() { return getKeysType() == KeysType.DUP_KEYS && getKeysNum() == 0; } @@ -2573,8 +2576,7 @@ public Set getPartitionKeys() { } public boolean isDupKeysOrMergeOnWrite() { - return keysType == KeysType.DUP_KEYS - || (keysType == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite()); + return keysType == KeysType.DUP_KEYS || isUniqKeyMergeOnWrite(); } public void initAutoIncrementGenerator(long dbId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 08c89e14c375bd..9714ef15719d2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -81,6 +81,36 @@ public enum TabletStatus { REPLICA_COMPACTION_TOO_SLOW // one replica's version count is much more than other replicas; } + public static class TabletHealth { + public TabletStatus status; + public TabletSchedCtx.Priority priority; + + // num of alive replica with version complete + public int aliveAndVersionCompleteNum; + + // NEED_FURTHER_REPAIR replica id + public long needFurtherRepairReplicaId; + + // has alive replica with version incomplete, prior to repair these replica + public boolean hasAliveAndVersionIncomplete; + + // this tablet recent write failed, then increase its sched priority + public boolean hasRecentLoadFailed; + + // this tablet want to add new replica, but not found target backend. + public boolean noPathForNewReplica; + + public TabletHealth() { + status = null; // don't set for balance task + priority = TabletSchedCtx.Priority.NORMAL; + aliveAndVersionCompleteNum = 0; + needFurtherRepairReplicaId = -1L; + hasAliveAndVersionIncomplete = false; + hasRecentLoadFailed = false; + noPathForNewReplica = false; + } + } + @SerializedName(value = "id") private long id; @SerializedName(value = "replicas") @@ -104,6 +134,16 @@ public enum TabletStatus { // no need to persist private long lastStatusCheckTime = -1; + // last time for load data fail + private long lastLoadFailedTime = -1; + + // if tablet want to add a new replica, but cann't found any backend to locate the new replica. + // then mark this tablet. For later repair, even try and try to repair this tablet, sched will always fail. + // For example, 1 tablet contains 3 replicas, if 1 backend is dead, then tablet's healthy status + // is REPLICA_MISSING. But since no other backend can held the new replica, then sched always fail. + // So don't increase this tablet's sched priority if it has no path for new replica. + private long lastTimeNoPathForNewReplica = -1; + public Tablet() { this(0L, new ArrayList<>()); } @@ -466,10 +506,8 @@ public long getRowCount(boolean singleReplica) { * 1. healthy replica num is equal to replicationNum * 2. all healthy replicas are in right tag */ - public Pair getHealthStatusWithPriority(SystemInfoService systemInfoService, + public TabletHealth getHealth(SystemInfoService systemInfoService, long visibleVersion, ReplicaAllocation replicaAlloc, List aliveBeIds) { - - Map allocMap = replicaAlloc.getAllocMap(); Map stableAllocMap = Maps.newHashMap(); Map stableVersionCompleteAllocMap = Maps.newHashMap(); @@ -480,16 +518,12 @@ public Pair getHealthStatusWithPriority(S int stable = 0; Replica needFurtherRepairReplica = null; + boolean hasAliveAndVersionIncomplete = false; Set hosts = Sets.newHashSet(); ArrayList versions = new ArrayList<>(); for (Replica replica : replicas) { Backend backend = systemInfoService.getBackend(replica.getBackendId()); - if (backend == null || !backend.isAlive() || !replica.isAlive() - || checkHost(hosts, backend) || replica.tooSlow() || !backend.isMixNode()) { - // this replica is not alive, - // or if this replica is on same host with another replica, we also treat it as 'dead', - // so that Tablet Scheduler will create a new replica on different host. - // ATTN: Replicas on same host is a bug of previous Doris version, so we fix it by this way. + if (!isReplicaAndBackendAlive(replica, backend, hosts)) { continue; } @@ -514,13 +548,30 @@ public Pair getHealthStatusWithPriority(S allocNum = stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0); stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum + 1)); + } else { + hasAliveAndVersionIncomplete = true; } } } + TabletHealth tabletHealth = new TabletHealth(); + initTabletHealth(tabletHealth); + tabletHealth.aliveAndVersionCompleteNum = aliveAndVersionComplete; + tabletHealth.hasAliveAndVersionIncomplete = hasAliveAndVersionIncomplete; + if (needFurtherRepairReplica != null) { + tabletHealth.needFurtherRepairReplicaId = needFurtherRepairReplica.getId(); + } + // 0. We can not choose a good replica as src to repair this tablet. if (aliveAndVersionComplete == 0) { - return Pair.of(TabletStatus.UNRECOVERABLE, Priority.VERY_HIGH); + tabletHealth.status = TabletStatus.UNRECOVERABLE; + return tabletHealth; + } else if (aliveAndVersionComplete < replicationNum && hasAliveAndVersionIncomplete) { + // not enough good replica, and there exists schedule available replicas and version incomplete, + // no matter whether they tag is proper right, fix them immediately. + tabletHealth.status = TabletStatus.VERSION_INCOMPLETE; + tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH; + return tabletHealth; } // 1. alive replicas are not enough @@ -536,24 +587,32 @@ public Pair getHealthStatusWithPriority(S // 3. aliveBackendsNum >= replicationNum: make sure after deleting, // there will be at least one backend for new replica. // 4. replicationNum > 1: if replication num is set to 1, do not delete any replica, for safety reason - return Pair.of(TabletStatus.FORCE_REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH); - } else if (alive < (replicationNum / 2) + 1) { - return Pair.of(TabletStatus.REPLICA_MISSING, TabletSchedCtx.Priority.HIGH); + tabletHealth.status = TabletStatus.FORCE_REDUNDANT; + tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH; + return tabletHealth; } else if (alive < replicationNum) { - return Pair.of(TabletStatus.REPLICA_MISSING, TabletSchedCtx.Priority.NORMAL); + tabletHealth.status = TabletStatus.REPLICA_MISSING; + tabletHealth.priority = alive < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.VERY_HIGH + : TabletSchedCtx.Priority.NORMAL; + return tabletHealth; } // 2. version complete replicas are not enough - if (aliveAndVersionComplete < (replicationNum / 2) + 1) { - return Pair.of(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.HIGH); - } else if (aliveAndVersionComplete < replicationNum) { - return Pair.of(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.NORMAL); + if (aliveAndVersionComplete < replicationNum) { + tabletHealth.status = TabletStatus.VERSION_INCOMPLETE; + tabletHealth.priority = alive < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.HIGH + : TabletSchedCtx.Priority.NORMAL; + return tabletHealth; } else if (aliveAndVersionComplete > replicationNum) { if (needFurtherRepairReplica != null) { - return Pair.of(TabletStatus.NEED_FURTHER_REPAIR, TabletSchedCtx.Priority.HIGH); + tabletHealth.status = TabletStatus.NEED_FURTHER_REPAIR; + tabletHealth.priority = TabletSchedCtx.Priority.HIGH; + } else { + // we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly. + tabletHealth.status = TabletStatus.REDUNDANT; + tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH; } - // we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly. - return Pair.of(TabletStatus.REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH); + return tabletHealth; } // 3. replica is under relocating @@ -564,14 +623,17 @@ public Pair getHealthStatusWithPriority(S if (replicaBeIds.containsAll(availableBeIds) && availableBeIds.size() >= replicationNum && replicationNum > 1) { // No BE can be choose to create a new replica - return Pair.of(TabletStatus.FORCE_REDUNDANT, - stable < (replicationNum / 2) + 1 - ? TabletSchedCtx.Priority.NORMAL : TabletSchedCtx.Priority.LOW); + tabletHealth.status = TabletStatus.FORCE_REDUNDANT; + tabletHealth.priority = stable < (replicationNum / 2) + 1 + ? TabletSchedCtx.Priority.NORMAL : TabletSchedCtx.Priority.LOW; + return tabletHealth; } - if (stable < (replicationNum / 2) + 1) { - return Pair.of(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.NORMAL); - } else if (stable < replicationNum) { - return Pair.of(TabletStatus.REPLICA_RELOCATING, TabletSchedCtx.Priority.LOW); + + if (stable < replicationNum) { + tabletHealth.status = TabletStatus.REPLICA_RELOCATING; + tabletHealth.priority = stable < (replicationNum / 2) + 1 ? TabletSchedCtx.Priority.NORMAL + : TabletSchedCtx.Priority.LOW; + return tabletHealth; } } @@ -579,19 +641,25 @@ public Pair getHealthStatusWithPriority(S for (Map.Entry alloc : allocMap.entrySet()) { if (stableVersionCompleteAllocMap.getOrDefault(alloc.getKey(), (short) 0) < alloc.getValue()) { if (stableAllocMap.getOrDefault(alloc.getKey(), (short) 0) >= alloc.getValue()) { - return Pair.of(TabletStatus.VERSION_INCOMPLETE, TabletSchedCtx.Priority.NORMAL); + tabletHealth.status = TabletStatus.VERSION_INCOMPLETE; } else { - return Pair.of(TabletStatus.REPLICA_MISSING_FOR_TAG, TabletSchedCtx.Priority.NORMAL); + tabletHealth.status = TabletStatus.REPLICA_MISSING_FOR_TAG; } + tabletHealth.priority = TabletSchedCtx.Priority.NORMAL; + return tabletHealth; } } if (replicas.size() > replicationNum) { if (needFurtherRepairReplica != null) { - return Pair.of(TabletStatus.NEED_FURTHER_REPAIR, TabletSchedCtx.Priority.HIGH); + tabletHealth.status = TabletStatus.NEED_FURTHER_REPAIR; + tabletHealth.priority = TabletSchedCtx.Priority.HIGH; + } else { + // we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly. + tabletHealth.status = TabletStatus.REDUNDANT; + tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH; } - // we set REDUNDANT as VERY_HIGH, because delete redundant replicas can free the space quickly. - return Pair.of(TabletStatus.REDUNDANT, TabletSchedCtx.Priority.VERY_HIGH); + return tabletHealth; } // 5. find a replica's version count is much more than others, and drop it @@ -603,12 +671,36 @@ public Pair getHealthStatusWithPriority(S double ratio = (double) delta / versions.get(versions.size() - 1); if (versions.get(versions.size() - 1) >= Config.min_version_count_indicate_replica_compaction_too_slow && ratio > Config.valid_version_count_delta_ratio_between_replicas) { - return Pair.of(TabletStatus.REPLICA_COMPACTION_TOO_SLOW, Priority.HIGH); + tabletHealth.status = TabletStatus.REPLICA_COMPACTION_TOO_SLOW; + tabletHealth.priority = Priority.HIGH; + return tabletHealth; } } // 6. healthy - return Pair.of(TabletStatus.HEALTHY, TabletSchedCtx.Priority.NORMAL); + tabletHealth.status = TabletStatus.HEALTHY; + tabletHealth.priority = TabletSchedCtx.Priority.NORMAL; + + return tabletHealth; + } + + private void initTabletHealth(TabletHealth tabletHealth) { + long endTime = System.currentTimeMillis() - Config.tablet_recent_load_failed_second * 1000L; + tabletHealth.hasRecentLoadFailed = lastLoadFailedTime > endTime; + tabletHealth.noPathForNewReplica = lastTimeNoPathForNewReplica > endTime; + } + + private boolean isReplicaAndBackendAlive(Replica replica, Backend backend, Set hosts) { + if (backend == null || !backend.isAlive() || !replica.isAlive() + || checkHost(hosts, backend) || replica.tooSlow() || !backend.isMixNode()) { + // this replica is not alive, + // or if this replica is on same host with another replica, we also treat it as 'dead', + // so that Tablet Scheduler will create a new replica on different host. + // ATTN: Replicas on same host is a bug of previous Doris version, so we fix it by this way. + return false; + } else { + return true; + } } private boolean checkHost(Set hosts, Backend backend) { @@ -637,8 +729,49 @@ private boolean checkHost(Set hosts, Backend backend) { * No need to check if backend is available. We consider all backends in 'backendsSet' are available, * If not, unavailable backends will be relocated by CalocateTableBalancer first. */ - public TabletStatus getColocateHealthStatus(long visibleVersion, + public TabletHealth getColocateHealth(long visibleVersion, ReplicaAllocation replicaAlloc, Set backendsSet) { + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + short replicationNum = replicaAlloc.getTotalReplicaNum(); + boolean hasAliveAndVersionIncomplete = false; + int aliveAndVersionComplete = 0; + Set hosts = Sets.newHashSet(); + for (Replica replica : replicas) { + Backend backend = systemInfoService.getBackend(replica.getBackendId()); + if (!isReplicaAndBackendAlive(replica, backend, hosts)) { + continue; + } + + boolean versionCompleted = replica.getLastFailedVersion() < 0 && replica.getVersion() >= visibleVersion; + if (versionCompleted) { + aliveAndVersionComplete++; + } + + if (replica.isScheduleAvailable()) { + if (!versionCompleted) { + hasAliveAndVersionIncomplete = true; + } + } + } + + TabletHealth tabletHealth = new TabletHealth(); + initTabletHealth(tabletHealth); + tabletHealth.aliveAndVersionCompleteNum = aliveAndVersionComplete; + tabletHealth.hasAliveAndVersionIncomplete = hasAliveAndVersionIncomplete; + tabletHealth.priority = TabletSchedCtx.Priority.NORMAL; + + // 0. We can not choose a good replica as src to repair this tablet. + if (aliveAndVersionComplete == 0) { + tabletHealth.status = TabletStatus.UNRECOVERABLE; + return tabletHealth; + } else if (aliveAndVersionComplete < replicationNum && hasAliveAndVersionIncomplete) { + // not enough good replica, and there exists schedule available replicas and version incomplete, + // no matter whether they tag is proper right, fix them immediately. + tabletHealth.status = TabletStatus.VERSION_INCOMPLETE; + tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH; + return tabletHealth; + } + // Here we don't need to care about tag. Because the replicas of the colocate table has been confirmed // in ColocateTableCheckerAndBalancer. Short totalReplicaNum = replicaAlloc.getTotalReplicaNum(); @@ -647,7 +780,8 @@ public TabletStatus getColocateHealthStatus(long visibleVersion, // Because if the following check doesn't pass, the COLOCATE_MISMATCH will return. Set replicaBackendIds = getBackendIds(); if (!replicaBackendIds.containsAll(backendsSet)) { - return TabletStatus.COLOCATE_MISMATCH; + tabletHealth.status = TabletStatus.COLOCATE_MISMATCH; + return tabletHealth; } // 2. check version completeness @@ -663,27 +797,31 @@ public TabletStatus getColocateHealthStatus(long visibleVersion, if (replica.isBad()) { // If this replica is bad but located on one of backendsSet, // we have drop it first, or we can find any other BE for new replica. - return TabletStatus.COLOCATE_REDUNDANT; + tabletHealth.status = TabletStatus.COLOCATE_REDUNDANT; } else { // maybe in replica's DECOMMISSION state // Here we return VERSION_INCOMPLETE, // and the tablet scheduler will finally set it's state to NORMAL. - return TabletStatus.VERSION_INCOMPLETE; + tabletHealth.status = TabletStatus.VERSION_INCOMPLETE; } + return tabletHealth; } if (replica.getLastFailedVersion() > 0 || replica.getVersion() < visibleVersion) { // this replica is alive but version incomplete - return TabletStatus.VERSION_INCOMPLETE; + tabletHealth.status = TabletStatus.VERSION_INCOMPLETE; + return tabletHealth; } } // 3. check redundant if (replicas.size() > totalReplicaNum) { - return TabletStatus.COLOCATE_REDUNDANT; + tabletHealth.status = TabletStatus.COLOCATE_REDUNDANT; + return tabletHealth; } - return TabletStatus.HEALTHY; + tabletHealth.status = TabletStatus.HEALTHY; + return tabletHealth; } /** @@ -744,4 +882,16 @@ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.P public void setLastStatusCheckTime(long lastStatusCheckTime) { this.lastStatusCheckTime = lastStatusCheckTime; } + + public long getLastLoadFailedTime() { + return lastLoadFailedTime; + } + + public void setLastLoadFailedTime(long lastLoadFailedTime) { + this.lastLoadFailedTime = lastLoadFailedTime; + } + + public void setLastTimeNoPathForNewReplica(long lastTimeNoPathForNewReplica) { + this.lastTimeNoPathForNewReplica = lastTimeNoPathForNewReplica; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 292013ec05aebc..4febec9e922706 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Tablet.TabletHealth; import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.clone.TabletChecker.CheckerCounter; import org.apache.doris.clone.TabletSchedCtx.Priority; @@ -334,7 +335,7 @@ public void addGroup(GroupId groupId, ReplicaAllocation replicaAlloc, List Config.max_scheduling_tablets @@ -357,6 +358,7 @@ private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Part return LoopControlStatus.CONTINUE; } boolean prioPartIsHealthy = true; + boolean isUniqKeyMergeOnWrite = tbl.isUniqKeyMergeOnWrite(); /* * Tablet in SHADOW index can not be repaired of balanced */ @@ -369,26 +371,25 @@ private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Part continue; } - Pair statusWithPrio = tablet.getHealthStatusWithPriority( - infoService, partition.getVisibleVersion(), + TabletHealth tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(), tbl.getPartitionInfo().getReplicaAllocation(partition.getId()), aliveBeIds); - if (statusWithPrio.first == TabletStatus.HEALTHY) { + if (tabletHealth.status == TabletStatus.HEALTHY) { // Only set last status check time when status is healthy. tablet.setLastStatusCheckTime(startTime); continue; - } else if (statusWithPrio.first == TabletStatus.UNRECOVERABLE) { + } else if (tabletHealth.status == TabletStatus.UNRECOVERABLE) { // This tablet is not recoverable, do not set it into tablet scheduler // all UNRECOVERABLE tablet can be seen from "show proc '/statistic'" counter.unhealthyTabletNum++; continue; } else if (isInPrios) { - statusWithPrio.second = TabletSchedCtx.Priority.VERY_HIGH; + tabletHealth.priority = TabletSchedCtx.Priority.VERY_HIGH; prioPartIsHealthy = false; } counter.unhealthyTabletNum++; - if (!tablet.readyToBeRepaired(infoService, statusWithPrio.second)) { + if (!tablet.readyToBeRepaired(infoService, tabletHealth.priority)) { continue; } @@ -399,8 +400,8 @@ private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Part tbl.getPartitionInfo().getReplicaAllocation(partition.getId()), System.currentTimeMillis()); // the tablet status will be set again when being scheduled - tabletCtx.setTabletStatus(statusWithPrio.first); - tabletCtx.setPriority(statusWithPrio.second); + tabletCtx.setTabletHealth(tabletHealth); + tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite); AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index e676774afcf11b..d004d21f79c82a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Tablet.TabletHealth; import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.clone.SchedException.Status; import org.apache.doris.clone.SchedException.SubCode; @@ -134,8 +135,6 @@ public enum State { private Type type; private BalanceType balanceType; - private Priority priority; - // we change the dynamic priority based on how many times it fails to be scheduled private int failedSchedCounter = 0; // clone task failed counter @@ -161,7 +160,7 @@ public enum State { private long taskTimeoutMs = 0; private State state; - private TabletStatus tabletStatus; + private TabletHealth tabletHealth; private long decommissionTime = -1; @@ -213,6 +212,8 @@ public enum State { private SubCode schedFailedCode; + private boolean isUniqKeyMergeOnWrite = false; + public TabletSchedCtx(Type type, long dbId, long tblId, long partId, long idxId, long tabletId, ReplicaAllocation replicaAlloc, long createTime) { this.type = type; @@ -227,6 +228,7 @@ public TabletSchedCtx(Type type, long dbId, long tblId, long partId, this.replicaAlloc = replicaAlloc; this.balanceType = BalanceType.BE_BALANCE; this.schedFailedCode = SubCode.NONE; + this.tabletHealth = new TabletHealth(); } public ReplicaAllocation getReplicaAlloc() { @@ -262,11 +264,19 @@ public BalanceType getBalanceType() { } public Priority getPriority() { - return priority; + return tabletHealth.priority; } public void setPriority(Priority priority) { - this.priority = priority; + this.tabletHealth.priority = priority; + } + + public void setTabletHealth(TabletHealth tabletHealth) { + this.tabletHealth = tabletHealth; + } + + public void setIsUniqKeyMergeOnWrite(boolean isUniqKeyMergeOnWrite) { + this.isUniqKeyMergeOnWrite = isUniqKeyMergeOnWrite; } public int getFinishedCounter() { @@ -345,11 +355,11 @@ public void setState(State state) { } public void setTabletStatus(TabletStatus tabletStatus) { - this.tabletStatus = tabletStatus; + this.tabletHealth.status = tabletStatus; } public TabletStatus getTabletStatus() { - return tabletStatus; + return tabletHealth.status; } public long getDbId() { @@ -739,7 +749,7 @@ public void chooseDestReplicaForVersionIncomplete(Map backendsWo if (replica.getLastFailedVersion() <= 0 && replica.getVersion() >= visibleVersion) { - if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR && replica.needFurtherRepair()) { + if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR && replica.needFurtherRepair()) { furtherRepairs.add(replica); } @@ -1016,10 +1026,10 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { // REPLICA_MISSING/REPLICA_RELOCATING, // we create a new replica with state CLONE Replica replica = null; - if (tabletStatus == TabletStatus.REPLICA_MISSING - || tabletStatus == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE - || tabletStatus == TabletStatus.COLOCATE_MISMATCH - || tabletStatus == TabletStatus.REPLICA_MISSING_FOR_TAG) { + if (tabletHealth.status == TabletStatus.REPLICA_MISSING + || tabletHealth.status == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE + || tabletHealth.status == TabletStatus.COLOCATE_MISMATCH + || tabletHealth.status == TabletStatus.REPLICA_MISSING_FOR_TAG) { replica = new Replica( Env.getCurrentEnv().getNextId(), destBackendId, -1 /* version */, schemaHash, @@ -1054,7 +1064,7 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { destOldVersion = replica.getVersion(); cloneTask.setPathHash(srcPathHash, destPathHash); LOG.info("create clone task to repair replica, tabletId={}, replica={}, visible version {}, tablet status {}", - tabletId, replica, visibleVersion, tabletStatus); + tabletId, replica, visibleVersion, tabletHealth.status); this.state = State.RUNNING; return cloneTask; @@ -1062,7 +1072,7 @@ public CloneTask createCloneReplicaAndTask() throws SchedException { // for storage migration or cloning a new replica public long getDestEstimatedCopingSize() { - if ((cloneTask != null && tabletStatus != TabletStatus.VERSION_INCOMPLETE) + if ((cloneTask != null && tabletHealth.status != TabletStatus.VERSION_INCOMPLETE) || storageMediaMigrationTask != null) { return Math.max(getTabletSize(), 10L); } else { @@ -1149,10 +1159,8 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) List aliveBeIds = infoService.getAllBackendIds(true); ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partitionId); - Pair pair = tablet.getHealthStatusWithPriority( - infoService, visibleVersion, replicaAlloc, - aliveBeIds); - if (pair.first == TabletStatus.HEALTHY) { + TabletStatus status = tablet.getHealth(infoService, visibleVersion, replicaAlloc, aliveBeIds).status; + if (status == TabletStatus.HEALTHY) { throw new SchedException(Status.FINISHED, "tablet is healthy"); } @@ -1266,10 +1274,13 @@ public List getBrief() { result.add(String.valueOf(tabletId)); result.add(type.name()); result.add(storageMedium == null ? FeConstants.null_string : storageMedium.name()); - result.add(tabletStatus == null ? FeConstants.null_string : tabletStatus.name()); + result.add(tabletHealth.status == null ? FeConstants.null_string : tabletHealth.status.name()); result.add(state.name()); result.add(schedFailedCode.name()); - result.add(priority.name()); + result.add(tabletHealth.priority == null ? FeConstants.null_string : tabletHealth.priority.name()); + // show the real priority value, higher this value, higher sched priority. Add 10 hour to make it + // to be a positive value. + result.add(String.valueOf((System.currentTimeMillis() - getCompareValue()) / 1000 + 10 * 3600L)); result.add(srcReplica == null ? "-1" : String.valueOf(srcReplica.getBackendId())); result.add(String.valueOf(srcPathHash)); result.add(String.valueOf(destBackendId)); @@ -1299,26 +1310,44 @@ public int compareTo(TabletSchedCtx o) { return Long.compare(getCompareValue(), o.getCompareValue()); } + // smaller compare value, higher priority private long getCompareValue() { long value = createTime; if (lastVisitedTime > 0) { value = lastVisitedTime; } - value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60 * 1000L; + value += (Priority.VERY_HIGH.ordinal() - tabletHealth.priority.ordinal() + 1) * 60 * 1000L; value += 5000L * (failedSchedCounter / 10); if (schedFailedCode == SubCode.WAITING_DECOMMISSION) { value += 5 * 1000L; } + long baseTime = Config.tablet_schedule_high_priority_second * 1000L; // repair tasks always prior than balance if (type == Type.BALANCE) { - value += 5 * 3600 * 1000L; // 5 hour + value += 10 * baseTime; + } else { + int replicaNum = replicaAlloc.getTotalReplicaNum(); + if (tabletHealth.aliveAndVersionCompleteNum < replicaNum && !tabletHealth.noPathForNewReplica) { + if (tabletHealth.aliveAndVersionCompleteNum < (replicaNum / 2 + 1)) { + value -= 3 * baseTime; + if (tabletHealth.hasRecentLoadFailed) { + value -= 3 * baseTime; + } + } + if (tabletHealth.hasAliveAndVersionIncomplete) { + value -= 1 * baseTime; + if (isUniqKeyMergeOnWrite) { + value -= 1 * baseTime; + } + } + } } - if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR) { - value -= 3600 * 1000L; // 1 hour + if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR) { + value -= 1 * baseTime; } return value; @@ -1328,8 +1357,8 @@ private long getCompareValue() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("tablet id: ").append(tabletId); - if (tabletStatus != null) { - sb.append(", status: ").append(tabletStatus.name()); + if (tabletHealth.status != null) { + sb.append(", status: ").append(tabletHealth.status.name()); } if (state != null) { sb.append(", state: ").append(state.name()); @@ -1340,9 +1369,7 @@ public String toString() { if (type == Type.BALANCE && balanceType != null) { sb.append(", balance: ").append(balanceType.name()); } - if (priority != null) { - sb.append(", priority: ").append(priority.name()); - } + sb.append(", priority: ").append(tabletHealth.priority.name()); sb.append(", tablet size: ").append(tabletSize); if (srcReplica != null) { sb.append(", from backend: ").append(srcReplica.getBackendId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 10a39af8a15f6c..a3a3a93e0fabab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -34,6 +34,7 @@ import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Tablet.TabletHealth; import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair; @@ -45,7 +46,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.persist.ReplicaPersistInfo; @@ -167,6 +167,17 @@ public TabletScheduler(Env env, SystemInfoService infoService, TabletInvertedInd this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex, backendsWorkingSlots); } + // for fe ut + public synchronized void clear() { + pendingTablets.clear(); + allTabletTypes.clear(); + runningTablets.clear(); + schedHistory.clear(); + + lastStatUpdateTime = 0; + lastSlotAdjustTime = 0; + } + public TabletSchedulerStat getStat() { return stat; } @@ -322,7 +333,7 @@ public synchronized void changeTabletsPriorityToVeryHigh(long dbId, long tblId, * */ @Override - protected void runAfterCatalogReady() { + public void runAfterCatalogReady() { if (!updateWorkingSlots()) { return; } @@ -481,7 +492,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) tabletCtx.setLastVisitedTime(currentTime); stat.counterTabletScheduled.incrementAndGet(); - Pair statusPair; + TabletHealth tabletHealth; Database db = Env.getCurrentInternalCatalog().getDbOrException(tabletCtx.getDbId(), s -> new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "db " + tabletCtx.getDbId() + " does not exist")); @@ -530,15 +541,13 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) Preconditions.checkState(tabletOrderIdx != -1); Set backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx); - TabletStatus st = tablet.getColocateHealthStatus( - partition.getVisibleVersion(), replicaAlloc, backendsSet); - statusPair = Pair.of(st, Priority.HIGH); + tabletHealth = tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc, backendsSet); + tabletHealth.priority = Priority.HIGH; tabletCtx.setColocateGroupBackendIds(backendsSet); } else { replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); List aliveBeIds = infoService.getAllBackendIds(true); - statusPair = tablet.getHealthStatusWithPriority( - infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds); + tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds); } if (tabletCtx.getType() != allTabletTypes.get(tabletId)) { @@ -576,7 +585,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) } } - if (statusPair.first != TabletStatus.VERSION_INCOMPLETE + if (tabletHealth.status != TabletStatus.VERSION_INCOMPLETE && (partition.getState() != PartitionState.NORMAL || tableState != OlapTableState.NORMAL) && tableState != OlapTableState.WAITING_STABLE) { // If table is under ALTER process(before FINISHING), do not allow to add or delete replica. @@ -585,13 +594,14 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) // executing an alter job, but the alter job is in a PENDING state and is waiting for // the table to become stable. In this case, we allow the tablet repair to proceed. throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, - "table is in alter process, but tablet status is " + statusPair.first.name()); + "table is in alter process, but tablet status is " + tabletHealth.status.name()); } - tabletCtx.setTabletStatus(statusPair.first); - if (statusPair.first == TabletStatus.HEALTHY && tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) { + tabletCtx.setTabletHealth(tabletHealth); + tabletCtx.setIsUniqKeyMergeOnWrite(tbl.isUniqKeyMergeOnWrite()); + if (tabletHealth.status == TabletStatus.HEALTHY && tabletCtx.getType() == TabletSchedCtx.Type.REPAIR) { throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "tablet is healthy"); - } else if (statusPair.first != TabletStatus.HEALTHY + } else if (tabletHealth.status != TabletStatus.HEALTHY && tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) { // we select an unhealthy tablet to do balance, which is not right. // so here we stop this task. @@ -612,7 +622,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId())); tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium()); - handleTabletByTypeAndStatus(statusPair.first, tabletCtx, batchTask); + handleTabletByTypeAndStatus(tabletHealth.status, tabletCtx, batchTask); } finally { tbl.writeUnlock(); } @@ -1379,6 +1389,24 @@ private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throw // if forColocate is false, the tag must be set. private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, Tag tag, boolean forColocate) throws SchedException { + boolean noPathForNewReplica = false; + try { + return doChooseAvailableDestPath(tabletCtx, tag, forColocate); + } catch (SchedException e) { + if (e.getStatus() == Status.UNRECOVERABLE) { + noPathForNewReplica = true; + } + throw e; + } finally { + Tablet tablet = tabletCtx.getTablet(); + if (tablet != null) { + tablet.setLastTimeNoPathForNewReplica(noPathForNewReplica ? System.currentTimeMillis() : -1L); + } + } + } + + private RootPathLoadStatistic doChooseAvailableDestPath(TabletSchedCtx tabletCtx, Tag tag, boolean forColocate) + throws SchedException { List beStatistics; if (tag != null) { Preconditions.checkState(!forColocate); @@ -1552,13 +1580,11 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, } } - private void addBackToPendingTablets(TabletSchedCtx tabletCtx) { Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.PENDING); addTablet(tabletCtx, true /* force */); } - private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) { if (state == TabletSchedCtx.State.CANCELLED || state == TabletSchedCtx.State.UNEXPECTED) { if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE @@ -1595,8 +1621,6 @@ private void tryAddAfterFinished(TabletSchedCtx tabletCtx) { if (tbl == null) { return; } - Pair statusPair; - ReplicaAllocation replicaAlloc = null; tbl.readLock(); try { Partition partition = tbl.getPartition(tabletCtx.getPartitionId()); @@ -1614,67 +1638,79 @@ private void tryAddAfterFinished(TabletSchedCtx tabletCtx) { return; } - boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId()); - if (isColocateTable) { - GroupId groupId = colocateTableIndex.getGroup(tbl.getId()); - if (groupId == null) { - return; - } - ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId); - if (groupSchema == null) { - return; - } + tryAddRepairTablet(tablet, tabletCtx.getDbId(), tbl, partition, idx, finishedCounter); + } finally { + tbl.readUnlock(); + } + } - replicaAlloc = groupSchema.getReplicaAlloc(); - int tabletOrderIdx = tabletCtx.getTabletOrderIdx(); - if (tabletOrderIdx == -1) { - tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId()); - } - Preconditions.checkState(tabletOrderIdx != -1); + public void tryAddRepairTablet(Tablet tablet, long dbId, OlapTable table, Partition partition, + MaterializedIndex idx, int finishedCounter) { + if (Config.disable_tablet_scheduler) { + return; + } - Set backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx); - TabletStatus st = tablet.getColocateHealthStatus( - partition.getVisibleVersion(), replicaAlloc, backendsSet); - statusPair = Pair.of(st, Priority.HIGH); - tabletCtx.setColocateGroupBackendIds(backendsSet); - } else { - replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId()); - List aliveBeIds = infoService.getAllBackendIds(true); - statusPair = tablet.getHealthStatusWithPriority( - infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds); + TabletHealth tabletHealth; + ReplicaAllocation replicaAlloc; + Set colocateBackendIds = null; + boolean isColocateTable = colocateTableIndex.isColocateTable(table.getId()); + if (isColocateTable) { + GroupId groupId = colocateTableIndex.getGroup(table.getId()); + if (groupId == null) { + return; + } + ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId); + if (groupSchema == null) { + return; + } - if (statusPair.second.ordinal() < tabletCtx.getPriority().ordinal()) { - statusPair.second = tabletCtx.getPriority(); - } + replicaAlloc = groupSchema.getReplicaAlloc(); + int tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId()); + if (tabletOrderIdx == -1) { + LOG.warn("Unknow colocate tablet order idx: group {}, table {}, partition {}, index {}, tablet {}", + groupId, table.getId(), partition.getId(), idx.getId(), tablet.getId()); + return; } - if (statusPair.first == TabletStatus.NEED_FURTHER_REPAIR) { + colocateBackendIds = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx); + tabletHealth = tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc, colocateBackendIds); + tabletHealth.priority = Priority.HIGH; + } else { + replicaAlloc = table.getPartitionInfo().getReplicaAllocation(partition.getId()); + List aliveBeIds = infoService.getAllBackendIds(true); + tabletHealth = tablet.getHealth(infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds); + } + + if (tabletHealth.status == TabletStatus.HEALTHY || tabletHealth.status == TabletStatus.UNRECOVERABLE) { + return; + } + + // first time found this tablet is unhealthy + if (finishedCounter == 0) { + if (!tablet.readyToBeRepaired(Env.getCurrentSystemInfo(), tabletHealth.priority)) { + return; + } + } else { + if (tabletHealth.status == TabletStatus.NEED_FURTHER_REPAIR) { // replica is just waiting for finishing txns before furtherRepairWatermarkTxnTd, - // no need to add it immediately - Replica replica = tablet.getReplicaByBackendId(tabletCtx.getDestBackendId()); + // no need to re add it immediately, can wait a little + Replica replica = tablet.getReplicaById(tabletHealth.needFurtherRepairReplicaId); if (replica != null && replica.getVersion() >= partition.getVisibleVersion() && replica.getLastFailedVersion() < 0) { return; } } - } finally { - tbl.readUnlock(); } - if (statusPair.first == TabletStatus.HEALTHY) { - return; - } + TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, dbId, table.getId(), + partition.getId(), idx.getId(), tablet.getId(), replicaAlloc, System.currentTimeMillis()); - TabletSchedCtx newTabletCtx = new TabletSchedCtx( - TabletSchedCtx.Type.REPAIR, tabletCtx.getDbId(), tabletCtx.getTblId(), - tabletCtx.getPartitionId(), tabletCtx.getIndexId(), tabletCtx.getTabletId(), - replicaAlloc, System.currentTimeMillis()); - - newTabletCtx.setTabletStatus(statusPair.first); - newTabletCtx.setPriority(statusPair.second); - newTabletCtx.setFinishedCounter(finishedCounter); + tabletCtx.setTabletHealth(tabletHealth); + tabletCtx.setFinishedCounter(finishedCounter); + tabletCtx.setColocateGroupBackendIds(colocateBackendIds); + tabletCtx.setIsUniqKeyMergeOnWrite(table.isUniqKeyMergeOnWrite()); - addTablet(newTabletCtx, false); + addTablet(tabletCtx, false); } private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, boolean resetReplicaState) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java index e28c74c327eff9..8aed3600741054 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java @@ -29,10 +29,8 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Tablet; -import org.apache.doris.clone.TabletSchedCtx; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.Pair; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskQueue; @@ -181,6 +179,7 @@ static class DBTabletStatistic { olapTable.readLock(); try { for (Partition partition : olapTable.getAllPartitions()) { + long visibleVersion = partition.getVisibleVersion(); ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo() .getReplicaAllocation(partition.getId()); for (MaterializedIndex materializedIndex : partition.getMaterializedIndices( @@ -196,13 +195,10 @@ static class DBTabletStatistic { replicaAlloc = groupSchema.getReplicaAlloc(); } Set backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i); - res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc, - backendsSet); + res = tablet.getColocateHealth(visibleVersion, replicaAlloc, backendsSet).status; } else { - Pair pair - = tablet.getHealthStatusWithPriority(infoService, - partition.getVisibleVersion(), replicaAlloc, aliveBeIds); - res = pair.first; + res = tablet.getHealth(infoService, visibleVersion, replicaAlloc, + aliveBeIds).status; } switch (res) { // CHECKSTYLE IGNORE THIS LINE: missing switch default case HEALTHY: diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java index 67e24870f9fc1b..060516eb7ba88b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java @@ -35,7 +35,8 @@ */ public class TabletSchedulerDetailProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("TabletId") - .add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe") + .add("Type").add("Medium").add("Status").add("State").add("SchedCode") + .add("Priority").add("RealPriorityVal").add("SrcBe") .add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit") .add("Finished").add("ReplicaSize").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer") .add("CmtVer").add("ErrMsg") diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 635a8bb675f249..de56b4ce9b6193 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -38,7 +38,6 @@ import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.clone.TabletSchedCtx; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; @@ -1252,8 +1251,7 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI if (groupSchema != null) { replicaAlloc = groupSchema.getReplicaAlloc(); } - TabletStatus status = - tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, backendsSet); + TabletStatus status = tablet.getColocateHealth(visibleVersion, replicaAlloc, backendsSet).status; if (status == TabletStatus.HEALTHY) { return false; } @@ -1265,8 +1263,7 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI SystemInfoService infoService = Env.getCurrentSystemInfo(); List aliveBeIds = infoService.getAllBackendIds(true); - Pair status = tablet.getHealthStatusWithPriority(infoService, - visibleVersion, replicaAlloc, aliveBeIds); + TabletStatus status = tablet.getHealth(infoService, visibleVersion, replicaAlloc, aliveBeIds).status; // FORCE_REDUNDANT is a specific missing case. // So it can add replica when it's in FORCE_REDUNDANT. @@ -1275,16 +1272,16 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI // it's safe to add this replica. // Because if the tablet scheduler want to delete a replica, it will choose the sched // unavailable replica and avoid the repeating loop as above. - boolean canAddForceRedundant = status.first == TabletStatus.FORCE_REDUNDANT + boolean canAddForceRedundant = status == TabletStatus.FORCE_REDUNDANT && infoService.checkBackendScheduleAvailable(backendId) && tablet.getReplicas().stream().anyMatch( r -> !infoService.checkBackendScheduleAvailable(r.getBackendId())); if (isColocateBackend || canAddForceRedundant - || status.first == TabletStatus.VERSION_INCOMPLETE - || status.first == TabletStatus.REPLICA_MISSING - || status.first == TabletStatus.UNRECOVERABLE) { + || status == TabletStatus.VERSION_INCOMPLETE + || status == TabletStatus.REPLICA_MISSING + || status == TabletStatus.UNRECOVERABLE) { long lastFailedVersion = -1L; // For some partition created by old version's Doris @@ -1360,7 +1357,7 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI LOG.info("add replica[{}-{}] to catalog. backend[{}], tablet status {}, tablet size {}, " + "is colocate backend {}", - tabletId, replicaId, backendId, status.first.name(), tablet.getReplicas().size(), + tabletId, replicaId, backendId, status.name(), tablet.getReplicas().size(), isColocateBackend); return true; } else { @@ -1374,7 +1371,7 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI } LOG.warn("no add replica [{}-{}] cause it is enough[{}-{}], tablet status {}", tabletId, replicaId, tablet.getReplicas().size(), replicaAlloc.toCreateStmt(), - status.first.name()); + status.name()); return false; } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 04065db8d3260e..0dcbfee9c4b20c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -177,7 +177,7 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys database.getId(), olapTableSink.getDstTable(), analyzer)); dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor()); List locationParams = olapTableSink - .createLocation(olapTableSink.getDstTable()); + .createLocation(database.getId(), olapTableSink.getDstTable()); dataStreamSink.setTabletSinkLocationParam(locationParams.get(0)); dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index e3195eec13549d..4196748cb8e8b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -213,7 +213,7 @@ public void complete(Analyzer analyzer) throws UserException { tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup()); tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer)); tSink.setPartition(createPartition(tSink.getDbId(), dstTable, analyzer)); - List locationParams = createLocation(dstTable); + List locationParams = createLocation(tSink.getDbId(), dstTable); tSink.setLocation(locationParams.get(0)); if (singleReplicaLoad) { tSink.setSlaveLocation(locationParams.get(1)); @@ -604,7 +604,7 @@ public List createDummyLocation(OlapTable table) throws return Arrays.asList(locationParam, slaveLocationParam); } - public List createLocation(OlapTable table) throws UserException { + public List createLocation(long dbId, OlapTable table) throws UserException { if (table.getPartitionInfo().enableAutomaticPartition() && partitionIds.isEmpty()) { return createDummyLocation(table); } @@ -622,6 +622,13 @@ public List createLocation(OlapTable table) throws User for (Tablet tablet : index.getTablets()) { Multimap bePathsMap = tablet.getNormalReplicaBackendPathMap(); if (bePathsMap.keySet().size() < loadRequiredReplicaNum) { + long now = System.currentTimeMillis(); + long lastLoadFailedTime = tablet.getLastLoadFailedTime(); + tablet.setLastLoadFailedTime(now); + if (now - lastLoadFailedTime >= 5000L) { + Env.getCurrentEnv().getTabletScheduler().tryAddRepairTablet( + tablet, dbId, table, partition, index, 0); + } throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "tablet " + tablet.getId() + " alive replica num " + bePathsMap.keySet().size() + " < load required replica num " + loadRequiredReplicaNum diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index db34dc2be3c84d..cc4ba2110846de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -621,6 +621,14 @@ private void checkCommitStatus(List tableList, TransactionState transacti int successReplicaNum = tabletSuccReplicas.size(); if (successReplicaNum < loadRequiredReplicaNum) { + long now = System.currentTimeMillis(); + long lastLoadFailedTime = tablet.getLastLoadFailedTime(); + tablet.setLastLoadFailedTime(now); + if (now - lastLoadFailedTime >= 5000L) { + Env.getCurrentEnv().getTabletScheduler().tryAddRepairTablet( + tablet, db.getId(), table, partition, index, 0); + } + String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, tabletVersionFailedReplicas); @@ -1271,6 +1279,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, partitionId, partition.getCommittedVersion(), writeDetail)); } + tablet.setLastLoadFailedTime(-1L); continue; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java index d7fdb2694a8282..3d54a772853f46 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -20,6 +20,8 @@ import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Sets; @@ -42,6 +44,7 @@ public class TabletTest { private Replica replica3; private TabletInvertedIndex invertedIndex; + private SystemInfoService infoService; @Mocked private Env env; @@ -49,6 +52,12 @@ public class TabletTest { @Before public void makeTablet() { invertedIndex = new TabletInvertedIndex(); + infoService = new SystemInfoService(); + for (long beId = 1L; beId <= 4L; beId++) { + Backend be = new Backend(beId, "127.0.0." + beId, 8030); + be.setAlive(true); + infoService.addBackend(be); + } new Expectations(env) { { Env.getCurrentEnvJournalVersion(); @@ -59,6 +68,10 @@ public void makeTablet() { minTimes = 0; result = invertedIndex; + Env.getCurrentSystemInfo(); + minTimes = 0; + result = infoService; + Env.isCheckpointThread(); minTimes = 0; result = false; @@ -170,8 +183,8 @@ private final void testTabletColocateHealthStatus0(Tablet.TabletStatus exceptedT tablet.addReplica(new Replica(replicaId++, pair.first, versionAndSuccessVersion, 0, 200000L, 0, 3000L, ReplicaState.NORMAL, lastFailVersion, versionAndSuccessVersion)); } - Assert.assertEquals(tablet.getColocateHealthStatus(100L, new ReplicaAllocation((short) 3), - Sets.newHashSet(1L, 2L, 3L)), exceptedTabletStatus); + Assert.assertEquals(tablet.getColocateHealth(100L, new ReplicaAllocation((short) 3), + Sets.newHashSet(1L, 2L, 3L)).status, exceptedTabletStatus); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java new file mode 100644 index 00000000000000..b22925e5d89270 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletHealthTest.java @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.ColocateTableIndex; +import org.apache.doris.catalog.ColocateTableIndex.GroupId; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Tablet.TabletHealth; +import org.apache.doris.catalog.Tablet.TabletStatus; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class TabletHealthTest extends TestWithFeService { + + private Database db; + + @Override + protected void beforeCreatingConnectContext() throws Exception { + Config.enable_debug_points = true; + Config.disable_balance = true; + Config.disable_colocate_balance_between_groups = true; + Config.drop_backend_after_decommission = false; + Config.colocate_group_relocate_delay_second = -1000; // be dead will imm relocate + Config.tablet_schedule_interval_ms = 7200_000L; //disable schedule + Config.tablet_checker_interval_ms = 7200_000L; //disable checker + } + + @Override + protected int backendNum() { + return 3; + } + + @Override + protected void runBeforeAll() throws Exception { + Thread.sleep(1000); + createDatabase("test"); + useDatabase("test"); + db = Env.getCurrentInternalCatalog().getDbOrMetaException("test"); + } + + @Override + protected void runBeforeEach() throws Exception { + for (Table table : db.getTables()) { + dropTable(table.getName(), true); + } + for (Backend be : Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG)) { + be.setDecommissioned(false); + } + Env.getCurrentEnv().getTabletScheduler().clear(); + DebugPointUtil.clearDebugPoints(); + Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG))); + } + + private void shutdownBackends(List backendIds) throws Exception { + if (backendIds.isEmpty()) { + return; + } + Map params = Maps.newHashMap(); + params.put("deadBeIds", Joiner.on(",").join(backendIds)); + DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler", params); + List backends = backendIds.stream().map(beId -> Env.getCurrentSystemInfo().getBackend(beId)) + .collect(Collectors.toList()); + Assertions.assertTrue(checkBELostHeartbeat(backends)); + } + + private void doRepair() throws Exception { + RebalancerTestUtil.updateReplicaPathHash(); + for (int i = 0; i < 10; i++) { + Env.getCurrentEnv().getTabletChecker().runAfterCatalogReady(); + ColocateTableCheckerAndBalancer.getInstance().runAfterCatalogReady(); + if (Env.getCurrentEnv().getTabletScheduler().getPendingNum() == 0) { + break; + } + + Env.getCurrentEnv().getTabletScheduler().runAfterCatalogReady(); + Thread.sleep(500); + } + } + + private void checkTabletStatus(Tablet tablet, TabletStatus status, + OlapTable table, Partition partition) throws Exception { + SystemInfoService infoService = Env.getCurrentSystemInfo(); + ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); + TabletHealth health; + if (colocateTableIndex.isColocateTable(table.getId())) { + GroupId groupId = colocateTableIndex.getGroup(table.getId()); + ReplicaAllocation replicaAlloc = colocateTableIndex.getGroupSchema(groupId).getReplicaAlloc(); + Set colocateBackends = colocateTableIndex.getTabletBackendsByGroup(groupId, 0); + health = tablet.getColocateHealth(partition.getVisibleVersion(), replicaAlloc, colocateBackends); + } else { + ReplicaAllocation replicaAlloc = table.getPartitionInfo().getReplicaAllocation(partition.getId()); + health = tablet.getHealth(infoService, partition.getVisibleVersion(), + replicaAlloc, infoService.getAllBackendIds(true)); + } + Assertions.assertEquals(status, health.status); + } + + private void checkTabletIsHealth(Tablet tablet, OlapTable table, Partition partition) throws Exception { + checkTabletStatus(tablet, TabletStatus.HEALTHY, table, partition); + ReplicaAllocation replicaAlloc = table.getPartitionInfo().getReplicaAllocation(partition.getId()); + Assertions.assertEquals((int) replicaAlloc.getTotalReplicaNum(), tablet.getReplicas().size()); + for (Replica replica : tablet.getReplicas()) { + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + Assertions.assertTrue(replica.isScheduleAvailable()); + Assertions.assertTrue(replica.isAlive()); + } + ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); + if (colocateTableIndex.isColocateTable(table.getId())) { + GroupId groupId = colocateTableIndex.getGroup(table.getId()); + Assertions.assertFalse(colocateTableIndex.isGroupUnstable(groupId)); + } + } + + @Test + public void testTabletHealth() throws Exception { + createTable("CREATE TABLE tbl1 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 1" + + " PROPERTIES ('replication_num' = '3')"); + + OlapTable table = (OlapTable) db.getTableOrMetaException("tbl1"); + Partition partition = table.getPartitions().iterator().next(); + Tablet tablet = partition.getMaterializedIndices(IndexExtState.ALL).iterator().next() + .getTablets().iterator().next(); + + partition.updateVisibleVersion(10L); + tablet.getReplicas().forEach(replica -> replica.updateVersion(10L)); + + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + // 1 replica miss version + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + tablet.deleteReplicaByBackendId(tablet.getReplicas().get(2).getBackendId()); + Assertions.assertEquals(2, tablet.getReplicas().size()); + // 1 replica miss version, 1 replica lost + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + tablet.getReplicas().get(1).setBad(true); + // 1 replica miss version, 1 replica bad + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.deleteReplicaByBackendId(tablet.getReplicas().get(2).getBackendId()); + Assertions.assertEquals(2, tablet.getReplicas().size()); + // 1 replica lost + checkTabletStatus(tablet, TabletStatus.REPLICA_MISSING, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(2).setBad(true); + // 1 replica bad + checkTabletStatus(tablet, TabletStatus.FORCE_REDUNDANT, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + Assertions.assertEquals(8L, tablet.getReplicas().get(0).getVersion()); + // 1 replica miss version + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + + shutdownBackends(Lists.newArrayList(tablet.getReplicas().get(2).getBackendId())); + // 1 replica miss version, 1 replica dead + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + doRepair(); + Assertions.assertEquals(10L, tablet.getReplicas().get(0).getVersion()); + // 1 replica dead + checkTabletStatus(tablet, TabletStatus.REPLICA_MISSING, table, partition); + + // be alive again + DebugPointUtil.clearDebugPoints(); + Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG))); + + alterTableSync("ALTER TABLE tbl1 MODIFY PARTITION(*) SET ('replication_num' = '2')"); + ReplicaAllocation replicaAlloc = table.getPartitionInfo().getReplicaAllocation(partition.getId()); + Assertions.assertEquals(2, (int) replicaAlloc.getTotalReplicaNum()); + + checkTabletStatus(tablet, TabletStatus.REDUNDANT, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(1).setBad(true); + // 1 replica bad + checkTabletStatus(tablet, TabletStatus.REPLICA_MISSING, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + Backend decommissionBe = Env.getCurrentSystemInfo().getBackend(tablet.getReplicas().get(0).getBackendId()); + decommissionBe.setDecommissioned(true); + // 1 replica's backend is decommission + checkTabletStatus(tablet, TabletStatus.REPLICA_RELOCATING, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + decommissionBe.setDecommissioned(false); + + shutdownBackends(Lists.newArrayList(tablet.getReplicas().get(1).getBackendId())); + // 1 replica dead + checkTabletStatus(tablet, TabletStatus.FORCE_REDUNDANT, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + shutdownBackends(Lists.newArrayList(tablet.getBackendIds())); + doRepair(); + // all replica dead + checkTabletStatus(tablet, TabletStatus.UNRECOVERABLE, table, partition); + Assertions.assertEquals(0, Env.getCurrentEnv().getTabletScheduler().getPendingNum()); + + dropTable(table.getName(), true); + } + + @Test + public void testColocateTabletHealth() throws Exception { + createTable("CREATE TABLE tbl2 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 1" + + " PROPERTIES ('replication_num' = '3', 'colocate_with' = 'foo')"); + + OlapTable table = (OlapTable) db.getTableOrMetaException("tbl2"); + Partition partition = table.getPartitions().iterator().next(); + Tablet tablet = partition.getMaterializedIndices(IndexExtState.ALL).iterator().next() + .getTablets().iterator().next(); + + Assertions.assertTrue(Env.getCurrentColocateIndex().isColocateTable(table.getId())); + + partition.updateVisibleVersion(10L); + tablet.getReplicas().forEach(replica -> replica.updateVersion(10L)); + + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + // 1 replica miss version + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + + tablet.deleteReplicaByBackendId(tablet.getReplicas().get(2).getBackendId()); + Assertions.assertEquals(2, tablet.getReplicas().size()); + // 1 replica miss version, 1 replica lost + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.deleteReplicaByBackendId(tablet.getReplicas().get(2).getBackendId()); + Assertions.assertEquals(2, tablet.getReplicas().size()); + // 1 replica lost + checkTabletStatus(tablet, TabletStatus.COLOCATE_MISMATCH, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(0).adminUpdateVersionInfo(8L, null, null, 0L); + Assertions.assertEquals(8L, tablet.getReplicas().get(0).getVersion()); + // 1 replica miss version + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + tablet.getReplicas().get(2).setBad(true); + // 1 replica miss version, 1 replica bad + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(2).setBad(true); + // 1 replica bad + checkTabletStatus(tablet, TabletStatus.COLOCATE_REDUNDANT, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + Assertions.assertNotNull(getSqlStmtExecutor("ALTER COLOCATE GROUP foo SET ('replication_num' = '2')")); + ReplicaAllocation replicaAlloc = table.getPartitionInfo().getReplicaAllocation(partition.getId()); + Assertions.assertEquals(2, (int) replicaAlloc.getTotalReplicaNum()); + + checkTabletStatus(tablet, TabletStatus.COLOCATE_REDUNDANT, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + tablet.getReplicas().get(1).setBad(true); + // 1 replica bad, first delete it, then re-add it + checkTabletStatus(tablet, TabletStatus.COLOCATE_REDUNDANT, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + long deleteBeId = tablet.getReplicas().get(1).getBackendId(); + shutdownBackends(Lists.newArrayList(deleteBeId)); + ColocateTableCheckerAndBalancer.getInstance().runAfterCatalogReady(); // colocate relocate + // 1 replica dead + checkTabletStatus(tablet, TabletStatus.COLOCATE_MISMATCH, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + + // be alive again + DebugPointUtil.clearDebugPoints(); + Assertions.assertTrue(checkBEHeartbeat(Env.getCurrentSystemInfo().getBackendsByTag(Tag.DEFAULT_BACKEND_TAG))); + + // temporary delete replica 1 + tablet.deleteReplica(tablet.getReplicas().get(1)); + Assertions.assertFalse(tablet.getBackendIds().contains(deleteBeId)); + Replica replica = new Replica(1234567890L, deleteBeId, Replica.ReplicaState.NORMAL, 8L, 0); + // add a `error` replica on other backend + tablet.addReplica(replica); + // colocate don't relocate because no be dead + ColocateTableCheckerAndBalancer.getInstance().runAfterCatalogReady(); + // first repair the replica on deleteBeId, then add a new replica on the located backend, + // then drop the replica on deleteBeId + checkTabletStatus(tablet, TabletStatus.VERSION_INCOMPLETE, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + Assertions.assertFalse(tablet.getBackendIds().contains(deleteBeId)); + + Backend decommissionBe = Env.getCurrentSystemInfo().getBackend(tablet.getReplicas().get(0).getBackendId()); + decommissionBe.setDecommissioned(true); + // 1 replica's backend is decommission + ColocateTableCheckerAndBalancer.getInstance().runAfterCatalogReady(); + checkTabletStatus(tablet, TabletStatus.COLOCATE_MISMATCH, table, partition); + doRepair(); + checkTabletIsHealth(tablet, table, partition); + decommissionBe.setDecommissioned(false); + + shutdownBackends(Lists.newArrayList(tablet.getBackendIds())); + doRepair(); + // all replica dead + checkTabletStatus(tablet, TabletStatus.UNRECOVERABLE, table, partition); + Assertions.assertEquals(0, Env.getCurrentEnv().getTabletScheduler().getPendingNum()); + + dropTable(table.getName(), true); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index 8916cc44e9187a..a8d72eada362cb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -98,7 +98,7 @@ public void testDecommissionBackend() throws Exception { } Assertions.assertNotNull(srcBackend); - String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; + String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); @@ -213,7 +213,7 @@ public void testDecommissionBackendWithDropTable() throws Exception { dropTable("db3.tbl1", false); // 6. execute decommission - String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; + String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); @@ -313,8 +313,7 @@ public void testDecommissionBackendWithMTMV() throws Exception { // 4. query tablet num int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); - String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" - + srcBackend.getHeartbeatPort() + "\""; + String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 063ab21d8bcf62..510b2ba00eb159 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -412,7 +412,7 @@ protected int startFEServerWithoutRetry(String runningDir) protected void createDorisCluster() throws InterruptedException, NotInitException, IOException, DdlException, EnvVarNotSetException, FeStartException { - createDorisCluster(runningDir, backendNum()); + createDorisClusterWithMultiTag(runningDir, backendNum()); } protected void createDorisCluster(String runningDir, int backendNum) @@ -425,26 +425,13 @@ protected void createDorisCluster(String runningDir, int backendNum) bes.add(createBackend("127.0.0.1", feRpcPort)); } System.out.println("after create backend"); - checkBEHeartbeat(bes); + if (!checkBEHeartbeat(bes)) { + System.out.println("Some backends dead, all backends: " + bes); + } // Thread.sleep(2000); System.out.println("after create backend2"); } - private void checkBEHeartbeat(List bes) throws InterruptedException { - int maxTry = Config.heartbeat_interval_second + 2; - boolean allAlive = false; - while (maxTry-- > 0 && !allAlive) { - Thread.sleep(1000); - boolean hasDead = false; - for (Backend be : bes) { - if (!be.isAlive()) { - hasDead = true; - } - } - allAlive = !hasDead; - } - } - // Create multi backends with different host for unit test. // the host of BE will be "127.0.0.1", "127.0.0.2" protected void createDorisClusterWithMultiTag(String runningDir, int backendNum) @@ -452,14 +439,45 @@ protected void createDorisClusterWithMultiTag(String runningDir, int backendNum) InterruptedException { // set runningUnitTest to true, so that for ut, the agent task will be send to "127.0.0.1" // to make cluster running well. - FeConstants.runningUnitTest = true; + if (backendNum > 1) { + FeConstants.runningUnitTest = true; + } int feRpcPort = startFEServer(runningDir); List bes = Lists.newArrayList(); + System.out.println("start create backend, backend num " + backendNum); for (int i = 0; i < backendNum; i++) { String host = "127.0.0." + (i + 1); bes.add(createBackend(host, feRpcPort)); } - checkBEHeartbeat(bes); + System.out.println("after create backend"); + if (!checkBEHeartbeat(bes)) { + System.out.println("Some backends dead, all backends: " + bes); + } + System.out.println("after create backend2"); + } + + protected boolean checkBEHeartbeat(List bes) { + return checkBEHeartbeatStatus(bes, true); + } + + protected boolean checkBELostHeartbeat(List bes) { + return checkBEHeartbeatStatus(bes, false); + } + + private boolean checkBEHeartbeatStatus(List bes, boolean isAlive) { + int maxTry = Config.heartbeat_interval_second + 2; + while (maxTry-- > 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // no exception + } + if (bes.stream().allMatch(be -> be.isAlive() == isAlive)) { + return true; + } + } + + return false; } protected Backend addNewBackend() throws IOException, InterruptedException {