From 34c4d94003133eba5a6bd33f0ba5095188b1487e Mon Sep 17 00:00:00 2001 From: csun5285 Date: Wed, 3 Jul 2024 18:16:47 +0800 Subject: [PATCH] fix inverted index size --- be/src/olap/compaction.cpp | 15 ++ .../rowset/vertical_beta_rowset_writer.cpp | 4 +- .../inverted_index_p0/test_show_data.groovy | 215 +++++++++++++++++- 3 files changed, 229 insertions(+), 5 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index b42c23f18742bc..5e835df5f3afbb 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -742,9 +742,13 @@ Status Compaction::do_inverted_index_compaction() { status = Status::Error(e.what()); } } + + uint64_t inverted_index_file_size = 0; for (auto& inverted_index_file_writer : inverted_index_file_writers) { if (Status st = inverted_index_file_writer->close(); !st.ok()) { status = Status::Error(st.msg()); + } else { + inverted_index_file_size += inverted_index_file_writer->get_index_file_size(); } } // check index compaction status. If status is not ok, we should return error and end this compaction round. @@ -752,11 +756,22 @@ Status Compaction::do_inverted_index_compaction() { return status; } + // index compaction should update total disk size and index disk size + _output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() + + inverted_index_file_size); + _output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() + + inverted_index_file_size); + _output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() + + inverted_index_file_size); + + COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size()); + LOG(INFO) << "succeed to do index compaction" << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num << ", output row number=" << _output_rowset->num_rows() << ", input_rowset_size=" << _input_rowsets_size << ", output_rowset_size=" << _output_rowset->data_disk_size() + << ", inverted index file size=" << inverted_index_file_size << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; return Status::OK(); diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index 4d7368395f7096..1de7d4f50dce8c 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -142,8 +142,7 @@ Status VerticalBetaRowsetWriter::_flush_columns(segment_v2::SegmentWriter* se this->_segment_num_rows.resize(_cur_writer_idx + 1); this->_segment_num_rows[_cur_writer_idx] = _segment_writers[_cur_writer_idx]->row_count(); } - this->_total_index_size += - static_cast(index_size) + segment_writer->get_inverted_index_file_size(); + this->_total_index_size += static_cast(index_size); return Status::OK(); } @@ -217,6 +216,7 @@ Status VerticalBetaRowsetWriter::final_flush() { return st; } this->_total_data_size += segment_size + segment_writer->get_inverted_index_file_size(); + this->_total_index_size += segment_writer->get_inverted_index_file_size(); segment_writer.reset(); } return Status::OK(); diff --git a/regression-test/suites/inverted_index_p0/test_show_data.groovy b/regression-test/suites/inverted_index_p0/test_show_data.groovy index 17c0a2ad020a25..6622e2bfd6cb0d 100644 --- a/regression-test/suites/inverted_index_p0/test_show_data.groovy +++ b/regression-test/suites/inverted_index_p0/test_show_data.groovy @@ -64,7 +64,7 @@ suite("test_show_data", "p0") { } def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, - expected_succ_rows = -1, load_to_single_tablet = 'true' -> + expected_succ_rows = -1 -> // load the json data streamLoad { @@ -261,7 +261,7 @@ suite("test_show_data_for_bkd", "p0") { } def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, - expected_succ_rows = -1, load_to_single_tablet = 'true' -> + expected_succ_rows = -1 -> // load the json data streamLoad { @@ -459,7 +459,7 @@ suite("test_show_data_multi_add", "p0") { } def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, - expected_succ_rows = -1, load_to_single_tablet = 'true' -> + expected_succ_rows = -1 -> // load the json data streamLoad { @@ -611,3 +611,212 @@ suite("test_show_data_multi_add", "p0") { //try_sql("DROP TABLE IF EXISTS ${testTable}") } } + +suite("test_show_data_with_compaction", "p0, nonConcurrent") { + // define a sql table + def tableWithIndexCompaction = "test_with_index_compaction" + def tableWithOutIndexCompaction = "test_without_index_compaction" + def delta_time = 5000 + def timeout = 60000 + def alter_res = "null" + def useTime = 0 + String database = context.config.getDbNameByFile(context.file) + boolean invertedIndexCompactionEnable = true + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == "inverted_index_compaction_enable") { + invertedIndexCompactionEnable = Boolean.parseBoolean(((List) ele)[2]) + logger.info("inverted_index_compaction_enable: ${((List) ele)[2]}") + } + } + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def create_table_with_index = {testTablex -> + // multi-line sql + def result = sql """ + CREATE TABLE IF NOT EXISTS ${testTablex} ( + `@timestamp` int(11) NULL, + `clientip` varchar(20) NULL, + `request` text NULL, + `status` int(11) NULL, + `size` int(11) NULL, + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser"="english") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + } + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1 -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + if (expected_succ_rows >= 0) { + assertEquals(json.NumberLoadedRows, expected_succ_rows) + } else { + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + } + + def wait_for_show_data_finish = { table_name, OpTimeout, origin_size, maxRetries = 5 -> + def size = origin_size + def retries = 0 + def last_size = origin_size + + while (retries < maxRetries) { + for (int t = 0; t < OpTimeout; t += delta_time) { + def result = sql """show data from ${database}.${table_name};""" + if (result.size() > 0) { + logger.info(table_name + " show data, detail: " + result[0].toString()) + size = result[0][2].replace(" KB", "").toDouble() + } + useTime += delta_time + Thread.sleep(delta_time) + + // If size changes, break the for loop to check in the next while iteration + if (size != origin_size && size != last_size) { + break + } + } + + if (size != last_size) { + last_size = size + } else { + // If size didn't change during the last OpTimeout period, return size + if (size != origin_size) { + return size + } + } + + retries++ + } + return "wait_timeout" + } + + + try { + + def run_compaction_and_wait = { tableName -> + //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + // trigger compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + if (compactJson.status.toLowerCase() == "fail") { + logger.info("Compaction was done automatically!") + } else { + assertEquals("success", compactJson.status.toLowerCase()) + } + } + + // wait for all compactions done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + } + + set_be_config.call("inverted_index_compaction_enable", "false") + sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}" + create_table_with_index.call(tableWithIndexCompaction) + + load_httplogs_data.call(tableWithIndexCompaction, '1', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableWithIndexCompaction, '2', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableWithIndexCompaction, '3', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableWithIndexCompaction, '4', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableWithIndexCompaction, '5', 'true', 'json', 'documents-1000.json') + + sql "sync" + + run_compaction_and_wait(tableWithIndexCompaction) + def with_index_size = wait_for_show_data_finish(tableWithIndexCompaction, 60000, 0) + assertTrue(with_index_size != "wait_timeout") + + set_be_config.call("inverted_index_compaction_enable", "true") + + sql "DROP TABLE IF EXISTS ${tableWithOutIndexCompaction}" + create_table_with_index.call(tableWithOutIndexCompaction) + load_httplogs_data.call(tableWithOutIndexCompaction, '6', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableWithOutIndexCompaction, '7', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableWithOutIndexCompaction, '8', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableWithOutIndexCompaction, '9', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableWithOutIndexCompaction, '10', 'true', 'json', 'documents-1000.json') + + run_compaction_and_wait(tableWithOutIndexCompaction) + def another_with_index_size = wait_for_show_data_finish(tableWithOutIndexCompaction, 60000, 0) + assertTrue(another_with_index_size != "wait_timeout") + + if (!isCloudMode()) { + assertEquals(another_with_index_size, with_index_size) + } + } finally { + // sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}" + // sql "DROP TABLE IF EXISTS ${tableWithOutIndexCompaction}" + set_be_config.call("inverted_index_compaction_enable", invertedIndexCompactionEnable.toString()) + } +}