From 5288bd6a49f90a8c904035bb66589394a06a1bff Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 31 Dec 2024 17:28:09 +0800 Subject: [PATCH] [fix](group commit) fix group commit use prepared statement and connect to observer --- .../commands/insert/InsertIntoTableCommand.java | 10 ++++++++++ .../insert/OlapGroupCommitInsertExecutor.java | 14 ++++++++++++++ .../java/org/apache/doris/qe/StmtExecutor.java | 2 ++ .../org/apache/doris/regression/suite/Suite.groovy | 6 +++++- .../insert_group_commit_with_exception.groovy | 4 ++-- .../insert_group_commit_with_prepare_stmt.groovy | 2 +- 6 files changed, 34 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 96d5d56a7e10ff..5d5dbca99fc807 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.StmtType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; @@ -452,6 +453,15 @@ public StmtType stmtType() { return StmtType.INSERT; } + @Override + public RedirectStatus toRedirectStatus() { + if (ConnectContext.get().isGroupCommit()) { + return RedirectStatus.NO_FORWARD; + } else { + return RedirectStatus.FORWARD_WITH_SYNC; + } + } + /** * this factory is used to delay create the AbstractInsertExecutor until the DistributePlan is generated * by NereidsPlanner diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 0f3e320edcd4bf..4cf4dfe536c212 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -66,6 +66,20 @@ public OlapGroupCommitInsertExecutor(ConnectContext ctx, Table table, this.groupCommitBackend = backend; } + /** + * check if the sql can run in group commit mode + * @param logicalPlan plan of sql + */ + public static void analyzeGroupCommit(LogicalPlan logicalPlan) { + ConnectContext ctx = ConnectContext.get(); + if (ctx.getSessionVariable().isEnableInsertGroupCommit() && logicalPlan instanceof InsertIntoTableCommand) { + LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery(); + TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); + OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, logicalQuery, + Optional.empty()); + } + } + protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, LogicalPlan logicalQuery, Optional insertCtx) { // The flag is set to false before execute sql, if it is true, this is a http stream diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index b155b468a7ec18..021e0a58cd9690 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -161,6 +161,7 @@ import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor; import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; @@ -724,6 +725,7 @@ private void executeByNereids(TUniqueId queryId) throws Exception { } if (logicalPlan instanceof Command) { if (logicalPlan instanceof Redirect) { + OlapGroupCommitInsertExecutor.analyzeGroupCommit(logicalPlan); redirectStatus = ((Redirect) logicalPlan).toRedirectStatus(); if (isForwardToMaster()) { // before forward to master, we also need to set profileType in this node diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 29f5631c4ca681..e232e46d55240f 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1407,8 +1407,12 @@ class Suite implements GroovyInterceptable { } String getServerPrepareJdbcUrl(String jdbcUrl, String database) { + return getServerPrepareJdbcUrl(jdbcUrl, database, true) + } + + String getServerPrepareJdbcUrl(String jdbcUrl, String database, boolean useMasterIp) { String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) - def sql_ip = getMasterIp() + def sql_ip = useMasterIp ? getMasterIp() : urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) def sql_port if (urlWithoutSchema.indexOf("/") >= 0) { // e.g: jdbc:mysql://locahost:8080/?a=b diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy index 166d329c455511..7a22039a41cbe9 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy @@ -111,12 +111,12 @@ suite("insert_group_commit_with_exception") { // prepare insert def db = context.config.defaultDb + "_insert_p0" - String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db) + String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db, false) try (Connection connection = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword)) { Statement statement = connection.createStatement(); statement.execute("use ${db}"); - statement.execute("set group_commit = eventual_consistency;"); + statement.execute("set group_commit = sync_mode"); statement.execute("set enable_server_side_prepared_statement = true") // without column try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) { diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy index 7f2919f8118d10..e93e157aa5d7af 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -127,7 +127,7 @@ suite("insert_group_commit_with_prepare_stmt") { return serverStatementIds } - def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb) + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb, false) logger.info("url: " + url) def result1 = connect(user, password, url + "&sessionVariables=group_commit=async_mode") {