Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ public class Config extends ConfigBase {
"The log roll size of BDBJE. When the number of log entries exceeds this value, the log will be rolled"})
public static int edit_log_roll_num = 50000;

@ConfField(mutable = true, masterOnly = true, description = {
"批量 BDBJE 日志包含的最大条目数", "The max number of log entries for batching BDBJE"})
public static int batch_edit_log_max_item_num = 100;

@ConfField(mutable = true, masterOnly = true, description = {
"批量 BDBJE 日志包含的最大长度", "The max size for batching BDBJE"})
public static long batch_edit_log_max_byte_size = 640 * 1024L;

@ConfField(mutable = true, masterOnly = true, description = {
"连续写多批 BDBJE 日志后的停顿时间", "The sleep time after writting multiple batching BDBJE continuously"})
public static long batch_edit_log_rest_time_ms = 10;

@ConfField(mutable = true, masterOnly = true, description = {
"连续写多批 BDBJE 日志后需要短暂停顿。这里最大的连写次数。",
"After writting multiple batching BDBJE continuously, need a short rest. "
+ "Indicates the writting count before a rest"})
public static long batch_edit_log_continuous_count_for_rest = 1000;

@ConfField(description = {"元数据同步的容忍延迟时间,单位为秒。如果元数据的延迟超过这个值,非主 FE 会停止提供服务",
"The toleration delay time of meta data synchronization, in seconds. "
+ "If the delay of meta data exceeds this value, non-master FE will stop offering service"})
Expand Down Expand Up @@ -3033,8 +3051,14 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, description = {"存算分离模式下是否开启大事务提交,默认false"})
public static boolean enable_cloud_txn_lazy_commit = false;

@ConfField(mutable = true, description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认true"})
public static boolean enable_immediate_be_assign = true;
@ConfField(mutable = true, masterOnly = true,
description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认false"})
public static boolean enable_immediate_be_assign = false;

@ConfField(mutable = true, masterOnly = false,
description = { "存算分离模式下,一个BE挂掉多长时间后,它的tablet彻底转移到其他BE上" })
public static int rehash_tablet_after_be_dead_seconds = 3600;


@ConfField(mutable = true, description = {"存算分离模式下是否启用自动启停功能,默认true",
"Whether to enable the automatic start-stop feature in cloud model, default is true."})
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ private void unprotectUpdateCloudReplica(OlapTable olapTable, UpdateCloudReplica
clusterId = realClusterId;
}

((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeId(), true);
((CloudReplica) replica).updateClusterToPrimaryBe(clusterId, info.getBeId());

LOG.debug("update single cloud replica cluster {} replica {} be {}", info.getClusterId(),
replica.getId(), info.getBeId());
Expand All @@ -1062,7 +1062,7 @@ private void unprotectUpdateCloudReplica(OlapTable olapTable, UpdateCloudReplica

LOG.debug("update cloud replica cluster {} replica {} be {}", info.getClusterId(),
replica.getId(), info.getBeIds().get(i));
((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeIds().get(i), true);
((CloudReplica) replica).updateClusterToPrimaryBe(clusterId, info.getBeIds().get(i));
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
Expand All @@ -80,6 +82,7 @@ public class CloudSystemInfoService extends SystemInfoService {

// for show cluster and cache user owned cluster
// clusterId -> List<Backend>
// pls make sure each cluster's backend list is sorted by backendId
protected Map<String, List<Backend>> clusterIdToBackend = new ConcurrentHashMap<>();
// clusterName -> clusterId
protected Map<String, String> clusterNameToId = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -243,9 +246,12 @@ public void updateCloudClusterMapNoLock(List<Backend> toAdd, List<Backend> toDel
clusterName, clusterId, be.size(), b);
continue;
}
be.add(b);
List<Backend> sortBackends = Lists.newArrayList(be);
sortBackends.add(b);
Collections.sort(sortBackends, Comparator.comparing(Backend::getId));
clusterIdToBackend.put(clusterId, sortBackends);
LOG.info("update (add) cloud cluster map, clusterName={} clusterId={} backendNum={} current backend={}",
clusterName, clusterId, be.size(), b);
clusterName, clusterId, sortBackends.size(), sortBackends);
}

for (Backend b : toDel) {
Expand Down Expand Up @@ -508,6 +514,13 @@ public int getMinPipelineExecutorSize() {
return super.getMinPipelineExecutorSize();
}

@Override
public int getTabletNumByBackendId(long beId) {
return ((CloudEnv) Env.getCurrentEnv())
.getCloudTabletRebalancer()
.getTabletNumByBackendId(beId);
}

@Override
public ImmutableMap<Long, Backend> getBackendsByCurrentCluster() throws AnalysisException {
ConnectContext ctx = ConnectContext.get();
Expand Down Expand Up @@ -557,7 +570,7 @@ public List<Backend> getBackendsByClusterId(final String clusterId) {
}
}

public String getClusterIdByBeAddr(String beEndpoint) {
public String getClusterNameByBeAddr(String beEndpoint) {
rlock.lock();
try {
for (Map.Entry<String, List<Backend>> idBe : clusterIdToBackend.entrySet()) {
Expand Down Expand Up @@ -629,6 +642,10 @@ public void updateClusterNameToId(final String newName,
}
}

public String getCloudClusterIdByName(String clusterName) {
return clusterNameToId.get(clusterName);
}

public String getClusterNameByClusterId(final String clusterId) {
rlock.lock();
try {
Expand Down Expand Up @@ -766,15 +783,6 @@ public Map<String, List<Backend>> getCloudClusterIdToBackend() {
}
}

public String getCloudClusterIdByName(String clusterName) {
rlock.lock();
try {
return clusterNameToId.get(clusterName);
} finally {
rlock.unlock();
}
}

public ImmutableMap<Long, Backend> getCloudIdToBackend(String clusterName) {
rlock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static List<List<String>> getBackendInfos() {
}

watch.start();
Integer tabletNum = Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
Integer tabletNum = systemInfoService.getTabletNumByBackendId(backendId);
watch.stop();
List<Comparable> backendInfo = Lists.newArrayList();
backendInfo.add(String.valueOf(backendId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.BackendLoadStatistic;
import org.apache.doris.clone.BackendLoadStatistic.Classification;
import org.apache.doris.clone.LoadStatisticForTag;
Expand Down Expand Up @@ -140,10 +139,9 @@ private DiagnoseItem diagnoseBaseBalance(boolean schedReady, boolean schedRecent
.collect(Collectors.toList());
boolean isPartitionBal = Config.tablet_rebalancer_type.equalsIgnoreCase("partition");
if (isPartitionBal) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
baseBalance.name = "Partition Balance";
List<Integer> tabletNums = availableBeIds.stream()
.map(beId -> invertedIndex.getTabletNumByBackendId(beId))
.map(beId -> infoService.getTabletNumByBackendId(beId))
.collect(Collectors.toList());
int minTabletNum = tabletNums.stream().mapToInt(v -> v).min().orElse(0);
int maxTabletNum = tabletNums.stream().mapToInt(v -> v).max().orElse(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.NetUtils;
Expand All @@ -41,13 +42,23 @@
* show replicas' detail info within a tablet
*/
public class ReplicasProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("ReplicaId")
.add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
.add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad")
.add("IsUserDrop")
.add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId")
.add("CooldownMetaId").add("QueryHits").build();
public static final ImmutableList<String> TITLE_NAMES;

static {
ImmutableList.Builder<String> builder = new ImmutableList.Builder<String>().add("ReplicaId")
.add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
.add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad")
.add("IsUserDrop")
.add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId")
.add("CooldownMetaId").add("QueryHits");

if (Config.isCloudMode()) {
builder.add("PrimaryBackendId");
}

TITLE_NAMES = builder.build();
}

private long tabletId;
private List<Replica> replicas;
Expand Down Expand Up @@ -105,28 +116,32 @@ public ProcResult fetchResult() throws AnalysisException {
if (Config.enable_query_hit_stats) {
queryHits = QueryStatsUtil.getMergedReplicaStats(replica.getId());
}
result.addRow(Arrays.asList(String.valueOf(replica.getId()),
String.valueOf(replica.getBackendIdWithoutException()),
String.valueOf(replica.getVersion()),
String.valueOf(replica.getLastSuccessVersion()),
String.valueOf(replica.getLastFailedVersion()),
TimeUtils.longToTimeString(replica.getLastFailedTimestamp()),
String.valueOf(replica.getSchemaHash()),
String.valueOf(replica.getDataSize()),
String.valueOf(replica.getRemoteDataSize()),
String.valueOf(replica.getRowCount()),
String.valueOf(replica.getState()),
String.valueOf(replica.isBad()),
String.valueOf(replica.isUserDrop()),
String.valueOf(replica.getVisibleVersionCount()),
String.valueOf(replica.getTotalVersionCount()),
String.valueOf(replica.getPathHash()),
path,
metaUrl,
compactionUrl,
String.valueOf(tablet.getCooldownConf().first),
cooldownMetaId,
String.valueOf(queryHits)));
List<String> replicaInfo = Arrays.asList(String.valueOf(replica.getId()),
String.valueOf(replica.getBackendIdWithoutException()),
String.valueOf(replica.getVersion()),
String.valueOf(replica.getLastSuccessVersion()),
String.valueOf(replica.getLastFailedVersion()),
TimeUtils.longToTimeString(replica.getLastFailedTimestamp()),
String.valueOf(replica.getSchemaHash()),
String.valueOf(replica.getDataSize()),
String.valueOf(replica.getRemoteDataSize()),
String.valueOf(replica.getRowCount()),
String.valueOf(replica.getState()),
String.valueOf(replica.isBad()),
String.valueOf(replica.isUserDrop()),
String.valueOf(replica.getVisibleVersionCount()),
String.valueOf(replica.getTotalVersionCount()),
String.valueOf(replica.getPathHash()),
path,
metaUrl,
compactionUrl,
String.valueOf(tablet.getCooldownConf().first),
cooldownMetaId,
String.valueOf(queryHits));
if (Config.isCloudMode()) {
replicaInfo.add(String.valueOf(((CloudReplica) replica).getPrimaryBackendId()));
}
result.addRow(replicaInfo);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
Expand All @@ -47,15 +48,24 @@
* show tablets' detail info within an index
*/
public class TabletsProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("TabletId").add("ReplicaId").add("BackendId").add("SchemaHash").add("Version")
.add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
.add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State")
.add("LstConsistencyCheckTime").add("CheckVersion")
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus")
.add("CooldownReplicaId").add("CooldownMetaId")
.build();
public static final ImmutableList<String> TITLE_NAMES;

static {
ImmutableList.Builder<String> builder = new ImmutableList.Builder<String>()
.add("TabletId").add("ReplicaId").add("BackendId").add("SchemaHash").add("Version")
.add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
.add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State")
.add("LstConsistencyCheckTime").add("CheckVersion")
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus")
.add("CooldownReplicaId").add("CooldownMetaId");

if (Config.isCloudMode()) {
builder.add("PrimaryBackendId");
}

TITLE_NAMES = builder.build();
}

private Table table;
private MaterializedIndex index;
Expand Down Expand Up @@ -124,6 +134,9 @@ public List<List<Comparable>> fetchComparableResult(long version, long backendId
tabletInfo.add(FeConstants.null_string); // compaction status
tabletInfo.add(-1); // cooldown replica id
tabletInfo.add(""); // cooldown meta id
if (Config.isCloudMode()) {
tabletInfo.add(-1L); // primary backend id
}

tabletInfos.add(tabletInfo);
} else {
Expand Down Expand Up @@ -170,6 +183,9 @@ public List<List<Comparable>> fetchComparableResult(long version, long backendId
} else {
tabletInfo.add(replica.getCooldownMetaId().toString());
}
if (Config.isCloudMode()) {
tabletInfo.add(((CloudReplica) replica).getPrimaryBackendId());
}
tabletInfos.add(tabletInfo);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.alter.Alter;
import org.apache.doris.alter.AlterJobV2.JobType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
Expand Down Expand Up @@ -646,7 +645,6 @@ public static void generateBackendsTabletMetrics() {
DORIS_METRIC_REGISTER.removeMetrics(TABLET_MAX_COMPACTION_SCORE);

SystemInfoService infoService = Env.getCurrentSystemInfo();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();

for (Long beId : infoService.getAllBackendIds(false)) {
Backend be = infoService.getBackend(beId);
Expand All @@ -661,7 +659,7 @@ public Long getValue() {
if (!Env.getCurrentEnv().isMaster()) {
return 0L;
}
return (long) invertedIndex.getTabletNumByBackendId(beId);
return (long) infoService.getTabletNumByBackendId(beId);
}
};
tabletNum.addLabel(new MetricLabel("backend",
Expand Down
Loading