From 4ed298974424f1ca6d833cdbccc97111962d0922 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 28 May 2024 17:37:48 +0800 Subject: [PATCH 1/9] 1 --- be/src/runtime/group_commit_mgr.cpp | 7 ++ .../java/org/apache/doris/common/Config.java | 6 ++ .../common/util/SlidingWindowCounter.java | 73 ++++++++++++++++ .../apache/doris/httpv2/rest/LoadAction.java | 47 ++++++++--- .../apache/doris/load/GroupCommitManager.java | 84 +++++++++++++++++++ .../insert/OlapGroupCommitInsertExecutor.java | 12 +++ .../doris/planner/GroupCommitPlanner.java | 48 +++++++---- .../java/org/apache/doris/qe/Coordinator.java | 25 +++++- .../org/apache/doris/qe/MasterOpExecutor.java | 40 +++++++++ .../doris/service/FrontendServiceImpl.java | 29 +++++++ gensrc/thrift/FrontendService.thrift | 9 ++ 11 files changed, 346 insertions(+), 34 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 5f989da023b36a..30885fa1ac9218 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -442,6 +442,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ request.__set_db_id(db_id); request.__set_table_id(table_id); request.__set_txnId(txn_id); + request.__set_groupCommit(true); + request.__set_receiveBytes(state->num_bytes_load_total()); + if (_exec_env->master_info()->__isset.backend_id) { + request.__set_backendId(_exec_env->master_info()->backend_id); + } else { + LOG(WARNING) << "_exec_env->master_info not set backend_id"; + } if (state) { request.__set_commitInfos(state->tablet_commit_infos()); } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index db48f445f80a79..c8eb61640a5038 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2660,6 +2660,12 @@ public class Config extends ConfigBase { }) public static boolean enable_advance_next_id = false; + @ConfField(description = { + "是否采用反馈group commit BE选择算法", + "Whether to use feedback group commit BE select strategy." + }) + public static boolean enable_feedback_group_commit_be_select_strategy = false; + // The count threshold to do manual GC when doing checkpoint but not enough memory. // Set zero to disable it. @ConfField(description = { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java new file mode 100644 index 00000000000000..787fbb06a2f7bb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import java.util.concurrent.atomic.AtomicLongArray; + +public class SlidingWindowCounter { + private final int windowSizeInSeconds; + private final int numberOfBuckets; + private final AtomicLongArray buckets; + private final AtomicLongArray bucketTimestamps; + + public SlidingWindowCounter(int windowSizeInSeconds) { + this.windowSizeInSeconds = windowSizeInSeconds; + this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 1 second + this.buckets = new AtomicLongArray(numberOfBuckets); + this.bucketTimestamps = new AtomicLongArray(numberOfBuckets); + } + + private int getCurrentBucketIndex() { + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + return (int) (currentTime % numberOfBuckets); + } + + private void updateCurrentBucket() { + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + int currentBucketIndex = getCurrentBucketIndex(); + long bucketTimestamp = bucketTimestamps.get(currentBucketIndex); + + if (currentTime - bucketTimestamp >= 1) { + buckets.set(currentBucketIndex, 0); + bucketTimestamps.set(currentBucketIndex, currentTime); + } + } + + public void add(long value) { + updateCurrentBucket(); + int bucketIndex = getCurrentBucketIndex(); + buckets.addAndGet(bucketIndex, value); + } + + public long get() { + updateCurrentBucket(); + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + long count = 0; + + for (int i = 0; i < numberOfBuckets; i++) { + if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) { + count += buckets.get(i); + } + } + return count; + } + + public String toString() { + return String.valueOf(get()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 5767e303f35ebf..b8ca1b7fa24265 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -143,11 +144,16 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse String sql = request.getHeader("sql"); LOG.info("streaming load sql={}", sql); boolean groupCommit = false; + long tableId = -1; String groupCommitStr = request.getHeader("group_commit"); if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { groupCommit = true; try { String[] pair = parseDbAndTb(sql); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); + Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + tableId = tbl.getId(); if (isGroupCommitBlock(pair[0], pair[1])) { String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE; return new RestBaseResult(msg); @@ -165,7 +171,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse } String label = request.getHeader(LABEL_KEY); - TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit); + TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit, tableId); LOG.info("redirect load action to destination={}, label: {}", redirectAddr.toString(), label); @@ -287,7 +293,9 @@ private Object executeWithoutPassword(HttpServletRequest request, return new RestBaseResult(e.getMessage()); } } else { - redirectAddr = selectRedirectBackend(request, groupCommit); + long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName) + .get()).getTable(tableName).get()).getId(); + redirectAddr = selectRedirectBackend(request, groupCommit, tableId); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", @@ -320,7 +328,7 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) { return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(request, false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1); LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}", redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation); @@ -352,7 +360,7 @@ private String getCloudClusterName(HttpServletRequest request) { return ""; } - private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit) + private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId) throws LoadException { long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); if (debugBackendId != -1L) { @@ -366,11 +374,12 @@ private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolea } return selectCloudRedirectBackend(cloudClusterName, request, groupCommit); } else { - return selectLocalRedirectBackend(groupCommit); + return selectLocalRedirectBackend(groupCommit, request, tableId); } } - private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws LoadException { + private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId) + throws LoadException { Backend backend = null; BeSelectionPolicy policy = null; String qualifiedUser = ConnectContext.get().getQualifiedUser(); @@ -390,11 +399,23 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws L throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - for (Long backendId : backendIds) { - Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId); - if (!candidateBe.isDecommissioned()) { - backend = candidateBe; - break; + if (Config.enable_feedback_group_commit_be_select_strategy) { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(request.getRemoteAddr()); + // set user to ADMIN_USER, so that we can get the proper resource tag + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + + backend = Env.getCurrentEnv().getGroupCommitManager().selectBackendForGroupCommit(tableId, ctx); + } else { + for (Long backendId : backendIds) { + Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId); + if (!candidateBe.isDecommissioned()) { + backend = candidateBe; + break; + } } } } else { @@ -573,10 +594,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, return new RestBaseResult("No label selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(request, false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1); LOG.info("Redirect load action with auth token to destination={}," - + "stream: {}, db: {}, tbl: {}, label: {}", + + "stream: {}, db: {}, tbl: {}, label: {}", redirectAddr.toString(), isStreamLoad, dbName, tableName, label); URI urlObj = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index c4bf1e03c9cf58..09c9df28daf51b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -19,19 +19,28 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.util.SlidingWindowCounter; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.MasterOpExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; public class GroupCommitManager { @@ -40,6 +49,11 @@ public class GroupCommitManager { private Set blockedTableIds = new HashSet<>(); + // Table id to BE id map. Only for group commit. + private Map tableToBeMap = new ConcurrentHashMap<>(); + // BE id to pressure map. Only for group commit. + private Map bePressureMap = new ConcurrentHashMap<>(); + public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); } @@ -163,4 +177,74 @@ public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { return size; } + public Backend selectBackendForGroupCommit(long tableId, ConnectContext context) throws LoadException { + if (!Env.getCurrentEnv().isMaster()) { + try { + long backendId = new MasterOpExecutor(context).getGroupCommitLoadBeId(tableId); + return Env.getCurrentSystemInfo().getBackend(backendId); + } catch (Exception e) { + throw new LoadException(e.getMessage()); + } + } else { + return Env.getCurrentSystemInfo().getBackend(selectBackendForGroupCommitInternal(tableId)); + } + } + + public long selectBackendForGroupCommitInternal(long tableId) throws LoadException { + LOG.info("group commit new strategy select be, tableToBeMap {}, bePressureMap {}", tableToBeMap.toString(), + bePressureMap.toString()); + if (tableToBeMap.containsKey(tableId)) { + if (bePressureMap.get(tableToBeMap.get(tableId)).get() < 1073741824) { + return tableToBeMap.get(tableId); + } else { + tableToBeMap.remove(tableId); + } + } + List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (allBackendIds.isEmpty()) { + throw new LoadException("No alive backend"); + } + + Collections.shuffle(allBackendIds); + for (Long beId : allBackendIds) { + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + if (!backend.isDecommissioned()) { + tableToBeMap.put(tableId, beId); + bePressureMap.put(beId, new SlidingWindowCounter(10)); + return beId; + } + } + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); + } + + public void updateLoadData(long backendId, long receiveData) { + if (backendId == -1) { + LOG.warn("invalid backend id: " + backendId); + } + if (!Env.getCurrentEnv().isMaster()) { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + // set user to ADMIN_USER, so that we can get the proper resource tag + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + try { + new MasterOpExecutor(ctx).updateLoadData(backendId, receiveData); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + updateLoadDataInternal(backendId, receiveData); + } + } + + public void updateLoadDataInternal(long backendId, long receiveData) { + if (bePressureMap.containsKey(backendId)) { + bePressureMap.get(backendId).add(receiveData); + LOG.info("Update load data for backend {}, receiveData {}, bePressureMap {}", backendId, receiveData, + bePressureMap.toString()); + } else { + LOG.warn("can not find backend id: {}", backendId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 984e8b0c8caa65..d9bc9c3fec67b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -18,11 +18,13 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.LoadException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.NereidsPlanner; @@ -66,6 +68,16 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Unbo && (tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion)); } + @Override + protected void beforeExec() { + try { + this.coordinator.setGroupCommitBe(Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(table.getId(), ctx)); + } catch (LoadException e) { + throw new RuntimeException(e); + } + } + @Override public void beginTransaction() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 09c2f72b5ca675..21a0c985088377 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -25,8 +25,10 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FormatOptions; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; @@ -147,28 +149,38 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, // cloud override protected void selectBackends(ConnectContext ctx) throws DdlException { - backend = ctx.getInsertGroupCommit(this.table.getId()); - if (backend != null && backend.isAlive() && !backend.isDecommissioned()) { - return; - } + if (Config.enable_feedback_group_commit_be_select_strategy) { + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(this.table.getId(), ctx); + LOG.info("Group commit new strategy select be {}, label is {}", backend.getId(), loadId.toString()); + } catch (LoadException e) { + throw new DdlException("No suitable backend"); + } + } else { + backend = ctx.getInsertGroupCommit(this.table.getId()); + if (backend != null && backend.isAlive() && !backend.isDecommissioned()) { + return; + } - List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (allBackendIds.isEmpty()) { - throw new DdlException("No alive backend"); - } - Collections.shuffle(allBackendIds); - for (Long beId : allBackendIds) { - backend = Env.getCurrentSystemInfo().getBackend(beId); - if (!backend.isDecommissioned()) { - ctx.setInsertGroupCommit(this.table.getId(), backend); - if (LOG.isDebugEnabled()) { - LOG.debug("choose new be {}", backend.getId()); + List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (allBackendIds.isEmpty()) { + throw new DdlException("No alive backend"); + } + Collections.shuffle(allBackendIds); + for (Long beId : allBackendIds) { + backend = Env.getCurrentSystemInfo().getBackend(beId); + if (!backend.isDecommissioned()) { + ctx.setInsertGroupCommit(this.table.getId(), backend); + if (LOG.isDebugEnabled()) { + LOG.debug("choose new be {}", backend.getId()); + } + return; } - return; } - } - throw new DdlException("No suitable backend"); + throw new DdlException("No suitable backend"); + } } public Backend getBackend() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8459cb3b68ec78..aed352570bbaec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -268,6 +268,8 @@ public class Coordinator implements CoordInterface { private boolean useNereids = false; + private Backend groupCommitBackend; + // Runtime filter merge instance address and ID public TNetworkAddress runtimeFilterMergeAddr; public TUniqueId runtimeFilterMergeInstanceId; @@ -294,6 +296,10 @@ public class Coordinator implements CoordInterface { // fragmentid -> backendid private MarkedCountDownLatch fragmentsDoneLatch = null; + public void setGroupCommitBe(Backend backend) { + this.groupCommitBackend = backend; + } + public void setTWorkloadGroups(List tWorkloadGroups) { this.tWorkloadGroups = tWorkloadGroups; } @@ -1716,8 +1722,11 @@ protected void computeFragmentHosts() throws Exception { if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { Reference backendIdRef = new Reference(); TNetworkAddress execHostport; - if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || (isAllExternalScan - && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { + if (groupCommitBackend != null) { + execHostport = getGroupCommitBackend(addressToBackendID); + } else if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || ( + isAllExternalScan + && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { // 2 cases: // case 1: user set resource tag, we need to use the BE with the specified resource tags. // case 2: All scan nodes are external scan node, @@ -1908,7 +1917,9 @@ protected void computeFragmentHosts() throws Exception { if (params.instanceExecParams.isEmpty()) { Reference backendIdRef = new Reference(); TNetworkAddress execHostport; - if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() + if (groupCommitBackend != null) { + execHostport = getGroupCommitBackend(addressToBackendID); + } else if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() && !addressToBackendID.isEmpty()) { // In this case, we only use the BE where the replica selected by the tag is located to // execute this query. Otherwise, except for the scan node, the rest of the execution nodes @@ -1932,6 +1943,14 @@ protected void computeFragmentHosts() throws Exception { } } + private TNetworkAddress getGroupCommitBackend(Map addressToBackendID) { + // Used for Nereids planner Group commit insert BE select. + TNetworkAddress execHostport = new TNetworkAddress(groupCommitBackend.getHost(), + groupCommitBackend.getBePort()); + addressToBackendID.put(execHostport, groupCommitBackend.getId()); + return execHostport; + } + // Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship // between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter private void assignRuntimeFilterAddr() throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index a14379ddb77b9a..0aa37310cf2007 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -101,6 +101,17 @@ public void syncJournal() throws Exception { waitOnReplaying(); } + public long getGroupCommitLoadBeId(long tableId) throws Exception { + result = forward(buildGetGroupCommitLoadBeIdParmas(tableId)); + waitOnReplaying(); + return result.groupCommitLoadBeId; + } + + public void updateLoadData(long backendId, long receiveData) throws Exception { + result = forward(buildUpdateLoadDataParams(backendId, receiveData)); + waitOnReplaying(); + } + public void cancel() throws Exception { TUniqueId queryId = ctx.queryId(); if (queryId == null) { @@ -232,6 +243,35 @@ private TMasterOpRequest buildSyncJournalParmas() { return params; } + private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId) { + final TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setGetGroupCommitLoadBeId(true); + params.setGroupCommitLoadTableId(tableId); + params.setDb(ctx.getDatabase()); + params.setUser(ctx.getQualifiedUser()); + // just make the protocol happy + params.setSql(""); + return params; + } + + private TMasterOpRequest buildUpdateLoadDataParams(long backendId, long receiveData) { + final TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setUpdateLoadData(true); + params.setBackendId(backendId); + params.setReceiveData(receiveData); + params.setDb(ctx.getDatabase()); + params.setUser(ctx.getQualifiedUser()); + // just make the protocol happy + params.setSql(""); + return params; + } + public ByteBuffer getOutputPacket() { if (result == null) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8f34fed5e5f8e9..fda1fee4a6d2e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -56,6 +56,7 @@ import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.PatternMatcher; @@ -1031,6 +1032,26 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { result.setPacket("".getBytes()); return result; } + if (params.isGetGroupCommitLoadBeId()) { + final TMasterOpResult result = new TMasterOpResult(); + try { + result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommitInternal(params.groupCommitLoadTableId)); + } catch (LoadException e) { + throw new TException(e.getMessage()); + } + // just make the protocol happy + result.setPacket("".getBytes()); + return result; + } + if (params.isUpdateLoadData()) { + final TMasterOpResult result = new TMasterOpResult(); + Env.getCurrentEnv().getGroupCommitManager() + .updateLoadData(params.backendId, params.receiveData); + // just make the protocol happy + result.setPacket("".getBytes()); + return result; + } if (params.isSetCancelQeury() && params.isCancelQeury()) { if (!params.isSetQueryId()) { throw new TException("a query id is needed to cancel a query"); @@ -1629,6 +1650,14 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } } + if (request.groupCommit) { + try { + long backendId = request.getBackendId(); + Env.getCurrentEnv().getGroupCommitManager().updateLoadData(backendId, request.receiveBytes); + } catch (Exception e) { + LOG.warn("Failed to update group commit load data, {}", e.getMessage()); + } + } // get database Env env = Env.getCurrentEnv(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 2867e15c3c1af9..5cf69d1e055c10 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -573,6 +573,11 @@ struct TMasterOpRequest { 28: optional map user_variables // transaction load 29: optional TTxnLoadInfo txnLoadInfo + 30: optional bool getGroupCommitLoadBeId + 31: optional i64 groupCommitLoadTableId + 32: optional bool updateLoadData + 33: optional i64 backendId + 34: optional i64 receiveData // selectdb cloud 1000: optional string cloud_cluster @@ -606,6 +611,7 @@ struct TMasterOpResult { 8: optional list queryResultBufList; // transaction load 9: optional TTxnLoadInfo txnLoadInfo; + 10: optional i64 groupCommitLoadBeId; } struct TUpdateExportTaskStatusRequest { @@ -817,6 +823,9 @@ struct TLoadTxnCommitRequest { 15: optional list tbls 16: optional i64 table_id 17: optional string auth_code_uuid + 18: optional bool groupCommit + 19: optional i64 receiveBytes + 20: optional i64 backendId } struct TLoadTxnCommitResult { From 25ccf3707a07343990f4a57e28a968c14f698d9c Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 15 Jul 2024 19:10:41 +0800 Subject: [PATCH 2/9] 2 --- .../plans/commands/insert/OlapGroupCommitInsertExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index d9bc9c3fec67b2..1969ee0eb302d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -17,8 +17,8 @@ package org.apache.doris.nereids.trees.plans.commands.insert; -import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; From 621f858d2d6be947efe027b1e43dd7ba504ea17a Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 18 Jul 2024 16:33:23 +0800 Subject: [PATCH 3/9] 3 --- .../planner/CloudGroupCommitPlanner.java | 47 ++------ .../apache/doris/httpv2/rest/LoadAction.java | 7 +- .../apache/doris/load/GroupCommitManager.java | 104 +++++++++++++----- .../insert/OlapGroupCommitInsertExecutor.java | 5 +- .../doris/planner/GroupCommitPlanner.java | 36 +----- .../org/apache/doris/qe/MasterOpExecutor.java | 25 +++-- .../doris/service/FrontendServiceImpl.java | 13 ++- gensrc/thrift/FrontendService.thrift | 16 ++- 8 files changed, 138 insertions(+), 115 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java index 6978ec65e72152..c28aea4b0ad31d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java @@ -20,24 +20,18 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.system.Backend; import org.apache.doris.thrift.TUniqueId; -import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; public class CloudGroupCommitPlanner extends GroupCommitPlanner { private static final Logger LOG = LogManager.getLogger(CloudGroupCommitPlanner.class); @@ -50,36 +44,17 @@ public CloudGroupCommitPlanner(Database db, OlapTable table, List target @Override protected void selectBackends(ConnectContext ctx) throws DdlException { - backend = ctx.getInsertGroupCommit(this.table.getId()); - if (backend != null && backend.isAlive() && !backend.isDecommissioned() - && backend.getCloudClusterName().equals(ctx.getCloudCluster())) { - return; - } - - String cluster = ctx.getCloudCluster(); - if (Strings.isNullOrEmpty(cluster)) { - ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); - } - - // select be - List backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) - .values().stream().collect(Collectors.toList()); - Collections.shuffle(backends); - for (Backend backend : backends) { - if (backend.isActive() && !backend.isDecommissioned()) { - this.backend = backend; - ctx.setInsertGroupCommit(this.table.getId(), backend); - LOG.debug("choose new be {}", backend.getId()); - return; + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(this.table.getId(), ctx, true); + if (backend != null && backend.isAlive() && !backend.isDecommissioned() + && backend.getCloudClusterName().equals(ctx.getCloudCluster())) { + LOG.info("Group commit cloud select be {}, label is {}", backend.getId(), loadId.toString()); + } else { + throw new DdlException("No suitable backend"); } + } catch (LoadException e) { + throw new DdlException("No suitable backend"); } - - List backendsInfo = backends.stream() - .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() - + ", decommission=" + be.isDecommissioned() + " }") - .collect(Collectors.toList()); - throw new DdlException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo); } - } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index b8ca1b7fa24265..bc19bb65b1fd39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -408,7 +408,12 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ ctx.setQualifiedUser(Auth.ADMIN_USER); ctx.setThreadLocalInfo(); - backend = Env.getCurrentEnv().getGroupCommitManager().selectBackendForGroupCommit(tableId, ctx); + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx, false); + } catch (DdlException e) { + throw new RuntimeException(e); + } } else { for (Long backendId : backendIds) { Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 09c9df28daf51b..229d6032eb2946 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -18,7 +18,12 @@ package org.apache.doris.load; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.LoadException; import org.apache.doris.common.util.SlidingWindowCounter; import org.apache.doris.mysql.privilege.Auth; @@ -32,9 +37,11 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -42,6 +49,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.stream.Collectors; public class GroupCommitManager { @@ -52,7 +60,7 @@ public class GroupCommitManager { // Table id to BE id map. Only for group commit. private Map tableToBeMap = new ConcurrentHashMap<>(); // BE id to pressure map. Only for group commit. - private Map bePressureMap = new ConcurrentHashMap<>(); + private Map tablePressureMap = new ConcurrentHashMap<>(); public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); @@ -177,44 +185,84 @@ public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { return size; } - public Backend selectBackendForGroupCommit(long tableId, ConnectContext context) throws LoadException { + public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud) + throws LoadException, DdlException { if (!Env.getCurrentEnv().isMaster()) { try { - long backendId = new MasterOpExecutor(context).getGroupCommitLoadBeId(tableId); + long backendId = new MasterOpExecutor(context) + .getGroupCommitLoadBeId(tableId, context.getCloudCluster(), isCloud); return Env.getCurrentSystemInfo().getBackend(backendId); } catch (Exception e) { throw new LoadException(e.getMessage()); } } else { - return Env.getCurrentSystemInfo().getBackend(selectBackendForGroupCommitInternal(tableId)); + return Env.getCurrentSystemInfo() + .getBackend(selectBackendForGroupCommitInternal(tableId, context.getCloudCluster(), isCloud)); } } - public long selectBackendForGroupCommitInternal(long tableId) throws LoadException { - LOG.info("group commit new strategy select be, tableToBeMap {}, bePressureMap {}", tableToBeMap.toString(), - bePressureMap.toString()); - if (tableToBeMap.containsKey(tableId)) { - if (bePressureMap.get(tableToBeMap.get(tableId)).get() < 1073741824) { - return tableToBeMap.get(tableId); - } else { - tableToBeMap.remove(tableId); + public long selectBackendForGroupCommitInternal(long tableId, String cluster, boolean isCloud) + throws LoadException, DdlException { + if (!isCloud) { + LOG.info("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + if (tableToBeMap.containsKey(tableId)) { + if (tablePressureMap.get(tableToBeMap.get(tableId)).get() < table.getGroupCommitDataBytes()) { + return tableToBeMap.get(tableId); + } else { + tableToBeMap.remove(tableId); + } + } + List backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends()); + if (backends.isEmpty()) { + throw new LoadException("No alive backend"); } - } - List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (allBackendIds.isEmpty()) { - throw new LoadException("No alive backend"); - } - Collections.shuffle(allBackendIds); - for (Long beId : allBackendIds) { - Backend backend = Env.getCurrentSystemInfo().getBackend(beId); - if (!backend.isDecommissioned()) { - tableToBeMap.put(tableId, beId); - bePressureMap.put(beId, new SlidingWindowCounter(10)); - return beId; + Collections.shuffle(backends); + for (Backend backend : backends) { + if (backend.isActive() && !backend.isDecommissioned()) { + tableToBeMap.put(tableId, backend.getId()); + tablePressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs())); + return backend.getId(); + } } + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); + } else { + LOG.info("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + if (tableToBeMap.containsKey(tableId)) { + if (tablePressureMap.get(tableToBeMap.get(tableId)).get() < table.getGroupCommitDataBytes()) { + return tableToBeMap.get(tableId); + } else { + tableToBeMap.remove(tableId); + } + } + + if (Strings.isNullOrEmpty(cluster)) { + ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); + } + + // select be + List backends = new ArrayList<>( + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) + .values()); + Collections.shuffle(backends); + for (Backend backend : backends) { + if (backend.isActive() && !backend.isDecommissioned()) { + tableToBeMap.put(tableId, backend.getId()); + tablePressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs())); + return backend.getId(); + } + } + + List backendsInfo = backends.stream() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + + ", decommission=" + be.isDecommissioned() + " }") + .collect(Collectors.toList()); + throw new DdlException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo); } - throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); } public void updateLoadData(long backendId, long receiveData) { @@ -239,10 +287,10 @@ public void updateLoadData(long backendId, long receiveData) { } public void updateLoadDataInternal(long backendId, long receiveData) { - if (bePressureMap.containsKey(backendId)) { - bePressureMap.get(backendId).add(receiveData); + if (tablePressureMap.containsKey(backendId)) { + tablePressureMap.get(backendId).add(receiveData); LOG.info("Update load data for backend {}, receiveData {}, bePressureMap {}", backendId, receiveData, - bePressureMap.toString()); + tablePressureMap.toString()); } else { LOG.warn("can not find backend id: {}", backendId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 1969ee0eb302d2..edf8251f97fab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LoadException; @@ -72,8 +73,8 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Unbo protected void beforeExec() { try { this.coordinator.setGroupCommitBe(Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(table.getId(), ctx)); - } catch (LoadException e) { + .selectBackendForGroupCommit(table.getId(), ctx, false)); + } catch (LoadException | DdlException e) { throw new RuntimeException(e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 21a0c985088377..d410c720d08233 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -25,7 +25,6 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FormatOptions; import org.apache.doris.common.LoadException; @@ -60,7 +59,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -147,38 +145,16 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, return future.get(); } - // cloud override protected void selectBackends(ConnectContext ctx) throws DdlException { - if (Config.enable_feedback_group_commit_be_select_strategy) { - try { - backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(this.table.getId(), ctx); + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(this.table.getId(), ctx, false); + if (backend != null && backend.isAlive() && !backend.isDecommissioned()) { LOG.info("Group commit new strategy select be {}, label is {}", backend.getId(), loadId.toString()); - } catch (LoadException e) { + } else { throw new DdlException("No suitable backend"); } - } else { - backend = ctx.getInsertGroupCommit(this.table.getId()); - if (backend != null && backend.isAlive() && !backend.isDecommissioned()) { - return; - } - - List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (allBackendIds.isEmpty()) { - throw new DdlException("No alive backend"); - } - Collections.shuffle(allBackendIds); - for (Long beId : allBackendIds) { - backend = Env.getCurrentSystemInfo().getBackend(beId); - if (!backend.isDecommissioned()) { - ctx.setInsertGroupCommit(this.table.getId(), backend); - if (LOG.isDebugEnabled()) { - LOG.debug("choose new be {}", backend.getId()); - } - return; - } - } - + } catch (LoadException e) { throw new DdlException("No suitable backend"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 0aa37310cf2007..d9e4857ab42022 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TExprNode; +import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; @@ -101,8 +102,8 @@ public void syncJournal() throws Exception { waitOnReplaying(); } - public long getGroupCommitLoadBeId(long tableId) throws Exception { - result = forward(buildGetGroupCommitLoadBeIdParmas(tableId)); + public long getGroupCommitLoadBeId(long tableId, String cluster, boolean isCloud) throws Exception { + result = forward(buildGetGroupCommitLoadBeIdParmas(tableId, cluster, isCloud)); waitOnReplaying(); return result.groupCommitLoadBeId; } @@ -243,13 +244,18 @@ private TMasterOpRequest buildSyncJournalParmas() { return params; } - private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId) { + private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId, String cluster, boolean isCloud) { + final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); + groupCommitParams.setGetGroupCommitLoadBeId(true); + groupCommitParams.setGroupCommitLoadTableId(tableId); + groupCommitParams.setCluster(cluster); + groupCommitParams.setIsCloud(isCloud); + final TMasterOpRequest params = new TMasterOpRequest(); // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); - params.setGetGroupCommitLoadBeId(true); - params.setGroupCommitLoadTableId(tableId); + params.setGroupCommitInfo(groupCommitParams); params.setDb(ctx.getDatabase()); params.setUser(ctx.getQualifiedUser()); // just make the protocol happy @@ -258,13 +264,16 @@ private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId) { } private TMasterOpRequest buildUpdateLoadDataParams(long backendId, long receiveData) { + final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); + groupCommitParams.setUpdateLoadData(true); + groupCommitParams.setBackendId(backendId); + groupCommitParams.setReceiveData(receiveData); + final TMasterOpRequest params = new TMasterOpRequest(); // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); - params.setUpdateLoadData(true); - params.setBackendId(backendId); - params.setReceiveData(receiveData); + params.setGroupCommitInfo(groupCommitParams); params.setDb(ctx.getDatabase()); params.setUser(ctx.getQualifiedUser()); // just make the protocol happy diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index fda1fee4a6d2e6..f69cc2da79a9cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -173,6 +173,7 @@ import org.apache.doris.thrift.TGetTablesResult; import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; import org.apache.doris.thrift.TGetTabletReplicaInfosResult; +import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TInitExternalCtlMetaRequest; import org.apache.doris.thrift.TInitExternalCtlMetaResult; import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; @@ -1032,22 +1033,24 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { result.setPacket("".getBytes()); return result; } - if (params.isGetGroupCommitLoadBeId()) { + if (params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) { + final TGroupCommitInfo info = params.getGroupCommitInfo(); final TMasterOpResult result = new TMasterOpResult(); try { result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommitInternal(params.groupCommitLoadTableId)); - } catch (LoadException e) { + .selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, info.cluster, info.isCloud)); + } catch (LoadException | DdlException e) { throw new TException(e.getMessage()); } // just make the protocol happy result.setPacket("".getBytes()); return result; } - if (params.isUpdateLoadData()) { + if (params.getGroupCommitInfo().isUpdateLoadData()) { + final TGroupCommitInfo info = params.getGroupCommitInfo(); final TMasterOpResult result = new TMasterOpResult(); Env.getCurrentEnv().getGroupCommitManager() - .updateLoadData(params.backendId, params.receiveData); + .updateLoadData(info.backendId, info.receiveData); // just make the protocol happy result.setPacket("".getBytes()); return result; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5cf69d1e055c10..6b3301d6ea7fcc 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -540,6 +540,16 @@ struct TTxnLoadInfo { 6: optional list subTxnInfos } +struct TGroupCommitInfo{ + 1: optional bool getGroupCommitLoadBeId + 2: optional i64 groupCommitLoadTableId + 3: optional string cluster + 4: optional bool isCloud + 5: optional bool updateLoadData + 6: optional i64 backendId + 7: optional i64 receiveData +} + struct TMasterOpRequest { 1: required string user 2: required string db @@ -573,11 +583,7 @@ struct TMasterOpRequest { 28: optional map user_variables // transaction load 29: optional TTxnLoadInfo txnLoadInfo - 30: optional bool getGroupCommitLoadBeId - 31: optional i64 groupCommitLoadTableId - 32: optional bool updateLoadData - 33: optional i64 backendId - 34: optional i64 receiveData + 30: optional TGroupCommitInfo groupCommitInfo // selectdb cloud 1000: optional string cloud_cluster From 98847191014f65264f4490aceb00fda4795b2320 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 18 Jul 2024 16:38:10 +0800 Subject: [PATCH 4/9] 4 --- .../java/org/apache/doris/common/Config.java | 6 ---- .../apache/doris/httpv2/rest/LoadAction.java | 34 +++++++------------ 2 files changed, 12 insertions(+), 28 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c8eb61640a5038..db48f445f80a79 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2660,12 +2660,6 @@ public class Config extends ConfigBase { }) public static boolean enable_advance_next_id = false; - @ConfField(description = { - "是否采用反馈group commit BE选择算法", - "Whether to use feedback group commit BE select strategy." - }) - public static boolean enable_feedback_group_commit_be_select_strategy = false; - // The count threshold to do manual GC when doing checkpoint but not enough memory. // Set zero to disable it. @ConfField(description = { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index bc19bb65b1fd39..82066334534ed8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -399,29 +399,19 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - if (Config.enable_feedback_group_commit_be_select_strategy) { - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - ctx.setThreadLocalInfo(); - ctx.setRemoteIP(request.getRemoteAddr()); - // set user to ADMIN_USER, so that we can get the proper resource tag - ctx.setQualifiedUser(Auth.ADMIN_USER); - ctx.setThreadLocalInfo(); + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(request.getRemoteAddr()); + // set user to ADMIN_USER, so that we can get the proper resource tag + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); - try { - backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(tableId, ctx, false); - } catch (DdlException e) { - throw new RuntimeException(e); - } - } else { - for (Long backendId : backendIds) { - Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId); - if (!candidateBe.isDecommissioned()) { - backend = candidateBe; - break; - } - } + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx, false); + } catch (DdlException e) { + throw new RuntimeException(e); } } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); From a3844ad3ed3e9801963d2d4082458feea80b00e7 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 18 Jul 2024 17:54:19 +0800 Subject: [PATCH 5/9] 5 --- .../apache/doris/load/GroupCommitManager.java | 24 +++++++++---------- .../org/apache/doris/qe/MasterOpExecutor.java | 8 +++---- .../doris/service/FrontendServiceImpl.java | 5 ++-- gensrc/thrift/FrontendService.thrift | 2 +- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 229d6032eb2946..4308d29de07e58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -208,7 +208,7 @@ public long selectBackendForGroupCommitInternal(long tableId, String cluster, bo tablePressureMap.toString()); OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); if (tableToBeMap.containsKey(tableId)) { - if (tablePressureMap.get(tableToBeMap.get(tableId)).get() < table.getGroupCommitDataBytes()) { + if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { return tableToBeMap.get(tableId); } else { tableToBeMap.remove(tableId); @@ -233,7 +233,7 @@ public long selectBackendForGroupCommitInternal(long tableId, String cluster, bo tablePressureMap.toString()); OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); if (tableToBeMap.containsKey(tableId)) { - if (tablePressureMap.get(tableToBeMap.get(tableId)).get() < table.getGroupCommitDataBytes()) { + if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { return tableToBeMap.get(tableId); } else { tableToBeMap.remove(tableId); @@ -265,9 +265,9 @@ public long selectBackendForGroupCommitInternal(long tableId, String cluster, bo } } - public void updateLoadData(long backendId, long receiveData) { - if (backendId == -1) { - LOG.warn("invalid backend id: " + backendId); + public void updateLoadData(long tableId, long receiveData) { + if (tableId == -1) { + LOG.warn("invalid table id: " + tableId); } if (!Env.getCurrentEnv().isMaster()) { ConnectContext ctx = new ConnectContext(); @@ -277,22 +277,22 @@ public void updateLoadData(long backendId, long receiveData) { ctx.setQualifiedUser(Auth.ADMIN_USER); ctx.setThreadLocalInfo(); try { - new MasterOpExecutor(ctx).updateLoadData(backendId, receiveData); + new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData); } catch (Exception e) { throw new RuntimeException(e); } } else { - updateLoadDataInternal(backendId, receiveData); + updateLoadDataInternal(tableId, receiveData); } } - public void updateLoadDataInternal(long backendId, long receiveData) { - if (tablePressureMap.containsKey(backendId)) { - tablePressureMap.get(backendId).add(receiveData); - LOG.info("Update load data for backend {}, receiveData {}, bePressureMap {}", backendId, receiveData, + public void updateLoadDataInternal(long tableId, long receiveData) { + if (tablePressureMap.containsKey(tableId)) { + tablePressureMap.get(tableId).add(receiveData); + LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData, tablePressureMap.toString()); } else { - LOG.warn("can not find backend id: {}", backendId); + LOG.warn("can not find backend id: {}", tableId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index d9e4857ab42022..eaab01df55635a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -108,8 +108,8 @@ public long getGroupCommitLoadBeId(long tableId, String cluster, boolean isCloud return result.groupCommitLoadBeId; } - public void updateLoadData(long backendId, long receiveData) throws Exception { - result = forward(buildUpdateLoadDataParams(backendId, receiveData)); + public void updateLoadData(long tableId, long receiveData) throws Exception { + result = forward(buildUpdateLoadDataParams(tableId, receiveData)); waitOnReplaying(); } @@ -263,10 +263,10 @@ private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId, String return params; } - private TMasterOpRequest buildUpdateLoadDataParams(long backendId, long receiveData) { + private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveData) { final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); groupCommitParams.setUpdateLoadData(true); - groupCommitParams.setBackendId(backendId); + groupCommitParams.setTableId(tableId); groupCommitParams.setReceiveData(receiveData); final TMasterOpRequest params = new TMasterOpRequest(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index f69cc2da79a9cd..f9ea0635287cab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1050,7 +1050,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { final TGroupCommitInfo info = params.getGroupCommitInfo(); final TMasterOpResult result = new TMasterOpResult(); Env.getCurrentEnv().getGroupCommitManager() - .updateLoadData(info.backendId, info.receiveData); + .updateLoadData(info.tableId, info.receiveData); // just make the protocol happy result.setPacket("".getBytes()); return result; @@ -1655,8 +1655,7 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce } if (request.groupCommit) { try { - long backendId = request.getBackendId(); - Env.getCurrentEnv().getGroupCommitManager().updateLoadData(backendId, request.receiveBytes); + Env.getCurrentEnv().getGroupCommitManager().updateLoadData(request.table_id, request.receiveBytes); } catch (Exception e) { LOG.warn("Failed to update group commit load data, {}", e.getMessage()); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 6b3301d6ea7fcc..871a1aee49b4ed 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -546,7 +546,7 @@ struct TGroupCommitInfo{ 3: optional string cluster 4: optional bool isCloud 5: optional bool updateLoadData - 6: optional i64 backendId + 6: optional i64 tableId 7: optional i64 receiveData } From d993448b67337c4679e3ecc63023179a84a5b276 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 18 Jul 2024 19:12:18 +0800 Subject: [PATCH 6/9] 6 --- .../java/org/apache/doris/load/GroupCommitManager.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 4308d29de07e58..ac87ea3c338633 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -223,7 +223,8 @@ public long selectBackendForGroupCommitInternal(long tableId, String cluster, bo for (Backend backend : backends) { if (backend.isActive() && !backend.isDecommissioned()) { tableToBeMap.put(tableId, backend.getId()); - tablePressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs())); + tablePressureMap.put(tableId, + new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); return backend.getId(); } } @@ -252,11 +253,11 @@ public long selectBackendForGroupCommitInternal(long tableId, String cluster, bo for (Backend backend : backends) { if (backend.isActive() && !backend.isDecommissioned()) { tableToBeMap.put(tableId, backend.getId()); - tablePressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs())); + tablePressureMap.put(tableId, + new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); return backend.getId(); } } - List backendsInfo = backends.stream() .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + ", decommission=" + be.isDecommissioned() + " }") From bf48013d6f7e11248fbeca4ac41b9ea8dea842b2 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 19 Jul 2024 15:35:41 +0800 Subject: [PATCH 7/9] 7 --- .../apache/doris/httpv2/rest/LoadAction.java | 2 - .../apache/doris/load/GroupCommitManager.java | 138 +++++++++++------- .../doris/planner/GroupCommitPlanner.java | 5 - 3 files changed, 86 insertions(+), 59 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 82066334534ed8..47c495b2913f1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -403,8 +403,6 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ ctx.setEnv(Env.getCurrentEnv()); ctx.setThreadLocalInfo(); ctx.setRemoteIP(request.getRemoteAddr()); - // set user to ADMIN_USER, so that we can get the proper resource tag - ctx.setQualifiedUser(Auth.ADMIN_USER); ctx.setThreadLocalInfo(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index ac87ea3c338633..cfc3af0b754e31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -33,13 +33,13 @@ import org.apache.doris.qe.MasterOpExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; -import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; import java.util.ArrayList; import java.util.Collections; @@ -187,6 +187,8 @@ public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud) throws LoadException, DdlException { + // If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE + // can select a BE and return this BE id to follower FE. if (!Env.getCurrentEnv().isMaster()) { try { long backendId = new MasterOpExecutor(context) @@ -196,6 +198,7 @@ public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, throw new LoadException(e.getMessage()); } } else { + // Master FE will select BE by itself. return Env.getCurrentSystemInfo() .getBackend(selectBackendForGroupCommitInternal(tableId, context.getCloudCluster(), isCloud)); } @@ -203,67 +206,98 @@ public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, public long selectBackendForGroupCommitInternal(long tableId, String cluster, boolean isCloud) throws LoadException, DdlException { - if (!isCloud) { - LOG.info("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), - tablePressureMap.toString()); - OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); - if (tableToBeMap.containsKey(tableId)) { - if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { - return tableToBeMap.get(tableId); - } else { - tableToBeMap.remove(tableId); - } - } - List backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends()); - if (backends.isEmpty()) { - throw new LoadException("No alive backend"); - } + // + return isCloud ? selectBackendForCloudGroupCommitInternal(tableId, cluster) + : selectBackendForLocalGroupCommitInternal(tableId); + } + + private long selectBackendForCloudGroupCommitInternal(long tableId, String cluster) + throws DdlException, LoadException { + LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + if (Strings.isNullOrEmpty(cluster)) { + ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); + } + + Long cachedBackendId = getCachedBackend(tableId); + if (cachedBackendId != null) { + return cachedBackendId; + } + + List backends = new ArrayList<>( + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) + .values()); + if (backends.isEmpty()) { + throw new LoadException("No alive backend"); + } + // If the cached backend is not active or decommissioned, select a random new backend. + Long randomBackendId = getRandomBackend(tableId, backends); + if (randomBackendId != null) { + return randomBackendId; + } + List backendsInfo = backends.stream() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + + ", decommission=" + be.isDecommissioned() + " }") + .collect(Collectors.toList()); + throw new LoadException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo); + } + + private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { + LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + Long cachedBackendId = getCachedBackend(tableId); + if (cachedBackendId != null) { + return cachedBackendId; + } - Collections.shuffle(backends); - for (Backend backend : backends) { + List backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends()); + if (backends.isEmpty()) { + throw new LoadException("No alive backend"); + } + + // If the cached backend is not active or decommissioned, select a random new backend. + Long randomBackendId = getRandomBackend(tableId, backends); + if (randomBackendId != null) { + return randomBackendId; + } + List backendsInfo = backends.stream() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + + ", decommission=" + be.isDecommissioned() + " }") + .collect(Collectors.toList()); + throw new LoadException("No suitable backend " + ", backends = " + backendsInfo); + } + + @Nullable + private Long getCachedBackend(long tableId) { + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + if (tableToBeMap.containsKey(tableId)) { + if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { + Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); if (backend.isActive() && !backend.isDecommissioned()) { - tableToBeMap.put(tableId, backend.getId()); - tablePressureMap.put(tableId, - new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); return backend.getId(); - } - } - throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); - } else { - LOG.info("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), - tablePressureMap.toString()); - OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); - if (tableToBeMap.containsKey(tableId)) { - if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { - return tableToBeMap.get(tableId); } else { tableToBeMap.remove(tableId); } + } else { + tableToBeMap.remove(tableId); } + } + return null; + } - if (Strings.isNullOrEmpty(cluster)) { - ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); - } - - // select be - List backends = new ArrayList<>( - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) - .values()); - Collections.shuffle(backends); - for (Backend backend : backends) { - if (backend.isActive() && !backend.isDecommissioned()) { - tableToBeMap.put(tableId, backend.getId()); - tablePressureMap.put(tableId, - new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); - return backend.getId(); - } + @Nullable + private Long getRandomBackend(long tableId, List backends) { + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + Collections.shuffle(backends); + for (Backend backend : backends) { + if (backend.isActive() && !backend.isDecommissioned()) { + tableToBeMap.put(tableId, backend.getId()); + tablePressureMap.put(tableId, + new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); + return backend.getId(); } - List backendsInfo = backends.stream() - .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() - + ", decommission=" + be.isDecommissioned() + " }") - .collect(Collectors.toList()); - throw new DdlException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo); } + return null; } public void updateLoadData(long tableId, long receiveData) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index d410c720d08233..bc3759d4e62867 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -149,11 +149,6 @@ protected void selectBackends(ConnectContext ctx) throws DdlException { try { backend = Env.getCurrentEnv().getGroupCommitManager() .selectBackendForGroupCommit(this.table.getId(), ctx, false); - if (backend != null && backend.isAlive() && !backend.isDecommissioned()) { - LOG.info("Group commit new strategy select be {}, label is {}", backend.getId(), loadId.toString()); - } else { - throw new DdlException("No suitable backend"); - } } catch (LoadException e) { throw new DdlException("No suitable backend"); } From eddd52c259ec61248e0f5fd9ab44e3e2cfa34136 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 19 Jul 2024 15:47:40 +0800 Subject: [PATCH 8/9] 8 --- .../src/main/java/org/apache/doris/load/GroupCommitManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index cfc3af0b754e31..6b75687bbd3f0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -206,7 +206,6 @@ public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, public long selectBackendForGroupCommitInternal(long tableId, String cluster, boolean isCloud) throws LoadException, DdlException { - // return isCloud ? selectBackendForCloudGroupCommitInternal(tableId, cluster) : selectBackendForLocalGroupCommitInternal(tableId); } From 9d208b3d8e6533e24895c3b716d4a34ebf9b77f6 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 19 Jul 2024 18:08:57 +0800 Subject: [PATCH 9/9] 9 --- .../planner/CloudGroupCommitPlanner.java | 6 ---- .../apache/doris/load/GroupCommitManager.java | 30 +++++++++++++++++++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java index c28aea4b0ad31d..782f78e6bc401e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java @@ -47,12 +47,6 @@ protected void selectBackends(ConnectContext ctx) throws DdlException { try { backend = Env.getCurrentEnv().getGroupCommitManager() .selectBackendForGroupCommit(this.table.getId(), ctx, true); - if (backend != null && backend.isAlive() && !backend.isDecommissioned() - && backend.getCloudClusterName().equals(ctx.getCloudCluster())) { - LOG.info("Group commit cloud select be {}, label is {}", backend.getId(), loadId.toString()); - } else { - throw new DdlException("No suitable backend"); - } } catch (LoadException e) { throw new DdlException("No suitable backend"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 6b75687bbd3f0d..ac365f9166f7b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -206,6 +206,36 @@ public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, public long selectBackendForGroupCommitInternal(long tableId, String cluster, boolean isCloud) throws LoadException, DdlException { + // Understanding Group Commit and Backend Selection Logic + // + // Group commit is a server-side technique used for batching data imports. + // The primary purpose of group commit is to enhance import performance by + // reducing the number of versions created for high-frequency, small-batch imports. + // Without batching, each import operation creates a separate version, similar to a rowset in an LSM Tree, + // which can consume significant compaction resources and degrade system performance. + // By batching data, fewer versions are generated from the same amount of data, + // thus minimizing compaction and improving performance. For detailed usage, + // you can refer to the Group Commit Manual + // (https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/) . + // + // The specific backend (BE) selection logic for group commits aims to + // direct data belonging to the same table to the same BE for batching. + // This is because group commit batches data imported to the same table + // on the same BE into a single version, which is then flushed periodically. + // For example, if data for the same table is distributed across three BEs, + // it will result in three versions. + // Conversely, if data for four different tables is directed to the same BE, + // it will create four versions. However, + // directing all data for the same table to a single BE will only produce one version. + // + // To optimize performance and avoid overloading a single BE, the strategy for selecting a BE works as follows: + // + // If a BE is already handling imports for table A and is not under significant load, + // the data is sent to this BE. + // If the BE is overloaded or if there is no existing record of a BE handling imports for table A, + // a BE is chosen at random. This BE is then recorded along with the mapping of table A and its load level. + // This approach ensures that group commits can effectively batch data together + // while managing the load on each BE efficiently. return isCloud ? selectBackendForCloudGroupCommitInternal(tableId, cluster) : selectBackendForLocalGroupCommitInternal(tableId); }