From c7c07971af032bf6b91256a1a9af9afd1bad379d Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Wed, 19 Jun 2024 11:48:19 +0800 Subject: [PATCH] [Fix](Variant) fix variant schema change may cause invalid block schema and write missing blocks (#36317) buffering block during schema change should merge first and then flush, otherwise may lost current block --- be/src/olap/rowset/segment_creator.cpp | 10 +++++++--- .../olap/rowset/segment_v2/hierarchical_data_reader.h | 8 ++++++-- .../olap/rowset/segment_v2/vertical_segment_writer.cpp | 3 ++- .../test_double_write_when_schema_change.groovy | 5 ++++- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 126a6548be54d3..b968f684855125 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -390,14 +390,15 @@ Status SegmentCreator::add_block(const vectorized::Block* block) { size_t block_row_num = block->rows(); size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num); size_t row_offset = 0; - if (_segment_flusher.need_buffering()) { + RETURN_IF_ERROR(_buffer_block.merge(*block)); if (_buffer_block.allocated_bytes() > config::write_buffer_size) { + LOG(INFO) << "directly flush a single block " << _buffer_block.rows() << " rows" + << ", block size " << _buffer_block.bytes() << " block allocated_size " + << _buffer_block.allocated_bytes(); vectorized::Block block = _buffer_block.to_block(); RETURN_IF_ERROR(flush_single_block(&block)); _buffer_block.clear(); - } else { - RETURN_IF_ERROR(_buffer_block.merge(*block)); } return Status::OK(); } @@ -426,6 +427,9 @@ Status SegmentCreator::add_block(const vectorized::Block* block) { Status SegmentCreator::flush() { if (_buffer_block.rows() > 0) { vectorized::Block block = _buffer_block.to_block(); + LOG(INFO) << "directly flush a single block " << block.rows() << " rows" + << ", block size " << block.bytes() << " block allocated_size " + << block.allocated_bytes(); RETURN_IF_ERROR(flush_single_block(&block)); _buffer_block.clear(); } diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index a3ac277586c15c..67f78651416a90 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -163,8 +163,12 @@ class HierarchicalDataReader : public ColumnIterator { type->to_string(container_variant, i, write_buffer); write_buffer.commit(); } - CHECK(variant.empty()); - variant.create_root(std::make_shared(), std::move(col_to)); + if (variant.empty()) { + variant.create_root(std::make_shared(), + std::move(col_to)); + } else { + variant.get_root()->insert_range_from(*col_to, 0, col_to->size()); + } } else { // TODO select v:b -> v.b / v.b.c but v.d maybe in v // copy container variant to dst variant, todo avoid copy diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 26bf6f6ca2e2b7..394f5bae184a93 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -718,8 +718,9 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t } else if (block->columns() != _tablet_schema->num_columns()) { return Status::InternalError( "illegal block columns, block columns = {}, tablet_schema columns = {}", - block->columns(), _tablet_schema->num_columns()); + block->dump_structure(), _tablet_schema->dump_structure()); } + LOG(INFO) << "add a single block " << block->rows(); _batched_blocks.emplace_back(block, row_pos, num_rows); return Status::OK(); } diff --git a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy index 91e94fcc40a73c..ecfd5ff98db2c7 100644 --- a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy +++ b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("double_write_schema_change_with_variant") { +suite("double_write_schema_change_with_variant", "nonConcurrent") { def set_be_config = { key, value -> String backend_id; def backendId_to_backendIP = [:] @@ -70,6 +70,7 @@ suite("double_write_schema_change_with_variant") { """ set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "6294967296") + set_be_config.call("write_buffer_size", "10240") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""") @@ -112,5 +113,7 @@ suite("double_write_schema_change_with_variant") { // createMV("create materialized view xxx as select k, sum(k) from ${table_name} group by k order by k;") // qt_sql "select v['type'], v['id'], v['created_at'] from ${table_name} where cast(v['id'] as bigint) != 25061216922 order by k, cast(v['id'] as bigint) limit 10" + // restore configs set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "2147483648") + set_be_config.call("write_buffer_size", "209715200") }