From ec40c1caf7c71661f3869ff179d93146dffe160a Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 9 Nov 2023 12:17:36 +0800 Subject: [PATCH 1/3] update --- .../olap/rowset/segment_v2/segment_writer.cpp | 4 +- be/src/olap/tablet.cpp | 14 +- be/src/olap/tablet.h | 3 +- ...partial_update_schema_change_row_store.out | 76 + ...tial_update_schema_change_row_store.groovy | 1218 +++++++++++++++++ 5 files changed, 1305 insertions(+), 10 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 6490b49f8b60cf..8903902692f636 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -554,8 +554,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f read_index[id_and_pos.pos] = read_idx++; } if (has_row_column) { - auto st = tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); + auto st = tablet->fetch_value_through_row_column( + rowset, *_tablet_schema, seg_it.first, rids, cids_missing, old_value_block); if (!st.ok()) { LOG(WARNING) << "failed to fetch value through row column"; return st; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index db1c2eb4ea2e6e..d58183a006de0c 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2742,7 +2742,8 @@ Status Tablet::_get_segment_column_iterator( } // fetch value by row column -Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid, +Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, + const TabletSchema& tablet_schema, uint32_t segid, const std::vector& rowids, const std::vector& cids, vectorized::Block& block) { @@ -2755,13 +2756,12 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint BetaRowsetSharedPtr rowset = std::static_pointer_cast(input_rowset); CHECK(rowset); - const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); - CHECK(tablet_schema->store_row_column()); + CHECK(tablet_schema.store_row_column()); SegmentCacheHandle segment_cache_handle; std::unique_ptr column_iterator; OlapReaderStatistics stats; RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, - tablet_schema->column(BeConsts::ROW_STORE_COL), + tablet_schema.column(BeConsts::ROW_STORE_COL), &segment_cache_handle, &column_iterator, &stats)); // get and parse tuple row vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); @@ -2774,7 +2774,7 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint std::vector default_values; default_values.resize(cids.size()); for (int i = 0; i < cids.size(); ++i) { - const TabletColumn& column = tablet_schema->column(cids[i]); + const TabletColumn& column = tablet_schema.column(cids[i]); vectorized::DataTypePtr type = vectorized::DataTypeFactory::instance().create_data_type(column); col_uid_to_idx[column.unique_id()] = i; @@ -3255,8 +3255,8 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema, (*read_index)[id_and_pos.pos] = read_idx++; } if (has_row_column) { - auto st = fetch_value_through_row_column(rowset_iter->second, seg_it.first, rids, - cids_to_read, block); + auto st = fetch_value_through_row_column(rowset_iter->second, *tablet_schema, + seg_it.first, rids, cids_to_read, block); if (!st.ok()) { LOG(WARNING) << "failed to fetch value through row column"; return st; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 3c30b3805fa5af..54554ffbd5f17a 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -438,7 +438,8 @@ class Tablet final : public BaseTablet { const TabletColumn& tablet_column, vectorized::MutableColumnPtr& dst); - Status fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid, + Status fetch_value_through_row_column(RowsetSharedPtr input_rowset, + const TabletSchema& tablet_schema, uint32_t segid, const std::vector& rowids, const std::vector& cids, vectorized::Block& block); diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out new file mode 100644 index 00000000000000..86df3374712e9f --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out @@ -0,0 +1,76 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql2 -- +1 1 1 0 0 0 0 0 0 0 0 + +-- !sql3 -- +1 1 1 0 0 0 0 0 0 0 10 + +-- !sql4 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql5 -- +1 1 1 0 0 0 0 0 0 + +-- !sql6 -- +1 2 1 0 0 0 0 1 0 + +-- !sql7 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql8 -- +1 1 1.0 0 0 0 0 0 0 0 + +-- !sql9 -- +1 + +-- !sql10 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql11 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql12 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql13 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql14 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql15 -- +1 1 1 0 0 0 0 0 0 0 0 + +-- !sql16 -- +1 1 1 0 0 0 0 0 0 0 10 + +-- !sql17 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql18 -- +1 1 1 0 0 0 0 0 0 + +-- !sql19 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql20 -- +1 1 1.0 0 0 0 0 0 0 0 + +-- !sql21 -- +1 + +-- !sql23 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql24 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql25 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql26 -- +1 1 1 0 0 0 0 0 0 0 + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy new file mode 100644 index 00000000000000..92828891b07a01 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy @@ -0,0 +1,1218 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_schema_change_row_store", "p0") { + // test add value column + def tableName = "test_partial_update_light_schema_change_add_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql1 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' " + def try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data without new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_without_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + // check data, new column is filled by default value. + qt_sql2 " select * from ${tableName} order by c0 " + + // test load data with new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2, c10' + + file 'schema_change/load_with_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + // check data, new column is filled by given value. + qt_sql3 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test delete value column + tableName = "test_partial_update_light_schema_change_delete_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql4 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} DROP COLUMN c8 " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data without delete column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_delete_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql5 " select * from ${tableName} order by c0 " + + // test load data with delete column, stream load will ignore the + // non-existing column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c7, c8' + + file 'schema_change/load_without_delete_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + // check result, which is fail for loading delete column. + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql6 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test update value column + tableName = "test_partial_update_light_schema_change_update_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql7 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} MODIFY COLUMN c2 double " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data with update column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_update_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql8 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test add key column + tableName = "test_partial_update_light_schema_change_add_key_column" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0' + + file 'schema_change/load1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql9 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} ADD COLUMN c1 int key null " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + sql " ALTER table ${tableName} ADD COLUMN c2 int null " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data with all key column, should fail because + // it don't have any value columns + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1' + + file 'schema_change/load_with_key_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertEquals(0, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test create index + tableName = "test_partial_update_light_schema_change_create_index" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql10 " select * from ${tableName} order by c0 " + + sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + //test load data with create index + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_create_index.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql11 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + // test change properties + tableName = "test_partial_update_light_schema_change_properties" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql12 " select * from ${tableName} order by c0 " + + sql " ALTER TABLE ${tableName} set ('in_memory' = 'false') " + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_change_properties.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql13 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + // test add value column + tableName = "test_partial_update_schema_change_add_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql14 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data without new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_without_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + // check data, new column is filled by default value. + qt_sql15 " select * from ${tableName} order by c0 " + + // test load data with new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2, c10' + + file 'schema_change/load_with_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + // check data, new column is filled by given value. + qt_sql16 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test delete value column + tableName = "test_partial_update_schema_change_delete_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql17 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} DROP COLUMN c8 " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data without delete column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_delete_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql18 " select * from ${tableName} order by c0 " + + // test load data with delete column + // todo bug + // streamLoad { + // table "${tableName}" + + // set 'column_separator', ',' + // set 'partial_columns', 'true' + // set 'columns', 'c0, c1, c8' + + // file 'schema_change/load_without_delete_column.csv' + // time 10000 // limit inflight 10s + + // check { result, exception, startTime, endTime -> + // if (exception != null) { + // throw exception + // } + // // check result, which is fail for loading delete column. + // log.info("Stream load result: ${result}".toString()) + // def json = parseJson(result) + // assertEquals("fail", json.Status.toLowerCase()) + // assertEquals(1, json.NumberTotalRows) + // assertEquals(1, json.NumberFilteredRows) + // assertEquals(0, json.NumberUnselectedRows) + // } + // } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test update value column + tableName = "test_partial_update_schema_change_update_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql19 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} MODIFY COLUMN c2 double " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data with update column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_update_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql20 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test add key column + tableName = "test_partial_update_schema_change_add_key_column" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0' + + file 'schema_change/load1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql21 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} ADD COLUMN c1 int key null " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + sql " ALTER table ${tableName} ADD COLUMN c2 int null " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data with all key column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1' + + file 'schema_change/load_with_key_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertEquals(0, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test create index + tableName = "test_partial_update_schema_change_create_index" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql23 " select * from ${tableName} order by c0 " + + sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + //test load data with create index + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_create_index.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql24 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + // test change properties + tableName = "test_partial_update_schema_change_properties" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql25 " select * from ${tableName} order by c0 " + + sql " ALTER TABLE ${tableName} set ('in_memory' = 'false') " + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_change_properties.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql26 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ +} From dda7c1bbac0e27d657c1c2f50a1b6fa9fb10515f Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 9 Nov 2023 13:22:54 +0800 Subject: [PATCH 2/3] update --- .../test_partial_update_schema_change.groovy | 7 +++- ...tial_update_schema_change_row_store.groovy | 33 +++++++++++-------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy index b4bb054a9a894e..eee92b2b39ed8e 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy @@ -17,7 +17,10 @@ // under the License. suite("test_partial_update_schema_change", "p0") { - // test add value column + + /* ============================================== light schema change cases: ============================================== */ + + // test add value column def tableName = "test_partial_update_light_schema_change_add_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ @@ -618,6 +621,8 @@ suite("test_partial_update_schema_change", "p0") { sql """ DROP TABLE IF EXISTS ${tableName} """ + /* ============================================== schema change cases: ============================================== */ + // test add value column tableName = "test_partial_update_schema_change_add_column" sql """ DROP TABLE IF EXISTS ${tableName} """ diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy index 92828891b07a01..1fa4ec39d36d04 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy @@ -16,9 +16,12 @@ // specific language governing permissions and limitations // under the License. -suite("test_partial_update_schema_change_row_store", "p0") { - // test add value column - def tableName = "test_partial_update_light_schema_change_add_column" +suite("test_partial_update_row_store_schema_change", "p0") { + + /* ============================================== light schema change cases: ============================================== */ + + // test add value column + def tableName = "test_partial_update_row_store_light_schema_change_add_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -141,7 +144,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { // test delete value column - tableName = "test_partial_update_light_schema_change_delete_column" + tableName = "test_partial_update_row_store_light_schema_change_delete_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -264,7 +267,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { // test update value column - tableName = "test_partial_update_light_schema_change_update_column" + tableName = "test_partial_update_row_store_light_schema_change_update_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -357,7 +360,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { // test add key column - tableName = "test_partial_update_light_schema_change_add_key_column" + tableName = "test_partial_update_row_store_light_schema_change_add_key_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ @@ -453,7 +456,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { // test create index - tableName = "test_partial_update_light_schema_change_create_index" + tableName = "test_partial_update_row_store_light_schema_change_create_index" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -545,7 +548,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { sql """ DROP TABLE IF EXISTS ${tableName} """ // test change properties - tableName = "test_partial_update_light_schema_change_properties" + tableName = "test_partial_update_row_store_light_schema_change_properties" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -624,8 +627,10 @@ suite("test_partial_update_schema_change_row_store", "p0") { sql """ DROP TABLE IF EXISTS ${tableName} """ + /* ============================================== schema change cases: ============================================== */ + // test add value column - tableName = "test_partial_update_schema_change_add_column" + tableName = "test_partial_update_row_store_schema_change_add_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -747,7 +752,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { // test delete value column - tableName = "test_partial_update_schema_change_delete_column" + tableName = "test_partial_update_row_store_schema_change_delete_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -865,7 +870,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { // test update value column - tableName = "test_partial_update_schema_change_update_column" + tableName = "test_partial_update_row_store_schema_change_update_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -957,7 +962,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { // test add key column - tableName = "test_partial_update_schema_change_add_key_column" + tableName = "test_partial_update_row_store_schema_change_add_key_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ @@ -1048,7 +1053,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { // test create index - tableName = "test_partial_update_schema_change_create_index" + tableName = "test_partial_update_row_store_schema_change_create_index" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( @@ -1138,7 +1143,7 @@ suite("test_partial_update_schema_change_row_store", "p0") { sql """ DROP TABLE IF EXISTS ${tableName} """ // test change properties - tableName = "test_partial_update_schema_change_properties" + tableName = "test_partial_update_row_store_schema_change_properties" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( From 2fce28d3db4a5b85e8055c564a2e4c71e3cae79b Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 9 Nov 2023 15:58:31 +0800 Subject: [PATCH 3/3] add comment --- be/src/olap/tablet.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 54554ffbd5f17a..7a2aff4edf2e42 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -438,6 +438,9 @@ class Tablet final : public BaseTablet { const TabletColumn& tablet_column, vectorized::MutableColumnPtr& dst); + // We use the TabletSchema from the caller because the TabletSchema in the rowset'meta + // may be outdated due to schema change. Also note that the the cids should indicate the indexes + // of the columns in the TabletSchema passed in. Status fetch_value_through_row_column(RowsetSharedPtr input_rowset, const TabletSchema& tablet_schema, uint32_t segid, const std::vector& rowids,