Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
#include <atomic>
#include <memory>
#include <string>
#include <vector>

#include "common/config.h"
#include "common/status.h"
#include "gutil/strings/split.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "olap/olap_common.h"
#include "olap/options.h"
#include "olap/tablet_schema.h"

namespace doris {
class CollectionValue;
Expand Down Expand Up @@ -70,6 +73,25 @@ class InvertedIndexColumnWriter {

virtual void close_on_error() = 0;

// check if the column is valid for inverted index, some columns
// are generated from variant, but not all of them are supported
static bool check_column_valid(const TabletColumn& column) {
// bellow types are not supported in inverted index for extracted columns
static std::set<FieldType> invalid_types = {
FieldType::OLAP_FIELD_TYPE_DOUBLE,
FieldType::OLAP_FIELD_TYPE_JSONB,
FieldType::OLAP_FIELD_TYPE_ARRAY,
FieldType::OLAP_FIELD_TYPE_FLOAT,
};
if (column.is_extracted_column() && (invalid_types.contains(column.type()))) {
return false;
}
if (column.is_variant_type()) {
return false;
}
return true;
}

private:
DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter);
};
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/segment_loader.h"
Expand Down Expand Up @@ -227,9 +228,8 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
}
// indexes for this column
opts.indexes = std::move(_tablet_schema->get_indexes_for_column(column));
if (column.is_variant_type() || (column.is_extracted_column() && column.is_jsonb_type()) ||
(column.is_extracted_column() && column.is_array_type())) {
// variant and jsonb type skip write index
if (!InvertedIndexColumnWriter::check_column_valid(column)) {
// skip inverted index if invalid
opts.indexes.clear();
opts.need_zone_map = false;
opts.need_bloom_filter = false;
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
}
// indexes for this column
opts.indexes = _tablet_schema->get_indexes_for_column(column);
if (column.is_variant_type() || (column.is_extracted_column() && column.is_jsonb_type()) ||
(column.is_extracted_column() && column.is_array_type())) {
// variant and jsonb type skip write index
if (!InvertedIndexColumnWriter::check_column_valid(column)) {
// skip inverted index if invalid
opts.indexes.clear();
opts.need_zone_map = false;
opts.need_bloom_filter = false;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
continue;
}
auto column = output_rowset_schema->column(column_idx);
if (!InvertedIndexColumnWriter::check_column_valid(column)) {
continue;
}
DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id, ""));
_olap_data_convertor->add_column_data_convertor(column);
return_columns.emplace_back(column_idx);
Expand Down
12 changes: 12 additions & 0 deletions regression-test/data/variant_p0/with_index/var_index.out
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,15 @@
2 {"a":18811,"b":"hello world","c":1181111}
4 {"a":1234,"b":"hello xxx world","c":8181111}

-- !sql --
1 {"a":123,"b":"xxxyyy","c":111999111}
2 {"a":18811,"b":"hello world","c":1181111}
3 {"a":18811,"b":"hello wworld","c":11111}
4 {"a":1234,"b":"hello xxx world","c":8181111}
5 {"a":123456789,"b":123456,"c":8181111}
6 {"timestamp":1713283200.060359}
7 {"timestamp":17.0}
8 {"timestamp":[123]}
9 {"timestamp":17.0}
10 {"timestamp":"17.0"}

108 changes: 106 additions & 2 deletions regression-test/suites/variant_github_events_p0/load.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,97 @@
import org.codehaus.groovy.runtime.IOGroovyMethods

suite("regression_test_variant_github_events_p0", "nonConcurrent"){
// prepare test table
def timeout = 300000
def delta_time = 1000
def alter_res = "null"
def useTime = 0

def wait_for_latest_op_on_table_finish = { table_name, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;"""
alter_res = alter_res.toString()
if(alter_res.contains("FINISHED")) {
sleep(10000) // wait change table state to normal
logger.info(table_name + " latest alter job finished, detail: " + alter_res)
break
}
useTime = t
sleep(delta_time)
}
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}

def wait_for_build_index_on_partition_finish = { table_name, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}";"""
def expected_finished_num = alter_res.size();
def finished_num = 0;
for (int i = 0; i < expected_finished_num; i++) {
logger.info(table_name + " build index job state: " + alter_res[i][7] + i)
if (alter_res[i][7] == "FINISHED") {
++finished_num;
}
}
if (finished_num == expected_finished_num) {
sleep(10000) // wait change table state to normal
logger.info(table_name + " all build index jobs finished, detail: " + alter_res)
break
}
useTime = t
sleep(delta_time)
}
assertTrue(useTime <= OpTimeout, "wait_for_latest_build_index_on_partition_finish timeout")
}

def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """

if (alter_res.size() == 0) {
logger.info(table_name + " last index job finished")
return "SKIPPED"
}
if (alter_res.size() > 0) {
def last_job_state = alter_res[alter_res.size()-1][7];
if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") {
sleep(10000) // wait change table state to normal
logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res)
return last_job_state;
}
}
useTime = t
sleep(delta_time)
}
logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res)
assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout")
return "wait_timeout"
}

def wait_for_last_build_index_on_table_running = { table_name, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """

if (alter_res.size() == 0) {
logger.info(table_name + " last index job finished")
return "SKIPPED"
}
if (alter_res.size() > 0) {
def last_job_state = alter_res[alter_res.size()-1][7];
if (last_job_state == "RUNNING") {
logger.info(table_name + " last index job running, state: " + last_job_state + ", detail: " + alter_res)
return last_job_state;
}
}
useTime = t
sleep(delta_time)
}
logger.info("wait_for_last_build_index_on_table_running debug: " + alter_res)
assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_running timeout")
return "wait_timeout"
}


def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
Expand Down Expand Up @@ -61,8 +152,8 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
v variant not null,
INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT ''
v variant not null
-- INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT ''
)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
Expand All @@ -74,12 +165,25 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""")

// build inverted index at middle of loading the data
// ADD INDEX
sql """ ALTER TABLE github_events ADD INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" = "chinese", "parser_mode" = "fine_grained", "support_phrase" = "true") """
wait_for_latest_op_on_table_finish("github_events", timeout)

// 2022
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-16.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-10.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-22.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-23.json'}""")

// BUILD INDEX and expect state is FINISHED
sql """ BUILD INDEX idx_var ON github_events"""
state = wait_for_last_build_index_on_table_finish("github_events", timeout)
assertEquals("FINISHED", state)

// add bloom filter at the end of loading data

def tablets = sql_return_maparray """ show tablets from github_events; """
// trigger compactions for all tablets in github_events
for (def tablet in tablets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("regression_test_variant_var_index", "variant_type"){
suite("regression_test_variant_var_index", "p0"){
def table_name = "var_index"
sql "DROP TABLE IF EXISTS var_index"
sql """
Expand All @@ -36,4 +36,10 @@ suite("regression_test_variant_var_index", "variant_type"){
qt_sql """select * from var_index where cast(v["a"] as smallint) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 1024 order by k"""
sql """insert into var_index values(5, '{"a" : 123456789, "b" : 123456, "c" : 8181111}')"""
qt_sql """select * from var_index where cast(v["a"] as int) > 123 and cast(v["b"] as string) match 'hello' and cast(v["c"] as int) > 11111 order by k"""
// insert double/float/array/json
sql """insert into var_index values(6, '{"timestamp": 1713283200.060359}')"""
sql """insert into var_index values(7, '{"timestamp": 17.0}')"""
sql """insert into var_index values(8, '{"timestamp": [123]}')"""
sql """insert into var_index values(9, '{"timestamp": 17.0}'),(10, '{"timestamp": "17.0"}')"""
qt_sql "select * from var_index order by k limit 10"
}