From 75b421aa6136376be9519508afb4bcf484cf82cb Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Tue, 14 May 2024 10:06:04 +0800 Subject: [PATCH 1/5] refactor to support partial update --- be/src/cloud/cloud_rowset_writer.cpp | 8 +- be/src/olap/base_tablet.cpp | 20 ++- be/src/olap/rowset/beta_rowset_writer.cpp | 8 +- be/src/olap/rowset/rowset.h | 1 + be/src/olap/rowset/rowset_meta.cpp | 14 ++ be/src/olap/rowset/rowset_writer_context.h | 3 +- be/src/olap/rowset/segment_creator.cpp | 87 ++-------- be/src/olap/rowset/segment_creator.h | 8 +- .../segment_v2/hierarchical_data_reader.cpp | 5 +- .../segment_v2/hierarchical_data_reader.h | 25 +-- be/src/olap/rowset/segment_v2/segment.cpp | 27 ++-- .../segment_v2/vertical_segment_writer.cpp | 151 ++++++++++++++++-- .../segment_v2/vertical_segment_writer.h | 11 +- be/src/olap/rowset_builder.cpp | 8 +- be/src/olap/tablet.cpp | 3 +- be/src/olap/tablet_schema.cpp | 3 +- be/src/vec/columns/column_object.cpp | 9 +- be/src/vec/common/schema_util.cpp | 109 ++++++------- be/src/vec/common/schema_util.h | 13 +- .../serde/data_type_object_serde.cpp | 19 ++- .../data_types/serde/data_type_object_serde.h | 9 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 2 +- .../functions/function_variant_element.cpp | 43 ++++- be/src/vec/olap/olap_data_convertor.cpp | 31 ++-- be/src/vec/olap/olap_data_convertor.h | 15 +- .../org/apache/doris/analysis/UpdateStmt.java | 11 +- .../trees/plans/commands/UpdateCommand.java | 2 +- .../data/variant_p0/delete_update.out | 7 + .../variant_p0/partial_update_parallel1.csv | 5 + .../variant_p0/partial_update_parallel2.csv | 5 + .../variant_p0/partial_update_parallel3.csv | 5 + .../variant_p0/partial_update_parallel4.csv | 3 + .../data/variant_p0/variant_with_rowstore.out | 9 ++ .../suites/variant_p0/delete_update.groovy | 118 +++++++++++--- .../test_compaction_extract_root.groovy | 12 +- .../variant_p0/variant_with_rowstore.groovy | 47 +++++- 36 files changed, 585 insertions(+), 271 deletions(-) create mode 100644 regression-test/data/variant_p0/partial_update_parallel1.csv create mode 100644 regression-test/data/variant_p0/partial_update_parallel2.csv create mode 100644 regression-test/data/variant_p0/partial_update_parallel3.csv create mode 100644 regression-test/data/variant_p0/partial_update_parallel4.csv diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index df88829b91d04b..7f59cecce592fd 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -100,9 +100,9 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) { } // update rowset meta tablet schema if tablet schema updated - if (_context.tablet_schema->num_variant_columns() > 0) { - _rowset_meta->set_tablet_schema(_context.tablet_schema); - } + auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema + : _context.tablet_schema; + _rowset_meta->set_tablet_schema(rowset_schema); if (_rowset_meta->newest_write_timestamp() == -1) { _rowset_meta->set_newest_write_timestamp(UnixSeconds()); @@ -116,7 +116,7 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) { } RETURN_NOT_OK_STATUS_WITH_WARN( - RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, + RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, _rowset_meta, &rowset), "rowset init failed when build new rowset"); _already_built = true; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 772feb6f4508ac..f3461ae6a18466 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -188,7 +188,7 @@ TabletSchemaSPtr BaseTablet::tablet_schema_with_merged_max_schema_version( [](const RowsetMetaSharedPtr& rs_meta) { return rs_meta->tablet_schema(); }); static_cast( vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema)); - VLOG_DEBUG << "dump schema: " << target_schema->dump_structure(); + VLOG_DEBUG << "dump schema: " << target_schema->dump_full_schema(); } return target_schema; } @@ -452,8 +452,8 @@ Status BaseTablet::lookup_row_data(const Slice& encoded_key, const RowLocation& CHECK(tablet_schema->store_row_column()); SegmentCacheHandle segment_cache_handle; std::unique_ptr column_iterator; - const auto& column = *DORIS_TRY(tablet_schema->column(BeConsts::ROW_STORE_COL)); - RETURN_IF_ERROR(_get_segment_column_iterator(rowset, row_location.segment_id, column, + RETURN_IF_ERROR(_get_segment_column_iterator(rowset, row_location.segment_id, + tablet_schema->column(BeConsts::ROW_STORE_COL), &segment_cache_handle, &column_iterator, &stats)); // get and parse tuple row vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); @@ -625,6 +625,10 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, (std::find(including_cids.cbegin(), including_cids.cend(), rowset_schema->sequence_col_idx()) != including_cids.cend()); } + bool has_variants = rowset_schema->num_variant_columns() > 0; + if (has_variants) { + rowset_schema = rowset_schema->copy_without_extracted_columns(); + } // use for partial update PartialUpdateReadPlan read_plan_ori; PartialUpdateReadPlan read_plan_update; @@ -871,9 +875,9 @@ Status BaseTablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, SegmentCacheHandle segment_cache_handle; std::unique_ptr column_iterator; OlapReaderStatistics stats; - const auto& column = *DORIS_TRY(tablet_schema.column(BeConsts::ROW_STORE_COL)); - RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, column, &segment_cache_handle, - &column_iterator, &stats)); + RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, + tablet_schema.column(BeConsts::ROW_STORE_COL), + &segment_cache_handle, &column_iterator, &stats)); // get and parse tuple row vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr)); @@ -1241,6 +1245,10 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf // update the shared_ptr to new bitmap, which is consistent with current rowset. txn_info->delete_bitmap = delete_bitmap; + // rowset->meta_meta()->tablet_schema() maybe updated so update rowset tablet schema + if (rowset->rowset_meta()->tablet_schema() != rowset->tablet_schema()) { + rowset->set_schema(rowset->rowset_meta()->tablet_schema()); + } // erase segment cache cause we will add a segment to rowset SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments()); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index ae08da0efb853b..b568b6761a03c1 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -695,12 +695,12 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { } // update rowset meta tablet schema if tablet schema updated - if (_context.tablet_schema->num_variant_columns() > 0) { - _rowset_meta->set_tablet_schema(_context.tablet_schema); - } + auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema + : _context.tablet_schema; + _rowset_meta->set_tablet_schema(rowset_schema); RETURN_NOT_OK_STATUS_WITH_WARN( - RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, + RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, _rowset_meta, &rowset), "rowset init failed when build new rowset"); _already_built = true; diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 496e712667645f..d72293a50921c1 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -141,6 +141,7 @@ class Rowset : public std::enable_shared_from_this { // publish rowset to make it visible to read void make_visible(Version version); void set_version(Version version); + void set_schema(TabletSchemaSPtr new_schema) { _schema = new_schema; } const TabletSchemaSPtr& tablet_schema() const { return _schema; } // helper class to access RowsetMeta diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index 98cefe20b2b62e..6cc0c6dffe8186 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -19,6 +19,8 @@ #include +#include + #include "common/logging.h" #include "google/protobuf/util/message_differencer.h" #include "io/fs/file_writer.h" @@ -31,6 +33,7 @@ #include "olap/tablet_fwd.h" #include "olap/tablet_schema.h" #include "olap/tablet_schema_cache.h" +#include "vec/common/schema_util.h" namespace doris { @@ -233,6 +236,17 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) { set_rowset_state(RowsetStatePB::COMMITTED); } + // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated + // Otherwise the schema is stale and lead to wrong data read + if (tablet_schema()->num_variant_columns() > 0) { + // merge extracted columns + TabletSchemaSPtr merged_schema; + static_cast(vectorized::schema_util::get_least_common_schema( + {tablet_schema(), other.tablet_schema()}, nullptr, merged_schema)); + if (*_schema != *merged_schema) { + set_tablet_schema(merged_schema); + } + } } bool operator==(const RowsetMeta& a, const RowsetMeta& b) { diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index b88c2494b7bd2b..488030993e116c 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -51,7 +51,8 @@ struct RowsetWriterContext { RowsetTypePB rowset_type {BETA_ROWSET}; TabletSchemaSPtr tablet_schema; - TabletSchemaSPtr original_tablet_schema; + // for variant schema update + TabletSchemaSPtr merged_tablet_schema; // PREPARED/COMMITTED for pending rowset // VISIBLE for non-pending rowset RowsetStatePB rowset_state {PREPARED}; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 368393d4814b8a..ca7b3a766c3d30 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -44,6 +44,7 @@ #include "vec/common/schema_util.h" // variant column #include "vec/core/block.h" #include "vec/core/columns_with_type_and_name.h" +#include "vec/core/types.h" namespace doris { using namespace ErrorCode; @@ -58,60 +59,38 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ if (block->rows() == 0) { return Status::OK(); } - // Expand variant columns vectorized::Block flush_block(*block); - TabletSchemaSPtr flush_schema; if (_context.write_type != DataWriteType::TYPE_COMPACTION && _context.tablet_schema->num_variant_columns() > 0) { - RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema)); + RETURN_IF_ERROR(_parse_variant_columns(flush_block)); } bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; if (config::enable_vertical_segment_writer && _context.tablet_schema->cluster_key_idxes().empty()) { std::unique_ptr writer; - RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); + RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); - RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size)); + RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), flush_size)); } else { std::unique_ptr writer; - RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); + RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); - RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size)); + RETURN_IF_ERROR(_flush_segment_writer(writer, nullptr /*TODO*/, flush_size)); } return Status::OK(); } -Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, - TabletSchemaSPtr& flush_schema) { +Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) { size_t num_rows = block.rows(); if (num_rows == 0) { return Status::OK(); } - { - std::lock_guard lock(*(_context.schema_lock)); - // save original tablet schema, _context->tablet_schema maybe modified - if (_context.original_tablet_schema == nullptr) { - _context.original_tablet_schema = _context.tablet_schema; - } - } - std::vector variant_column_pos; - if (_context.partial_update_info && _context.partial_update_info->is_partial_update) { - // check columns that used to do partial updates should not include variant - for (int i : _context.partial_update_info->update_cids) { - const auto& col = *_context.original_tablet_schema->columns()[i]; - if (!col.is_key() && col.name() != DELETE_SIGN) { - return Status::InvalidArgument( - "Not implement partial update for variant only support delete currently"); - } - } - } else { - // find positions of variant columns - for (int i = 0; i < _context.original_tablet_schema->columns().size(); ++i) { - if (_context.original_tablet_schema->columns()[i]->is_variant_type()) { - variant_column_pos.push_back(i); - } + for (int i = 0; i < block.columns(); ++i) { + const auto& entry = block.get_by_position(i); + if (remove_nullable(entry.type)->get_type_id() == vectorized::TypeIndex::VARIANT) { + variant_column_pos.push_back(i); } } @@ -120,37 +99,8 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, } vectorized::schema_util::ParseContext ctx; - ctx.record_raw_json_column = _context.original_tablet_schema->store_row_column(); - RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns( - block, variant_column_pos, ctx)); - - flush_schema = std::make_shared(); - flush_schema->copy_from(*_context.original_tablet_schema); - vectorized::Block flush_block(std::move(block)); - vectorized::schema_util::rebuild_schema_and_block( - _context.original_tablet_schema, variant_column_pos, flush_block, flush_schema); - - { - // Update rowset schema, tablet's tablet schema will be updated when build Rowset - // Eg. flush schema: A(int), B(float), C(int), D(int) - // ctx.tablet_schema: A(bigint), B(double) - // => update_schema: A(bigint), B(double), C(int), D(int) - std::lock_guard lock(*(_context.schema_lock)); - TabletSchemaSPtr update_schema; - RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( - {_context.tablet_schema, flush_schema}, nullptr, update_schema)); - CHECK_GE(update_schema->num_columns(), flush_schema->num_columns()) - << "Rowset merge schema columns count is " << update_schema->num_columns() - << ", but flush_schema is larger " << flush_schema->num_columns() - << " update_schema: " << update_schema->dump_structure() - << " flush_schema: " << flush_schema->dump_structure(); - _context.tablet_schema.swap(update_schema); - VLOG_DEBUG << "dump rs schema: " << _context.tablet_schema->dump_structure(); - } - - block.swap(flush_block); // NOLINT(bugprone-use-after-move) - VLOG_DEBUG << "dump block: " << block.dump_data(); - VLOG_DEBUG << "dump flush schema: " << flush_schema->dump_structure(); + ctx.record_raw_json_column = _context.tablet_schema->store_row_column(); + RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_column_pos, ctx)); return Status::OK(); } @@ -182,8 +132,7 @@ Status SegmentFlusher::_add_rows(std::unique_ptr& writer, - int32_t segment_id, bool no_compression, - TabletSchemaSPtr flush_schema) { + int32_t segment_id, bool no_compression) { io::FileWriterPtr file_writer; RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer)); @@ -195,9 +144,8 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr( - file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir, + file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context); RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer))); auto s = writer->init(); @@ -211,7 +159,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr& writer, int32_t segment_id, - bool no_compression, TabletSchemaSPtr flush_schema) { + bool no_compression) { io::FileWriterPtr file_writer; RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer)); @@ -223,9 +171,8 @@ Status SegmentFlusher::_create_segment_writer( writer_options.compression_type = NO_COMPRESSION; } - const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema; writer = std::make_unique( - file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir, + file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context); RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer))); auto s = writer->init(); diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index f6379cd047b2b3..3226ab0adf87cb 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -135,17 +135,15 @@ class SegmentFlusher { bool need_buffering(); private: - Status _expand_variant_to_subcolumns(vectorized::Block& block, TabletSchemaSPtr& flush_schema); + Status _parse_variant_columns(vectorized::Block& block); Status _add_rows(std::unique_ptr& segment_writer, const vectorized::Block* block, size_t row_offset, size_t row_num); Status _add_rows(std::unique_ptr& segment_writer, const vectorized::Block* block, size_t row_offset, size_t row_num); Status _create_segment_writer(std::unique_ptr& writer, - int32_t segment_id, bool no_compression = false, - TabletSchemaSPtr flush_schema = nullptr); + int32_t segment_id, bool no_compression = false); Status _create_segment_writer(std::unique_ptr& writer, - int32_t segment_id, bool no_compression = false, - TabletSchemaSPtr flush_schema = nullptr); + int32_t segment_id, bool no_compression = false); Status _flush_segment_writer(std::unique_ptr& writer, TabletSchemaSPtr flush_schema = nullptr, int64_t* flush_size = nullptr); diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index 66ad0eb92a996d..dcc082c22ae00c 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -34,10 +34,9 @@ namespace segment_v2 { Status HierarchicalDataReader::create(std::unique_ptr* reader, vectorized::PathInData path, const SubcolumnColumnReaders::Node* node, - const SubcolumnColumnReaders::Node* root, - bool output_as_raw_json) { + const SubcolumnColumnReaders::Node* root) { // None leave node need merge with root - auto* stream_iter = new HierarchicalDataReader(path, output_as_raw_json); + auto* stream_iter = new HierarchicalDataReader(path); std::vector leaves; vectorized::PathsInData leaves_paths; SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths); diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index a3ac277586c15c..1d02685e445dfd 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -64,12 +64,11 @@ using SubcolumnColumnReaders = vectorized::SubcolumnsTree; // Reader for hierarchical data for variant, merge with root(sparse encoded columns) class HierarchicalDataReader : public ColumnIterator { public: - HierarchicalDataReader(const vectorized::PathInData& path, bool output_as_raw_json = false) - : _path(path), _output_as_raw_json(output_as_raw_json) {} + HierarchicalDataReader(const vectorized::PathInData& path) : _path(path) {} static Status create(std::unique_ptr* reader, vectorized::PathInData path, const SubcolumnColumnReaders::Node* target_node, - const SubcolumnColumnReaders::Node* root, bool output_as_raw_json = false); + const SubcolumnColumnReaders::Node* root); Status init(const ColumnIteratorOptions& opts) override; @@ -93,7 +92,6 @@ class HierarchicalDataReader : public ColumnIterator { std::unique_ptr _root_reader; size_t _rows_read = 0; vectorized::PathInData _path; - bool _output_as_raw_json = false; template Status tranverse(NodeFunction&& node_func) { @@ -154,22 +152,9 @@ class HierarchicalDataReader : public ColumnIterator { return Status::OK(); })); - if (_output_as_raw_json) { - auto col_to = vectorized::ColumnString::create(); - col_to->reserve(nrows * 2); - vectorized::VectorBufferWriter write_buffer(*col_to.get()); - auto type = std::make_shared(); - for (size_t i = 0; i < nrows; ++i) { - type->to_string(container_variant, i, write_buffer); - write_buffer.commit(); - } - CHECK(variant.empty()); - variant.create_root(std::make_shared(), std::move(col_to)); - } else { - // TODO select v:b -> v.b / v.b.c but v.d maybe in v - // copy container variant to dst variant, todo avoid copy - variant.insert_range_from(container_variant, 0, nrows); - } + // TODO select v:b -> v.b / v.b.c but v.d maybe in v + // copy container variant to dst variant, todo avoid copy + variant.insert_range_from(container_variant, 0, nrows); // variant.set_num_rows(nrows); _rows_read += nrows; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 841c4403e9cb13..d1e4283bbdd9cc 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -539,24 +539,17 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, auto sparse_node = tablet_column.has_path_info() ? _sparse_column_tree.find_exact(*tablet_column.path_info_ptr()) : nullptr; - if (opt != nullptr && opt->io_ctx.reader_type == ReaderType::READER_ALTER_TABLE) { - CHECK(tablet_column.is_variant_type()); - if (root == nullptr) { - // No such variant column in this segment, get a default one - RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); - return Status::OK(); - } - bool output_as_raw_json = true; - // Alter table operation should read the whole variant column, since it does not aware of - // subcolumns of variant during processing rewriting rowsets. - // This is slow, since it needs to read all sub columns and merge them into a single column - RETURN_IF_ERROR( - HierarchicalDataReader::create(iter, root_path, root, root, output_as_raw_json)); - return Status::OK(); - } - if (opt == nullptr || opt->io_ctx.reader_type != ReaderType::READER_QUERY) { - // Could be compaction ..etc and read flat leaves nodes data + auto is_compaction = [](ReaderType type) { + return type == ReaderType::READER_BASE_COMPACTION || + type == ReaderType::READER_CUMULATIVE_COMPACTION || + type == ReaderType::READER_COLD_DATA_COMPACTION || + type == ReaderType::READER_SEGMENT_COMPACTION || + type == ReaderType::READER_FULL_COMPACTION; + }; + + if (opt != nullptr && is_compaction(opt->io_ctx.reader_type)) { + // compaction need to read flat leaves nodes data to prevent from amplification const auto* node = tablet_column.has_path_info() ? _sub_column_tree.find_leaf(*tablet_column.path_info_ptr()) : nullptr; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index f1709226d01586..0af69e3e295e94 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -140,13 +140,10 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t colum for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); } - // add sparse column to footer - for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { - _init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i)); - } } -Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column) { +Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletSchemaSPtr& tablet_schema) { ColumnWriterOptions opts; opts.meta = _footer.add_columns(); @@ -154,9 +151,9 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS // except for columns whose type don't support zone map. - opts.need_zone_map = column.is_key() || _tablet_schema->keys_type() != KeysType::AGG_KEYS; + opts.need_zone_map = column.is_key() || tablet_schema->keys_type() != KeysType::AGG_KEYS; opts.need_bloom_filter = column.is_bf_column(); - auto* tablet_index = _tablet_schema->get_ngram_bf_index(column.unique_id()); + auto* tablet_index = tablet_schema->get_ngram_bf_index(column.unique_id()); if (tablet_index) { opts.need_bloom_filter = true; opts.is_ngram_bf_index = true; @@ -172,11 +169,11 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo } // skip write inverted index on load if skip_write_index_on_load is true if (_opts.write_type == DataWriteType::TYPE_DIRECT && - _tablet_schema->skip_write_index_on_load()) { + tablet_schema->skip_write_index_on_load()) { skip_inverted_index = true; } // indexes for this column - opts.indexes = _tablet_schema->get_indexes_for_column(column); + opts.indexes = tablet_schema->get_indexes_for_column(column); if (!InvertedIndexColumnWriter::check_column_valid(column)) { // skip inverted index if invalid opts.indexes.clear(); @@ -310,13 +307,14 @@ void VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl // 2.2 build read plan to read by batch // 2.3 fill block // 3. set columns to data convertor and then write all columns -Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data) { +Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data, + vectorized::Block& full_block) { DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); DCHECK(_opts.rowset_ctx->partial_update_info != nullptr); auto tablet = static_cast(_tablet.get()); // create full block and fill with input columns - auto full_block = _tablet_schema->create_block(); + full_block = _tablet_schema->create_block(); const auto& including_cids = _opts.rowset_ctx->partial_update_info->update_cids; size_t input_id = 0; for (auto i : including_cids) { @@ -722,16 +720,127 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t return Status::OK(); } +// for variant type, we should do following steps to fill content of block: +// 1. set block data to data convertor, and get all flattened columns from variant subcolumns +// 2. get sparse columns from previous sparse columns stripped in OlapColumnDataConvertorVariant +// 3. merge current columns info(contains extracted columns) with previous merged_tablet_schema +// which will be used to contruct the new schema for rowset +Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) { + if (_tablet_schema->num_variant_columns() == 0) { + return Status::OK(); + } + size_t column_id = _tablet_schema->num_columns(); + for (int i = 0; i < _tablet_schema->columns().size(); ++i) { + if (!_tablet_schema->columns()[i]->is_variant_type()) { + continue; + } + if (_flush_schema == nullptr) { + _flush_schema = std::make_shared(*_tablet_schema); + } + auto column_ref = data.block->get_by_position(i).column; + const vectorized::ColumnObject& object_column = assert_cast( + remove_nullable(column_ref)->assume_mutable_ref()); + const TabletColumnPtr& parent_column = _tablet_schema->columns()[i]; + + // generate column info by entry info + auto generate_column_info = [&](const auto& entry) { + const std::string& column_name = + parent_column->name_lower_case() + "." + entry->path.get_path(); + const vectorized::DataTypePtr& final_data_type_from_object = + entry->data.get_least_common_type(); + vectorized::PathInDataBuilder full_path_builder; + auto full_path = full_path_builder.append(parent_column->name_lower_case(), false) + .append(entry->path.get_parts(), false) + .build(); + return vectorized::schema_util::get_column_by_type( + final_data_type_from_object, column_name, + vectorized::schema_util::ExtraInfo { + .unique_id = -1, + .parent_unique_id = parent_column->unique_id(), + .path_info = full_path}); + }; + + CHECK(object_column.is_finalized()); + // common extracted columns + for (const auto& entry : + vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) { + if (entry->path.empty()) { + // already handled by parent column + continue; + } + CHECK(entry->data.is_finalized()); + int current_column_id = column_id++; + TabletColumn tablet_column = generate_column_info(entry); + vectorized::schema_util::inherit_column_attributes(*parent_column, tablet_column, + _flush_schema); + RETURN_IF_ERROR(_create_column_writer(current_column_id /*unused*/, tablet_column, + _flush_schema)); + _olap_data_convertor->set_source_content_with_specifid_column( + {entry->data.get_finalized_column_ptr()->get_ptr(), + entry->data.get_least_common_type(), tablet_column.name()}, + data.row_pos, data.num_rows, current_column_id); + // convert column data from engine format to storage layer format + auto [status, column] = _olap_data_convertor->convert_column_data(current_column_id); + if (!status.ok()) { + return status; + } + RETURN_IF_ERROR(_column_writers[current_column_id]->append( + column->get_nullmap(), column->get_data(), data.num_rows)); + _flush_schema->append_column(tablet_column); + _olap_data_convertor->clear_source_content(); + } + // sparse_columns + for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns( + object_column.get_sparse_subcolumns())) { + TabletColumn sparse_tablet_column = generate_column_info(entry); + _flush_schema->mutable_column_by_uid(parent_column->unique_id()) + .append_sparse_column(sparse_tablet_column); + + // add sparse column to footer + auto* column_pb = _footer.mutable_columns(i); + _init_column_meta(column_pb->add_sparse_columns(), -1, sparse_tablet_column); + } + } + + // Update rowset schema, tablet's tablet schema will be updated when build Rowset + // Eg. flush schema: A(int), B(float), C(int), D(int) + // ctx.tablet_schema: A(bigint), B(double) + // => update_schema: A(bigint), B(double), C(int), D(int) + std::lock_guard lock(*(_opts.rowset_ctx->schema_lock)); + if (_opts.rowset_ctx->merged_tablet_schema == nullptr) { + _opts.rowset_ctx->merged_tablet_schema = _opts.rowset_ctx->tablet_schema; + } + TabletSchemaSPtr update_schema; + RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( + {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, update_schema)); + CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns()) + << "Rowset merge schema columns count is " << update_schema->num_columns() + << ", but flush_schema is larger " << _flush_schema->num_columns() + << " update_schema: " << update_schema->dump_structure() + << " flush_schema: " << _flush_schema->dump_structure(); + _opts.rowset_ctx->merged_tablet_schema.swap(update_schema); + VLOG_DEBUG << "dump block " << data.block->dump_data(); + VLOG_DEBUG << "dump rs schema: " << _opts.rowset_ctx->merged_tablet_schema->dump_full_schema(); + VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " << _segment_id; + return Status::OK(); +} + Status VerticalSegmentWriter::write_batch() { if (_opts.rowset_ctx->partial_update_info && _opts.rowset_ctx->partial_update_info->is_partial_update && _opts.write_type == DataWriteType::TYPE_DIRECT && !_opts.rowset_ctx->is_transient_rowset_writer) { for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { - RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid))); + RETURN_IF_ERROR( + _create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); + } + vectorized::Block full_block; + for (auto& data : _batched_blocks) { + RETURN_IF_ERROR(_append_block_with_partial_content(data, full_block)); } for (auto& data : _batched_blocks) { - RETURN_IF_ERROR(_append_block_with_partial_content(data)); + RowsInBlock full_rows_block {&full_block, data.row_pos, data.num_rows}; + RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block)); } for (auto& column_writer : _column_writers) { RETURN_IF_ERROR(column_writer->finish()); @@ -753,7 +862,7 @@ Status VerticalSegmentWriter::write_batch() { std::vector key_columns; vectorized::IOlapColumnDataAccessor* seq_column = nullptr; for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { - RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid))); + RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); for (auto& data : _batched_blocks) { RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( data.block, data.row_pos, data.num_rows, std::vector {cid})); @@ -778,8 +887,6 @@ Status VerticalSegmentWriter::write_batch() { return Status::Error("disk {} exceed capacity limit.", _data_dir->path_hash()); } - RETURN_IF_ERROR(_column_writers[cid]->finish()); - RETURN_IF_ERROR(_column_writers[cid]->write_data()); } for (auto& data : _batched_blocks) { @@ -826,6 +933,18 @@ Status VerticalSegmentWriter::write_batch() { _num_rows_written += data.num_rows; } + if (_opts.write_type == DataWriteType::TYPE_DIRECT || + _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) { + for (auto& data : _batched_blocks) { + RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data)); + } + } + + for (auto& column_writer : _column_writers) { + RETURN_IF_ERROR(column_writer->finish()); + RETURN_IF_ERROR(column_writer->write_data()); + } + _batched_blocks.clear(); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 080530f833adf1..3809a8301d5de7 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -118,11 +118,14 @@ class VerticalSegmentWriter { Slice min_encoded_key(); Slice max_encoded_key(); + TabletSchemaSPtr flush_schema() const { return _flush_schema; }; + void clear(); private: void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column); - Status _create_column_writer(uint32_t cid, const TabletColumn& column); + Status _create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletSchemaSPtr& schema); size_t _calculate_inverted_index_file_size(); uint64_t _estimated_remaining_size(); Status _write_ordinal_index(); @@ -147,7 +150,8 @@ class VerticalSegmentWriter { void _set_min_key(const Slice& key); void _set_max_key(const Slice& key); void _serialize_block_to_row_column(vectorized::Block& block); - Status _append_block_with_partial_content(RowsInBlock& data); + Status _append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block); + Status _append_block_with_variant_subcolumns(RowsInBlock& data); Status _fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, bool has_default_or_nullable, const size_t& segment_start_pos, @@ -205,6 +209,9 @@ class VerticalSegmentWriter { std::map _rsid_to_rowset; std::vector _batched_blocks; + + // contains auto generated columns, should be nullptr if no variants's subcolumns + TabletSchemaSPtr _flush_schema = nullptr; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 1236414adf9a3a..2767b975a53316 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -311,14 +311,18 @@ Status RowsetBuilder::commit_txn() { } std::lock_guard l(_lock); SCOPED_TIMER(_commit_txn_timer); - if (tablet()->tablet_schema()->num_variant_columns() > 0) { + + const RowsetWriterContext& rw_ctx = _rowset_writer->context(); + if (rw_ctx.tablet_schema->num_variant_columns() > 0) { // update tablet schema when meet variant columns, before commit_txn // Eg. rowset schema: A(int), B(float), C(int), D(int) // _tabelt->tablet_schema: A(bigint), B(double) // => update_schema: A(bigint), B(double), C(int), D(int) - const RowsetWriterContext& rw_ctx = _rowset_writer->context(); RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema)); } + if (rw_ctx.merged_tablet_schema != nullptr) { + RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema)); + } // Transfer ownership of `PendingRowsetGuard` to `TxnManager` Status res = _engine.txn_manager()->commit_txn(_req.partition_id, *tablet(), _req.txn_id, _req.load_id, _rowset, diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 08ff77fdfa3e28..af944b2a6971ef 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1825,7 +1825,8 @@ Result> Tablet::create_transient_rowset_writer( context.rowset_state = PREPARED; context.segments_overlap = OVERLAPPING; context.tablet_schema = std::make_shared(); - context.tablet_schema->copy_from(*(rowset.tablet_schema())); + // context.tablet_schema->copy_from(*(rowset.tablet_schema())); + context.tablet_schema = rowset.tablet_schema()->copy_without_extracted_columns(); context.newest_write_timestamp = UnixSeconds(); context.tablet_id = table_id(); context.enable_segcompaction = false; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 89d154df9e332c..f34043adffdf41 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -856,7 +856,8 @@ void TabletSchema::append_column(TabletColumn column, ColumnType col_type) { _cols.push_back(std::make_shared(std::move(column))); // The dropped column may have same name with exsiting column, so that // not add to name to index map, only for uid to index map - if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type()) { + if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type() || + _cols.back()->is_extracted_column()) { _field_name_to_index.emplace(StringRef(_cols.back()->name()), _num_columns); _field_path_to_index[_cols.back()->path_info_ptr().get()] = _num_columns; } else if (col_type == ColumnType::NORMAL) { diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index e18c988e348871..6faf6f12f90a4a 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -745,8 +745,13 @@ void ColumnObject::insert_from(const IColumn& src, size_t n) { void ColumnObject::try_insert(const Field& field) { if (field.get_type() != Field::Types::VariantMap) { auto* root = get_subcolumn({}); - if (!root) { - doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, "Failed to find root column_path"); + if (root == nullptr) { + bool succ = add_sub_column({}, num_rows); + if (!succ) { + throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, + "Failed to add root sub column {}"); + } + root = get_subcolumn({}); } root->insert(field); ++num_rows; diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 49e901e08463c4..a60d513135132c 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -374,7 +374,32 @@ void update_least_sparse_column(const std::vector& schemas, update_least_schema_internal(subcolumns_types, common_schema, true, variant_col_unique_id); } -void inherit_root_attributes(TabletSchemaSPtr& schema) { +void inherit_column_attributes(const TabletColumn& source, TabletColumn& target, + TabletSchemaSPtr& target_schema) { + if (target.type() != FieldType::OLAP_FIELD_TYPE_TINYINT && + target.type() != FieldType::OLAP_FIELD_TYPE_ARRAY && + target.type() != FieldType::OLAP_FIELD_TYPE_DOUBLE && + target.type() != FieldType::OLAP_FIELD_TYPE_FLOAT) { + // above types are not supported in bf + target.set_is_bf_column(source.is_bf_column()); + } + target.set_aggregation_method(source.aggregation()); + const auto* source_index_meta = target_schema->get_inverted_index(source.unique_id(), ""); + if (source_index_meta != nullptr) { + // add index meta + TabletIndex index_info = *source_index_meta; + index_info.set_escaped_escaped_index_suffix_path(target.path_info_ptr()->get_path()); + const auto* target_index_meta = target_schema->get_inverted_index(target); + if (target_index_meta != nullptr) { + // already exist + target_schema->update_index(target, index_info); + } else { + target_schema->append_index(index_info); + } + } +} + +void inherit_column_attributes(TabletSchemaSPtr& schema) { std::unordered_map variants_index_meta; // Get all variants tablet index metas if exist for (const auto& col : schema->columns()) { @@ -390,29 +415,11 @@ void inherit_root_attributes(TabletSchemaSPtr& schema) { if (!col.is_extracted_column()) { continue; } - if (col.type() != FieldType::OLAP_FIELD_TYPE_TINYINT && - col.type() != FieldType::OLAP_FIELD_TYPE_ARRAY && - col.type() != FieldType::OLAP_FIELD_TYPE_DOUBLE && - col.type() != FieldType::OLAP_FIELD_TYPE_FLOAT) { - // above types are not supported in bf - col.set_is_bf_column(schema->column_by_uid(col.parent_unique_id()).is_bf_column()); - } - col.set_aggregation_method(schema->column_by_uid(col.parent_unique_id()).aggregation()); - auto it = variants_index_meta.find(col.parent_unique_id()); - // variant has no index meta, ignore - if (it == variants_index_meta.end()) { + if (schema->field_index(col.parent_unique_id()) == -1) { + // parent column is missing, maybe dropped continue; } - auto index_meta = schema->get_inverted_index(col); - // add index meta - TabletIndex index_info = it->second; - index_info.set_escaped_escaped_index_suffix_path(col.path_info_ptr()->get_path()); - if (index_meta != nullptr) { - // already exist - schema->update_index(col, index_info); - } else { - schema->append_index(index_info); - } + inherit_column_attributes(schema->column(col.parent_unique_id()), col, schema); } } @@ -473,7 +480,7 @@ Status get_least_common_schema(const std::vector& schemas, update_least_sparse_column(schemas, output_schema, unique_id, path_set); } - inherit_root_attributes(output_schema); + inherit_column_attributes(output_schema); if (check_schema_size && output_schema->columns().size() > config::variant_max_merged_tablet_schema_size) { return Status::DataQualityError("Reached max column size limit {}", @@ -483,25 +490,8 @@ Status get_least_common_schema(const std::vector& schemas, return Status::OK(); } -Status parse_and_encode_variant_columns(Block& block, const std::vector& variant_pos, - const ParseContext& ctx) { - try { - // Parse each variant column from raw string column - RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_pos, ctx)); - vectorized::schema_util::finalize_variant_columns(block, variant_pos, - false /*not ingore sparse*/); - RETURN_IF_ERROR( - vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_pos)); - } catch (const doris::Exception& e) { - // TODO more graceful, max_filter_ratio - LOG(WARNING) << "encounter execption " << e.to_string(); - return Status::InternalError(e.to_string()); - } - return Status::OK(); -} - -Status parse_variant_columns(Block& block, const std::vector& variant_pos, - const ParseContext& ctx) { +Status _parse_variant_columns(Block& block, const std::vector& variant_pos, + const ParseContext& ctx) { for (int i = 0; i < variant_pos.size(); ++i) { auto column_ref = block.get_by_position(variant_pos[i]).column; bool is_nullable = column_ref->is_nullable(); @@ -582,6 +572,19 @@ Status parse_variant_columns(Block& block, const std::vector& variant_pos, return Status::OK(); } +Status parse_variant_columns(Block& block, const std::vector& variant_pos, + const ParseContext& ctx) { + try { + // Parse each variant column from raw string column + RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx)); + } catch (const doris::Exception& e) { + // TODO more graceful, max_filter_ratio + LOG(WARNING) << "encounter execption " << e.to_string(); + return Status::InternalError(e.to_string()); + } + return Status::OK(); +} + void finalize_variant_columns(Block& block, const std::vector& variant_pos, bool ignore_sparse) { for (int i = 0; i < variant_pos.size(); ++i) { @@ -597,19 +600,11 @@ void finalize_variant_columns(Block& block, const std::vector& variant_pos, } } -Status encode_variant_sparse_subcolumns(Block& block, const std::vector& variant_pos) { - for (int i = 0; i < variant_pos.size(); ++i) { - auto& column_ref = block.get_by_position(variant_pos[i]).column->assume_mutable_ref(); - auto& column = - column_ref.is_nullable() - ? assert_cast( - assert_cast(column_ref).get_nested_column()) - : assert_cast(column_ref); - // Make sure the root node is jsonb storage type - auto expected_root_type = make_nullable(std::make_shared()); - column.ensure_root_node_type(expected_root_type); - RETURN_IF_ERROR(column.merge_sparse_to_root_column()); - } +Status encode_variant_sparse_subcolumns(ColumnObject& column) { + // Make sure the root node is jsonb storage type + auto expected_root_type = make_nullable(std::make_shared()); + column.ensure_root_node_type(expected_root_type); + RETURN_IF_ERROR(column.merge_sparse_to_root_column()); return Status::OK(); } @@ -643,7 +638,7 @@ static void _append_column(const TabletColumn& parent_variant, } // sort by paths in lexicographical order -static vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( +vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( const vectorized::ColumnObject::Subcolumns& subcolumns) { // sort by paths in lexicographical order vectorized::ColumnObject::Subcolumns sorted = subcolumns; @@ -716,7 +711,7 @@ void rebuild_schema_and_block(const TabletSchemaSPtr& original, VLOG_DEBUG << "set root_path : " << full_root_path.get_path(); } - vectorized::schema_util::inherit_root_attributes(flush_schema); + vectorized::schema_util::inherit_column_attributes(flush_schema); } // --------------------------- diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index 078081593c549b..f519e4dacae376 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -90,13 +90,11 @@ struct ParseContext { // 1. parse variant from raw json string // 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns // 3. encode sparse sub columns -Status parse_and_encode_variant_columns(Block& block, const std::vector& variant_pos, - const ParseContext& ctx); Status parse_variant_columns(Block& block, const std::vector& variant_pos, const ParseContext& ctx); void finalize_variant_columns(Block& block, const std::vector& variant_pos, bool ignore_sparse = true); -Status encode_variant_sparse_subcolumns(Block& block, const std::vector& variant_pos); +Status encode_variant_sparse_subcolumns(ColumnObject& column); // Pick the tablet schema with the highest schema version as the reference. // Then update all variant columns to there least common types. @@ -117,7 +115,14 @@ void update_least_sparse_column(const std::vector& schemas, const std::unordered_set& path_set); // inherit attributes like index/agg info from it's parent column -void inherit_root_attributes(TabletSchemaSPtr& schema); +void inherit_column_attributes(TabletSchemaSPtr& schema); + +void inherit_column_attributes(const TabletColumn& source, TabletColumn& target, + TabletSchemaSPtr& target_schema); + +// get sorted subcolumns of variant +vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( + const vectorized::ColumnObject::Subcolumns& subcolumns); // Rebuild schema from original schema by extend dynamic columns generated from ColumnObject. // Block consists of two parts, dynamic part of columns and static part of columns. diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 99a301294dc727..77fef42df55da7 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -37,9 +37,10 @@ namespace doris { namespace vectorized { -Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, - MysqlRowBuffer& row_buffer, int row_idx, - bool col_const) const { +template +Status DataTypeObjectSerDe::_write_column_to_mysql(const IColumn& column, + MysqlRowBuffer& row_buffer, + int row_idx, bool col_const) const { const auto& variant = assert_cast(column); if (!variant.is_finalized()) { const_cast(variant).finalize(); @@ -66,6 +67,18 @@ Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, return Status::OK(); } +Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, + MysqlRowBuffer& row_buffer, int row_idx, + bool col_const) const { + return _write_column_to_mysql(column, row_buffer, row_idx, col_const); +} + +Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, + MysqlRowBuffer& row_buffer, int row_idx, + bool col_const) const { + return _write_column_to_mysql(column, row_buffer, row_idx, col_const); +} + void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, int32_t col_id, int row_num) const { diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h index c589a1af92a0aa..7bf8da438c50ec 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.h +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -81,9 +81,7 @@ class DataTypeObjectSerDe : public DataTypeSerDe { } Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer& row_buffer, - int row_idx, bool col_const) const override { - return Status::NotSupported("write_column_to_mysql with type " + column.get_name()); - } + int row_idx, bool col_const) const override; Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer& row_buffer, int row_idx, bool col_const) const override; @@ -94,6 +92,11 @@ class DataTypeObjectSerDe : public DataTypeSerDe { std::vector& buffer_list) const override { return Status::NotSupported("write_column_to_orc with type " + column.get_name()); } + +private: + template + Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer& result, + int row_idx, bool col_const) const; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index b394b30af5784a..d539c0d7864f19 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -431,7 +431,7 @@ Status NewOlapScanner::_init_variant_columns() { } } } - schema_util::inherit_root_attributes(tablet_schema); + schema_util::inherit_column_attributes(tablet_schema); return Status::OK(); } diff --git a/be/src/vec/functions/function_variant_element.cpp b/be/src/vec/functions/function_variant_element.cpp index 89256635279f4b..84ddc3b8046f56 100644 --- a/be/src/vec/functions/function_variant_element.cpp +++ b/be/src/vec/functions/function_variant_element.cpp @@ -32,6 +32,7 @@ #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" #include "vec/columns/column_string.h" +#include "vec/columns/subcolumn_tree.h" #include "vec/common/assert_cast.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" @@ -43,6 +44,7 @@ #include "vec/functions/function.h" #include "vec/functions/function_helpers.h" #include "vec/functions/simple_function_factory.h" +#include "vec/json/path_in_data.h" namespace doris::vectorized { @@ -128,8 +130,45 @@ class FunctionVariantElement : public IFunction { *result = ColumnObject::create(true, type, std::move(result_column)); return Status::OK(); } else { - return Status::NotSupported("Not support element_at with none scalar variant {}", - src.debug_string()); + auto mutable_src = src.clone_finalized(); + auto* mutable_ptr = assert_cast(mutable_src.get()); + PathInData path(field_name); + ColumnObject::Subcolumns subcolumns = mutable_ptr->get_subcolumns(); + const auto* node = subcolumns.find_exact(path); + auto result_col = ColumnObject::create(true, false /*should not create root*/); + if (node != nullptr) { + std::vector nodes; + PathsInData paths; + ColumnObject::Subcolumns::get_leaves_of_node(node, nodes, paths); + ColumnObject::Subcolumns new_subcolumns; + for (const auto* n : nodes) { + PathInData new_path = n->path.copy_pop_front(); + VLOG_DEBUG << "add node " << new_path.get_path() + << ", data size: " << n->data.size() + << ", finalized size: " << n->data.get_finalized_column().size() + << ", common type: " << n->data.get_least_common_type()->get_name(); + // if new_path is empty, indicate it's the root column, but adding a root will return false when calling add + if (!new_subcolumns.add(new_path, n->data)) { + VLOG_DEBUG << "failed to add node " << new_path.get_path(); + } + } + // handle the root node + if (new_subcolumns.empty() && !nodes.empty()) { + CHECK_EQ(nodes.size(), 1); + new_subcolumns.create_root(ColumnObject::Subcolumn { + nodes[0]->data.get_finalized_column_ptr()->assume_mutable(), + nodes[0]->data.get_least_common_type(), true, true}); + } + auto container = ColumnObject::create(std::move(new_subcolumns), true); + result_col->insert_range_from(*container, 0, container->size()); + } else { + result_col->insert_many_defaults(src.size()); + } + *result = result_col->get_ptr(); + VLOG_DEBUG << "dump new object " + << static_cast(result_col.get())->debug_string() + << ", path " << path.get_path(); + return Status::OK(); } } diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 7c1010f743e199..bc15ef169710dd 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -17,6 +17,7 @@ #include "vec/olap/olap_data_convertor.h" +#include #include #include "common/compiler_util.h" // IWYU pragma: keep @@ -42,6 +43,7 @@ #include "vec/columns/column_struct.h" #include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" +#include "vec/common/schema_util.h" #include "vec/core/block.h" #include "vec/data_types/data_type_agg_state.h" #include "vec/data_types/data_type_array.h" @@ -232,6 +234,16 @@ void OlapBlockDataConvertor::set_source_content(const vectorized::Block* block, } } +Status OlapBlockDataConvertor::set_source_content_with_specifid_column( + const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows, uint32_t cid) { + DCHECK(num_rows > 0); + DCHECK(row_pos + num_rows <= typed_column.column->size()); + DCHECK(cid < _convertors.size()); + RETURN_IF_CATCH_EXCEPTION({_convertors[cid]->set_source_column(typed_column, row_pos, num_rows);}); + return Statuse::OK(); +} + + Status OlapBlockDataConvertor::set_source_content_with_specifid_columns( const vectorized::Block* block, size_t row_pos, size_t num_rows, std::vector cids) { @@ -1090,8 +1102,6 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column( ? assert_cast(*typed_column.column) : assert_cast( nullable_column->get_nested_column()); - - const_cast(variant).finalize_if_not(); if (variant.is_null_root()) { auto root_type = make_nullable(std::make_shared()); auto root_col = root_type->create_column(); @@ -1099,19 +1109,22 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column( const_cast(variant).create_root(root_type, std::move(root_col)); variant.check_consistency(); } - auto root_of_variant = variant.get_root(); - auto nullable = assert_cast(root_of_variant.get()); - CHECK(nullable); - _root_data_column = assert_cast(&nullable->get_nested_column()); - _root_data_convertor->set_source_column({root_of_variant->get_ptr(), nullptr, ""}, row_pos, - num_rows); + // ensure data finalized + _source_column_ptr = &const_cast(variant); + _source_column_ptr->finalize(false); + _root_data_convertor = std::make_unique(true); OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows); } // convert root data Status OlapBlockDataConvertor::OlapColumnDataConvertorVariant::convert_to_olap() { - RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, _root_data_column)); + RETURN_IF_ERROR(vectorized::schema_util::encode_variant_sparse_subcolumns(*_source_column_ptr)); + _root_data_convertor->set_source_column( + {_source_column_ptr->get_root()->get_ptr(), nullptr, ""}, _row_pos, _num_rows); + const auto* nullable = assert_cast(_source_column_ptr->get_root().get()); + const auto* root_column = assert_cast(&nullable->get_nested_column()); + RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, root_column)); return Status::OK(); } diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index d05485e2bc535f..fd368616a6ba40 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -34,6 +34,7 @@ #include "olap/uint24.h" #include "runtime/collection_value.h" #include "util/slice.h" +#include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" #include "vec/columns/column_string.h" @@ -79,6 +80,9 @@ class OlapBlockDataConvertor { void set_source_content(const vectorized::Block* block, size_t row_pos, size_t num_rows); Status set_source_content_with_specifid_columns(const vectorized::Block* block, size_t row_pos, size_t num_rows, std::vector cids); + Status set_source_content_with_specifid_column(const ColumnWithTypeAndName& typed_column, + size_t row_pos, size_t num_rows, uint32_t cid); + void clear_source_content(); std::pair convert_column_data(size_t cid); void add_column_data_convertor(const TabletColumn& column); @@ -495,8 +499,8 @@ class OlapBlockDataConvertor { class OlapColumnDataConvertorVariant : public OlapColumnDataConvertorBase { public: - OlapColumnDataConvertorVariant() - : _root_data_convertor(std::make_unique(true)) {} + OlapColumnDataConvertorVariant() = default; + void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) override; Status convert_to_olap() override; @@ -505,10 +509,11 @@ class OlapBlockDataConvertor { const void* get_data_at(size_t offset) const override; private: - // encodes sparsed columns - const ColumnString* _root_data_column; - // _nullmap contains null info for this variant + // // encodes sparsed columns + // const ColumnString* _root_data_column; + // // _nullmap contains null info for this variant std::unique_ptr _root_data_convertor; + ColumnObject* _source_column_ptr; }; private: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java index 9ee6f9c1f7c355..38c63579d3e534 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java @@ -190,12 +190,8 @@ private void analyzeSetExprs(Analyzer analyzer) throws AnalysisException { // step3: generate select list and insert column name list in insert stmt boolean isMow = ((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite(); - boolean hasVariant = false; int setExprCnt = 0; for (Column column : targetTable.getColumns()) { - if (column.getType().isVariantType()) { - hasVariant = true; - } for (BinaryPredicate setExpr : setExprs) { Expr lhs = setExpr.getChild(0); if (((SlotRef) lhs).getColumn().equals(column)) { @@ -203,13 +199,10 @@ private void analyzeSetExprs(Analyzer analyzer) throws AnalysisException { } } } - // 1.table with sequence col cannot use partial update cause in MOW, we encode pk + // table with sequence col cannot use partial update cause in MOW, we encode pk // with seq column but we don't know which column is sequence in update - // 2. variant column update schema during load, so implement partial update is complicated, - // just ignore it at present if (isMow && ((OlapTable) targetTable).getSequenceCol() == null - && setExprCnt <= targetTable.getColumns().size() * 3 / 10 - && !hasVariant) { + && setExprCnt <= targetTable.getColumns().size() * 3 / 10) { isPartialUpdate = true; } Optional sequenceMapCol = Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index f105c7173a42f5..3e656f82541274 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -159,7 +159,7 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite() && selectItems.size() < targetTable.getColumns().size() - && !targetTable.hasVariantColumns() && targetTable.getSequenceCol() == null + && targetTable.getSequenceCol() == null && partialUpdateColNameToExpression.size() <= targetTable.getFullSchema().size() * 3 / 10; List partialUpdateColNames = new ArrayList<>(); diff --git a/regression-test/data/variant_p0/delete_update.out b/regression-test/data/variant_p0/delete_update.out index 0e478bdeb0df5c..0cf801f1c004e9 100644 --- a/regression-test/data/variant_p0/delete_update.out +++ b/regression-test/data/variant_p0/delete_update.out @@ -14,3 +14,10 @@ 2 {"updated_value":123} {"updated_value":123} 6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123} +-- !sql -- +1 "ddddddddddd" 1111 199 10 {"new_data1":1} +2 "eeeeee" 2222 299 20 {"new_data2":2} +3 "aaaaa" 3333 399 30 {"new_data3":3} +4 "bbbbbbbb" 4444 499 40 {"new_data4":4} +5 "cccccccccccc" 5555 599 50 {"new_data5":5} + diff --git a/regression-test/data/variant_p0/partial_update_parallel1.csv b/regression-test/data/variant_p0/partial_update_parallel1.csv new file mode 100644 index 00000000000000..4ba84bb7785ff2 --- /dev/null +++ b/regression-test/data/variant_p0/partial_update_parallel1.csv @@ -0,0 +1,5 @@ +1,"ddddddddddd" +2,"eeeeee" +3,"aaaaa" +4,"bbbbbbbb" +5,"cccccccccccc" diff --git a/regression-test/data/variant_p0/partial_update_parallel2.csv b/regression-test/data/variant_p0/partial_update_parallel2.csv new file mode 100644 index 00000000000000..1560d6d3261218 --- /dev/null +++ b/regression-test/data/variant_p0/partial_update_parallel2.csv @@ -0,0 +1,5 @@ +1,1111,199 +2,2222,299 +3,3333,399 +4,4444,499 +5,5555,599 diff --git a/regression-test/data/variant_p0/partial_update_parallel3.csv b/regression-test/data/variant_p0/partial_update_parallel3.csv new file mode 100644 index 00000000000000..17abeef1a9cf9c --- /dev/null +++ b/regression-test/data/variant_p0/partial_update_parallel3.csv @@ -0,0 +1,5 @@ +1,10,{"new_data1" : 1} +2,20,{"new_data2" : 2} +3,30,{"new_data3" : 3} +4,40,{"new_data4" : 4} +5,50,{"new_data5" : 5} diff --git a/regression-test/data/variant_p0/partial_update_parallel4.csv b/regression-test/data/variant_p0/partial_update_parallel4.csv new file mode 100644 index 00000000000000..0a7cbd412faab3 --- /dev/null +++ b/regression-test/data/variant_p0/partial_update_parallel4.csv @@ -0,0 +1,3 @@ +1,1 +3,1 +5,1 diff --git a/regression-test/data/variant_p0/variant_with_rowstore.out b/regression-test/data/variant_p0/variant_with_rowstore.out index d7d759baad3cc4..6c34622bec85f2 100644 --- a/regression-test/data/variant_p0/variant_with_rowstore.out +++ b/regression-test/data/variant_p0/variant_with_rowstore.out @@ -23,3 +23,12 @@ 5 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} 6 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} +-- !point_select -- +-3 {"a":1,"b":1.5,"c":[1,2,3]} {"a":1,"b":1.5,"c":[1,2,3]} + +-- !point_select -- +-2 {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}} {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}} + +-- !point_select -- +-1 {"a":1123} {"a":1123} + diff --git a/regression-test/suites/variant_p0/delete_update.groovy b/regression-test/suites/variant_p0/delete_update.groovy index bbd999559b42ee..a50ba57f7b444a 100644 --- a/regression-test/suites/variant_p0/delete_update.groovy +++ b/regression-test/suites/variant_p0/delete_update.groovy @@ -30,14 +30,14 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ """ // test mor table - sql """insert into ${table_name} values (1, '{"a" : 1, "b" : [1], "c": 1.0}')""" - sql """insert into ${table_name} values (2, '{"a" : 2, "b" : [1], "c": 2.0}')""" - sql """insert into ${table_name} values (3, '{"a" : 3, "b" : [3], "c": 3.0}')""" - sql """insert into ${table_name} values (4, '{"a" : 4, "b" : [4], "c": 4.0}')""" - sql """insert into ${table_name} values (5, '{"a" : 5, "b" : [5], "c": 5.0}')""" + sql """insert into ${table_name} values (1, '{"a":1,"b":[1],"c":1.0}')""" + sql """insert into ${table_name} values (2, '{"a":2,"b":[1],"c":2.0}')""" + sql """insert into ${table_name} values (3, '{"a":3,"b":[3],"c":3.0}')""" + sql """insert into ${table_name} values (4, '{"a":4,"b":[4],"c":4.0}')""" + sql """insert into ${table_name} values (5, '{"a":5,"b":[5],"c":5.0}')""" sql "delete from ${table_name} where k = 1" - sql """update ${table_name} set v = '{"updated_value" : 123}' where k = 2""" + sql """update ${table_name} set v = '{"updated_value":123}' where k = 2""" qt_sql "select * from ${table_name} order by k" // MOW @@ -46,41 +46,123 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ sql """ CREATE TABLE IF NOT EXISTS ${table_name} ( k bigint, - v variant, + v variant, vs string ) UNIQUE KEY(`k`) - DISTRIBUTED BY HASH(k) BUCKETS 3 + DISTRIBUTED BY HASH(k) BUCKETS 4 properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """ sql "insert into var_delete_update_mow select k, cast(v as string), cast(v as string) from var_delete_update" sql "delete from ${table_name} where k = 1" sql "delete from ${table_name} where k in (select k from var_delete_update_mow where k in (3, 4, 5))" - sql """insert into ${table_name} values (6, '{"a" : 4, "b" : [4], "c": 4.0}', 'xxx')""" - sql """insert into ${table_name} values (7, '{"a" : 4, "b" : [4], "c": 4.0}', 'yyy')""" - sql """update ${table_name} set vs = '{"updated_value" : 123}' where k = 6""" - sql """update ${table_name} set v = '{"updated_value" : 1111}' where k = 7""" - qt_sql "select * from ${table_name} order by k" + sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.0}', 'xxx')""" + sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.0}', 'yyy')""" + sql """update var_delete_update_mow set vs = '{"updated_value" : 123}' where k = 6""" + sql """update var_delete_update_mow set v = '{"updated_value":1111}' where k = 7""" + qt_sql "select * from var_delete_update_mow order by k" sql """delete from ${table_name} where v = 'xxx' or vs = 'yyy'""" sql """delete from ${table_name} where vs = 'xxx' or vs = 'yyy'""" qt_sql "select * from ${table_name} order by k" // delete & insert concurrently - + sql "set enable_unique_key_partial_update=true;" + sql "sync" t1 = Thread.startDaemon { for (int k = 1; k <= 60; k++) { - int x = k % 10; - sql """insert into ${table_name} values(${x}, '${x}', '{"k${x}" : ${x}}')""" + int x = new Random().nextInt(61) % 10; + sql """insert into ${table_name}(k,vs) values(${x}, '{"k${x}" : ${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : ${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')""" } } t2 = Thread.startDaemon { for (int k = 1; k <= 60; k++) { - int x = k % 10; - sql """delete from ${table_name} where k = ${x} """ + int x = new Random().nextInt(61) % 10; + sql """insert into ${table_name}(k,v) values(${x}, '{"k${x}" : ${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : ${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')""" + } + } + t3 = Thread.startDaemon { + for (int k = 1; k <= 60; k++) { + int x = new Random().nextInt(61) % 10; + sql """insert into ${table_name}(k,v) values(${x}, '{"k${x}" : ${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : ${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')""" + } + } + t1.join() + t2.join() + t3.join() + sql "sync" + + sql "set enable_unique_key_partial_update=false;" + // case 1: concurrent partial update + def tableName = "test_primary_key_partial_update_parallel" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `var` variant NULL) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true") + """ + + sql """insert into ${tableName} values + (2, "doris2", 2000, 223, 2, '{"id":2, "name":"doris2","score":2000,"test":223,"dft":2}'), + (1, "doris", 1000, 123, 1, '{"id":1, "name":"doris","score":1000,"test":123,"dft":1}'), + (5, "doris5", 5000, 523, 5, '{"id":5, "name":"doris5","score":5000,"test":523,"dft":5}'), + (4, "doris4", 4000, 423, 4, '{"id":4, "name":"doris4","score":4000,"test":423,"dft":4}'), + (3, "doris3", 3000, 323, 3, '{"id":3, "name":"doris3","score":3000,"test":323,"dft":3}');""" + + t1 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name' + + file 'partial_update_parallel1.csv' + time 10000 // limit inflight 10s + } + } + + t2 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score,test' + + file 'partial_update_parallel2.csv' + time 10000 // limit inflight 10s } } + + t3 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,dft,var' + + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + } + } + t1.join() t2.join() + t3.join() + + sql "sync" + + qt_sql """ select * from ${tableName} order by id;""" } \ No newline at end of file diff --git a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy index 69c330fa98a280..43f1048f1510c3 100644 --- a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy +++ b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy @@ -85,8 +85,10 @@ suite("test_compaction_extract_root", "nonConcurrent") { union all select 5, '{"a": 1123}' as json_str union all select 5, '{"a": 11245, "b" : 42005}' as json_str from numbers("number" = "4096") limit 4096 ;""" // // fix cast to string tobe {} - qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM ${tableName};""" - qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};""" + qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM test_t""" + qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM test_t""" + // TODO, sparse columns with v['b'] will not be merged in hierachical_data_reader with sparse columns + // qt_select_b_2 """ select v['b'] from test_t where cast(v['b'] as string) != '42005' and cast(v['b'] as string) != '42004' and cast(v['b'] as string) != '42003' order by cast(v['b'] as string); """ qt_select_1_bfcompact """select v['b'] from test_t where k = 0 and cast(v['a'] as int) = 11245;""" @@ -140,8 +142,10 @@ suite("test_compaction_extract_root", "nonConcurrent") { } assert (rowCount <= 8) // fix cast to string tobe {} - qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM ${tableName};""" - qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};""" + qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM test_t""" + qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM test_t""" + // TODO, sparse columns with v['b'] will not be merged in hierachical_data_reader with sparse columns + // qt_select_b_5 """ select v['b'] from test_t where cast(v['b'] as string) != '42005' and cast(v['b'] as string) != '42004' and cast(v['b'] as string) != '42003' order by cast(v['b'] as string); """ qt_select_1 """select v['b'] from test_t where k = 0 and cast(v['a'] as int) = 11245;""" set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1") diff --git a/regression-test/suites/variant_p0/variant_with_rowstore.groovy b/regression-test/suites/variant_p0/variant_with_rowstore.groovy index 58c245ee831685..771f776b3e77e4 100644 --- a/regression-test/suites/variant_p0/variant_with_rowstore.groovy +++ b/regression-test/suites/variant_p0/variant_with_rowstore.groovy @@ -29,7 +29,6 @@ suite("regression_test_variant_rowstore", "variant_type"){ def table_name = "var_rowstore" sql "DROP TABLE IF EXISTS ${table_name}" - set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") sql """ CREATE TABLE IF NOT EXISTS ${table_name} ( @@ -63,4 +62,50 @@ suite("regression_test_variant_rowstore", "variant_type"){ """ sql """insert into ${table_name} select k, cast(v as string), cast(v as string) from var_rowstore""" qt_sql "select * from ${table_name} order by k limit 10" + + // Parse url + def user = context.config.jdbcUser + def password = context.config.jdbcPassword + def realDb = "regression_test_variant_p0" + String jdbcUrl = context.config.jdbcUrl + String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + // set server side prepared statement url + def prepare_url = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb + "?&useServerPrepStmts=true" + table_name = "var_rs_pq" + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant, + v1 variant + ) + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 1 + properties("replication_num" = "1", "disable_auto_compaction" = "false", "store_row_column" = "true", "enable_unique_key_merge_on_write" = "true"); + """ + sql """insert into ${table_name} select k, cast(v as string), cast(v as string) from var_rowstore""" + def result1 = connect(user=user, password=password, url=prepare_url) { + def stmt = prepareStatement "select * from var_rs_pq where k = ?" + assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement); + stmt.setInt(1, -3) + qe_point_select stmt + stmt.setInt(1, -2) + qe_point_select stmt + stmt.setInt(1, -1) + qe_point_select stmt + + // def stmt1 = prepareStatement "select var['a'] from var_rs_pq where k = ?" + // assertEquals(stmt1.class, com.mysql.cj.jdbc.ServerPreparedStatement); + // stmt.setInt(1, -3) + // qe_point_select stmt + } } \ No newline at end of file From 6254b070065ac3926921f2bffac760b3eb11e5b3 Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Thu, 16 May 2024 16:53:40 +0800 Subject: [PATCH 2/5] fix --- be/src/vec/common/schema_util.cpp | 11 +---------- .../suites/variant_p0/delete_update.groovy | 6 +++++- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index a60d513135132c..eb33dcd1654d0f 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -400,15 +400,6 @@ void inherit_column_attributes(const TabletColumn& source, TabletColumn& target, } void inherit_column_attributes(TabletSchemaSPtr& schema) { - std::unordered_map variants_index_meta; - // Get all variants tablet index metas if exist - for (const auto& col : schema->columns()) { - auto index_meta = schema->get_inverted_index(col->unique_id(), ""); - if (col->is_variant_type() && index_meta != nullptr) { - variants_index_meta.emplace(col->unique_id(), *index_meta); - } - } - // Add index meta if extracted column is missing index meta for (size_t i = 0; i < schema->num_columns(); ++i) { TabletColumn& col = schema->mutable_column(i); @@ -419,7 +410,7 @@ void inherit_column_attributes(TabletSchemaSPtr& schema) { // parent column is missing, maybe dropped continue; } - inherit_column_attributes(schema->column(col.parent_unique_id()), col, schema); + inherit_column_attributes(schema->column_by_uid(col.parent_unique_id()), col, schema); } } diff --git a/regression-test/suites/variant_p0/delete_update.groovy b/regression-test/suites/variant_p0/delete_update.groovy index a50ba57f7b444a..ed9667e7f5b324 100644 --- a/regression-test/suites/variant_p0/delete_update.groovy +++ b/regression-test/suites/variant_p0/delete_update.groovy @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +import org.codehaus.groovy.runtime.IOGroovyMethods + suite("regression_test_variant_delete_and_update", "variant_type"){ // MOR def table_name = "var_delete_update" @@ -164,5 +166,7 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ sql "sync" - qt_sql """ select * from ${tableName} order by id;""" + if (!isCloudMode()) { + qt_sql """ select * from ${tableName} order by id;""" + } } \ No newline at end of file From dad23437e4f7b6d7d016e0a4f09bc5e24c7c70ba Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Wed, 29 May 2024 23:11:03 +0800 Subject: [PATCH 3/5] support segment writer and add comment --- be/src/olap/rowset/segment_creator.cpp | 2 +- .../olap/rowset/segment_v2/segment_writer.cpp | 275 ++++++++++++------ .../olap/rowset/segment_v2/segment_writer.h | 12 +- be/src/olap/rowset_builder.cpp | 12 +- be/src/olap/tablet.cpp | 5 +- be/src/vec/columns/column_object.cpp | 11 +- be/src/vec/olap/olap_data_convertor.cpp | 7 +- 7 files changed, 230 insertions(+), 94 deletions(-) diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index ca7b3a766c3d30..5572ac334f5698 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -75,7 +75,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ std::unique_ptr writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); - RETURN_IF_ERROR(_flush_segment_writer(writer, nullptr /*TODO*/, flush_size)); + RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), flush_size)); } return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 78b0a9d2134b22..ae858b521bf770 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -186,64 +186,53 @@ Status SegmentWriter::init() { return init(column_ids, true); } -Status SegmentWriter::init(const std::vector& col_ids, bool has_key) { - DCHECK(_column_writers.empty()); - DCHECK(_column_ids.empty()); - _has_key = has_key; - _column_writers.reserve(_tablet_schema->columns().size()); - _column_ids.insert(_column_ids.end(), col_ids.begin(), col_ids.end()); - _olap_data_convertor = std::make_unique(); - if (_opts.compression_type == UNKNOWN_COMPRESSION) { - _opts.compression_type = _tablet_schema->compression_type(); - } - auto create_column_writer = [&](uint32_t cid, const auto& column) -> auto { - ColumnWriterOptions opts; - opts.meta = _footer.add_columns(); - - init_column_meta(opts.meta, cid, column, _tablet_schema); - - // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS - // except for columns whose type don't support zone map. - opts.need_zone_map = column.is_key() || _tablet_schema->keys_type() != KeysType::AGG_KEYS; - opts.need_bloom_filter = column.is_bf_column(); - auto* tablet_index = _tablet_schema->get_ngram_bf_index(column.unique_id()); - if (tablet_index) { - opts.need_bloom_filter = true; - opts.is_ngram_bf_index = true; - opts.gram_size = tablet_index->get_gram_size(); - opts.gram_bf_size = tablet_index->get_gram_bf_size(); - } - - opts.need_bitmap_index = column.has_bitmap_index(); - bool skip_inverted_index = false; - if (_opts.rowset_ctx != nullptr) { - // skip write inverted index for index compaction - skip_inverted_index = - _opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0; - } - // skip write inverted index on load if skip_write_index_on_load is true - if (_opts.write_type == DataWriteType::TYPE_DIRECT && - _tablet_schema->skip_write_index_on_load()) { - skip_inverted_index = true; - } - // indexes for this column - opts.indexes = std::move(_tablet_schema->get_indexes_for_column(column)); - if (!InvertedIndexColumnWriter::check_column_valid(column)) { - // skip inverted index if invalid - opts.indexes.clear(); - opts.need_zone_map = false; - opts.need_bloom_filter = false; - opts.need_bitmap_index = false; - } - opts.inverted_index_file_writer = _inverted_index_file_writer.get(); - for (const auto* index : opts.indexes) { - if (!skip_inverted_index && index->index_type() == IndexType::INVERTED) { - opts.inverted_index = index; - opts.need_inverted_index = true; - // TODO support multiple inverted index - break; - } +Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletSchemaSPtr& schema) { + ColumnWriterOptions opts; + opts.meta = _footer.add_columns(); + + init_column_meta(opts.meta, cid, column, schema); + + // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS + // except for columns whose type don't support zone map. + opts.need_zone_map = column.is_key() || schema->keys_type() != KeysType::AGG_KEYS; + opts.need_bloom_filter = column.is_bf_column(); + auto* tablet_index = schema->get_ngram_bf_index(column.unique_id()); + if (tablet_index) { + opts.need_bloom_filter = true; + opts.is_ngram_bf_index = true; + opts.gram_size = tablet_index->get_gram_size(); + opts.gram_bf_size = tablet_index->get_gram_bf_size(); + } + + opts.need_bitmap_index = column.has_bitmap_index(); + bool skip_inverted_index = false; + if (_opts.rowset_ctx != nullptr) { + // skip write inverted index for index compaction + skip_inverted_index = _opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0; + } + // skip write inverted index on load if skip_write_index_on_load is true + if (_opts.write_type == DataWriteType::TYPE_DIRECT && schema->skip_write_index_on_load()) { + skip_inverted_index = true; + } + // indexes for this column + opts.indexes = schema->get_indexes_for_column(column); + if (!InvertedIndexColumnWriter::check_column_valid(column)) { + // skip inverted index if invalid + opts.indexes.clear(); + opts.need_zone_map = false; + opts.need_bloom_filter = false; + opts.need_bitmap_index = false; + } + opts.inverted_index_file_writer = _inverted_index_file_writer.get(); + for (const auto* index : opts.indexes) { + if (!skip_inverted_index && index->index_type() == IndexType::INVERTED) { + opts.inverted_index = index; + opts.need_inverted_index = true; + // TODO support multiple inverted index + break; } + } #define CHECK_FIELD_TYPE(TYPE, type_name) \ if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \ opts.need_zone_map = false; \ @@ -255,31 +244,42 @@ Status SegmentWriter::init(const std::vector& col_ids, bool has_key) { } \ } - CHECK_FIELD_TYPE(STRUCT, "struct") - CHECK_FIELD_TYPE(ARRAY, "array") - CHECK_FIELD_TYPE(JSONB, "jsonb") - CHECK_FIELD_TYPE(AGG_STATE, "agg_state") - CHECK_FIELD_TYPE(MAP, "map") - CHECK_FIELD_TYPE(OBJECT, "object") - CHECK_FIELD_TYPE(HLL, "hll") - CHECK_FIELD_TYPE(QUANTILE_STATE, "quantile_state") + CHECK_FIELD_TYPE(STRUCT, "struct") + CHECK_FIELD_TYPE(ARRAY, "array") + CHECK_FIELD_TYPE(JSONB, "jsonb") + CHECK_FIELD_TYPE(AGG_STATE, "agg_state") + CHECK_FIELD_TYPE(MAP, "map") + CHECK_FIELD_TYPE(OBJECT, "object") + CHECK_FIELD_TYPE(HLL, "hll") + CHECK_FIELD_TYPE(QUANTILE_STATE, "quantile_state") #undef CHECK_FIELD_TYPE - if (column.is_row_store_column()) { - // smaller page size for row store column - opts.data_page_size = config::row_column_page_size; - } - std::unique_ptr writer; - RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer)); - RETURN_IF_ERROR(writer->init()); - _column_writers.push_back(std::move(writer)); + if (column.is_row_store_column()) { + // smaller page size for row store column + opts.data_page_size = config::row_column_page_size; + } + std::unique_ptr writer; + RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer)); + RETURN_IF_ERROR(writer->init()); + _column_writers.push_back(std::move(writer)); - _olap_data_convertor->add_column_data_convertor(column); - return Status::OK(); - }; + _olap_data_convertor->add_column_data_convertor(column); + return Status::OK(); +} + +Status SegmentWriter::init(const std::vector& col_ids, bool has_key) { + DCHECK(_column_writers.empty()); + DCHECK(_column_ids.empty()); + _has_key = has_key; + _column_writers.reserve(_tablet_schema->columns().size()); + _column_ids.insert(_column_ids.end(), col_ids.begin(), col_ids.end()); + _olap_data_convertor = std::make_unique(); + if (_opts.compression_type == UNKNOWN_COMPRESSION) { + _opts.compression_type = _tablet_schema->compression_type(); + } - RETURN_IF_ERROR(_create_writers(*_tablet_schema, col_ids, create_column_writer)); + RETURN_IF_ERROR(_create_writers(_tablet_schema, col_ids)); // we don't need the short key index for unique key merge on write table. if (_has_key) { @@ -306,12 +306,11 @@ Status SegmentWriter::init(const std::vector& col_ids, bool has_key) { return Status::OK(); } -Status SegmentWriter::_create_writers( - const TabletSchema& tablet_schema, const std::vector& col_ids, - std::function create_column_writer) { +Status SegmentWriter::_create_writers(const TabletSchemaSPtr& tablet_schema, + const std::vector& col_ids) { _olap_data_convertor->reserve(col_ids.size()); for (auto& cid : col_ids) { - RETURN_IF_ERROR(create_column_writer(cid, tablet_schema.column(cid))); + RETURN_IF_ERROR(_create_column_writer(cid, tablet_schema->column(cid), tablet_schema)); } return Status::OK(); } @@ -327,6 +326,112 @@ void SegmentWriter::_maybe_invalid_row_cache(const std::string& key) { } } +// for variant type, we should do following steps to fill content of block: +// 1. set block data to data convertor, and get all flattened columns from variant subcolumns +// 2. get sparse columns from previous sparse columns stripped in OlapColumnDataConvertorVariant +// 3. merge current columns info(contains extracted columns) with previous merged_tablet_schema +// which will be used to contruct the new schema for rowset +Status SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& data) { + if (_tablet_schema->num_variant_columns() == 0) { + return Status::OK(); + } + size_t column_id = _tablet_schema->num_columns(); + for (int i = 0; i < _tablet_schema->columns().size(); ++i) { + if (!_tablet_schema->columns()[i]->is_variant_type()) { + continue; + } + if (_flush_schema == nullptr) { + _flush_schema = std::make_shared(*_tablet_schema); + } + auto column_ref = data.get_by_position(i).column; + const vectorized::ColumnObject& object_column = assert_cast( + remove_nullable(column_ref)->assume_mutable_ref()); + const TabletColumnPtr& parent_column = _tablet_schema->columns()[i]; + + // generate column info by entry info + auto generate_column_info = [&](const auto& entry) { + const std::string& column_name = + parent_column->name_lower_case() + "." + entry->path.get_path(); + const vectorized::DataTypePtr& final_data_type_from_object = + entry->data.get_least_common_type(); + vectorized::PathInDataBuilder full_path_builder; + auto full_path = full_path_builder.append(parent_column->name_lower_case(), false) + .append(entry->path.get_parts(), false) + .build(); + return vectorized::schema_util::get_column_by_type( + final_data_type_from_object, column_name, + vectorized::schema_util::ExtraInfo { + .unique_id = -1, + .parent_unique_id = parent_column->unique_id(), + .path_info = full_path}); + }; + + CHECK(object_column.is_finalized()); + // common extracted columns + for (const auto& entry : + vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) { + if (entry->path.empty()) { + // already handled by parent column + continue; + } + CHECK(entry->data.is_finalized()); + int current_column_id = column_id++; + TabletColumn tablet_column = generate_column_info(entry); + vectorized::schema_util::inherit_column_attributes(*parent_column, tablet_column, + _flush_schema); + RETURN_IF_ERROR(_create_column_writer(current_column_id /*unused*/, tablet_column, + _flush_schema)); + _olap_data_convertor->set_source_content_with_specifid_column( + {entry->data.get_finalized_column_ptr()->get_ptr(), + entry->data.get_least_common_type(), tablet_column.name()}, + 0, data.rows(), current_column_id); + // convert column data from engine format to storage layer format + auto [status, column] = _olap_data_convertor->convert_column_data(current_column_id); + if (!status.ok()) { + return status; + } + RETURN_IF_ERROR(_column_writers[current_column_id]->append( + column->get_nullmap(), column->get_data(), data.rows())); + _flush_schema->append_column(tablet_column); + _olap_data_convertor->clear_source_content(); + } + // sparse_columns + for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns( + object_column.get_sparse_subcolumns())) { + TabletColumn sparse_tablet_column = generate_column_info(entry); + _flush_schema->mutable_column_by_uid(parent_column->unique_id()) + .append_sparse_column(sparse_tablet_column); + + // add sparse column to footer + auto* column_pb = _footer.mutable_columns(i); + init_column_meta(column_pb->add_sparse_columns(), -1, sparse_tablet_column, + _flush_schema); + } + } + + // Update rowset schema, tablet's tablet schema will be updated when build Rowset + // Eg. flush schema: A(int), B(float), C(int), D(int) + // ctx.tablet_schema: A(bigint), B(double) + // => update_schema: A(bigint), B(double), C(int), D(int) + std::lock_guard lock(*(_opts.rowset_ctx->schema_lock)); + if (_opts.rowset_ctx->merged_tablet_schema == nullptr) { + _opts.rowset_ctx->merged_tablet_schema = _opts.rowset_ctx->tablet_schema; + } + TabletSchemaSPtr update_schema; + RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( + {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, update_schema)); + CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns()) + << "Rowset merge schema columns count is " << update_schema->num_columns() + << ", but flush_schema is larger " << _flush_schema->num_columns() + << " update_schema: " << update_schema->dump_structure() + << " flush_schema: " << _flush_schema->dump_structure(); + _opts.rowset_ctx->merged_tablet_schema.swap(update_schema); + VLOG_DEBUG << "dump block " << data.dump_data(); + VLOG_DEBUG << "dump rs schema: " << _opts.rowset_ctx->merged_tablet_schema->dump_full_schema(); + VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " << _segment_id; + return Status::OK(); +} + void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) { if (block.rows() == 0) { return; @@ -620,6 +725,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* << "primary key index builder num rows(" << _primary_key_index_builder->num_rows() << ") not equal to segment writer's num rows written(" << _num_rows_written << ")"; _olap_data_convertor->clear_source_content(); + + RETURN_IF_ERROR(append_block_with_variant_subcolumns(full_block)); return Status::OK(); } @@ -866,6 +973,12 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po } } + if (_opts.write_type == DataWriteType::TYPE_DIRECT || + _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) { + RETURN_IF_ERROR( + append_block_with_variant_subcolumns(*const_cast(block))); + } + _num_rows_written += num_rows; _olap_data_convertor->clear_source_content(); return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 0d095133b8fba4..92af12d4da6f75 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -99,6 +99,7 @@ class SegmentWriter { Status append_block(const vectorized::Block* block, size_t row_pos, size_t num_rows); Status append_block_with_partial_content(const vectorized::Block* block, size_t row_pos, size_t num_rows); + Status append_block_with_variant_subcolumns(vectorized::Block& data); int64_t max_row_to_add(size_t row_avg_size_in_bytes); @@ -133,6 +134,8 @@ class SegmentWriter { void clear(); + TabletSchemaSPtr flush_schema() const { return _flush_schema; }; + void set_mow_context(std::shared_ptr mow_context); Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, @@ -141,8 +144,10 @@ class SegmentWriter { private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); - Status _create_writers(const TabletSchema& tablet_schema, const std::vector& col_ids, - std::function writer_creator); + Status _create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletSchemaSPtr& schema); + Status _create_writers(const TabletSchemaSPtr& tablet_schema, + const std::vector& col_ids); Status _write_data(); Status _write_ordinal_index(); Status _write_zone_map(); @@ -173,7 +178,6 @@ class SegmentWriter { void set_min_max_key(const Slice& key); void set_min_key(const Slice& key); void set_max_key(const Slice& key); - bool _should_create_writers_with_dynamic_block(size_t num_columns_in_block); void _serialize_block_to_row_column(vectorized::Block& block); Status _generate_primary_key_index( const std::vector& primary_key_coders, @@ -238,6 +242,8 @@ class SegmentWriter { // group every rowset-segment row id to speed up reader PartialUpdateReadPlan _rssid_to_rid; std::map _rsid_to_rowset; + // contains auto generated columns, should be nullptr if no variants's subcolumns + TabletSchemaSPtr _flush_schema = nullptr; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 2767b975a53316..2fc3d58b49141c 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -314,15 +314,21 @@ Status RowsetBuilder::commit_txn() { const RowsetWriterContext& rw_ctx = _rowset_writer->context(); if (rw_ctx.tablet_schema->num_variant_columns() > 0) { + // Need to merge schema with `rw_ctx.merged_tablet_schema` in prior, + // merged schema keeps the newest merged schema for the rowset, which is updated and merged + // during flushing segments. + if (rw_ctx.merged_tablet_schema != nullptr) { + RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema)); + } + // We should merge rowset schema further, in case that the merged_tablet_schema maybe null + // when enable_memtable_on_sink_node is true, the merged_tablet_schema will not be passed to + // the destination backend. // update tablet schema when meet variant columns, before commit_txn // Eg. rowset schema: A(int), B(float), C(int), D(int) // _tabelt->tablet_schema: A(bigint), B(double) // => update_schema: A(bigint), B(double), C(int), D(int) RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema)); } - if (rw_ctx.merged_tablet_schema != nullptr) { - RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema)); - } // Transfer ownership of `PendingRowsetGuard` to `TxnManager` Status res = _engine.txn_manager()->commit_txn(_req.partition_id, *tablet(), _req.txn_id, _req.load_id, _rowset, diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index af944b2a6971ef..60a29d4d81d8ee 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1825,7 +1825,10 @@ Result> Tablet::create_transient_rowset_writer( context.rowset_state = PREPARED; context.segments_overlap = OVERLAPPING; context.tablet_schema = std::make_shared(); - // context.tablet_schema->copy_from(*(rowset.tablet_schema())); + // During a partial update, the extracted columns of a variant should not be included in the tablet schema. + // This is because the partial update for a variant needs to ignore the extracted columns. + // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update, + // the complete variant is constructed by reading all the sub-columns of the variant. context.tablet_schema = rowset.tablet_schema()->copy_without_extracted_columns(); context.newest_write_timestamp = UnixSeconds(); context.tablet_id = table_id(); diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 6faf6f12f90a4a..c3315330d75910 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -745,6 +745,8 @@ void ColumnObject::insert_from(const IColumn& src, size_t n) { void ColumnObject::try_insert(const Field& field) { if (field.get_type() != Field::Types::VariantMap) { auto* root = get_subcolumn({}); + // Insert to an emtpy ColumnObject may result root null, + // so create a root column of Variant is expected. if (root == nullptr) { bool succ = add_sub_column({}, num_rows); if (!succ) { @@ -1291,9 +1293,12 @@ Status ColumnObject::merge_sparse_to_root_column() { parser.getWriter().getOutput()->getSize()); result_column_nullable->get_null_map_data().push_back(0); } - - // assign merged column - subcolumns.get_mutable_root()->data.get_finalized_column_ptr() = mresult->get_ptr(); + subcolumns.get_mutable_root()->data.get_finalized_column().clear(); + // assign merged column, do insert_range_from to make a copy, instead of replace the ptr itselft + // to make sure the root column ptr is not changed + subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from( + *mresult->get_ptr(), 0, num_rows); + // subcolumns.get_mutable_root()->data.get_finalized_column_ptr() = mresult->get_ptr(); return Status::OK(); } diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index bc15ef169710dd..55f0d39ab5f86f 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -1113,6 +1113,8 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column( _source_column_ptr = &const_cast(variant); _source_column_ptr->finalize(false); _root_data_convertor = std::make_unique(true); + _root_data_convertor->set_source_column( + {_source_column_ptr->get_root()->get_ptr(), nullptr, ""}, row_pos, num_rows); OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows); } @@ -1120,8 +1122,9 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column( // convert root data Status OlapBlockDataConvertor::OlapColumnDataConvertorVariant::convert_to_olap() { RETURN_IF_ERROR(vectorized::schema_util::encode_variant_sparse_subcolumns(*_source_column_ptr)); - _root_data_convertor->set_source_column( - {_source_column_ptr->get_root()->get_ptr(), nullptr, ""}, _row_pos, _num_rows); +#ifndef NDEBUG + _source_column_ptr->check_consistency(); +#endif const auto* nullable = assert_cast(_source_column_ptr->get_root().get()); const auto* root_column = assert_cast(&nullable->get_nested_column()); RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, root_column)); From c887190ef46abf85e2c986be43e874ac6a054595 Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Thu, 30 May 2024 11:49:21 +0800 Subject: [PATCH 4/5] add more case --- .../variant_github_events_p0_new/load.groovy | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy b/regression-test/suites/variant_github_events_p0_new/load.groovy index 0be0f205b69b44..43f1cd5a8c78d1 100644 --- a/regression-test/suites/variant_github_events_p0_new/load.groovy +++ b/regression-test/suites/variant_github_events_p0_new/load.groovy @@ -95,6 +95,36 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){ sql """ insert into github_events_2 select 1, cast(v["repo"]["name"] as string) FROM github_events; """ + // insert batches of nulls + for(int t = 0; t <= 10; t += 1){ + long k = 9223372036854775107 + t + sql """INSERT INTO github_events VALUES (${k}, NULL)""" + } + sql """ALTER TABLE github_events SET("bloom_filter_columns" = "v")""" + // wait for add bloom filter finished + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='github_events' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + int max_try_time = 100 + while (max_try_time--){ + String result = getJobState("github_events") + if (result == "FINISHED") { + break + } else { + sleep(2000) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + sql """ALTER TABLE github_events ADD COLUMN v2 variant DEFAULT NULL""" + for(int t = 0; t <= 10; t += 1){ + long k = 9223372036854775107 + t + sql """INSERT INTO github_events VALUES (${k}, '{"aaaa" : 1234, "bbbb" : "11ssss"}', '{"xxxx" : 1234, "yyyy" : [1.111]}')""" + } + sql """ALTER TABLE github_events DROP COLUMN v2""" + sql """DELETE FROM github_events where k >= 9223372036854775107""" qt_sql_select_count """ select count(*) from github_events_2; """ } From 4ee22526610ce2da836e51b2dda51babfd4c5a01 Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Tue, 11 Jun 2024 13:41:33 +0800 Subject: [PATCH 5/5] rebase and fix comment --- be/src/cloud/cloud_rowset_writer.cpp | 7 +++-- be/src/olap/base_tablet.cpp | 26 +++++++++---------- be/src/olap/rowset/beta_rowset_writer.cpp | 7 +++-- be/src/olap/rowset/rowset.cpp | 7 +++++ be/src/olap/rowset/rowset.h | 3 ++- be/src/olap/rowset/rowset_meta.cpp | 6 ++--- be/src/olap/rowset/segment_creator.cpp | 11 ++++---- .../olap/rowset/segment_v2/segment_writer.cpp | 9 +++---- .../segment_v2/vertical_segment_writer.cpp | 21 ++++++++++----- be/src/olap/schema_change.cpp | 2 +- be/src/olap/tablet.cpp | 2 +- be/src/olap/tablet_schema.cpp | 2 +- be/src/olap/tablet_schema.h | 2 +- be/src/vec/columns/column_object.cpp | 1 - be/src/vec/data_types/data_type.h | 4 +++ be/src/vec/olap/olap_data_convertor.cpp | 6 ++--- be/src/vec/olap/olap_data_convertor.h | 2 +- 17 files changed, 65 insertions(+), 53 deletions(-) diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index 7f59cecce592fd..ad5c57fd21ee49 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -115,10 +115,9 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) { _rowset_meta->add_segments_file_size(seg_file_size.value()); } - RETURN_NOT_OK_STATUS_WITH_WARN( - RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, _rowset_meta, - &rowset), - "rowset init failed when build new rowset"); + RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, + _rowset_meta, &rowset), + "rowset init failed when build new rowset"); _already_built = true; return Status::OK(); } diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index f3461ae6a18466..89365e0f4e4644 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -452,8 +452,8 @@ Status BaseTablet::lookup_row_data(const Slice& encoded_key, const RowLocation& CHECK(tablet_schema->store_row_column()); SegmentCacheHandle segment_cache_handle; std::unique_ptr column_iterator; - RETURN_IF_ERROR(_get_segment_column_iterator(rowset, row_location.segment_id, - tablet_schema->column(BeConsts::ROW_STORE_COL), + const auto& column = *DORIS_TRY(tablet_schema->column(BeConsts::ROW_STORE_COL)); + RETURN_IF_ERROR(_get_segment_column_iterator(rowset, row_location.segment_id, column, &segment_cache_handle, &column_iterator, &stats)); // get and parse tuple row vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); @@ -625,9 +625,12 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, (std::find(including_cids.cbegin(), including_cids.cend(), rowset_schema->sequence_col_idx()) != including_cids.cend()); } - bool has_variants = rowset_schema->num_variant_columns() > 0; - if (has_variants) { - rowset_schema = rowset_schema->copy_without_extracted_columns(); + if (rowset_schema->num_variant_columns() > 0) { + // During partial updates, the extracted columns of a variant should not be included in the rowset schema. + // This is because the partial update for a variant needs to ignore the extracted columns. + // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update, + // the complete variant is constructed by reading all the sub-columns of the variant. + rowset_schema = rowset_schema->copy_without_variant_extracted_columns(); } // use for partial update PartialUpdateReadPlan read_plan_ori; @@ -875,9 +878,9 @@ Status BaseTablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, SegmentCacheHandle segment_cache_handle; std::unique_ptr column_iterator; OlapReaderStatistics stats; - RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, - tablet_schema.column(BeConsts::ROW_STORE_COL), - &segment_cache_handle, &column_iterator, &stats)); + const auto& column = *DORIS_TRY(tablet_schema.column(BeConsts::ROW_STORE_COL)); + RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, column, &segment_cache_handle, + &column_iterator, &stats)); // get and parse tuple row vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr)); @@ -1237,7 +1240,7 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf RowsetSharedPtr transient_rowset; RETURN_IF_ERROR(transient_rs_writer->build(transient_rowset)); auto old_segments = rowset->num_segments(); - rowset->rowset_meta()->merge_rowset_meta(*transient_rowset->rowset_meta()); + rowset->merge_rowset_meta(*transient_rowset->rowset_meta()); auto new_segments = rowset->num_segments(); ss << ", partial update flush rowset (old segment num: " << old_segments << ", new segment num: " << new_segments << ")" @@ -1245,11 +1248,6 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf // update the shared_ptr to new bitmap, which is consistent with current rowset. txn_info->delete_bitmap = delete_bitmap; - // rowset->meta_meta()->tablet_schema() maybe updated so update rowset tablet schema - if (rowset->rowset_meta()->tablet_schema() != rowset->tablet_schema()) { - rowset->set_schema(rowset->rowset_meta()->tablet_schema()); - } - // erase segment cache cause we will add a segment to rowset SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments()); } diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index b568b6761a03c1..4cbb83d54628cf 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -699,10 +699,9 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { : _context.tablet_schema; _rowset_meta->set_tablet_schema(rowset_schema); - RETURN_NOT_OK_STATUS_WITH_WARN( - RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, _rowset_meta, - &rowset), - "rowset init failed when build new rowset"); + RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, + _rowset_meta, &rowset), + "rowset init failed when build new rowset"); _already_built = true; return Status::OK(); } diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index 545453786e1f43..8b840d806edc8e 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -140,4 +140,11 @@ Status check_version_continuity(const std::vector& rowsets) { return Status::OK(); } +void Rowset::merge_rowset_meta(const RowsetMeta& other) { + _rowset_meta->merge_rowset_meta(other); + // rowset->meta_meta()->tablet_schema() maybe updated so make sure _schema is + // consistent with rowset meta + _schema = _rowset_meta->tablet_schema(); +} + } // namespace doris diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index d72293a50921c1..53fb61aad44760 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -132,6 +132,8 @@ class Rowset : public std::enable_shared_from_this { const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; } + void merge_rowset_meta(const RowsetMeta& other); + bool is_pending() const { return _is_pending; } bool is_local() const { return _rowset_meta->is_local(); } @@ -141,7 +143,6 @@ class Rowset : public std::enable_shared_from_this { // publish rowset to make it visible to read void make_visible(Version version); void set_version(Version version); - void set_schema(TabletSchemaSPtr new_schema) { _schema = new_schema; } const TabletSchemaSPtr& tablet_schema() const { return _schema; } // helper class to access RowsetMeta diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index 6cc0c6dffe8186..b969db7a2a229e 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -233,9 +233,6 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { _rowset_meta_pb.add_segments_file_size(fsize); } } - if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) { - set_rowset_state(RowsetStatePB::COMMITTED); - } // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated // Otherwise the schema is stale and lead to wrong data read if (tablet_schema()->num_variant_columns() > 0) { @@ -247,6 +244,9 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { set_tablet_schema(merged_schema); } } + if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) { + set_rowset_state(RowsetStatePB::COMMITTED); + } } bool operator==(const RowsetMeta& a, const RowsetMeta& b) { diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 5572ac334f5698..df7f09c351ac9c 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -45,6 +45,7 @@ #include "vec/core/block.h" #include "vec/core/columns_with_type_and_name.h" #include "vec/core/types.h" +#include "vec/data_types/data_type.h" namespace doris { using namespace ErrorCode; @@ -89,7 +90,7 @@ Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) { std::vector variant_column_pos; for (int i = 0; i < block.columns(); ++i) { const auto& entry = block.get_by_position(i); - if (remove_nullable(entry.type)->get_type_id() == vectorized::TypeIndex::VARIANT) { + if (vectorized::is_variant_type(remove_nullable(entry.type))) { variant_column_pos.push_back(i); } } @@ -145,8 +146,8 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr( - file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, _context.data_dir, - _context.max_rows_per_segment, writer_options, _context.mow_context); + file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, + _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context); RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer))); auto s = writer->init(); if (!s.ok()) { @@ -172,8 +173,8 @@ Status SegmentFlusher::_create_segment_writer( } writer = std::make_unique( - file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, _context.data_dir, - _context.max_rows_per_segment, writer_options, _context.mow_context); + file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, + _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context); RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer))); auto s = writer->init(); if (!s.ok()) { diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index ae858b521bf770..b2fb585a86c3ca 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -172,7 +172,6 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id, for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i), tablet_schema); } - meta->set_result_is_nullable(column.get_result_is_nullable()); meta->set_function_name(column.get_aggregation_name()); } @@ -381,10 +380,10 @@ Status SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& da _flush_schema); RETURN_IF_ERROR(_create_column_writer(current_column_id /*unused*/, tablet_column, _flush_schema)); - _olap_data_convertor->set_source_content_with_specifid_column( + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( {entry->data.get_finalized_column_ptr()->get_ptr(), entry->data.get_least_common_type(), tablet_column.name()}, - 0, data.rows(), current_column_id); + 0, data.rows(), current_column_id)); // convert column data from engine format to storage layer format auto [status, column] = _olap_data_convertor->convert_column_data(current_column_id); if (!status.ok()) { @@ -558,12 +557,12 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } } std::vector> segment_caches(specified_rowsets.size()); - // locate rows in base data int64_t num_rows_updated = 0; int64_t num_rows_new_added = 0; int64_t num_rows_deleted = 0; int64_t num_rows_filtered = 0; + for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) { // block segment // 2 -> 0 @@ -670,7 +669,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, has_default_or_nullable, segment_start_pos, block)); full_block.set_columns(std::move(mutable_full_columns)); - // row column should be filled here if (_tablet_schema->store_row_column()) { // convert block to row store format @@ -694,7 +692,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* converted_result.second->get_data(), num_rows)); } - _num_rows_updated += num_rows_updated; _num_rows_deleted += num_rows_deleted; _num_rows_new_added += num_rows_new_added; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 0af69e3e295e94..124cfaf9b3925f 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -140,6 +140,10 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t colum for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); } + // add sparse column to footer + for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { + _init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i)); + } } Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column, @@ -775,10 +779,10 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& _flush_schema); RETURN_IF_ERROR(_create_column_writer(current_column_id /*unused*/, tablet_column, _flush_schema)); - _olap_data_convertor->set_source_content_with_specifid_column( + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( {entry->data.get_finalized_column_ptr()->get_ptr(), entry->data.get_least_common_type(), tablet_column.name()}, - data.row_pos, data.num_rows, current_column_id); + data.row_pos, data.num_rows, current_column_id)); // convert column data from engine format to storage layer format auto [status, column] = _olap_data_convertor->convert_column_data(current_column_id); if (!status.ok()) { @@ -887,6 +891,8 @@ Status VerticalSegmentWriter::write_batch() { return Status::Error("disk {} exceed capacity limit.", _data_dir->path_hash()); } + RETURN_IF_ERROR(_column_writers[cid]->finish()); + RETURN_IF_ERROR(_column_writers[cid]->write_data()); } for (auto& data : _batched_blocks) { @@ -935,14 +941,15 @@ Status VerticalSegmentWriter::write_batch() { if (_opts.write_type == DataWriteType::TYPE_DIRECT || _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) { + size_t original_writers_cnt = _column_writers.size(); + // handle variant dynamic sub columns for (auto& data : _batched_blocks) { RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data)); } - } - - for (auto& column_writer : _column_writers) { - RETURN_IF_ERROR(column_writer->finish()); - RETURN_IF_ERROR(column_writer->write_data()); + for (size_t i = original_writers_cnt; i < _column_writers.size(); ++i) { + RETURN_IF_ERROR(_column_writers[i]->finish()); + RETURN_IF_ERROR(_column_writers[i]->write_data()); + } } _batched_blocks.clear(); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 08bc150067eacd..66930c77408ddf 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -737,7 +737,7 @@ SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine, // This is because the schema change for a variant needs to ignore the extracted columns. // Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change, // the complete variant is constructed by reading all the sub-columns of the variant. - _new_tablet_schema = _new_tablet->tablet_schema()->copy_without_extracted_columns(); + _new_tablet_schema = _new_tablet->tablet_schema()->copy_without_variant_extracted_columns(); } _job_id = job_id; } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 60a29d4d81d8ee..d49d56ef2d3842 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1829,7 +1829,7 @@ Result> Tablet::create_transient_rowset_writer( // This is because the partial update for a variant needs to ignore the extracted columns. // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update, // the complete variant is constructed by reading all the sub-columns of the variant. - context.tablet_schema = rowset.tablet_schema()->copy_without_extracted_columns(); + context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns(); context.newest_write_timestamp = UnixSeconds(); context.tablet_id = table_id(); context.enable_segcompaction = false; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index f34043adffdf41..be96f395724c6c 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1115,7 +1115,7 @@ void TabletSchema::merge_dropped_columns(const TabletSchema& src_schema) { } } -TabletSchemaSPtr TabletSchema::copy_without_extracted_columns() { +TabletSchemaSPtr TabletSchema::copy_without_variant_extracted_columns() { TabletSchemaSPtr copy = std::make_shared(); TabletSchemaPB tablet_schema_pb; this->to_schema_pb(&tablet_schema_pb); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 4dc5c5e521b37a..240618229b16e1 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -466,7 +466,7 @@ class TabletSchema { vectorized::Block create_block_by_cids(const std::vector& cids); - std::shared_ptr copy_without_extracted_columns(); + std::shared_ptr copy_without_variant_extracted_columns(); InvertedIndexStorageFormatPB get_inverted_index_storage_format() const { return _inverted_index_storage_format; } diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index c3315330d75910..3a16db8226303a 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -1298,7 +1298,6 @@ Status ColumnObject::merge_sparse_to_root_column() { // to make sure the root column ptr is not changed subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from( *mresult->get_ptr(), 0, num_rows); - // subcolumns.get_mutable_root()->data.get_finalized_column_ptr() = mresult->get_ptr(); return Status::OK(); } diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h index e708cda164ef1e..986f957d72b42d 100644 --- a/be/src/vec/data_types/data_type.h +++ b/be/src/vec/data_types/data_type.h @@ -412,5 +412,9 @@ inline bool is_complex_type(const DataTypePtr& data_type) { return which.is_array() || which.is_map() || which.is_struct(); } +inline bool is_variant_type(const DataTypePtr& data_type) { + return WhichDataType(data_type).is_variant_type(); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 55f0d39ab5f86f..ecb5488a050836 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -239,11 +239,11 @@ Status OlapBlockDataConvertor::set_source_content_with_specifid_column( DCHECK(num_rows > 0); DCHECK(row_pos + num_rows <= typed_column.column->size()); DCHECK(cid < _convertors.size()); - RETURN_IF_CATCH_EXCEPTION({_convertors[cid]->set_source_column(typed_column, row_pos, num_rows);}); - return Statuse::OK(); + RETURN_IF_CATCH_EXCEPTION( + { _convertors[cid]->set_source_column(typed_column, row_pos, num_rows); }); + return Status::OK(); } - Status OlapBlockDataConvertor::set_source_content_with_specifid_columns( const vectorized::Block* block, size_t row_pos, size_t num_rows, std::vector cids) { diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index fd368616a6ba40..500fc7dfc4ab4d 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -81,7 +81,7 @@ class OlapBlockDataConvertor { Status set_source_content_with_specifid_columns(const vectorized::Block* block, size_t row_pos, size_t num_rows, std::vector cids); Status set_source_content_with_specifid_column(const ColumnWithTypeAndName& typed_column, - size_t row_pos, size_t num_rows, uint32_t cid); + size_t row_pos, size_t num_rows, uint32_t cid); void clear_source_content(); std::pair convert_column_data(size_t cid);