Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions regression-test/plugins/plugin_curl_requester.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,99 @@
import org.apache.doris.regression.suite.Suite
import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.NodeType
@Grab(group='org.apache.httpcomponents', module='httpclient', version='4.5.13')
import org.apache.http.client.methods.*
import org.apache.http.impl.client.*
import org.apache.http.util.EntityUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.conn.ConnectTimeoutException
import org.apache.http.conn.HttpHostConnectException
import org.codehaus.groovy.runtime.IOGroovyMethods

Suite.metaClass.http_client = { String method, String url /* param */ ->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
throw new Exception("Invalid method: ${method}")
}
if (!url || !(url =~ /^https?:\/\/.+/)) {
throw new Exception("Invalid url: ${url}")
}

Integer timeout = 300 // seconds
Integer maxRetries = 10
Integer retryCount = 0
Integer sleepTime = 1000 // milliseconds

logger.info("HTTP request: ${method} ${url}")

CloseableHttpClient httpClient = HttpClients.custom()
.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true))
.build()

int code
String err
String out

try {
while (retryCount < maxRetries) {
HttpRequestBase request
if (method == "GET") {
request = new HttpGet(url)
} else if (method == "POST") {
request = new HttpPost(url)
}

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000)
.build()
request.setConfig(requestConfig)

try {
CloseableHttpResponse response = httpClient.execute(request)
try {
code = response.getStatusLine().getStatusCode()
out = EntityUtils.toString(response.getEntity())

if (code >= 200 && code < 300) {
code = 0 // to be compatible with the old curl function
err = ""
return [code, out, err]
} else if (code == 500) {
return [code, out, "Internal Server Error"]
} else {
logger.warn("HTTP request failed with status code ${code}, response ${out}, retrying (${++retryCount}/${maxRetries})")
}
} finally {
response.close()
}
} catch (ConnectTimeoutException | HttpHostConnectException e) {
logger.warn("Connection failed, retrying (${++retryCount}/${maxRetries}): ${e.message}")
} catch (SocketTimeoutException e) {
timeout = timeout + 10
logger.warn("Read timed out, retrying (${++retryCount}/${maxRetries}): ${e.message}")
} catch (Exception e) {
code = 500 // Internal Server Error
logger.error("Error executing HTTP request: ${e.message}")
err = e.message
return [code, out, err]
}

sleep(sleepTime)
sleepTime = Math.min(sleepTime * 2, 60000)
}

logger.error("HTTP request failed after ${maxRetries} attempts")
err = "Failed after ${maxRetries} attempts"
code = 500 // Internal Server Error
return [code, out, err]
} finally {
httpClient.close()
}
}

logger.info("Added 'http_client' function to Suite")

Suite.metaClass.curl = { String method, String url /* param */->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST")
Expand Down Expand Up @@ -161,3 +252,43 @@ Suite.metaClass.be_report_task = { String ip, int port ->
}

logger.info("Added 'be_report_task' function to Suite")

// check nested index file api
Suite.metaClass.check_nested_index_file = { ip, port, tablet_id, expected_rowsets_count, expected_indices_count, format ->
def (code, out, err) = http_client("GET", String.format("http://%s:%s/api/show_nested_index_file?tablet_id=%s", ip, port, tablet_id))
logger.info("Run show_nested_index_file_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
// only when the expected_indices_count is 0, the tablet may not have the index file.
if (code == 500 && expected_indices_count == 0) {
assertEquals("E-6003", parseJson(out.trim()).status)
assertTrue(parseJson(out.trim()).msg.contains("not found"))
return
}
assertTrue(code == 0)
assertEquals(tablet_id, parseJson(out.trim()).tablet_id.toString())
def rowsets_count = parseJson(out.trim()).rowsets.size();
assertEquals(expected_rowsets_count, rowsets_count)
def index_files_count = 0
def segment_files_count = 0
for (def rowset in parseJson(out.trim()).rowsets) {
assertEquals(format, rowset.index_storage_format)
for (int i = 0; i < rowset.segments.size(); i++) {
def segment = rowset.segments[i]
assertEquals(i, segment.segment_id)
def indices_count = segment.indices.size()
assertEquals(expected_indices_count, indices_count)
if (format == "V1") {
index_files_count += indices_count
} else {
index_files_count++
}
}
segment_files_count += rowset.segments.size()
}
if (format == "V1") {
assertEquals(index_files_count, segment_files_count * expected_indices_count)
} else {
assertEquals(index_files_count, segment_files_count)
}
}

logger.info("Added 'check_nested_index_file' function to Suite")
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_add_build_index_with_format_v2", "inverted_index_format_v2"){
def tableName = "test_add_build_index_with_format_v2"

def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
Expand Down Expand Up @@ -115,22 +112,15 @@ suite("test_add_build_index_with_format_v2", "inverted_index_format_v2"){
String backend_id = tablets[0].BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
def (code, out, err) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(code == 0)
assertTrue(out.contains("crc_value"))
assertTrue(out.contains("used_time_ms"))
assertEquals("0", parseJson(out.trim()).start_version)
assertEquals("7", parseJson(out.trim()).end_version)
assertEquals("7", parseJson(out.trim()).rowset_count)

// cloud mode is directly schema change, local mode is light schema change.
// cloud mode is 12, local mode is 6
if (isCloudMode()) {
assertEquals("12", parseJson(out.trim()).file_count)
check_nested_index_file(ip, port, tablet_id, 7, 2, "V2")
qt_sql "SELECT * FROM $tableName WHERE name match 'andy' order by id, name, score;"
return
} else {
assertEquals("6", parseJson(out.trim()).file_count)
check_nested_index_file(ip, port, tablet_id, 7, 0, "V2")
}

// build index
Expand All @@ -139,31 +129,15 @@ suite("test_add_build_index_with_format_v2", "inverted_index_format_v2"){
"""
wait_for_build_index_on_partition_finish(tableName, timeout)

(code, out, err) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(code == 0)
assertTrue(out.contains("crc_value"))
assertTrue(out.contains("used_time_ms"))
assertEquals("0", parseJson(out.trim()).start_version)
assertEquals("7", parseJson(out.trim()).end_version)
assertEquals("7", parseJson(out.trim()).rowset_count)
assertEquals("12", parseJson(out.trim()).file_count)
check_nested_index_file(ip, port, tablet_id, 7, 1, "V2")

// build index
sql """
BUILD INDEX idx_score ON ${tableName};
"""
wait_for_build_index_on_partition_finish(tableName, timeout)

(code, out, err) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(code == 0)
assertTrue(out.contains("crc_value"))
assertTrue(out.contains("used_time_ms"))
assertEquals("0", parseJson(out.trim()).start_version)
assertEquals("7", parseJson(out.trim()).end_version)
assertEquals("7", parseJson(out.trim()).rowset_count)
assertEquals("12", parseJson(out.trim()).file_count)
check_nested_index_file(ip, port, tablet_id, 7, 2, "V2")

qt_sql "SELECT * FROM $tableName WHERE name match 'andy' order by id, name, score;"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_create_table_with_format_v2", "inverted_index_format_v2"){
def tableName = "test_create_table_with_format_v2"

def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
Expand Down Expand Up @@ -60,13 +57,5 @@ suite("test_create_table_with_format_v2", "inverted_index_format_v2"){
String backend_id = tablets[0].BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
def (code, out, err) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(code == 0)
assertTrue(out.contains("crc_value"))
assertTrue(out.contains("used_time_ms"))
assertEquals("0", parseJson(out.trim()).start_version)
assertEquals("7", parseJson(out.trim()).end_version)
assertEquals("7", parseJson(out.trim()).rowset_count)
assertEquals("12", parseJson(out.trim()).file_count)
check_nested_index_file(ip, port, tablet_id, 7, 2, "V2")
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ suite("test_cumulative_compaction_with_format_v2", "inverted_index_format_v2") {
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}

def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}

def calc_segment_count = { tablet ->
int segment_count = 0
String tablet_id = tablet.TabletId
Expand Down Expand Up @@ -173,16 +169,7 @@ suite("test_cumulative_compaction_with_format_v2", "inverted_index_format_v2") {
String port = backendId_to_backendHttpPort.get(backend_id)
int segment_count = calc_segment_count(tablet)
logger.info("TabletId: " + tablet_id + ", segment_count: " + segment_count)
def (c, o, e) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + c + ", out=" + o + ", err=" + e)
assertTrue(c == 0)
assertTrue(o.contains("crc_value"))
assertTrue(o.contains("used_time_ms"))
assertEquals("0", parseJson(o.trim()).start_version)
assertEquals("9", parseJson(o.trim()).end_version)
assertEquals("9", parseJson(o.trim()).rowset_count)
int file_count = segment_count * 2
assertEquals(file_count, Integer.parseInt(parseJson(o.trim()).file_count))
check_nested_index_file(ip, port, tablet_id, 9, 3, "V2")

StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
Expand Down Expand Up @@ -242,17 +229,7 @@ suite("test_cumulative_compaction_with_format_v2", "inverted_index_format_v2") {
String port = backendId_to_backendHttpPort.get(backend_id)
int segment_count = calc_segment_count(tablet)
logger.info("TabletId: " + tablet_id + ", segment_count: " + segment_count)
def (c, o, e) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + c + ", out=" + o + ", err=" + e)
assertTrue(c == 0)
assertTrue(o.contains("crc_value"))
assertTrue(o.contains("used_time_ms"))
assertEquals("0", parseJson(o.trim()).start_version)
assertEquals("9", parseJson(o.trim()).end_version)
// after compaction, there are 2 rwosets.
assertEquals("2", parseJson(o.trim()).rowset_count)
int file_count = segment_count * 2
assertEquals(file_count, Integer.parseInt(parseJson(o.trim()).file_count))
check_nested_index_file(ip, port, tablet_id, 2, 3, "V2")
}

int segmentsCount = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ suite("test_drop_column_with_format_v2", "inverted_index_format_v2"){
}
def tableName = "test_drop_column_with_format_v2"

def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
Expand Down Expand Up @@ -106,50 +103,23 @@ suite("test_drop_column_with_format_v2", "inverted_index_format_v2"){
String backend_id = tablets[0].BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
def (code, out, err) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(code == 0)
assertTrue(out.contains("crc_value"))
assertTrue(out.contains("used_time_ms"))
assertEquals("0", parseJson(out.trim()).start_version)
assertEquals("7", parseJson(out.trim()).end_version)
assertEquals("7", parseJson(out.trim()).rowset_count)
assertEquals("12", parseJson(out.trim()).file_count)
check_nested_index_file(ip, port, tablet_id, 7, 2, "V2")

// drop column
sql """ ALTER TABLE ${tableName} DROP COLUMN score; """
wait_for_latest_op_on_table_finish(tableName, timeout)

// select to sync rowset meta in cloud mode
sql """ select * from ${tableName} limit 1; """

tablets = sql_return_maparray """ show tablets from ${tableName}; """
tablet_id = tablets[0].TabletId
(code, out, err) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(code == 0)
assertTrue(out.contains("crc_value"))
assertTrue(out.contains("used_time_ms"))
assertEquals("0", parseJson(out.trim()).start_version)
assertEquals("7", parseJson(out.trim()).end_version)
assertEquals("7", parseJson(out.trim()).rowset_count)
assertEquals("12", parseJson(out.trim()).file_count)
// when drop column, the index files will not be deleted, so the index files count is still 2
check_nested_index_file(ip, port, tablet_id, 7, 2, "V2")

sql """ ALTER TABLE ${tableName} DROP COLUMN name; """
wait_for_latest_op_on_table_finish(tableName, timeout)

// select to sync rowset meta in cloud mode
sql """ select * from ${tableName} limit 1; """

tablets = sql_return_maparray """ show tablets from ${tableName}; """
tablet_id = tablets[0].TabletId
(code, out, err) = calc_file_crc_on_tablet(ip, port, tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
assertTrue(code == 0)
assertTrue(out.contains("crc_value"))
assertTrue(out.contains("used_time_ms"))
assertEquals("0", parseJson(out.trim()).start_version)
assertEquals("7", parseJson(out.trim()).end_version)
assertEquals("7", parseJson(out.trim()).rowset_count)
assertEquals("6", parseJson(out.trim()).file_count)
// when drop column, the index files will not be deleted, so the index files count is still 2
// when all index columns are dropped, the index files will be deleted by GC later
check_nested_index_file(ip, port, tablet_id, 7, 2, "V2")
}
Loading