From b963b0963acc680b299bec455600681e94fca2c4 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Wed, 28 Aug 2024 23:10:44 +0800 Subject: [PATCH 1/3] [Fix](group commit) Fix cloud group commit be select strategy (#39986) In #35558, we optimized be select for group commit. However, we forgot to apply this strategy to cloud. This PR applys it. --- .../apache/doris/httpv2/rest/LoadAction.java | 134 ++++++++++++------ 1 file changed, 87 insertions(+), 47 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 030aced411f3e6..2865f86c8f6079 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -70,7 +70,7 @@ public class LoadAction extends RestBaseController { @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT) public Object load(HttpServletRequest request, HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -87,20 +87,29 @@ public Object load(HttpServletRequest request, HttpServletResponse response, @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT) public Object streamLoad(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request)); boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { - groupCommit = true; - try { - if (isGroupCommitBlock(db, table)) { - String msg = "insert table " + table + " is blocked on schema change"; - return new RestBaseResult(msg); + if (groupCommitStr != null) { + if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode") + && !groupCommitStr.equalsIgnoreCase("off_mode")) { + return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`."); + } + if (!groupCommitStr.equalsIgnoreCase("off_mode")) { + groupCommit = true; + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + try { + if (isGroupCommitBlock(db, table)) { + String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE; + return new RestBaseResult(msg); + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); + } } - } catch (Exception e) { - LOG.info("exception:" + e); - return new RestBaseResult(e.getMessage()); } } if (needRedirect(request.getScheme())) { @@ -131,21 +140,32 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse boolean groupCommit = false; long tableId = -1; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { - groupCommit = true; - try { - String[] pair = parseDbAndTb(sql); - Database db = Env.getCurrentInternalCatalog() - .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); - Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); - tableId = tbl.getId(); - if (isGroupCommitBlock(pair[0], pair[1])) { - String msg = "insert table " + pair[1] + " is blocked on schema change"; - return new RestBaseResult(msg); + if (groupCommitStr != null) { + if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode") + && !groupCommitStr.equalsIgnoreCase("off_mode")) { + return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`."); + } + if (!groupCommitStr.equalsIgnoreCase("off_mode")) { + try { + groupCommit = true; + String[] pair = parseDbAndTb(sql); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); + Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + tableId = tbl.getId(); + + // async mode needs to write WAL, we need to block load during waiting WAL. + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + if (isGroupCommitBlock(pair[0], pair[1])) { + String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE; + return new RestBaseResult(msg); + } + + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); } - } catch (Exception e) { - LOG.info("exception:" + e); - return new RestBaseResult(e.getMessage()); } } executeCheckPassword(request, response); @@ -207,8 +227,9 @@ private String[] parseDbAndTb(String sql) throws Exception { @RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT) public Object streamLoad2PC(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db) { + LOG.info("streamload action 2PC, db: {}, headers: {}", db, getAllHeaders(request)); if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -219,9 +240,10 @@ public Object streamLoad2PC(HttpServletRequest request, @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT) public Object streamLoad2PC_table(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, - @PathVariable(value = TABLE_KEY) String table) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db, + @PathVariable(value = TABLE_KEY) String table) { + LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request)); if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -361,21 +383,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - ctx.setThreadLocalInfo(); - ctx.setRemoteIP(request.getRemoteAddr()); - // We set this variable to fulfill required field 'user' in - // TMasterOpRequest(FrontendService.thrift) - ctx.setQualifiedUser(Auth.ADMIN_USER); - ctx.setThreadLocalInfo(); - - try { - backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(tableId, ctx, false); - } catch (DdlException e) { - throw new RuntimeException(e); - } + backend = selectBackendForGroupCommit("", request, tableId); } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); } @@ -402,7 +410,7 @@ private boolean checkClusterToken(String token) { // temporarily addressing the users' needs for audit logs. // So this function is not widely tested under general scenario private Object executeWithClusterToken(HttpServletRequest request, String db, - String table, boolean isStreamLoad) { + String table, boolean isStreamLoad) { try { ConnectContext ctx = new ConnectContext(); ctx.setEnv(Env.getCurrentEnv()); @@ -473,4 +481,36 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, ConnectContext.remove(); } } + + private String getAllHeaders(HttpServletRequest request) { + StringBuilder headers = new StringBuilder(); + Enumeration headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String headerName = headerNames.nextElement(); + String headerValue = request.getHeader(headerName); + headers.append(headerName).append(":").append(headerValue).append(", "); + } + return headers.toString(); + } + + private Backend selectBackendForGroupCommit(String clusterName, HttpServletRequest req, long tableId) + throws LoadException { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(req.getRemoteAddr()); + // We set this variable to fulfill required field 'user' in + // TMasterOpRequest(FrontendService.thrift) + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + + Backend backend = null; + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx, isCloud); + } catch (DdlException e) { + throw new LoadException(e.getMessage(), e); + } + return backend; + } } From 89c002d35a347b0dfdfb89bc9331308038b63270 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 3 Sep 2024 15:46:10 +0800 Subject: [PATCH 2/3] 1 --- .../apache/doris/httpv2/rest/LoadAction.java | 24 ++++--------------- .../apache/doris/load/GroupCommitManager.java | 2 +- .../doris/planner/GroupCommitPlanner.java | 10 -------- 3 files changed, 6 insertions(+), 30 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 2865f86c8f6079..c3a77a13b3dc36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -89,7 +89,6 @@ public Object load(HttpServletRequest request, HttpServletResponse response, public Object streamLoad(HttpServletRequest request, HttpServletResponse response, @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { - LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request)); boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); if (groupCommitStr != null) { @@ -102,7 +101,7 @@ public Object streamLoad(HttpServletRequest request, if (groupCommitStr.equalsIgnoreCase("async_mode")) { try { if (isGroupCommitBlock(db, table)) { - String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE; + String msg = "insert table " + table + " is blocked on schema change"; return new RestBaseResult(msg); } } catch (Exception e) { @@ -157,7 +156,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse // async mode needs to write WAL, we need to block load during waiting WAL. if (groupCommitStr.equalsIgnoreCase("async_mode")) { if (isGroupCommitBlock(pair[0], pair[1])) { - String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE; + String msg = "insert table " + pair[1] + " is blocked on schema change"; return new RestBaseResult(msg); } @@ -229,7 +228,6 @@ private String[] parseDbAndTb(String sql) throws Exception { public Object streamLoad2PC(HttpServletRequest request, HttpServletResponse response, @PathVariable(value = DB_KEY) String db) { - LOG.info("streamload action 2PC, db: {}, headers: {}", db, getAllHeaders(request)); if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -243,7 +241,6 @@ public Object streamLoad2PC_table(HttpServletRequest request, HttpServletResponse response, @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { - LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request)); if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -383,7 +380,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - backend = selectBackendForGroupCommit("", request, tableId); + backend = selectBackendForGroupCommit(request, tableId); } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); } @@ -482,18 +479,7 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, } } - private String getAllHeaders(HttpServletRequest request) { - StringBuilder headers = new StringBuilder(); - Enumeration headerNames = request.getHeaderNames(); - while (headerNames.hasMoreElements()) { - String headerName = headerNames.nextElement(); - String headerValue = request.getHeader(headerName); - headers.append(headerName).append(":").append(headerValue).append(", "); - } - return headers.toString(); - } - - private Backend selectBackendForGroupCommit(String clusterName, HttpServletRequest req, long tableId) + private Backend selectBackendForGroupCommit(HttpServletRequest req, long tableId) throws LoadException { ConnectContext ctx = new ConnectContext(); ctx.setEnv(Env.getCurrentEnv()); @@ -507,7 +493,7 @@ private Backend selectBackendForGroupCommit(String clusterName, HttpServletReque Backend backend = null; try { backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(tableId, ctx, isCloud); + .selectBackendForGroupCommit(tableId, ctx); } catch (DdlException e) { throw new LoadException(e.getMessage(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index f8c37d647abe53..656a9a8a0ae7f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -181,7 +181,7 @@ private long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { return size; } - public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud) + public Backend selectBackendForGroupCommit(long tableId, ConnectContext context) throws LoadException, DdlException { // If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE // can select a BE and return this BE id to follower FE. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 0b051aeb88806b..6e235443bc3a4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -30,7 +30,6 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.FormatOptions; -import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; @@ -203,15 +202,6 @@ private static void processExprVal(Expr expr, InternalService.PDataRow.Builder r } } - protected void selectBackends(ConnectContext ctx) throws DdlException { - try { - backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(this.table.getId(), ctx, false); - } catch (LoadException e) { - throw new DdlException("No suitable backend"); - } - } - public Backend getBackend() { return backend; } From e6c345b6ebd0583782c900922c3ee70b2b7b3a9a Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 4 Sep 2024 15:36:43 +0800 Subject: [PATCH 3/3] 2 --- .../apache/doris/planner/GroupCommitPlanner.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 6e235443bc3a4d..9b1044b2f7ef88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.FormatOptions; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; @@ -133,7 +134,7 @@ public GroupCommitPlanner(Database db, OlapTable table, List targetColum public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, List rows) throws DdlException, RpcException, ExecutionException, InterruptedException { - backend = ctx.getInsertGroupCommit(this.table.getId()); + selectBackends(ctx); if (backend == null || !backend.isAlive() || backend.isDecommissioned()) { List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); if (allBackendIds.isEmpty()) { @@ -161,7 +162,7 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, .setRequest(execPlanFragmentParamsBytes) .setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build()) .setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo) - .build()).addAllData(rows) + .build()).addAllData(rows) .build(); Future future = BackendServiceProxy.getInstance() .groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); @@ -202,6 +203,15 @@ private static void processExprVal(Expr expr, InternalService.PDataRow.Builder r } } + protected void selectBackends(ConnectContext ctx) throws DdlException { + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(this.table.getId(), ctx); + } catch (LoadException e) { + throw new DdlException("No suitable backend"); + } + } + public Backend getBackend() { return backend; }