Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ Status IndexBuilder::update_inverted_index_info() {
TabletIndex index;
index.init_from_thrift(t_inverted_index, *input_rs_tablet_schema);
auto column_uid = index.col_unique_ids()[0];
if (column_uid < 0) {
LOG(WARNING) << "referenced column was missing. "
<< "[column=" << t_inverted_index.columns[0]
<< " referenced_column=" << column_uid << "]";
output_rs_tablet_schema->append_index(index);
continue;
}
const TabletColumn& col = output_rs_tablet_schema->column_by_uid(column_uid);
const TabletIndex* exist_index = output_rs_tablet_schema->get_inverted_index(col);
if (exist_index && exist_index->index_id() != index.index_id()) {
Expand Down Expand Up @@ -360,6 +367,12 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
inverted_index_writer_signs.push_back(writer_sign);
}
}

if (return_columns.empty()) {
// no columns to read
break;
}

_inverted_index_file_writers.emplace(seg_ptr->id(),
std::move(inverted_index_file_writer));

Expand All @@ -378,7 +391,7 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
return Status::Error<ErrorCode::ROWSET_READER_INIT>(res.to_string());
}

std::shared_ptr<vectorized::Block> block = std::make_shared<vectorized::Block>(
auto block = vectorized::Block::create_unique(
output_rowset_schema->create_block(return_columns));
while (true) {
auto status = iter->next_batch(block.get());
Expand Down Expand Up @@ -447,12 +460,6 @@ Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema,
for (auto i = 0; i < _alter_inverted_indexes.size(); ++i) {
auto inverted_index = _alter_inverted_indexes[i];
auto index_id = inverted_index.index_id;
auto converted_result = _olap_data_convertor->convert_column_data(i);
if (!converted_result.first.ok()) {
LOG(WARNING) << "failed to convert block, errcode: " << converted_result.first;
return converted_result.first;
}

auto column_name = inverted_index.columns[0];
auto column_idx = tablet_schema->field_index(column_name);
if (column_idx < 0) {
Expand All @@ -463,6 +470,11 @@ Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema,
auto column = tablet_schema->column(column_idx);
auto writer_sign = std::make_pair(segment_idx, index_id);
std::unique_ptr<Field> field(FieldFactory::create(column));
auto converted_result = _olap_data_convertor->convert_column_data(i);
if (converted_result.first != Status::OK()) {
LOG(WARNING) << "failed to convert block, errcode: " << converted_result.first;
return converted_result.first;
}
const auto* ptr = (const uint8_t*)converted_result.second->get_data();
if (converted_result.second->get_nullmap()) {
RETURN_IF_ERROR(_add_nullable(column_name, writer_sign, field.get(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --
1 hello world \N

-- !select2 --
1 hello world \N

Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_index_change_on_new_column") {
def timeout = 60000
def delta_time = 1000
def alter_res = "null"
def useTime = 0

def wait_for_latest_op_on_table_finish = { table_name, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;"""
alter_res = alter_res.toString()
if(alter_res.contains("FINISHED")) {
sleep(3000) // wait change table state to normal
logger.info(table_name + " latest alter job finished, detail: " + alter_res)
break
}
useTime = t
sleep(delta_time)
}
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}

def wait_for_build_index_on_partition_finish = { table_name, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}";"""
def expected_finished_num = alter_res.size();
def finished_num = 0;
for (int i = 0; i < expected_finished_num; i++) {
logger.info(table_name + " build index job state: " + alter_res[i][7] + i)
if (alter_res[i][7] == "FINISHED") {
++finished_num;
}
}
if (finished_num == expected_finished_num) {
logger.info(table_name + " all build index jobs finished, detail: " + alter_res)
break
}
useTime = t
sleep(delta_time)
}
assertTrue(useTime <= OpTimeout, "wait_for_latest_build_index_on_partition_finish timeout")
}

def tableName = "test_index_change_on_new_column"

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`id` INT COMMENT "",
`s` STRING COMMENT ""
)
DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`)
PROPERTIES ( "replication_num" = "1" );
"""

sql """ INSERT INTO ${tableName} VALUES
(1, 'hello world')
"""

// add new column
sql """ alter table ${tableName} add column s1 varchar(50) default null after s; """

qt_select1 """ SELECT * FROM ${tableName}; """

// create inverted index on new column
sql """ alter table ${tableName} add index idx_s1(s1) USING INVERTED """
wait_for_latest_op_on_table_finish(tableName, timeout)

// build inverted index on new column
sql """ build index idx_s1 on ${tableName} """
wait_for_build_index_on_partition_finish(tableName, timeout)

def show_result = sql "show index from ${tableName}"
logger.info("show index from " + tableName + " result: " + show_result)
assertEquals(show_result.size(), 1)
assertEquals(show_result[0][2], "idx_s1")

qt_select2 """ SELECT * FROM ${tableName}; """
}