diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index 8fcba76f0f7c8b..f15cd81bc35cad 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -41,6 +41,10 @@ class Config { public String jdbcPassword public String defaultDb + public String ccrDownstreamUrl + public String ccrDownstreamUser + public String ccrDownstreamPassword + public String feSourceThriftAddress public String feTargetThriftAddress public String feSyncerUser @@ -309,6 +313,9 @@ class Config { configToString(obj.sslCertificatePath) ) + config.ccrDownstreamUrl = configToString(obj.ccrDownstreamUrl) + config.ccrDownstreamUser = configToString(obj.ccrDownstreamUser) + config.ccrDownstreamPassword = configToString(obj.ccrDownstreamPassword) config.image = configToString(obj.image) config.dockerEndDeleteFiles = configToBoolean(obj.dockerEndDeleteFiles) config.excludeDockerTest = configToBoolean(obj.excludeDockerTest) @@ -533,6 +540,49 @@ class Config { return DriverManager.getConnection(dbUrl, jdbcUser, jdbcPassword) } + public static String buildUrlWithDbImpl(String jdbcUrl, String dbName) { + String urlWithDb = jdbcUrl + String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + if (urlWithoutSchema.indexOf("/") >= 0) { + if (jdbcUrl.contains("?")) { + // e.g: jdbc:mysql://locahost:8080/?a=b + urlWithDb = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("?")) + urlWithDb = urlWithDb.substring(0, urlWithDb.lastIndexOf("/")) + urlWithDb += ("/" + dbName) + jdbcUrl.substring(jdbcUrl.lastIndexOf("?")) + } else { + // e.g: jdbc:mysql://locahost:8080/ + urlWithDb += dbName + } + } else { + // e.g: jdbc:mysql://locahost:8080 + urlWithDb += ("/" + dbName) + } + + return urlWithDb + } + + Connection getConnectionByArrowFlightSql(String dbName) { + Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver") + String arrowFlightSqlHost = otherConfigs.get("extArrowFlightSqlHost") + String arrowFlightSqlPort = otherConfigs.get("extArrowFlightSqlPort") + String arrowFlightSqlUrl = "jdbc:arrow-flight-sql://${arrowFlightSqlHost}:${arrowFlightSqlPort}" + + "/?useServerPrepStmts=false&useSSL=false&useEncryption=false" + // TODO jdbc:arrow-flight-sql not support connect db + String dbUrl = buildUrlWithDbImpl(arrowFlightSqlUrl, dbName) + tryCreateDbIfNotExist(dbName) + log.info("connect to ${dbUrl}".toString()) + String arrowFlightSqlJdbcUser = otherConfigs.get("extArrowFlightSqlUser") + String arrowFlightSqlJdbcPassword = otherConfigs.get("extArrowFlightSqlPassword") + return DriverManager.getConnection(dbUrl, arrowFlightSqlJdbcUser, arrowFlightSqlJdbcPassword) + } + + Connection getDownstreamConnectionByDbName(String dbName) { + String dbUrl = buildUrlWithDb(ccrDownstreamUrl, dbName) + tryCreateDbIfNotExist(dbName) + log.info("connect to ${dbUrl}".toString()) + return DriverManager.getConnection(dbUrl, ccrDownstreamUser, ccrDownstreamPassword) + } + String getDbNameByFile(File suiteFile) { String dir = new File(suitePath).relativePath(suiteFile.parentFile) // We put sql files under sql dir, so dbs and tables used by cases diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index f347a6cc9413ac..a7f0a94c7c833a 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -51,6 +51,7 @@ import java.util.stream.Collectors import java.util.stream.LongStream import static org.apache.doris.regression.util.DataUtils.sortByToString +import java.sql.Connection import java.sql.DriverManager import java.sql.PreparedStatement import java.sql.ResultSetMetaData @@ -259,9 +260,34 @@ class Suite implements GroovyInterceptable { return result } - def sql_return_maparray(String sqlStr) { + List> insert_into_sql_impl(Connection conn, String sqlStr, int num) { + logger.info("insert into " + num + " records") + def (result, meta) = JdbcUtils.executeToList(conn, sqlStr) + return result + } + + List> jdbc_insert_into_sql(String sqlStr, int num) { + return insert_into_sql_impl(context.getConnection(), sqlStr, num) + } + + List> arrow_flight_insert_into_sql(String sqlStr, int num) { + return insert_into_sql_impl(context.getArrowFlightSqlConnection(), (String) ("USE ${context.dbName};" + sqlStr), num) + } + + List> insert_into_sql(String sqlStr, int num) { + if (context.useArrowFlightSql()) { + return arrow_flight_insert_into_sql(sqlStr, num) + } else { + return jdbc_insert_into_sql(sqlStr, num) + } + } + + def sql_return_maparray(String sqlStr, Connection conn = null) { logger.info("Execute sql: ${sqlStr}".toString()) - def (result, meta) = JdbcUtils.executeToList(context.getConnection(), sqlStr) + if (conn == null) { + conn = context.getConnection() + } + def (result, meta) = JdbcUtils.executeToList(conn, sqlStr) // get all column names as list List columnNames = new ArrayList<>() @@ -516,6 +542,11 @@ class Suite implements GroovyInterceptable { return lines; } + + Connection getTargetConnection() { + return context.getTargetConnection(this) + } + boolean deleteFile(String filePath) { def file = new File(filePath) file.delete() diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy index 9cc21faae0777c..b2a2cb7ba1e393 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy @@ -152,9 +152,14 @@ class SuiteContext implements Closeable { return subJdbc.substring(0, subJdbc.indexOf("/")) } - private Map getSpec() { + private String getDownstreamJdbcNetInfo() { + String subJdbc = config.ccrDownstreamUrl.substring(config.ccrDownstreamUrl.indexOf("://") + 3) + return subJdbc.substring(0, subJdbc.indexOf("/")) + } + + private Map getSpec(String[] jdbc) { Map spec = Maps.newHashMap() - String[] jdbc = getJdbcNetInfo().split(":") + spec.put("host", jdbc[0]) spec.put("port", jdbc[1]) spec.put("user", config.feSyncerUser) @@ -165,7 +170,8 @@ class SuiteContext implements Closeable { } Map getSrcSpec() { - Map spec = getSpec() + String[] jdbc = getJdbcNetInfo().split(":") + Map spec = getSpec(jdbc) spec.put("thrift_port", config.feSourceThriftNetworkAddress.port.toString()) spec.put("database", dbName) @@ -173,7 +179,8 @@ class SuiteContext implements Closeable { } Map getDestSpec() { - Map spec = getSpec() + String[] jdbc = getDownstreamJdbcNetInfo().split(":") + Map spec = getSpec(jdbc) spec.put("thrift_port", config.feTargetThriftNetworkAddress.port.toString()) spec.put("database", "TEST_" + dbName) @@ -203,7 +210,7 @@ class SuiteContext implements Closeable { Connection getTargetConnection(Suite suite) { def context = getSyncer(suite).context if (context.targetConnection == null) { - context.targetConnection = config.getConnectionByDbName("TEST_" + dbName) + context.targetConnection = config.getDownstreamConnectionByDbName("TEST_" + dbName) } return context.targetConnection } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy index 388904ec2da4d2..92214532fdcf4e 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy @@ -147,8 +147,8 @@ class SyncerContext { return info } - FrontendClientImpl getMasterFrontClient() { - def result = suite.sql_return_maparray "select Host, RpcPort, IsMaster from frontends();" + FrontendClientImpl getMasterFrontClient(Connection conn) { + def result = suite.sql_return_maparray("select Host, RpcPort, IsMaster from frontends();", conn) logger.info("get master fe: ${result}") def masterHost = "" @@ -179,7 +179,7 @@ class SyncerContext { FrontendClientImpl getTargetFrontClient() { if (targetFrontendClient == null) { - targetFrontendClient = getMasterFrontClient() + targetFrontendClient = getMasterFrontClient(suite.getTargetConnection()) } return targetFrontendClient } diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index 8cfe8811993c0f..615cb4f81c09d6 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -25,6 +25,11 @@ targetJdbcUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLo jdbcUser = "root" jdbcPassword = "" +ccrDownstreamUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLoadLocalInfile=true" +ccrDownstreamUser = "root" +ccrDownstreamPassword = "" +ccrDownstreamFeThriftAddress = "127.0.0.1:9020" + feSourceThriftAddress = "127.0.0.1:9020" feTargetThriftAddress = "127.0.0.1:9020" feSyncerUser = "root" diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy b/regression-test/pipeline/p1/conf/regression-conf.groovy index 8669e8fb5bd3ed..98f0d9173b218f 100644 --- a/regression-test/pipeline/p1/conf/regression-conf.groovy +++ b/regression-test/pipeline/p1/conf/regression-conf.groovy @@ -25,6 +25,11 @@ targetJdbcUrl = "jdbc:mysql://172.19.0.2:9132/?useLocalSessionState=true&allowLo jdbcUser = "root" jdbcPassword = "" +ccrDownstreamUrl = "jdbc:mysql://172.19.0.2:9132/?useLocalSessionState=true&allowLoadLocalInfile=true" +ccrDownstreamUser = "root" +ccrDownstreamPassword = "" +ccrDownstreamFeThriftAddress = "127.0.0.1:9020" + feSourceThriftAddress = "127.0.0.1:9020" feTargetThriftAddress = "127.0.0.1:9020" feSyncerUser = "root"