Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class Config {
public String jdbcPassword
public String defaultDb

public String ccrDownstreamUrl
public String ccrDownstreamUser
public String ccrDownstreamPassword

public String feSourceThriftAddress
public String feTargetThriftAddress
public String feSyncerUser
Expand Down Expand Up @@ -309,6 +313,9 @@ class Config {
configToString(obj.sslCertificatePath)
)

config.ccrDownstreamUrl = configToString(obj.ccrDownstreamUrl)
config.ccrDownstreamUser = configToString(obj.ccrDownstreamUser)
config.ccrDownstreamPassword = configToString(obj.ccrDownstreamPassword)
config.image = configToString(obj.image)
config.dockerEndDeleteFiles = configToBoolean(obj.dockerEndDeleteFiles)
config.excludeDockerTest = configToBoolean(obj.excludeDockerTest)
Expand Down Expand Up @@ -533,6 +540,49 @@ class Config {
return DriverManager.getConnection(dbUrl, jdbcUser, jdbcPassword)
}

public static String buildUrlWithDbImpl(String jdbcUrl, String dbName) {
String urlWithDb = jdbcUrl
String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
if (urlWithoutSchema.indexOf("/") >= 0) {
if (jdbcUrl.contains("?")) {
// e.g: jdbc:mysql://locahost:8080/?a=b
urlWithDb = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("?"))
urlWithDb = urlWithDb.substring(0, urlWithDb.lastIndexOf("/"))
urlWithDb += ("/" + dbName) + jdbcUrl.substring(jdbcUrl.lastIndexOf("?"))
} else {
// e.g: jdbc:mysql://locahost:8080/
urlWithDb += dbName
}
} else {
// e.g: jdbc:mysql://locahost:8080
urlWithDb += ("/" + dbName)
}

return urlWithDb
}

Connection getConnectionByArrowFlightSql(String dbName) {
Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver")
String arrowFlightSqlHost = otherConfigs.get("extArrowFlightSqlHost")
String arrowFlightSqlPort = otherConfigs.get("extArrowFlightSqlPort")
String arrowFlightSqlUrl = "jdbc:arrow-flight-sql://${arrowFlightSqlHost}:${arrowFlightSqlPort}" +
"/?useServerPrepStmts=false&useSSL=false&useEncryption=false"
// TODO jdbc:arrow-flight-sql not support connect db
String dbUrl = buildUrlWithDbImpl(arrowFlightSqlUrl, dbName)
tryCreateDbIfNotExist(dbName)
log.info("connect to ${dbUrl}".toString())
String arrowFlightSqlJdbcUser = otherConfigs.get("extArrowFlightSqlUser")
String arrowFlightSqlJdbcPassword = otherConfigs.get("extArrowFlightSqlPassword")
return DriverManager.getConnection(dbUrl, arrowFlightSqlJdbcUser, arrowFlightSqlJdbcPassword)
}

Connection getDownstreamConnectionByDbName(String dbName) {
String dbUrl = buildUrlWithDb(ccrDownstreamUrl, dbName)
tryCreateDbIfNotExist(dbName)
log.info("connect to ${dbUrl}".toString())
return DriverManager.getConnection(dbUrl, ccrDownstreamUser, ccrDownstreamPassword)
}

String getDbNameByFile(File suiteFile) {
String dir = new File(suitePath).relativePath(suiteFile.parentFile)
// We put sql files under sql dir, so dbs and tables used by cases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import java.util.stream.Collectors
import java.util.stream.LongStream
import static org.apache.doris.regression.util.DataUtils.sortByToString

import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.ResultSetMetaData
Expand Down Expand Up @@ -259,9 +260,34 @@ class Suite implements GroovyInterceptable {
return result
}

def sql_return_maparray(String sqlStr) {
List<List<Object>> insert_into_sql_impl(Connection conn, String sqlStr, int num) {
logger.info("insert into " + num + " records")
def (result, meta) = JdbcUtils.executeToList(conn, sqlStr)
return result
}

List<List<Object>> jdbc_insert_into_sql(String sqlStr, int num) {
return insert_into_sql_impl(context.getConnection(), sqlStr, num)
}

List<List<Object>> arrow_flight_insert_into_sql(String sqlStr, int num) {
return insert_into_sql_impl(context.getArrowFlightSqlConnection(), (String) ("USE ${context.dbName};" + sqlStr), num)
}

List<List<Object>> insert_into_sql(String sqlStr, int num) {
if (context.useArrowFlightSql()) {
return arrow_flight_insert_into_sql(sqlStr, num)
} else {
return jdbc_insert_into_sql(sqlStr, num)
}
}

def sql_return_maparray(String sqlStr, Connection conn = null) {
logger.info("Execute sql: ${sqlStr}".toString())
def (result, meta) = JdbcUtils.executeToList(context.getConnection(), sqlStr)
if (conn == null) {
conn = context.getConnection()
}
def (result, meta) = JdbcUtils.executeToList(conn, sqlStr)

// get all column names as list
List<String> columnNames = new ArrayList<>()
Expand Down Expand Up @@ -516,6 +542,11 @@ class Suite implements GroovyInterceptable {
return lines;
}


Connection getTargetConnection() {
return context.getTargetConnection(this)
}

boolean deleteFile(String filePath) {
def file = new File(filePath)
file.delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,14 @@ class SuiteContext implements Closeable {
return subJdbc.substring(0, subJdbc.indexOf("/"))
}

private Map<String, String> getSpec() {
private String getDownstreamJdbcNetInfo() {
String subJdbc = config.ccrDownstreamUrl.substring(config.ccrDownstreamUrl.indexOf("://") + 3)
return subJdbc.substring(0, subJdbc.indexOf("/"))
}

private Map<String, String> getSpec(String[] jdbc) {
Map<String, String> spec = Maps.newHashMap()
String[] jdbc = getJdbcNetInfo().split(":")

spec.put("host", jdbc[0])
spec.put("port", jdbc[1])
spec.put("user", config.feSyncerUser)
Expand All @@ -165,15 +170,17 @@ class SuiteContext implements Closeable {
}

Map<String, String> getSrcSpec() {
Map<String, String> spec = getSpec()
String[] jdbc = getJdbcNetInfo().split(":")
Map<String, String> spec = getSpec(jdbc)
spec.put("thrift_port", config.feSourceThriftNetworkAddress.port.toString())
spec.put("database", dbName)

return spec
}

Map<String, String> getDestSpec() {
Map<String, String> spec = getSpec()
String[] jdbc = getDownstreamJdbcNetInfo().split(":")
Map<String, String> spec = getSpec(jdbc)
spec.put("thrift_port", config.feTargetThriftNetworkAddress.port.toString())
spec.put("database", "TEST_" + dbName)

Expand Down Expand Up @@ -203,7 +210,7 @@ class SuiteContext implements Closeable {
Connection getTargetConnection(Suite suite) {
def context = getSyncer(suite).context
if (context.targetConnection == null) {
context.targetConnection = config.getConnectionByDbName("TEST_" + dbName)
context.targetConnection = config.getDownstreamConnectionByDbName("TEST_" + dbName)
}
return context.targetConnection
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ class SyncerContext {
return info
}

FrontendClientImpl getMasterFrontClient() {
def result = suite.sql_return_maparray "select Host, RpcPort, IsMaster from frontends();"
FrontendClientImpl getMasterFrontClient(Connection conn) {
def result = suite.sql_return_maparray("select Host, RpcPort, IsMaster from frontends();", conn)
logger.info("get master fe: ${result}")

def masterHost = ""
Expand Down Expand Up @@ -179,7 +179,7 @@ class SyncerContext {

FrontendClientImpl getTargetFrontClient() {
if (targetFrontendClient == null) {
targetFrontendClient = getMasterFrontClient()
targetFrontendClient = getMasterFrontClient(suite.getTargetConnection())
}
return targetFrontendClient
}
Expand Down
5 changes: 5 additions & 0 deletions regression-test/pipeline/p0/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ targetJdbcUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLo
jdbcUser = "root"
jdbcPassword = ""

ccrDownstreamUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLoadLocalInfile=true"
ccrDownstreamUser = "root"
ccrDownstreamPassword = ""
ccrDownstreamFeThriftAddress = "127.0.0.1:9020"

feSourceThriftAddress = "127.0.0.1:9020"
feTargetThriftAddress = "127.0.0.1:9020"
feSyncerUser = "root"
Expand Down
5 changes: 5 additions & 0 deletions regression-test/pipeline/p1/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ targetJdbcUrl = "jdbc:mysql://172.19.0.2:9132/?useLocalSessionState=true&allowLo
jdbcUser = "root"
jdbcPassword = ""

ccrDownstreamUrl = "jdbc:mysql://172.19.0.2:9132/?useLocalSessionState=true&allowLoadLocalInfile=true"
ccrDownstreamUser = "root"
ccrDownstreamPassword = ""
ccrDownstreamFeThriftAddress = "127.0.0.1:9020"

feSourceThriftAddress = "127.0.0.1:9020"
feTargetThriftAddress = "127.0.0.1:9020"
feSyncerUser = "root"
Expand Down