From 9e184334e576474148b2e15b120c409a39d14712 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Sun, 4 Aug 2024 18:33:55 +0800 Subject: [PATCH 1/2] [Fix](group commit) Fix multiple cluster group commit BE select strategy (#38644) --- .../apache/doris/load/GroupCommitManager.java | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 1ec6a06179e443..855b37cb622262 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -53,10 +53,10 @@ public class GroupCommitManager { private Set blockedTableIds = new HashSet<>(); - // Table id to BE id map. Only for group commit. - private Map tableToBeMap = new ConcurrentHashMap<>(); - // BE id to pressure map. Only for group commit. - private Map tablePressureMap = new ConcurrentHashMap<>(); + // Encoded to BE id map. Only for group commit. + private final Map tableToBeMap = new ConcurrentHashMap<>(); + // Table id to pressure map. Only for group commit. + private final Map tableToPressureMap = new ConcurrentHashMap<>(); public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); @@ -237,8 +237,8 @@ public long selectBackendForGroupCommitInternal(long tableId) private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), - tablePressureMap.toString()); - Long cachedBackendId = getCachedBackend(tableId); + tableToPressureMap.toString()); + Long cachedBackendId = getCachedBackend(null, tableId); if (cachedBackendId != null) { return cachedBackendId; } @@ -249,7 +249,7 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE } // If the cached backend is not active or decommissioned, select a random new backend. - Long randomBackendId = getRandomBackend(tableId, backends); + Long randomBackendId = getRandomBackend(null, tableId, backends); if (randomBackendId != null) { return randomBackendId; } @@ -261,25 +261,35 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE } @Nullable - private Long getCachedBackend(long tableId) { + private Long getCachedBackend(String cluster, long tableId) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); - if (tableToBeMap.containsKey(tableId)) { - if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { - Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); - if (backend.isAlive() && !backend.isDecommissioned()) { + if (tableToBeMap.containsKey(encode(cluster, tableId))) { + if (tableToPressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { + // There are multiple threads getting cached backends for the same table. + // Maybe one thread removes the tableId from the tableToBeMap. + // Another thread gets the same tableId but can not find this tableId. + // So another thread needs to get the random backend. + Long backendId = tableToBeMap.get(encode(cluster, tableId)); + Backend backend; + if (backendId != null) { + backend = Env.getCurrentSystemInfo().getBackend(backendId); + } else { + return null; + } + if (backend.isActive() && !backend.isDecommissioned()) { return backend.getId(); } else { - tableToBeMap.remove(tableId); + tableToBeMap.remove(encode(cluster, tableId)); } } else { - tableToBeMap.remove(tableId); + tableToBeMap.remove(encode(cluster, tableId)); } } return null; } @Nullable - private Long getRandomBackend(long tableId, List backends) { + private Long getRandomBackend(String cluster, long tableId, List backends) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); Collections.shuffle(backends); for (Backend backend : backends) { @@ -315,10 +325,10 @@ public void updateLoadData(long tableId, long receiveData) { } public void updateLoadDataInternal(long tableId, long receiveData) { - if (tablePressureMap.containsKey(tableId)) { - tablePressureMap.get(tableId).add(receiveData); + if (tableToPressureMap.containsKey(tableId)) { + tableToPressureMap.get(tableId).add(receiveData); LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData, - tablePressureMap.toString()); + tableToPressureMap.toString()); } else { LOG.warn("can not find backend id: {}", tableId); } From d910602e00dc8754787825596502d61b1db9f168 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 7 Aug 2024 14:50:59 +0800 Subject: [PATCH 2/2] 2 --- .../apache/doris/load/GroupCommitManager.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 855b37cb622262..b6cf6cbb0a84a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -53,10 +53,10 @@ public class GroupCommitManager { private Set blockedTableIds = new HashSet<>(); - // Encoded to BE id map. Only for group commit. - private final Map tableToBeMap = new ConcurrentHashMap<>(); + // Table id to BE id map. Only for group commit. + private Map tableToBeMap = new ConcurrentHashMap<>(); // Table id to pressure map. Only for group commit. - private final Map tableToPressureMap = new ConcurrentHashMap<>(); + private Map tableToPressureMap = new ConcurrentHashMap<>(); public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); @@ -236,9 +236,9 @@ public long selectBackendForGroupCommitInternal(long tableId) } private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { - LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + LOG.debug("group commit select be info, tableToBeMap {}, tableToPressureMap {}", tableToBeMap.toString(), tableToPressureMap.toString()); - Long cachedBackendId = getCachedBackend(null, tableId); + Long cachedBackendId = getCachedBackend(tableId); if (cachedBackendId != null) { return cachedBackendId; } @@ -249,7 +249,7 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE } // If the cached backend is not active or decommissioned, select a random new backend. - Long randomBackendId = getRandomBackend(null, tableId, backends); + Long randomBackendId = getRandomBackend(tableId, backends); if (randomBackendId != null) { return randomBackendId; } @@ -261,41 +261,41 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE } @Nullable - private Long getCachedBackend(String cluster, long tableId) { + private Long getCachedBackend(long tableId) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); - if (tableToBeMap.containsKey(encode(cluster, tableId))) { + if (tableToBeMap.containsKey(tableId)) { if (tableToPressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { // There are multiple threads getting cached backends for the same table. // Maybe one thread removes the tableId from the tableToBeMap. // Another thread gets the same tableId but can not find this tableId. // So another thread needs to get the random backend. - Long backendId = tableToBeMap.get(encode(cluster, tableId)); + Long backendId = tableToBeMap.get(tableId); Backend backend; if (backendId != null) { backend = Env.getCurrentSystemInfo().getBackend(backendId); } else { return null; } - if (backend.isActive() && !backend.isDecommissioned()) { + if (backend.isAlive() && !backend.isDecommissioned()) { return backend.getId(); } else { - tableToBeMap.remove(encode(cluster, tableId)); + tableToBeMap.remove(tableId); } } else { - tableToBeMap.remove(encode(cluster, tableId)); + tableToBeMap.remove(tableId); } } return null; } @Nullable - private Long getRandomBackend(String cluster, long tableId, List backends) { + private Long getRandomBackend(long tableId, List backends) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); Collections.shuffle(backends); for (Backend backend : backends) { if (backend.isAlive() && !backend.isDecommissioned()) { tableToBeMap.put(tableId, backend.getId()); - tablePressureMap.put(tableId, + tableToPressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); return backend.getId(); }