Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,21 +742,36 @@ Status Compaction::do_inverted_index_compaction() {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
}
}

uint64_t inverted_index_file_size = 0;
for (auto& inverted_index_file_writer : inverted_index_file_writers) {
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
} else {
inverted_index_file_size += inverted_index_file_writer->get_index_file_size();
}
}
// check index compaction status. If status is not ok, we should return error and end this compaction round.
if (!status.ok()) {
return status;
}

// index compaction should update total disk size and index disk size
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
inverted_index_file_size);

COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size());

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ", input_rowset_size=" << _input_rowsets_size
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
<< ", inverted index file size=" << inverted_index_file_size
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";

return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/vertical_beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ Status VerticalBetaRowsetWriter<T>::_flush_columns(segment_v2::SegmentWriter* se
this->_segment_num_rows.resize(_cur_writer_idx + 1);
this->_segment_num_rows[_cur_writer_idx] = _segment_writers[_cur_writer_idx]->row_count();
}
this->_total_index_size +=
static_cast<int64_t>(index_size) + segment_writer->get_inverted_index_file_size();
this->_total_index_size += static_cast<int64_t>(index_size);
return Status::OK();
}

Expand Down Expand Up @@ -217,6 +216,7 @@ Status VerticalBetaRowsetWriter<T>::final_flush() {
return st;
}
this->_total_data_size += segment_size + segment_writer->get_inverted_index_file_size();
this->_total_index_size += segment_writer->get_inverted_index_file_size();
segment_writer.reset();
}
return Status::OK();
Expand Down
215 changes: 212 additions & 3 deletions regression-test/suites/inverted_index_p0/test_show_data.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ suite("test_show_data", "p0") {
}

def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1, load_to_single_tablet = 'true' ->
expected_succ_rows = -1 ->

// load the json data
streamLoad {
Expand Down Expand Up @@ -261,7 +261,7 @@ suite("test_show_data_for_bkd", "p0") {
}

def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1, load_to_single_tablet = 'true' ->
expected_succ_rows = -1 ->

// load the json data
streamLoad {
Expand Down Expand Up @@ -459,7 +459,7 @@ suite("test_show_data_multi_add", "p0") {
}

def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1, load_to_single_tablet = 'true' ->
expected_succ_rows = -1 ->

// load the json data
streamLoad {
Expand Down Expand Up @@ -611,3 +611,212 @@ suite("test_show_data_multi_add", "p0") {
//try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}

suite("test_show_data_with_compaction", "p0, nonConcurrent") {
// define a sql table
def tableWithIndexCompaction = "test_with_index_compaction"
def tableWithOutIndexCompaction = "test_without_index_compaction"
def delta_time = 5000
def timeout = 60000
def alter_res = "null"
def useTime = 0
String database = context.config.getDbNameByFile(context.file)
boolean invertedIndexCompactionEnable = true

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))

logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "inverted_index_compaction_enable") {
invertedIndexCompactionEnable = Boolean.parseBoolean(((List<String>) ele)[2])
logger.info("inverted_index_compaction_enable: ${((List<String>) ele)[2]}")
}
}

def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
(code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
}

def create_table_with_index = {testTablex ->
// multi-line sql
def result = sql """
CREATE TABLE IF NOT EXISTS ${testTablex} (
`@timestamp` int(11) NULL,
`clientip` varchar(20) NULL,
`request` text NULL,
`status` int(11) NULL,
`size` int(11) NULL,
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser"="english") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true"
);
"""
}

def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1 ->

// load the json data
streamLoad {
table "${table_name}"

// set http request header params
set 'label', label + "_" + UUID.randomUUID().toString()
set 'read_json_by_line', read_flag
set 'format', format_flag
file file_name // import json file
time 10000 // limit inflight 10s
if (expected_succ_rows >= 0) {
set 'max_filter_ratio', '1'
}

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (ignore_failure && expected_succ_rows < 0) { return }
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
if (expected_succ_rows >= 0) {
assertEquals(json.NumberLoadedRows, expected_succ_rows)
} else {
assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
}

def wait_for_show_data_finish = { table_name, OpTimeout, origin_size, maxRetries = 5 ->
def size = origin_size
def retries = 0
def last_size = origin_size

while (retries < maxRetries) {
for (int t = 0; t < OpTimeout; t += delta_time) {
def result = sql """show data from ${database}.${table_name};"""
if (result.size() > 0) {
logger.info(table_name + " show data, detail: " + result[0].toString())
size = result[0][2].replace(" KB", "").toDouble()
}
useTime += delta_time
Thread.sleep(delta_time)

// If size changes, break the for loop to check in the next while iteration
if (size != origin_size && size != last_size) {
break
}
}

if (size != last_size) {
last_size = size
} else {
// If size didn't change during the last OpTimeout period, return size
if (size != origin_size) {
return size
}
}

retries++
}
return "wait_timeout"
}


try {

def run_compaction_and_wait = { tableName ->
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName}; """

// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
logger.info("Compaction was done automatically!")
} else {
assertEquals("success", compactJson.status.toLowerCase())
}
}

// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}

set_be_config.call("inverted_index_compaction_enable", "false")
sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
create_table_with_index.call(tableWithIndexCompaction)

load_httplogs_data.call(tableWithIndexCompaction, '1', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(tableWithIndexCompaction, '2', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(tableWithIndexCompaction, '3', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(tableWithIndexCompaction, '4', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(tableWithIndexCompaction, '5', 'true', 'json', 'documents-1000.json')

sql "sync"

run_compaction_and_wait(tableWithIndexCompaction)
def with_index_size = wait_for_show_data_finish(tableWithIndexCompaction, 60000, 0)
assertTrue(with_index_size != "wait_timeout")

set_be_config.call("inverted_index_compaction_enable", "true")

sql "DROP TABLE IF EXISTS ${tableWithOutIndexCompaction}"
create_table_with_index.call(tableWithOutIndexCompaction)
load_httplogs_data.call(tableWithOutIndexCompaction, '6', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(tableWithOutIndexCompaction, '7', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(tableWithOutIndexCompaction, '8', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(tableWithOutIndexCompaction, '9', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(tableWithOutIndexCompaction, '10', 'true', 'json', 'documents-1000.json')

run_compaction_and_wait(tableWithOutIndexCompaction)
def another_with_index_size = wait_for_show_data_finish(tableWithOutIndexCompaction, 60000, 0)
assertTrue(another_with_index_size != "wait_timeout")

if (!isCloudMode()) {
assertEquals(another_with_index_size, with_index_size)
}
} finally {
// sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
// sql "DROP TABLE IF EXISTS ${tableWithOutIndexCompaction}"
set_be_config.call("inverted_index_compaction_enable", invertedIndexCompactionEnable.toString())
}
}