diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 9cb883540127b4..236bfde88ad68f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -219,6 +219,24 @@ public class Config extends ConfigBase { "The log roll size of BDBJE. When the number of log entries exceeds this value, the log will be rolled"}) public static int edit_log_roll_num = 50000; + @ConfField(mutable = true, masterOnly = true, description = { + "批量 BDBJE 日志包含的最大条目数", "The max number of log entries for batching BDBJE"}) + public static int batch_edit_log_max_item_num = 100; + + @ConfField(mutable = true, masterOnly = true, description = { + "批量 BDBJE 日志包含的最大长度", "The max size for batching BDBJE"}) + public static long batch_edit_log_max_byte_size = 640 * 1024L; + + @ConfField(mutable = true, masterOnly = true, description = { + "连续写多批 BDBJE 日志后的停顿时间", "The sleep time after writting multiple batching BDBJE continuously"}) + public static long batch_edit_log_rest_time_ms = 10; + + @ConfField(mutable = true, masterOnly = true, description = { + "连续写多批 BDBJE 日志后需要短暂停顿。这里最大的连写次数。", + "After writting multiple batching BDBJE continuously, need a short rest. " + + "Indicates the writting count before a rest"}) + public static long batch_edit_log_continuous_count_for_rest = 1000; + @ConfField(description = {"元数据同步的容忍延迟时间,单位为秒。如果元数据的延迟超过这个值,非主 FE 会停止提供服务", "The toleration delay time of meta data synchronization, in seconds. " + "If the delay of meta data exceeds this value, non-master FE will stop offering service"}) @@ -3033,8 +3051,14 @@ public static int metaServiceRpcRetryTimes() { @ConfField(mutable = true, description = {"存算分离模式下是否开启大事务提交,默认false"}) public static boolean enable_cloud_txn_lazy_commit = false; - @ConfField(mutable = true, description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认true"}) - public static boolean enable_immediate_be_assign = true; + @ConfField(mutable = true, masterOnly = true, + description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认false"}) + public static boolean enable_immediate_be_assign = false; + + @ConfField(mutable = true, masterOnly = false, + description = { "存算分离模式下,一个BE挂掉多长时间后,它的tablet彻底转移到其他BE上" }) + public static int rehash_tablet_after_be_dead_seconds = 3600; + @ConfField(mutable = true, description = {"存算分离模式下是否启用自动启停功能,默认true", "Whether to enable the automatic start-stop feature in cloud model, default is true."}) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index a3653fefd246aa..ff786236cbdfc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -17,6 +17,7 @@ package org.apache.doris.cloud.catalog; +import org.apache.doris.catalog.ColocateTableIndex.GroupId; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; @@ -31,6 +32,7 @@ import org.apache.doris.system.Backend; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; import com.google.gson.annotations.SerializedName; @@ -44,12 +46,14 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; public class CloudReplica extends Replica { private static final Logger LOG = LogManager.getLogger(CloudReplica.class); // In the future, a replica may be mapped to multiple BEs in a cluster, // so this value is be list + @SerializedName(value = "bes") private Map> primaryClusterToBackends = new ConcurrentHashMap>(); @SerializedName(value = "dbId") private long dbId = -1; @@ -93,51 +97,115 @@ private boolean isColocated() { return Env.getCurrentColocateIndex().isColocateTable(tableId); } - private long getColocatedBeId(String cluster) throws ComputeGroupException { - List bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getBackendsByClusterId(cluster); - String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterNameByClusterId(cluster); + public long getColocatedBeId(String clusterId) throws ComputeGroupException { + CloudSystemInfoService infoService = ((CloudSystemInfoService) Env.getCurrentSystemInfo()); + List bes = infoService.getBackendsByClusterId(clusterId).stream() + .filter(be -> !be.isQueryDisabled()).collect(Collectors.toList()); + String clusterName = infoService.getClusterNameByClusterId(clusterId); if (bes.isEmpty()) { + LOG.warn("failed to get available be, cluster: {}-{}", clusterName, clusterId); throw new ComputeGroupException( String.format("There are no Backend nodes in the current compute group %s", clusterName), ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_NO_BE); } List availableBes = new ArrayList<>(); + List decommissionAvailBes = new ArrayList<>(); for (Backend be : bes) { if (be.isAlive()) { - availableBes.add(be); + if (be.isDecommissioned()) { + decommissionAvailBes.add(be); + } else { + availableBes.add(be); + } } } - if (availableBes.isEmpty()) { - LOG.warn("failed to get available be, clusterId: {}", cluster); + availableBes = decommissionAvailBes; + } + if (availableBes.isEmpty()) { + LOG.warn("failed to get available backend due to all backend dead, cluster: {}-{}", clusterName, clusterId); throw new ComputeGroupException( String.format("All the Backend nodes in the current compute group %s are in an abnormal state", - clusterName), + clusterName), ComputeGroupException.FailedTypeEnum.COMPUTE_GROUPS_NO_ALIVE_BE); } + GroupId groupId = Env.getCurrentColocateIndex().getGroup(tableId); + HashCode hashCode = Hashing.murmur3_128().hashLong(groupId.grpId); + if (availableBes.size() != bes.size()) { + // some be is dead recently, still hash tablets on all backends. + long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L; + if (bes.stream().anyMatch(be -> !be.isAlive() && be.getLastUpdateMs() > needRehashDeadTime)) { + List beAliveOrDeadShort = bes.stream() + .filter(be -> be.isAlive() || be.getLastUpdateMs() > needRehashDeadTime) + .collect(Collectors.toList()); + long index = getIndexByBeNum(hashCode.asLong() + idx, beAliveOrDeadShort.size()); + Backend be = beAliveOrDeadShort.get((int) index); + if (be.isAlive() && !be.isDecommissioned()) { + return be.getId(); + } + } + } + // Tablets with the same idx will be hashed to the same BE, which // meets the requirements of colocated table. - long index = idx % availableBes.size(); + long index = getIndexByBeNum(hashCode.asLong() + idx, availableBes.size()); long pickedBeId = availableBes.get((int) index).getId(); return pickedBeId; } + @Override + public long getBackendId() throws ComputeGroupException { + return getBackendIdImpl(getCurrentClusterId()); + } + public long getBackendId(String beEndpoint) { - String cluster = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterIdByBeAddr(beEndpoint); + String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getClusterNameByBeAddr(beEndpoint); try { - return getBackendIdImpl(cluster); + String clusterId = getCloudClusterIdByName(clusterName); + return getBackendIdImpl(clusterId); } catch (ComputeGroupException e) { - LOG.warn("failed to get compute group name {}", cluster, e); + LOG.warn("failed to get compute group name {}", clusterName, e); return -1; } } - @Override - public long getBackendId() throws ComputeGroupException { - String cluster = null; + public long getPrimaryBackendId() { + String clusterId; + try { + clusterId = getCurrentClusterId(); + } catch (ComputeGroupException e) { + return -1L; + } + + if (Strings.isNullOrEmpty(clusterId)) { + return -1L; + } + + return getClusterPrimaryBackendId(clusterId); + } + + public long getClusterPrimaryBackendId(String clusterId) { + if (isColocated()) { + try { + return getColocatedBeId(clusterId); + } catch (ComputeGroupException e) { + return -1L; + } + } + + List backendIds = primaryClusterToBackends.get(clusterId); + if (backendIds != null && !backendIds.isEmpty()) { + return backendIds.get(0); + } + + return -1L; + } + + private String getCurrentClusterId() throws ComputeGroupException { // Not in a connect session + String cluster = null; ConnectContext context = ConnectContext.get(); if (context != null) { if (!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) { @@ -147,7 +215,7 @@ public long getBackendId() throws ComputeGroupException { } catch (Exception e) { LOG.warn("get compute group by session context exception"); throw new ComputeGroupException(String.format("default compute group %s check auth failed", - cluster), + cluster), ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP); } if (LOG.isDebugEnabled()) { @@ -176,10 +244,11 @@ public long getBackendId() throws ComputeGroupException { throw new ComputeGroupException("connect context not set", ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET); } - return getBackendIdImpl(cluster); + + return getCloudClusterIdByName(cluster); } - private long getBackendIdImpl(String cluster) throws ComputeGroupException { + private String getCloudClusterIdByName(String cluster) throws ComputeGroupException { // if cluster is SUSPENDED, wait String wakeUPCluster = ""; try { @@ -209,7 +278,13 @@ private long getBackendIdImpl(String cluster) throws ComputeGroupException { ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_NOT_EXIST); } - String clusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster); + return ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster); + } + + private long getBackendIdImpl(String clusterId) throws ComputeGroupException { + if (Strings.isNullOrEmpty(clusterId)) { + return -1L; + } if (isColocated()) { return getColocatedBeId(clusterId); @@ -254,16 +329,16 @@ private long getBackendIdImpl(String cluster) throws ComputeGroupException { } // use primaryClusterToBackends, if find be normal - long pickBeId = getAvaliableBeId(clusterId, primaryClusterToBackends); - if (pickBeId != -1) { - return pickBeId; + Backend be = getPrimaryBackend(clusterId); + if (be != null && be.isQueryAvailable()) { + return be.getId(); } if (!Config.enable_immediate_be_assign) { // use secondaryClusterToBackends, if find be normal - pickBeId = getAvaliableBeId(clusterId, secondaryClusterToBackends); - if (pickBeId != -1) { - return pickBeId; + be = getSecondaryBackend(clusterId); + if (be != null && be.isQueryAvailable()) { + return be.getId(); } } @@ -273,27 +348,32 @@ private long getBackendIdImpl(String cluster) throws ComputeGroupException { } // be abnormal, rehash it. configure settings to different maps - pickBeId = hashReplicaToBe(clusterId, false); - updateClusterToBe(clusterId, pickBeId, Config.enable_immediate_be_assign); + long pickBeId = hashReplicaToBe(clusterId, false); + if (Config.enable_immediate_be_assign) { + updateClusterToPrimaryBe(clusterId, pickBeId); + } else { + updateClusterToSecondaryBe(clusterId, pickBeId); + } return pickBeId; } - private long getAvaliableBeId(String clusterId, Map> clusterToBackends) { - List backendIds = clusterToBackends.get(clusterId); + public Backend getPrimaryBackend(String clusterId) { + long beId = getClusterPrimaryBackendId(clusterId); + if (beId != -1L) { + return Env.getCurrentSystemInfo().getBackend(beId); + } else { + return null; + } + } + + public Backend getSecondaryBackend(String clusterId) { + List backendIds = secondaryClusterToBackends.get(clusterId); if (backendIds == null || backendIds.isEmpty()) { - return -1; + return null; } long backendId = backendIds.get(0); - Backend be = Env.getCurrentSystemInfo().getBackend(backendId); - if (be != null && be.isQueryAvailable()) { - // be normal - if (LOG.isDebugEnabled()) { - LOG.debug("backendId={} ", backendId); - } - return backendId; - } - return -1; + return Env.getCurrentSystemInfo().getBackend(backendId); } public long hashReplicaToBe(String clusterId, boolean isBackGround) throws ComputeGroupException { @@ -308,15 +388,23 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) throws Compu } // use alive be to exec sql List availableBes = new ArrayList<>(); + List decommissionAvailBes = new ArrayList<>(); for (Backend be : clusterBes) { long lastUpdateMs = be.getLastUpdateMs(); long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis()); // be core or restart must in heartbeat_interval_second if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L) && !be.isSmoothUpgradeSrc()) { - availableBes.add(be); + if (be.isDecommissioned()) { + decommissionAvailBes.add(be); + } else { + availableBes.add(be); + } } } + if (availableBes.isEmpty()) { + availableBes = decommissionAvailBes; + } if (availableBes.isEmpty()) { if (!isBackGround) { LOG.warn("failed to get available be, clusterId: {}", clusterId); @@ -335,11 +423,7 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) throws Compu index = getId() % availableBes.size(); } else { hashCode = Hashing.murmur3_128().hashLong(partitionId); - int beNum = availableBes.size(); - // (hashCode.asLong() + idx) % beNum may be a negative value, so we - // need to take the modulus of beNum again to ensure that index is - // a positive value - index = ((hashCode.asLong() + idx) % beNum + beNum) % beNum; + index = getIndexByBeNum(hashCode.asLong() + idx, availableBes.size()); } long pickedBeId = availableBes.get((int) index).getId(); LOG.info("picked beId {}, replicaId {}, partitionId {}, beNum {}, replicaIdx {}, picked Index {}, hashVal {}", @@ -349,6 +433,13 @@ pickedBeId, getId(), partitionId, availableBes.size(), idx, index, return pickedBeId; } + private long getIndexByBeNum(long hashValue, int beNum) { + // hashValue may be a negative value, so we + // need to take the modulus of beNum again to ensure + // that result is a positive value + return (hashValue % beNum + beNum) % beNum; + } + public List hashReplicaToBes(String clusterId, boolean isBackGround, int replicaNum) throws ComputeGroupException { // TODO(luwei) list should be sorted @@ -362,15 +453,23 @@ public List hashReplicaToBes(String clusterId, boolean isBackGround, int r } // use alive be to exec sql List availableBes = new ArrayList<>(); + List decommissionAvailBes = new ArrayList<>(); for (Backend be : clusterBes) { long lastUpdateMs = be.getLastUpdateMs(); long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis()); // be core or restart must in heartbeat_interval_second if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L) && !be.isSmoothUpgradeSrc()) { - availableBes.add(be); + if (be.isDecommissioned()) { + decommissionAvailBes.add(be); + } else { + availableBes.add(be); + } } } + if (availableBes.isEmpty()) { + availableBes = decommissionAvailBes; + } if (availableBes.isEmpty()) { if (!isBackGround) { LOG.warn("failed to get available be, clusterId: {}", clusterId); @@ -393,11 +492,7 @@ public List hashReplicaToBes(String clusterId, boolean isBackGround, int r index = getId() % availableBes.size(); } else { hashCode = Hashing.murmur3_128().hashLong(partitionId + i); - int beNum = availableBes.size(); - // (hashCode.asLong() + idx) % beNum may be a negative value, so we - // need to take the modulus of beNum again to ensure that index is - // a positive value - index = ((hashCode.asLong() + idx) % beNum + beNum) % beNum; + index = getIndexByBeNum(hashCode.asLong() + idx, availableBes.size()); } long pickedBeId = availableBes.get((int) index).getId(); availableBes.remove((int) index); @@ -480,19 +575,17 @@ public long getIdx() { return idx; } - public Map> getprimaryClusterToBackends() { - return primaryClusterToBackends; + public void updateClusterToPrimaryBe(String cluster, long beId) { + primaryClusterToBackends.put(cluster, Lists.newArrayList(beId)); + secondaryClusterToBackends.remove(cluster); } - // save to primaryClusterToBackends or secondaryClusterToBackends map - public void updateClusterToBe(String cluster, long beId, boolean isUpdatePrimary) { - // write lock - List bes = new ArrayList(); - bes.add(beId); - if (isUpdatePrimary) { - primaryClusterToBackends.put(cluster, bes); - } else { - secondaryClusterToBackends.put(cluster, bes); - } + private void updateClusterToSecondaryBe(String cluster, long beId) { + secondaryClusterToBackends.put(cluster, Lists.newArrayList(beId)); + } + + public void clearClusterToBe(String cluster) { + primaryClusterToBackends.remove(cluster); + secondaryClusterToBackends.remove(cluster); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 051a7a3da62c87..dccce2c3dcb5f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -47,6 +47,7 @@ import org.apache.doris.thrift.TWarmUpCacheAsyncResponse; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -69,14 +70,15 @@ public class CloudTabletRebalancer extends MasterDaemon { private volatile ConcurrentHashMap> beToTabletsGlobal = new ConcurrentHashMap>(); + private volatile ConcurrentHashMap> beToColocateTabletsGlobal = + new ConcurrentHashMap>(); + private Map> futureBeToTabletsGlobal; private Map> clusterToBes; private Set allBes; - private List replicaInfos; - // partitionId -> indexId -> be -> tablet private Map>>> partitionToTablets; @@ -99,8 +101,6 @@ public class CloudTabletRebalancer extends MasterDaemon { private Map tabletToInfightTask = new HashMap<>(); - private long assignedErrNum = 0; - private CloudSystemInfoService cloudSystemInfoService; public CloudTabletRebalancer(CloudSystemInfoService cloudSystemInfoService) { @@ -165,14 +165,30 @@ private class TransferPairInfo { } public Set getSnapshotTabletsByBeId(Long beId) { - Set snapshotTablets = new HashSet(); - if (beToTabletsGlobal == null || !beToTabletsGlobal.containsKey(beId)) { - LOG.warn("beToTabletsGlobal null or not contain beId {}", beId); - return snapshotTablets; + Set tabletIds = Sets.newHashSet(); + List tablets = beToTabletsGlobal.get(beId); + if (tablets != null) { + for (Tablet tablet : tablets) { + tabletIds.add(tablet.getId()); + } } - beToTabletsGlobal.get(beId).forEach(tablet -> snapshotTablets.add(tablet.getId())); - return snapshotTablets; + tablets = beToColocateTabletsGlobal.get(beId); + if (tablets != null) { + for (Tablet tablet : tablets) { + tabletIds.add(tablet.getId()); + } + } + + return tabletIds; + } + + public int getTabletNumByBackendId(long beId) { + List tablets = beToTabletsGlobal.get(beId); + List colocateTablets = beToColocateTabletsGlobal.get(beId); + + return (tablets == null ? 0 : tablets.size()) + + (colocateTablets == null ? 0 : colocateTablets.size()); } // 1 build cluster to backends info @@ -211,14 +227,7 @@ protected void runAfterCatalogReady() { LOG.info("cluster to backends {}", clusterToBes); // 2 complete route info - replicaInfos = new ArrayList(); - completeRouteInfo(); - LOG.info("collect to editlog route {} infos", replicaInfos.size()); - try { - Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(replicaInfos); - } catch (Exception e) { - LOG.warn("failed to update cloud replicas", e); - // edit log failed, try next time + if (!completeRouteInfo()) { return; } @@ -381,10 +390,11 @@ public void checkInflghtWarmUpCacheAsync() { } List infos = new ArrayList<>(); + long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L; for (Map.Entry> entry : beToInfightTasks.entrySet()) { LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey()); - if (destBackend == null) { + if (destBackend == null || (!destBackend.isAlive() && destBackend.getLastUpdateMs() < needRehashDeadTime)) { for (InfightTask task : entry.getValue()) { for (InfightTablet key : tabletToInfightTask.keySet()) { tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId)); @@ -392,6 +402,9 @@ public void checkInflghtWarmUpCacheAsync() { } continue; } + if (!destBackend.isAlive()) { + continue; + } List tablets = entry.getValue().stream() .map(task -> task.pickedTablet.getId()).collect(Collectors.toList()); Map taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey()); @@ -440,9 +453,9 @@ public void checkInflghtWarmUpCacheAsync() { public void checkDecommissionState(Map> clusterToBes) { for (Map.Entry> entry : clusterToBes.entrySet()) { List beList = entry.getValue(); - long tabletNum = 0L; for (long beId : beList) { - tabletNum = beToTabletsGlobal.get(beId) == null ? 0 : beToTabletsGlobal.get(beId).size(); + List tablets = beToTabletsGlobal.get(beId); + int tabletNum = tablets == null ? 0 : tablets.size(); Backend backend = cloudSystemInfoService.getBackend(beId); if (backend == null) { LOG.info("backend {} not found", beId); @@ -492,53 +505,91 @@ public void checkDecommissionState(Map> clusterToBes) { } } - private void completeRouteInfo() { - assignedErrNum = 0L; + private boolean completeRouteInfo() { + List updateReplicaInfos = new ArrayList(); + long[] assignedErrNum = {0L}; + long needRehashDeadTime = System.currentTimeMillis() - Config.rehash_tablet_after_be_dead_seconds * 1000L; loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> { boolean assigned = false; List beIds = new ArrayList(); List tabletIds = new ArrayList(); + boolean isColocated = Env.getCurrentColocateIndex().isColocateTable(table.getId()); for (Tablet tablet : index.getTablets()) { - for (Replica replica : tablet.getReplicas()) { - Map> primaryClusterToBackends = - ((CloudReplica) replica).getprimaryClusterToBackends(); - if (!primaryClusterToBackends.containsKey(cluster)) { - long beId; - try { - beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true); - } catch (ComputeGroupException e) { - LOG.warn("failed to hash replica to be {}", cluster, e); - beId = -1; - } - if (beId <= 0) { - assignedErrNum++; - continue; - } - ((CloudReplica) replica).updateClusterToBe(cluster, beId, true); - List bes = new ArrayList(); - bes.add(beId); - primaryClusterToBackends.put(cluster, bes); + for (Replica r : tablet.getReplicas()) { + CloudReplica replica = (CloudReplica) r; + InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster); + // colocate table no need to update primary backends + if (isColocated) { + replica.clearClusterToBe(cluster); + tabletToInfightTask.remove(taskKey); + continue; + } - assigned = true; - beIds.add(beId); + // primary backend is alive or dead not long + Backend be = replica.getPrimaryBackend(cluster); + if (be != null && (be.isQueryAvailable() + || (!be.isQueryDisabled() && be.getLastUpdateMs() > needRehashDeadTime))) { + beIds.add(be.getId()); tabletIds.add(tablet.getId()); + continue; + } + + // primary backend not available too long, change one + long beId = -1L; + be = replica.getSecondaryBackend(cluster); + if (be != null && be.isQueryAvailable()) { + beId = be.getId(); } else { - beIds.add(primaryClusterToBackends.get(cluster).get(0)); - tabletIds.add(tablet.getId()); + InfightTask task = tabletToInfightTask.get(taskKey); + be = task == null ? null : Env.getCurrentSystemInfo().getBackend(task.destBe); + if (be != null && be.isQueryAvailable()) { + beId = be.getId(); + } else { + try { + beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true); + } catch (ComputeGroupException e) { + LOG.warn("failed to hash replica to be {}", cluster, e); + beId = -1; + } + } + } + + if (beId <= 0) { + assignedErrNum[0]++; + continue; } + + tabletToInfightTask.remove(taskKey); + + ((CloudReplica) replica).updateClusterToPrimaryBe(cluster, beId); + beIds.add(beId); + tabletIds.add(tablet.getId()); + assigned = true; } } if (assigned) { UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(db.getId(), table.getId(), partition.getId(), index.getId(), cluster, beIds, tabletIds); - replicaInfos.add(info); + updateReplicaInfos.add(info); } }); - if (assignedErrNum > 0) { - LOG.warn("completeRouteInfo error num {}", assignedErrNum); + LOG.info("collect to editlog route {} infos, error num {}", updateReplicaInfos.size(), assignedErrNum[0]); + + if (updateReplicaInfos.isEmpty()) { + return true; } + + try { + Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(updateReplicaInfos); + } catch (Exception e) { + LOG.warn("failed to update cloud replicas", e); + // edit log failed, try next time + return false; + } + + return true; } public void fillBeToTablets(long be, long tableId, long partId, long indexId, Tablet tablet, @@ -566,6 +617,9 @@ public void fillBeToTablets(long be, long tableId, long partId, long indexId, Ta public void statRouteInfo() { ConcurrentHashMap> tmpBeToTabletsGlobal = new ConcurrentHashMap>(); + ConcurrentHashMap> tmpBeToColocateTabletsGlobal + = new ConcurrentHashMap>(); + futureBeToTabletsGlobal = new HashMap>(); partitionToTablets = new HashMap>>>(); @@ -575,39 +629,48 @@ public void statRouteInfo() { futureBeToTabletsInTable = new HashMap>>(); loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> { + boolean isColocated = Env.getCurrentColocateIndex().isColocateTable(table.getId()); for (Tablet tablet : index.getTablets()) { - for (Replica replica : tablet.getReplicas()) { - Map> primaryClusterToBackends = - ((CloudReplica) replica).getprimaryClusterToBackends(); - for (Map.Entry> entry : primaryClusterToBackends.entrySet()) { - if (!cluster.equals(entry.getKey())) { + for (Replica r : tablet.getReplicas()) { + CloudReplica replica = (CloudReplica) r; + if (isColocated) { + long beId = -1L; + try { + beId = replica.getColocatedBeId(cluster); + } catch (ComputeGroupException e) { continue; } - - List bes = entry.getValue(); - if (!allBes.contains(bes.get(0))) { - continue; + if (allBes.contains(beId)) { + List colocateTablets = tmpBeToColocateTabletsGlobal.get(beId); + if (colocateTablets == null) { + colocateTablets = new ArrayList(); + tmpBeToColocateTabletsGlobal.put(beId, colocateTablets); + } + colocateTablets.add(tablet); } + continue; + } - fillBeToTablets(bes.get(0), table.getId(), partition.getId(), index.getId(), tablet, - tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets); + Backend be = replica.getPrimaryBackend(cluster); + long beId = be == null ? -1L : be.getId(); + if (!allBes.contains(beId)) { + continue; + } - InfightTask task = tabletToInfightTask - .getOrDefault(new InfightTablet(tablet.getId(), cluster), null); + InfightTablet taskKey = new InfightTablet(tablet.getId(), cluster); + InfightTask task = tabletToInfightTask.get(taskKey); + long futureBeId = task == null ? beId : task.destBe; + fillBeToTablets(beId, table.getId(), partition.getId(), index.getId(), tablet, + tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets); - if (task != null) { - fillBeToTablets(task.destBe, table.getId(), partition.getId(), index.getId(), tablet, - futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); - } else { - fillBeToTablets(bes.get(0), table.getId(), partition.getId(), index.getId(), tablet, - futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); - } - } + fillBeToTablets(futureBeId, table.getId(), partition.getId(), index.getId(), tablet, + futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); } } }); beToTabletsGlobal = tmpBeToTabletsGlobal; + beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal; } public void loopCloudReplica(Operator operator) { @@ -734,7 +797,6 @@ private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, Bal private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId, List infos) { CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); - cloudReplica.updateClusterToBe(clusterId, destBe, true); Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId()); if (db == null) { return; @@ -751,6 +813,7 @@ private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clust return; } + cloudReplica.updateClusterToPrimaryBe(clusterId, destBe); UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(), cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(), pickedTablet.getId(), cloudReplica.getId(), clusterId, destBe); @@ -901,7 +964,9 @@ private void balanceImpl(List bes, String clusterId, Map tablets = new ArrayList<>(); - if (!beToTabletsGlobal.containsKey(srcBe)) { - LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe); - ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe); - return; - } - tablets = beToTabletsGlobal.get(srcBe); - if (tablets.isEmpty()) { + List tablets = beToTabletsGlobal.get(srcBe); + if (tablets == null || tablets.isEmpty()) { LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe); ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe); return; @@ -975,13 +1034,6 @@ private void migrateTablets(Long srcBe, Long dstBe) { LOG.info("src backend {} not found", srcBe); continue; } - String clusterId = be.getCloudClusterId(); - String clusterName = be.getCloudClusterName(); - // update replica location info - cloudReplica.updateClusterToBe(clusterId, dstBe, true); - LOG.info("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}", - tablet.getId(), srcBe, dstBe, clusterId, clusterName); - // populate to followers Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId()); if (db == null) { @@ -1001,11 +1053,16 @@ private void migrateTablets(Long srcBe, Long dstBe) { continue; } + String clusterId = be.getCloudClusterId(); + String clusterName = be.getCloudClusterName(); + table.readLock(); try { if (db.getTableNullable(cloudReplica.getTableId()) == null) { continue; } + // update replica location info + cloudReplica.updateClusterToPrimaryBe(clusterId, dstBe); UpdateCloudReplicaInfo info = new UpdateCloudReplicaInfo(cloudReplica.getDbId(), cloudReplica.getTableId(), cloudReplica.getPartitionId(), cloudReplica.getIndexId(), tablet.getId(), cloudReplica.getId(), clusterId, dstBe); @@ -1013,6 +1070,9 @@ private void migrateTablets(Long srcBe, Long dstBe) { } finally { table.readUnlock(); } + + LOG.info("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}", + tablet.getId(), srcBe, dstBe, clusterId, clusterName); } long oldSize = infos.size(); infos = batchUpdateCloudReplicaInfoEditlogs(infos); @@ -1091,5 +1151,6 @@ private List batchUpdateCloudReplicaInfoEditlogs(List List + // pls make sure each cluster's backend list is sorted by backendId protected Map> clusterIdToBackend = new ConcurrentHashMap<>(); // clusterName -> clusterId protected Map clusterNameToId = new ConcurrentHashMap<>(); @@ -243,9 +246,12 @@ public void updateCloudClusterMapNoLock(List toAdd, List toDel clusterName, clusterId, be.size(), b); continue; } - be.add(b); + List sortBackends = Lists.newArrayList(be); + sortBackends.add(b); + Collections.sort(sortBackends, Comparator.comparing(Backend::getId)); + clusterIdToBackend.put(clusterId, sortBackends); LOG.info("update (add) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}", - clusterName, clusterId, be.size(), b); + clusterName, clusterId, sortBackends.size(), sortBackends); } for (Backend b : toDel) { @@ -508,6 +514,13 @@ public int getMinPipelineExecutorSize() { return super.getMinPipelineExecutorSize(); } + @Override + public int getTabletNumByBackendId(long beId) { + return ((CloudEnv) Env.getCurrentEnv()) + .getCloudTabletRebalancer() + .getTabletNumByBackendId(beId); + } + @Override public ImmutableMap getBackendsByCurrentCluster() throws AnalysisException { ConnectContext ctx = ConnectContext.get(); @@ -557,7 +570,7 @@ public List getBackendsByClusterId(final String clusterId) { } } - public String getClusterIdByBeAddr(String beEndpoint) { + public String getClusterNameByBeAddr(String beEndpoint) { rlock.lock(); try { for (Map.Entry> idBe : clusterIdToBackend.entrySet()) { @@ -629,6 +642,10 @@ public void updateClusterNameToId(final String newName, } } + public String getCloudClusterIdByName(String clusterName) { + return clusterNameToId.get(clusterName); + } + public String getClusterNameByClusterId(final String clusterId) { rlock.lock(); try { @@ -766,15 +783,6 @@ public Map> getCloudClusterIdToBackend() { } } - public String getCloudClusterIdByName(String clusterName) { - rlock.lock(); - try { - return clusterNameToId.get(clusterName); - } finally { - rlock.unlock(); - } - } - public ImmutableMap getCloudIdToBackend(String clusterName) { rlock.lock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java index d029a8c2383c2c..c5273304137d7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java @@ -103,7 +103,7 @@ public static List> getBackendInfos() { } watch.start(); - Integer tabletNum = Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); + Integer tabletNum = systemInfoService.getTabletNumByBackendId(backendId); watch.stop(); List backendInfo = Lists.newArrayList(); backendInfo.add(String.valueOf(backendId)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DiagnoseClusterBalanceProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DiagnoseClusterBalanceProcDir.java index c848b369369f78..31db0455aeda41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DiagnoseClusterBalanceProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DiagnoseClusterBalanceProcDir.java @@ -19,7 +19,6 @@ import org.apache.doris.catalog.ColocateTableIndex.GroupId; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.BackendLoadStatistic; import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.clone.LoadStatisticForTag; @@ -140,10 +139,9 @@ private DiagnoseItem diagnoseBaseBalance(boolean schedReady, boolean schedRecent .collect(Collectors.toList()); boolean isPartitionBal = Config.tablet_rebalancer_type.equalsIgnoreCase("partition"); if (isPartitionBal) { - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); baseBalance.name = "Partition Balance"; List tabletNums = availableBeIds.stream() - .map(beId -> invertedIndex.getTabletNumByBackendId(beId)) + .map(beId -> infoService.getTabletNumByBackendId(beId)) .collect(Collectors.toList()); int minTabletNum = tabletNums.stream().mapToInt(v -> v).min().orElse(0); int maxTabletNum = tabletNums.stream().mapToInt(v -> v).max().orElse(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index dabd3c28cc2e2a..cba5432cc14c80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.cloud.catalog.CloudReplica; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.util.NetUtils; @@ -41,13 +42,23 @@ * show replicas' detail info within a tablet */ public class ReplicasProcNode implements ProcNodeInterface { - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("ReplicaId") - .add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") - .add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad") - .add("IsUserDrop") - .add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path") - .add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId") - .add("CooldownMetaId").add("QueryHits").build(); + public static final ImmutableList TITLE_NAMES; + + static { + ImmutableList.Builder builder = new ImmutableList.Builder().add("ReplicaId") + .add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") + .add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad") + .add("IsUserDrop") + .add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path") + .add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId") + .add("CooldownMetaId").add("QueryHits"); + + if (Config.isCloudMode()) { + builder.add("PrimaryBackendId"); + } + + TITLE_NAMES = builder.build(); + } private long tabletId; private List replicas; @@ -105,28 +116,32 @@ public ProcResult fetchResult() throws AnalysisException { if (Config.enable_query_hit_stats) { queryHits = QueryStatsUtil.getMergedReplicaStats(replica.getId()); } - result.addRow(Arrays.asList(String.valueOf(replica.getId()), - String.valueOf(replica.getBackendIdWithoutException()), - String.valueOf(replica.getVersion()), - String.valueOf(replica.getLastSuccessVersion()), - String.valueOf(replica.getLastFailedVersion()), - TimeUtils.longToTimeString(replica.getLastFailedTimestamp()), - String.valueOf(replica.getSchemaHash()), - String.valueOf(replica.getDataSize()), - String.valueOf(replica.getRemoteDataSize()), - String.valueOf(replica.getRowCount()), - String.valueOf(replica.getState()), - String.valueOf(replica.isBad()), - String.valueOf(replica.isUserDrop()), - String.valueOf(replica.getVisibleVersionCount()), - String.valueOf(replica.getTotalVersionCount()), - String.valueOf(replica.getPathHash()), - path, - metaUrl, - compactionUrl, - String.valueOf(tablet.getCooldownConf().first), - cooldownMetaId, - String.valueOf(queryHits))); + List replicaInfo = Arrays.asList(String.valueOf(replica.getId()), + String.valueOf(replica.getBackendIdWithoutException()), + String.valueOf(replica.getVersion()), + String.valueOf(replica.getLastSuccessVersion()), + String.valueOf(replica.getLastFailedVersion()), + TimeUtils.longToTimeString(replica.getLastFailedTimestamp()), + String.valueOf(replica.getSchemaHash()), + String.valueOf(replica.getDataSize()), + String.valueOf(replica.getRemoteDataSize()), + String.valueOf(replica.getRowCount()), + String.valueOf(replica.getState()), + String.valueOf(replica.isBad()), + String.valueOf(replica.isUserDrop()), + String.valueOf(replica.getVisibleVersionCount()), + String.valueOf(replica.getTotalVersionCount()), + String.valueOf(replica.getPathHash()), + path, + metaUrl, + compactionUrl, + String.valueOf(tablet.getCooldownConf().first), + cooldownMetaId, + String.valueOf(queryHits)); + if (Config.isCloudMode()) { + replicaInfo.add(String.valueOf(((CloudReplica) replica).getPrimaryBackendId())); + } + result.addRow(replicaInfo); } return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index a7591af19cdfa9..8eca5f84faa0b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; +import org.apache.doris.cloud.catalog.CloudReplica; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; @@ -47,15 +48,24 @@ * show tablets' detail info within an index */ public class TabletsProcDir implements ProcDirInterface { - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("TabletId").add("ReplicaId").add("BackendId").add("SchemaHash").add("Version") - .add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") - .add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State") - .add("LstConsistencyCheckTime").add("CheckVersion") - .add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path") - .add("MetaUrl").add("CompactionStatus") - .add("CooldownReplicaId").add("CooldownMetaId") - .build(); + public static final ImmutableList TITLE_NAMES; + + static { + ImmutableList.Builder builder = new ImmutableList.Builder() + .add("TabletId").add("ReplicaId").add("BackendId").add("SchemaHash").add("Version") + .add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") + .add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State") + .add("LstConsistencyCheckTime").add("CheckVersion") + .add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path") + .add("MetaUrl").add("CompactionStatus") + .add("CooldownReplicaId").add("CooldownMetaId"); + + if (Config.isCloudMode()) { + builder.add("PrimaryBackendId"); + } + + TITLE_NAMES = builder.build(); + } private Table table; private MaterializedIndex index; @@ -124,6 +134,9 @@ public List> fetchComparableResult(long version, long backendId tabletInfo.add(FeConstants.null_string); // compaction status tabletInfo.add(-1); // cooldown replica id tabletInfo.add(""); // cooldown meta id + if (Config.isCloudMode()) { + tabletInfo.add(-1L); // primary backend id + } tabletInfos.add(tabletInfo); } else { @@ -170,6 +183,9 @@ public List> fetchComparableResult(long version, long backendId } else { tabletInfo.add(replica.getCooldownMetaId().toString()); } + if (Config.isCloudMode()) { + tabletInfo.add(((CloudReplica) replica).getPrimaryBackendId()); + } tabletInfos.add(tabletInfo); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index 70da86cae4f8ab..5e8748db9fd17b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -20,7 +20,6 @@ import org.apache.doris.alter.Alter; import org.apache.doris.alter.AlterJobV2.JobType; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; @@ -646,7 +645,6 @@ public static void generateBackendsTabletMetrics() { DORIS_METRIC_REGISTER.removeMetrics(TABLET_MAX_COMPACTION_SCORE); SystemInfoService infoService = Env.getCurrentSystemInfo(); - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Long beId : infoService.getAllBackendIds(false)) { Backend be = infoService.getBackend(beId); @@ -661,7 +659,7 @@ public Long getValue() { if (!Env.getCurrentEnv().isMaster()) { return 0L; } - return (long) invertedIndex.getTabletNumByBackendId(beId); + return (long) infoService.getTabletNumByBackendId(beId); } }; tabletNum.addLabel(new MetricLabel("backend", diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 0e7dba0bba7659..0a252a1e98e56f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1268,13 +1268,28 @@ public void rollEditLog() { journal.rollJournal(); } - private synchronized void logEdit(short op, List entries) throws IOException { - JournalBatch batch = new JournalBatch(35); + // NOTICE: No guarantee atomicity of entries + private void logEdit(short op, List entries) throws IOException { + int itemNum = Math.max(1, Math.min(Config.batch_edit_log_max_item_num, entries.size())); + JournalBatch batch = new JournalBatch(itemNum); + long batchCount = 0; for (T entry : entries) { - // the number of batch entities to less than 32 and the batch data size to less than 640KB - if (batch.getJournalEntities().size() >= 32 || batch.getSize() >= 640 * 1024) { + if (batch.getJournalEntities().size() >= Config.batch_edit_log_max_item_num + || batch.getSize() >= Config.batch_edit_log_max_byte_size) { journal.write(batch); - batch = new JournalBatch(35); + batch = new JournalBatch(itemNum); + + // take a rest + batchCount++; + if (batchCount >= Config.batch_edit_log_continuous_count_for_rest + && Config.batch_edit_log_rest_time_ms > 0) { + batchCount = 0; + try { + Thread.sleep(Config.batch_edit_log_rest_time_ms); + } catch (InterruptedException e) { + LOG.warn("sleep failed", e); + } + } } batch.addJournal(op, entry); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 6126cc7a7021b7..d7ebef150e0186 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -1018,4 +1018,10 @@ public int getMinPipelineExecutorSize() { } return minPipelineExecutorSize; } + + // CloudSystemInfoService override + public int getTabletNumByBackendId(long beId) { + return Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 103c659a990645..ce2e564716f8c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -375,7 +375,7 @@ private static TFetchSchemaTableDataResult backendsMetadataResult(TMetadataTable } watch.start(); - Integer tabletNum = Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); + Integer tabletNum = systemInfoService.getTabletNumByBackendId(backendId); watch.stop(); TRow trow = new TRow(); diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 2db2c7c07490dd..da7259dfef5e8e 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -43,10 +43,14 @@ class ClusterOptions { int waitTimeout = 180 + // don't add whitespace in feConfigs items, + // for example, ' xx = yy ' is bad, should use 'xx=yy' List feConfigs = [ 'heartbeat_interval_second=5', ] + // don't add whitespace in beConfigs items, + // for example, ' xx = yy ' is bad, should use 'xx=yy' List beConfigs = [ 'report_disk_state_interval_seconds=2', 'report_random_wait=false', @@ -268,7 +272,7 @@ class SuiteCluster { final String name final Config config private boolean running - private boolean sqlModeNodeMgr = false; + private boolean sqlModeNodeMgr = false SuiteCluster(String name, Config config) { this.name = name @@ -327,7 +331,7 @@ class SuiteCluster { cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)] - sqlModeNodeMgr = options.sqlModeNodeMgr; + sqlModeNodeMgr = options.sqlModeNodeMgr runCmd(cmd.join(' '), -1) @@ -431,7 +435,7 @@ class SuiteCluster { def data = runCmd(cmd) assert data instanceof List def rows = (List>) data - logger.info("get all nodes {}", rows); + logger.info('get all nodes {}', rows) def header = new ListHeader(rows.get(0)) for (int i = 1; i < rows.size(); i++) { def row = (List) rows.get(i) diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy new file mode 100644 index 00000000000000..9ef469ec6fed1b --- /dev/null +++ b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite('test_fe_tablet_same_backend', 'multi_cluster,docker') { + def tbl1 = 'tbl_1_test_fe_tablet_same_backend' + def tbl2 = 'tbl_2_test_fe_tablet_same_backend' + def bucketNum = 6 + + def choseDeadBeIndex = 1 + + def getTabletBeIdsForEachFe = { tbl, isPrimaryBe -> + def result = [] + def frontends = cluster.getAllFrontends() + for (def fe : frontends) { + def feUrl = "jdbc:mysql://${fe.host}:${fe.queryPort}/?useLocalSessionState=false&allowLoadLocalInfile=false" + feUrl = context.config.buildUrlWithDb(feUrl, context.dbName) + connect('root', '', feUrl) { + sql 'SET forward_to_master=false' + sql "SELECT * FROM ${tbl}" + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tbl}" + result.add(tablets.collectEntries { + def tabletId = it.TabletId as long + def backendId = (isPrimaryBe ? it.PrimaryBackendId : it.BackendId) as long + [tabletId, backendId] + }) + } + } + return result + } + + def checkOneTable = { tbl, isColocateTbl, isAllBeAliveOrDeadLong, isAwaiting -> + def feToCurBeIds = getTabletBeIdsForEachFe(tbl, false) + def feToPrimaryBeIds = getTabletBeIdsForEachFe(tbl, true) + def succ = true + + logger.info('check table got: cur backends {}, primary backends {}, isColocateTbl {}, isAllBeAliveOrDeadLong {}', + feToCurBeIds, feToPrimaryBeIds, isColocateTbl, isAllBeAliveOrDeadLong) + + // check whether 3 frontends are consistent + for (def feToBeIds : [ feToCurBeIds, feToPrimaryBeIds ]) { + assertEquals(3, feToBeIds.size()) + for (def tablets : feToBeIds) { + assertEquals(bucketNum, tablets.size()) + } + for (def i = 1; i <= 2; i++) { + if (feToBeIds[0] != feToBeIds[i]) { + succ = false + if (!isAwaiting) { + assertEquals(feToBeIds[0], feToBeIds[i], + "3 fe inconsistent backends: 3 fe to backends ${feToCurBeIds}, isColocateTbl ${isColocateTbl}") + } + } + } + } + + // check whether primary be ids equals to current be ids, + def chosenBe = cluster.getBeByIndex(choseDeadBeIndex) + def primaryTabletNum = 0 + feToPrimaryBeIds[0].each { if (it.value == chosenBe.backendId) { primaryTabletNum++ } } + if (isColocateTbl) { + assertEquals(feToPrimaryBeIds[0], feToCurBeIds[0]) + assertEquals(chosenBe.alive ? 2 : 0, primaryTabletNum) + } else { + if (isAllBeAliveOrDeadLong) { + assertEquals(feToPrimaryBeIds[0], feToCurBeIds[0]) + } else { + assertNotEquals(feToPrimaryBeIds[0], feToCurBeIds[0]) + } + assertEquals(chosenBe.alive || !isAllBeAliveOrDeadLong ? 2 : 0, primaryTabletNum) + } + + def curTabletNum = 0 + feToCurBeIds[0].each { if (it.value == chosenBe.backendId) { curTabletNum++ } } + assertEquals(chosenBe.alive ? 2 : 0, curTabletNum) + + return succ + } + + def checkAllTableImpl = { isAllBeAliveOrDeadLong, isAwaiting -> + return checkOneTable(tbl1, false, isAllBeAliveOrDeadLong, isAwaiting) + && checkOneTable(tbl2, true, isAllBeAliveOrDeadLong, isAwaiting) + } + + def checkAllTable = { isAllBeAliveOrDeadLong -> + dockerAwaitUntil(30) { + checkAllTableImpl(isAllBeAliveOrDeadLong, true) + } + checkAllTableImpl(isAllBeAliveOrDeadLong, false) + } + + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'enable_cloud_warm_up_for_rebalance=true', + 'cloud_tablet_rebalancer_interval_second=1', + 'cloud_balance_tablet_percent_per_run=1.0', + ] + options.setFeNum(3) + options.setBeNum(3) + options.cloudMode = true + options.enableDebugPoints() + + docker(options) { + sql "ADMIN SET ALL FRONTENDS CONFIG ('rehash_tablet_after_be_dead_seconds' = '3600')" + + sql "CREATE TABLE ${tbl1} (a INT) DISTRIBUTED BY HASH(a) BUCKETS ${bucketNum}" + sql "CREATE TABLE ${tbl2} (a INT) DISTRIBUTED BY HASH(a) BUCKETS ${bucketNum} PROPERTIES ('colocate_with' = 'foo')" + sql "INSERT INTO ${tbl1} VALUES (1)" + sql "INSERT INTO ${tbl2} VALUES (1)" + + // all fe alive + checkAllTable(true) + + cluster.stopBackends(choseDeadBeIndex) + dockerAwaitUntil(10) { + def chosenBe = cluster.getBeByIndex(choseDeadBeIndex) + !chosenBe.alive + } + + // be-1 dead, but not dead for a long time + checkAllTable(false) + + sql "ADMIN SET ALL FRONTENDS CONFIG ('rehash_tablet_after_be_dead_seconds' = '1')" + + sleep(2 * 1000) + // be-1 dead, and dead for a long time + checkAllTable(true) + + def choseRestartFeIndex = cluster.getOneFollowerFe().index + cluster.stopFrontends(choseRestartFeIndex) + dockerAwaitUntil(10) { + def chosenFe = cluster.getFeByIndex(choseRestartFeIndex) + !chosenFe.alive + } + + cluster.startFrontends(choseRestartFeIndex) + dockerAwaitUntil(10) { + def chosenFe = cluster.getFeByIndex(choseRestartFeIndex) + chosenFe.alive + } + + def frontends = cluster.getAllFrontends() + for (def fe : frontends) { + def feUrl = "jdbc:mysql://${fe.host}:${fe.queryPort}/?useLocalSessionState=false&allowLoadLocalInfile=false" + feUrl = context.config.buildUrlWithDb(feUrl, context.dbName) + connect('root', '', feUrl) { + sql 'SET forward_to_master=false' + sql "ADMIN SET FRONTEND CONFIG ('rehash_tablet_after_be_dead_seconds' = '1')" + } + } + + // be-1 dead, and dead for a long time + checkAllTable(true) + } +} diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy index 0aa2e83ccc2bfd..c0b63827371d1c 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy +++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy @@ -16,9 +16,6 @@ // under the License. import org.apache.doris.regression.suite.ClusterOptions -import groovy.json.JsonSlurper -import org.awaitility.Awaitility; -import static java.util.concurrent.TimeUnit.SECONDS; suite('test_rebalance_in_cloud', 'multi_cluster') { if (!isCloudMode()) { diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy index 71ee67076b6917..b7de0c501618e8 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy +++ b/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy @@ -16,9 +16,6 @@ // under the License. import org.apache.doris.regression.suite.ClusterOptions -import groovy.json.JsonSlurper -import org.awaitility.Awaitility; -import static java.util.concurrent.TimeUnit.SECONDS; import org.codehaus.groovy.runtime.IOGroovyMethods suite('test_warmup_rebalance_in_cloud', 'multi_cluster, docker') { diff --git a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy index 77f7d05ff299c0..7405cb864d889d 100644 --- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy +++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy @@ -18,7 +18,7 @@ import org.apache.doris.regression.suite.ClusterOptions import groovy.json.JsonSlurper -suite('test_sql_mode_node_mgr', 'docker,p1') { +suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { if (!isCloudMode()) { return; } @@ -492,4 +492,4 @@ suite('test_sql_mode_node_mgr', 'docker,p1') { } } -} \ No newline at end of file +} diff --git a/regression-test/suites/demo_p0/docker_action.groovy b/regression-test/suites/demo_p0/docker_action.groovy index 6d62d6ea7bea8d..6b868b81ae9feb 100644 --- a/regression-test/suites/demo_p0/docker_action.groovy +++ b/regression-test/suites/demo_p0/docker_action.groovy @@ -17,7 +17,34 @@ import org.apache.doris.regression.suite.ClusterOptions -suite('docker_action') { +// Run docker suite steps: +// 1. Read 'docker/runtime/doris-compose/Readme.md', make sure you can setup a doris docker cluster; +// 2. update regression-conf-custom.groovy with config: +// image = "xxxx" // your doris docker image +// excludeDockerTest = false // do run docker suite, default is true +// dockerEndDeleteFiles = false // after run docker suite, whether delete contains's log and data in directory '/tmp/doris/' + +// When run docker suite, then no need an external doris cluster. +// But whether run a docker suite, need more check. +// Firstly, get the pipeline's run mode (cloud or not_cloud): +// If there's an external doris cluster, then fetch pipeline's runMode from it. +// If there's no external doris cluster, then set pipeline's runMode with command args. +// for example: sh run-regression-test.sh --run docker_action -runMode=cloud/not_cloud +// Secondly, compare ClusterOptions.cloudMode and pipeline's runMode +// If ClusterOptions.cloudMode = null then let ClusterOptions.cloudMode = pipeline's cloudMode, and run docker suite. +// if ClusterOptions.cloudMode = true or false, if cloudMode == pipeline's cloudMode or pipeline's cloudMode is unknown, +// then run docker suite, otherwise don't run docker suite. + +// NOTICE: +// 1. Need add 'docker' to suite's group, and don't add 'nonConcurrent' to it; +// 2. In docker closure: +// a. Don't use 'Awaitility.await()...until(f)', but use 'dockerAwaitUntil(..., f)'; +// 3. No need to use code ` if (isCloudMode()) { return } ` in docker suites, +// instead should use `ClusterOptions.cloudMode = true/false` is enough. +// Because when run docker suite without an external doris cluster, if suite use code `isCloudMode()`, it need specific -runMode=cloud/not_cloud. +// On the contrary, `ClusterOptions.cloudMode = true/false` no need specific -runMode=cloud/not_cloud when no external doris cluster exists. + +suite('docker_action', 'docker') { // run a new docker docker { sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10'''