diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 5f989da023b36a..30885fa1ac9218 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -442,6 +442,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ request.__set_db_id(db_id); request.__set_table_id(table_id); request.__set_txnId(txn_id); + request.__set_groupCommit(true); + request.__set_receiveBytes(state->num_bytes_load_total()); + if (_exec_env->master_info()->__isset.backend_id) { + request.__set_backendId(_exec_env->master_info()->backend_id); + } else { + LOG(WARNING) << "_exec_env->master_info not set backend_id"; + } if (state) { request.__set_commitInfos(state->tablet_commit_infos()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java index 6978ec65e72152..782f78e6bc401e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java @@ -20,24 +20,18 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.system.Backend; import org.apache.doris.thrift.TUniqueId; -import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; public class CloudGroupCommitPlanner extends GroupCommitPlanner { private static final Logger LOG = LogManager.getLogger(CloudGroupCommitPlanner.class); @@ -50,36 +44,11 @@ public CloudGroupCommitPlanner(Database db, OlapTable table, List target @Override protected void selectBackends(ConnectContext ctx) throws DdlException { - backend = ctx.getInsertGroupCommit(this.table.getId()); - if (backend != null && backend.isAlive() && !backend.isDecommissioned() - && backend.getCloudClusterName().equals(ctx.getCloudCluster())) { - return; + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(this.table.getId(), ctx, true); + } catch (LoadException e) { + throw new DdlException("No suitable backend"); } - - String cluster = ctx.getCloudCluster(); - if (Strings.isNullOrEmpty(cluster)) { - ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); - } - - // select be - List backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) - .values().stream().collect(Collectors.toList()); - Collections.shuffle(backends); - for (Backend backend : backends) { - if (backend.isActive() && !backend.isDecommissioned()) { - this.backend = backend; - ctx.setInsertGroupCommit(this.table.getId(), backend); - LOG.debug("choose new be {}", backend.getId()); - return; - } - } - - List backendsInfo = backends.stream() - .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() - + ", decommission=" + be.isDecommissioned() + " }") - .collect(Collectors.toList()); - throw new DdlException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo); } - } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java new file mode 100644 index 00000000000000..787fbb06a2f7bb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import java.util.concurrent.atomic.AtomicLongArray; + +public class SlidingWindowCounter { + private final int windowSizeInSeconds; + private final int numberOfBuckets; + private final AtomicLongArray buckets; + private final AtomicLongArray bucketTimestamps; + + public SlidingWindowCounter(int windowSizeInSeconds) { + this.windowSizeInSeconds = windowSizeInSeconds; + this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 1 second + this.buckets = new AtomicLongArray(numberOfBuckets); + this.bucketTimestamps = new AtomicLongArray(numberOfBuckets); + } + + private int getCurrentBucketIndex() { + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + return (int) (currentTime % numberOfBuckets); + } + + private void updateCurrentBucket() { + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + int currentBucketIndex = getCurrentBucketIndex(); + long bucketTimestamp = bucketTimestamps.get(currentBucketIndex); + + if (currentTime - bucketTimestamp >= 1) { + buckets.set(currentBucketIndex, 0); + bucketTimestamps.set(currentBucketIndex, currentTime); + } + } + + public void add(long value) { + updateCurrentBucket(); + int bucketIndex = getCurrentBucketIndex(); + buckets.addAndGet(bucketIndex, value); + } + + public long get() { + updateCurrentBucket(); + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + long count = 0; + + for (int i = 0; i < numberOfBuckets; i++) { + if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) { + count += buckets.get(i); + } + } + return count; + } + + public String toString() { + return String.valueOf(get()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 5767e303f35ebf..47c495b2913f1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -143,11 +144,16 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse String sql = request.getHeader("sql"); LOG.info("streaming load sql={}", sql); boolean groupCommit = false; + long tableId = -1; String groupCommitStr = request.getHeader("group_commit"); if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { groupCommit = true; try { String[] pair = parseDbAndTb(sql); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); + Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + tableId = tbl.getId(); if (isGroupCommitBlock(pair[0], pair[1])) { String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE; return new RestBaseResult(msg); @@ -165,7 +171,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse } String label = request.getHeader(LABEL_KEY); - TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit); + TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit, tableId); LOG.info("redirect load action to destination={}, label: {}", redirectAddr.toString(), label); @@ -287,7 +293,9 @@ private Object executeWithoutPassword(HttpServletRequest request, return new RestBaseResult(e.getMessage()); } } else { - redirectAddr = selectRedirectBackend(request, groupCommit); + long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName) + .get()).getTable(tableName).get()).getId(); + redirectAddr = selectRedirectBackend(request, groupCommit, tableId); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", @@ -320,7 +328,7 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) { return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(request, false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1); LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}", redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation); @@ -352,7 +360,7 @@ private String getCloudClusterName(HttpServletRequest request) { return ""; } - private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit) + private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId) throws LoadException { long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); if (debugBackendId != -1L) { @@ -366,11 +374,12 @@ private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolea } return selectCloudRedirectBackend(cloudClusterName, request, groupCommit); } else { - return selectLocalRedirectBackend(groupCommit); + return selectLocalRedirectBackend(groupCommit, request, tableId); } } - private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws LoadException { + private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId) + throws LoadException { Backend backend = null; BeSelectionPolicy policy = null; String qualifiedUser = ConnectContext.get().getQualifiedUser(); @@ -390,12 +399,17 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws L throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - for (Long backendId : backendIds) { - Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId); - if (!candidateBe.isDecommissioned()) { - backend = candidateBe; - break; - } + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(request.getRemoteAddr()); + ctx.setThreadLocalInfo(); + + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx, false); + } catch (DdlException e) { + throw new RuntimeException(e); } } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); @@ -573,10 +587,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, return new RestBaseResult("No label selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(request, false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1); LOG.info("Redirect load action with auth token to destination={}," - + "stream: {}, db: {}, tbl: {}, label: {}", + + "stream: {}, db: {}, tbl: {}, label: {}", redirectAddr.toString(), isStreamLoad, dbName, tableName, label); URI urlObj = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index c4bf1e03c9cf58..ac365f9166f7b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -18,21 +18,38 @@ package org.apache.doris.load; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.util.SlidingWindowCounter; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.MasterOpExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.stream.Collectors; public class GroupCommitManager { @@ -40,6 +57,11 @@ public class GroupCommitManager { private Set blockedTableIds = new HashSet<>(); + // Table id to BE id map. Only for group commit. + private Map tableToBeMap = new ConcurrentHashMap<>(); + // BE id to pressure map. Only for group commit. + private Map tablePressureMap = new ConcurrentHashMap<>(); + public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); } @@ -163,4 +185,178 @@ public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { return size; } + public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud) + throws LoadException, DdlException { + // If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE + // can select a BE and return this BE id to follower FE. + if (!Env.getCurrentEnv().isMaster()) { + try { + long backendId = new MasterOpExecutor(context) + .getGroupCommitLoadBeId(tableId, context.getCloudCluster(), isCloud); + return Env.getCurrentSystemInfo().getBackend(backendId); + } catch (Exception e) { + throw new LoadException(e.getMessage()); + } + } else { + // Master FE will select BE by itself. + return Env.getCurrentSystemInfo() + .getBackend(selectBackendForGroupCommitInternal(tableId, context.getCloudCluster(), isCloud)); + } + } + + public long selectBackendForGroupCommitInternal(long tableId, String cluster, boolean isCloud) + throws LoadException, DdlException { + // Understanding Group Commit and Backend Selection Logic + // + // Group commit is a server-side technique used for batching data imports. + // The primary purpose of group commit is to enhance import performance by + // reducing the number of versions created for high-frequency, small-batch imports. + // Without batching, each import operation creates a separate version, similar to a rowset in an LSM Tree, + // which can consume significant compaction resources and degrade system performance. + // By batching data, fewer versions are generated from the same amount of data, + // thus minimizing compaction and improving performance. For detailed usage, + // you can refer to the Group Commit Manual + // (https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/) . + // + // The specific backend (BE) selection logic for group commits aims to + // direct data belonging to the same table to the same BE for batching. + // This is because group commit batches data imported to the same table + // on the same BE into a single version, which is then flushed periodically. + // For example, if data for the same table is distributed across three BEs, + // it will result in three versions. + // Conversely, if data for four different tables is directed to the same BE, + // it will create four versions. However, + // directing all data for the same table to a single BE will only produce one version. + // + // To optimize performance and avoid overloading a single BE, the strategy for selecting a BE works as follows: + // + // If a BE is already handling imports for table A and is not under significant load, + // the data is sent to this BE. + // If the BE is overloaded or if there is no existing record of a BE handling imports for table A, + // a BE is chosen at random. This BE is then recorded along with the mapping of table A and its load level. + // This approach ensures that group commits can effectively batch data together + // while managing the load on each BE efficiently. + return isCloud ? selectBackendForCloudGroupCommitInternal(tableId, cluster) + : selectBackendForLocalGroupCommitInternal(tableId); + } + + private long selectBackendForCloudGroupCommitInternal(long tableId, String cluster) + throws DdlException, LoadException { + LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + if (Strings.isNullOrEmpty(cluster)) { + ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); + } + + Long cachedBackendId = getCachedBackend(tableId); + if (cachedBackendId != null) { + return cachedBackendId; + } + + List backends = new ArrayList<>( + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) + .values()); + if (backends.isEmpty()) { + throw new LoadException("No alive backend"); + } + // If the cached backend is not active or decommissioned, select a random new backend. + Long randomBackendId = getRandomBackend(tableId, backends); + if (randomBackendId != null) { + return randomBackendId; + } + List backendsInfo = backends.stream() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + + ", decommission=" + be.isDecommissioned() + " }") + .collect(Collectors.toList()); + throw new LoadException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo); + } + + private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { + LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + Long cachedBackendId = getCachedBackend(tableId); + if (cachedBackendId != null) { + return cachedBackendId; + } + + List backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends()); + if (backends.isEmpty()) { + throw new LoadException("No alive backend"); + } + + // If the cached backend is not active or decommissioned, select a random new backend. + Long randomBackendId = getRandomBackend(tableId, backends); + if (randomBackendId != null) { + return randomBackendId; + } + List backendsInfo = backends.stream() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + + ", decommission=" + be.isDecommissioned() + " }") + .collect(Collectors.toList()); + throw new LoadException("No suitable backend " + ", backends = " + backendsInfo); + } + + @Nullable + private Long getCachedBackend(long tableId) { + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + if (tableToBeMap.containsKey(tableId)) { + if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { + Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); + if (backend.isActive() && !backend.isDecommissioned()) { + return backend.getId(); + } else { + tableToBeMap.remove(tableId); + } + } else { + tableToBeMap.remove(tableId); + } + } + return null; + } + + @Nullable + private Long getRandomBackend(long tableId, List backends) { + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + Collections.shuffle(backends); + for (Backend backend : backends) { + if (backend.isActive() && !backend.isDecommissioned()) { + tableToBeMap.put(tableId, backend.getId()); + tablePressureMap.put(tableId, + new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); + return backend.getId(); + } + } + return null; + } + + public void updateLoadData(long tableId, long receiveData) { + if (tableId == -1) { + LOG.warn("invalid table id: " + tableId); + } + if (!Env.getCurrentEnv().isMaster()) { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + // set user to ADMIN_USER, so that we can get the proper resource tag + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + try { + new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + updateLoadDataInternal(tableId, receiveData); + } + } + + public void updateLoadDataInternal(long tableId, long receiveData) { + if (tablePressureMap.containsKey(tableId)) { + tablePressureMap.get(tableId).add(receiveData); + LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData, + tablePressureMap.toString()); + } else { + LOG.warn("can not find backend id: {}", tableId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 984e8b0c8caa65..edf8251f97fab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -17,12 +17,15 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.LoadException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.NereidsPlanner; @@ -66,6 +69,16 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Unbo && (tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion)); } + @Override + protected void beforeExec() { + try { + this.coordinator.setGroupCommitBe(Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(table.getId(), ctx, false)); + } catch (LoadException | DdlException e) { + throw new RuntimeException(e); + } + } + @Override public void beginTransaction() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 09c2f72b5ca675..bc3759d4e62867 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.FormatOptions; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; @@ -58,7 +59,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -145,30 +145,13 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, return future.get(); } - // cloud override protected void selectBackends(ConnectContext ctx) throws DdlException { - backend = ctx.getInsertGroupCommit(this.table.getId()); - if (backend != null && backend.isAlive() && !backend.isDecommissioned()) { - return; + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(this.table.getId(), ctx, false); + } catch (LoadException e) { + throw new DdlException("No suitable backend"); } - - List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (allBackendIds.isEmpty()) { - throw new DdlException("No alive backend"); - } - Collections.shuffle(allBackendIds); - for (Long beId : allBackendIds) { - backend = Env.getCurrentSystemInfo().getBackend(beId); - if (!backend.isDecommissioned()) { - ctx.setInsertGroupCommit(this.table.getId(), backend); - if (LOG.isDebugEnabled()) { - LOG.debug("choose new be {}", backend.getId()); - } - return; - } - } - - throw new DdlException("No suitable backend"); } public Backend getBackend() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8459cb3b68ec78..aed352570bbaec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -268,6 +268,8 @@ public class Coordinator implements CoordInterface { private boolean useNereids = false; + private Backend groupCommitBackend; + // Runtime filter merge instance address and ID public TNetworkAddress runtimeFilterMergeAddr; public TUniqueId runtimeFilterMergeInstanceId; @@ -294,6 +296,10 @@ public class Coordinator implements CoordInterface { // fragmentid -> backendid private MarkedCountDownLatch fragmentsDoneLatch = null; + public void setGroupCommitBe(Backend backend) { + this.groupCommitBackend = backend; + } + public void setTWorkloadGroups(List tWorkloadGroups) { this.tWorkloadGroups = tWorkloadGroups; } @@ -1716,8 +1722,11 @@ protected void computeFragmentHosts() throws Exception { if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { Reference backendIdRef = new Reference(); TNetworkAddress execHostport; - if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || (isAllExternalScan - && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { + if (groupCommitBackend != null) { + execHostport = getGroupCommitBackend(addressToBackendID); + } else if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || ( + isAllExternalScan + && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { // 2 cases: // case 1: user set resource tag, we need to use the BE with the specified resource tags. // case 2: All scan nodes are external scan node, @@ -1908,7 +1917,9 @@ protected void computeFragmentHosts() throws Exception { if (params.instanceExecParams.isEmpty()) { Reference backendIdRef = new Reference(); TNetworkAddress execHostport; - if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() + if (groupCommitBackend != null) { + execHostport = getGroupCommitBackend(addressToBackendID); + } else if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() && !addressToBackendID.isEmpty()) { // In this case, we only use the BE where the replica selected by the tag is located to // execute this query. Otherwise, except for the scan node, the rest of the execution nodes @@ -1932,6 +1943,14 @@ protected void computeFragmentHosts() throws Exception { } } + private TNetworkAddress getGroupCommitBackend(Map addressToBackendID) { + // Used for Nereids planner Group commit insert BE select. + TNetworkAddress execHostport = new TNetworkAddress(groupCommitBackend.getHost(), + groupCommitBackend.getBePort()); + addressToBackendID.put(execHostport, groupCommitBackend.getId()); + return execHostport; + } + // Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship // between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter private void assignRuntimeFilterAddr() throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index a14379ddb77b9a..eaab01df55635a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TExprNode; +import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; @@ -101,6 +102,17 @@ public void syncJournal() throws Exception { waitOnReplaying(); } + public long getGroupCommitLoadBeId(long tableId, String cluster, boolean isCloud) throws Exception { + result = forward(buildGetGroupCommitLoadBeIdParmas(tableId, cluster, isCloud)); + waitOnReplaying(); + return result.groupCommitLoadBeId; + } + + public void updateLoadData(long tableId, long receiveData) throws Exception { + result = forward(buildUpdateLoadDataParams(tableId, receiveData)); + waitOnReplaying(); + } + public void cancel() throws Exception { TUniqueId queryId = ctx.queryId(); if (queryId == null) { @@ -232,6 +244,43 @@ private TMasterOpRequest buildSyncJournalParmas() { return params; } + private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId, String cluster, boolean isCloud) { + final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); + groupCommitParams.setGetGroupCommitLoadBeId(true); + groupCommitParams.setGroupCommitLoadTableId(tableId); + groupCommitParams.setCluster(cluster); + groupCommitParams.setIsCloud(isCloud); + + final TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setGroupCommitInfo(groupCommitParams); + params.setDb(ctx.getDatabase()); + params.setUser(ctx.getQualifiedUser()); + // just make the protocol happy + params.setSql(""); + return params; + } + + private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveData) { + final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); + groupCommitParams.setUpdateLoadData(true); + groupCommitParams.setTableId(tableId); + groupCommitParams.setReceiveData(receiveData); + + final TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setGroupCommitInfo(groupCommitParams); + params.setDb(ctx.getDatabase()); + params.setUser(ctx.getQualifiedUser()); + // just make the protocol happy + params.setSql(""); + return params; + } + public ByteBuffer getOutputPacket() { if (result == null) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8f34fed5e5f8e9..f9ea0635287cab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -56,6 +56,7 @@ import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.PatternMatcher; @@ -172,6 +173,7 @@ import org.apache.doris.thrift.TGetTablesResult; import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; import org.apache.doris.thrift.TGetTabletReplicaInfosResult; +import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TInitExternalCtlMetaRequest; import org.apache.doris.thrift.TInitExternalCtlMetaResult; import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; @@ -1031,6 +1033,28 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { result.setPacket("".getBytes()); return result; } + if (params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) { + final TGroupCommitInfo info = params.getGroupCommitInfo(); + final TMasterOpResult result = new TMasterOpResult(); + try { + result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, info.cluster, info.isCloud)); + } catch (LoadException | DdlException e) { + throw new TException(e.getMessage()); + } + // just make the protocol happy + result.setPacket("".getBytes()); + return result; + } + if (params.getGroupCommitInfo().isUpdateLoadData()) { + final TGroupCommitInfo info = params.getGroupCommitInfo(); + final TMasterOpResult result = new TMasterOpResult(); + Env.getCurrentEnv().getGroupCommitManager() + .updateLoadData(info.tableId, info.receiveData); + // just make the protocol happy + result.setPacket("".getBytes()); + return result; + } if (params.isSetCancelQeury() && params.isCancelQeury()) { if (!params.isSetQueryId()) { throw new TException("a query id is needed to cancel a query"); @@ -1629,6 +1653,13 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } } + if (request.groupCommit) { + try { + Env.getCurrentEnv().getGroupCommitManager().updateLoadData(request.table_id, request.receiveBytes); + } catch (Exception e) { + LOG.warn("Failed to update group commit load data, {}", e.getMessage()); + } + } // get database Env env = Env.getCurrentEnv(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 2867e15c3c1af9..871a1aee49b4ed 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -540,6 +540,16 @@ struct TTxnLoadInfo { 6: optional list subTxnInfos } +struct TGroupCommitInfo{ + 1: optional bool getGroupCommitLoadBeId + 2: optional i64 groupCommitLoadTableId + 3: optional string cluster + 4: optional bool isCloud + 5: optional bool updateLoadData + 6: optional i64 tableId + 7: optional i64 receiveData +} + struct TMasterOpRequest { 1: required string user 2: required string db @@ -573,6 +583,7 @@ struct TMasterOpRequest { 28: optional map user_variables // transaction load 29: optional TTxnLoadInfo txnLoadInfo + 30: optional TGroupCommitInfo groupCommitInfo // selectdb cloud 1000: optional string cloud_cluster @@ -606,6 +617,7 @@ struct TMasterOpResult { 8: optional list queryResultBufList; // transaction load 9: optional TTxnLoadInfo txnLoadInfo; + 10: optional i64 groupCommitLoadBeId; } struct TUpdateExportTaskStatusRequest { @@ -817,6 +829,9 @@ struct TLoadTxnCommitRequest { 15: optional list tbls 16: optional i64 table_id 17: optional string auth_code_uuid + 18: optional bool groupCommit + 19: optional i64 receiveBytes + 20: optional i64 backendId } struct TLoadTxnCommitResult {