From d11ece195e6b139e07a912231f9ec5fe2a5c3ae6 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Tue, 11 Jun 2024 18:23:06 +0800 Subject: [PATCH 1/4] [Refactor](Variant) refactor flush logic to support partial update (#34925) 1. Moved variant flatten and flush logic to `_append_block_with_variant_subcolumns` in segment writer to simplify handling partial updates. 2. Ensured rowset schema is refreshed during partial updates for tables with variant types, preventing stale schemas and incorrect data reads. 3. Removed `_output_as_raw_json` as the segment writer now handles variant flatten and flush logic, which was used for schema change, but t flatten and flush already handled in segment writer now. 4. Implemented element_at function in BE for non-scalar variants to enhance functionality. --- be/src/olap/base_tablet.cpp | 2 +- be/src/olap/rowset/beta_rowset_writer.cpp | 9 +- be/src/olap/rowset/rowset.cpp | 16 ++ be/src/olap/rowset/rowset.h | 2 + be/src/olap/rowset/rowset_meta.cpp | 5 + be/src/olap/rowset/rowset_writer_context.h | 3 +- be/src/olap/rowset/segment_creator.cpp | 96 +++--------- be/src/olap/rowset/segment_creator.h | 8 +- .../segment_v2/hierarchical_data_reader.cpp | 5 +- .../segment_v2/hierarchical_data_reader.h | 29 +--- be/src/olap/rowset/segment_v2/segment.cpp | 27 ++-- .../segment_v2/vertical_segment_writer.cpp | 148 ++++++++++++++++-- .../segment_v2/vertical_segment_writer.h | 11 +- be/src/olap/rowset_builder.cpp | 14 +- be/src/olap/schema_change.cpp | 3 +- be/src/olap/tablet.cpp | 13 +- be/src/olap/tablet_schema.cpp | 9 +- be/src/olap/tablet_schema.h | 2 +- be/src/vec/columns/column_object.cpp | 19 ++- be/src/vec/common/schema_util.cpp | 113 ++++++------- be/src/vec/common/schema_util.h | 13 +- be/src/vec/data_types/data_type.h | 4 + .../serde/data_type_object_serde.cpp | 23 ++- .../data_types/serde/data_type_object_serde.h | 9 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 2 +- .../functions/function_variant_element.cpp | 43 ++++- be/src/vec/olap/olap_data_convertor.cpp | 34 ++-- be/src/vec/olap/olap_data_convertor.h | 15 +- .../org/apache/doris/analysis/UpdateStmt.java | 11 +- .../trees/plans/commands/UpdateCommand.java | 2 +- .../data/variant_p0/delete_update.out | 7 + .../variant_p0/partial_update_parallel1.csv | 5 + .../variant_p0/partial_update_parallel2.csv | 5 + .../variant_p0/partial_update_parallel3.csv | 5 + .../variant_p0/partial_update_parallel4.csv | 3 + .../data/variant_p0/variant_with_rowstore.out | 9 ++ .../variant_github_events_p0_new/load.groovy | 30 ++++ .../suites/variant_p0/delete_update.groovy | 122 ++++++++++++--- .../test_compaction_extract_root.groovy | 12 +- .../variant_p0/variant_with_rowstore.groovy | 47 +++++- 40 files changed, 658 insertions(+), 277 deletions(-) create mode 100644 regression-test/data/variant_p0/partial_update_parallel1.csv create mode 100644 regression-test/data/variant_p0/partial_update_parallel2.csv create mode 100644 regression-test/data/variant_p0/partial_update_parallel3.csv create mode 100644 regression-test/data/variant_p0/partial_update_parallel4.csv diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 8901aee0e5a252..c67c83bbec8fd8 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -79,7 +79,7 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_ {_max_version_schema, update_schema}, _max_version_schema, final_schema, check_column_size)); _max_version_schema = final_schema; - VLOG_DEBUG << "dump updated tablet schema: " << final_schema->dump_structure(); + VLOG_DEBUG << "dump updated tablet schema: " << final_schema->dump_full_schema(); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index de051eea45e349..1a350bb1664afd 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -600,13 +600,12 @@ Status BaseBetaRowsetWriter::build(RowsetSharedPtr& rowset) { } // update rowset meta tablet schema if tablet schema updated - if (_context.tablet_schema->num_variant_columns() > 0) { - _rowset_meta->set_tablet_schema(_context.tablet_schema); - } + auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema + : _context.tablet_schema; + _rowset_meta->set_tablet_schema(rowset_schema); RETURN_NOT_OK_STATUS_WITH_WARN( - RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta, - &rowset), + RowsetFactory::create_rowset(rowset_schema, _context.rowset_dir, _rowset_meta, &rowset), "rowset init failed when build new rowset"); _already_built = true; return Status::OK(); diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index f4667d3fb63ccb..42246fcff6c5f8 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -23,6 +23,7 @@ #include "olap/segment_loader.h" #include "olap/tablet_schema.h" #include "util/time.h" +#include "vec/common/schema_util.h" namespace doris { @@ -98,6 +99,21 @@ void Rowset::merge_rowset_meta(const RowsetMetaSharedPtr& other) { for (auto key_bound : key_bounds) { _rowset_meta->add_segment_key_bounds(key_bound); } + + // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated + // Otherwise the schema is stale and lead to wrong data read + if (tablet_schema()->num_variant_columns() > 0) { + // merge extracted columns + TabletSchemaSPtr merged_schema; + static_cast(vectorized::schema_util::get_least_common_schema( + {tablet_schema(), other->tablet_schema()}, nullptr, merged_schema)); + if (*_schema != *merged_schema) { + _rowset_meta->set_tablet_schema(merged_schema); + } + // rowset->meta_meta()->tablet_schema() maybe updated so make sure _schema is + // consistent with rowset meta + _schema = _rowset_meta->tablet_schema(); + } } void Rowset::clear_cache() { diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 6194703176fdb4..5cd0efe0938235 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -132,6 +132,8 @@ class Rowset : public std::enable_shared_from_this { const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; } + void merge_rowset_meta(const RowsetMeta& other); + bool is_pending() const { return _is_pending; } bool is_local() const { return _rowset_meta->is_local(); } diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index d8ef2e7b5ddc94..d37f575706431d 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -17,6 +17,10 @@ #include "olap/rowset/rowset_meta.h" +#include + +#include + #include "common/logging.h" #include "google/protobuf/util/message_differencer.h" #include "io/fs/local_file_system.h" @@ -28,6 +32,7 @@ #include "olap/tablet_fwd.h" #include "olap/tablet_schema.h" #include "olap/tablet_schema_cache.h" +#include "vec/common/schema_util.h" namespace doris { diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 54be9f9597010e..ad82f6c491e7af 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -63,7 +63,8 @@ struct RowsetWriterContext { io::FileSystemSPtr fs; std::string rowset_dir; TabletSchemaSPtr tablet_schema; - TabletSchemaSPtr original_tablet_schema; + // for variant schema update + TabletSchemaSPtr merged_tablet_schema; // PREPARED/COMMITTED for pending rowset // VISIBLE for non-pending rowset RowsetStatePB rowset_state; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index b968f684855125..e42a80170a8670 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -43,6 +43,8 @@ #include "vec/common/schema_util.h" // variant column #include "vec/core/block.h" #include "vec/core/columns_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" namespace doris { using namespace ErrorCode; @@ -61,60 +63,38 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ if (block->rows() == 0) { return Status::OK(); } - // Expand variant columns vectorized::Block flush_block(*block); - TabletSchemaSPtr flush_schema; if (_context->write_type != DataWriteType::TYPE_COMPACTION && _context->tablet_schema->num_variant_columns() > 0) { - RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema)); + RETURN_IF_ERROR(_parse_variant_columns(flush_block)); } bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; if (config::enable_vertical_segment_writer && _context->tablet_schema->cluster_key_idxes().empty()) { std::unique_ptr writer; - RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); + RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); - RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size)); + RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), flush_size)); } else { std::unique_ptr writer; - RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); + RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); - RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size)); + RETURN_IF_ERROR(_flush_segment_writer(writer, nullptr, flush_size)); } return Status::OK(); } -Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, - TabletSchemaSPtr& flush_schema) { +Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) { size_t num_rows = block.rows(); if (num_rows == 0) { return Status::OK(); } - { - std::lock_guard lock(*(_context->schema_lock)); - // save original tablet schema, _context->tablet_schema maybe modified - if (_context->original_tablet_schema == nullptr) { - _context->original_tablet_schema = _context->tablet_schema; - } - } - std::vector variant_column_pos; - if (_context->partial_update_info && _context->partial_update_info->is_partial_update) { - // check columns that used to do partial updates should not include variant - for (int i : _context->partial_update_info->update_cids) { - const auto& col = *_context->original_tablet_schema->columns()[i]; - if (!col.is_key() && col.name() != DELETE_SIGN) { - return Status::InvalidArgument( - "Not implement partial update for variant only support delete currently"); - } - } - } else { - // find positions of variant columns - for (int i = 0; i < _context->original_tablet_schema->columns().size(); ++i) { - if (_context->original_tablet_schema->columns()[i]->is_variant_type()) { - variant_column_pos.push_back(i); - } + for (int i = 0; i < block.columns(); ++i) { + const auto& entry = block.get_by_position(i); + if (vectorized::is_variant_type(remove_nullable(entry.type))) { + variant_column_pos.push_back(i); } } @@ -123,37 +103,8 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, } vectorized::schema_util::ParseContext ctx; - ctx.record_raw_json_column = _context->original_tablet_schema->store_row_column(); - RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns( - block, variant_column_pos, ctx)); - - flush_schema = std::make_shared(); - flush_schema->copy_from(*_context->original_tablet_schema); - vectorized::Block flush_block(std::move(block)); - vectorized::schema_util::rebuild_schema_and_block( - _context->original_tablet_schema, variant_column_pos, flush_block, flush_schema); - - { - // Update rowset schema, tablet's tablet schema will be updated when build Rowset - // Eg. flush schema: A(int), B(float), C(int), D(int) - // ctx.tablet_schema: A(bigint), B(double) - // => update_schema: A(bigint), B(double), C(int), D(int) - std::lock_guard lock(*(_context->schema_lock)); - TabletSchemaSPtr update_schema; - RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( - {_context->tablet_schema, flush_schema}, nullptr, update_schema)); - CHECK_GE(update_schema->num_columns(), flush_schema->num_columns()) - << "Rowset merge schema columns count is " << update_schema->num_columns() - << ", but flush_schema is larger " << flush_schema->num_columns() - << " update_schema: " << update_schema->dump_structure() - << " flush_schema: " << flush_schema->dump_structure(); - _context->tablet_schema.swap(update_schema); - VLOG_DEBUG << "dump rs schema: " << _context->tablet_schema->dump_structure(); - } - - block.swap(flush_block); // NOLINT(bugprone-use-after-move) - VLOG_DEBUG << "dump block: " << block.dump_data(); - VLOG_DEBUG << "dump flush schema: " << flush_schema->dump_structure(); + ctx.record_raw_json_column = _context->tablet_schema->store_row_column(); + RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_column_pos, ctx)); return Status::OK(); } @@ -194,8 +145,7 @@ Status SegmentFlusher::_add_rows(std::unique_ptr& writer, - int32_t segment_id, bool no_compression, - TabletSchemaSPtr flush_schema) { + int32_t segment_id, bool no_compression) { io::FileWriterPtr file_writer; RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id, file_writer)); @@ -207,10 +157,10 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptrtablet_schema; - writer.reset(new segment_v2::SegmentWriter( - file_writer.get(), segment_id, tablet_schema, _context->tablet, _context->data_dir, - _context->max_rows_per_segment, writer_options, _context->mow_context)); + writer.reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id, + _context->tablet_schema, _context->tablet, + _context->data_dir, _context->max_rows_per_segment, + writer_options, _context->mow_context)); { std::lock_guard l(_lock); _file_writers.emplace(segment_id, std::move(file_writer)); @@ -226,7 +176,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr& writer, int32_t segment_id, - bool no_compression, TabletSchemaSPtr flush_schema) { + bool no_compression) { io::FileWriterPtr file_writer; RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id, file_writer)); @@ -238,10 +188,10 @@ Status SegmentFlusher::_create_segment_writer( writer_options.compression_type = NO_COMPRESSION; } - const auto& tablet_schema = flush_schema ? flush_schema : _context->tablet_schema; writer.reset(new segment_v2::VerticalSegmentWriter( - file_writer.get(), segment_id, tablet_schema, _context->tablet, _context->data_dir, - _context->max_rows_per_segment, writer_options, _context->mow_context)); + file_writer.get(), segment_id, _context->tablet_schema, _context->tablet, + _context->data_dir, _context->max_rows_per_segment, writer_options, + _context->mow_context)); { std::lock_guard l(_lock); _file_writers.emplace(segment_id, std::move(file_writer)); diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 214322ed8d5b21..93508e9629ddbb 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -138,17 +138,15 @@ class SegmentFlusher { bool need_buffering(); private: - Status _expand_variant_to_subcolumns(vectorized::Block& block, TabletSchemaSPtr& flush_schema); + Status _parse_variant_columns(vectorized::Block& block); Status _add_rows(std::unique_ptr& segment_writer, const vectorized::Block* block, size_t row_offset, size_t row_num); Status _add_rows(std::unique_ptr& segment_writer, const vectorized::Block* block, size_t row_offset, size_t row_num); Status _create_segment_writer(std::unique_ptr& writer, - int32_t segment_id, bool no_compression = false, - TabletSchemaSPtr flush_schema = nullptr); + int32_t segment_id, bool no_compression = false); Status _create_segment_writer(std::unique_ptr& writer, - int32_t segment_id, bool no_compression = false, - TabletSchemaSPtr flush_schema = nullptr); + int32_t segment_id, bool no_compression = false); Status _flush_segment_writer(std::unique_ptr& writer, TabletSchemaSPtr flush_schema = nullptr, int64_t* flush_size = nullptr); diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index 66ad0eb92a996d..dcc082c22ae00c 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -34,10 +34,9 @@ namespace segment_v2 { Status HierarchicalDataReader::create(std::unique_ptr* reader, vectorized::PathInData path, const SubcolumnColumnReaders::Node* node, - const SubcolumnColumnReaders::Node* root, - bool output_as_raw_json) { + const SubcolumnColumnReaders::Node* root) { // None leave node need merge with root - auto* stream_iter = new HierarchicalDataReader(path, output_as_raw_json); + auto* stream_iter = new HierarchicalDataReader(path); std::vector leaves; vectorized::PathsInData leaves_paths; SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths); diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index 67f78651416a90..1d02685e445dfd 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -64,12 +64,11 @@ using SubcolumnColumnReaders = vectorized::SubcolumnsTree; // Reader for hierarchical data for variant, merge with root(sparse encoded columns) class HierarchicalDataReader : public ColumnIterator { public: - HierarchicalDataReader(const vectorized::PathInData& path, bool output_as_raw_json = false) - : _path(path), _output_as_raw_json(output_as_raw_json) {} + HierarchicalDataReader(const vectorized::PathInData& path) : _path(path) {} static Status create(std::unique_ptr* reader, vectorized::PathInData path, const SubcolumnColumnReaders::Node* target_node, - const SubcolumnColumnReaders::Node* root, bool output_as_raw_json = false); + const SubcolumnColumnReaders::Node* root); Status init(const ColumnIteratorOptions& opts) override; @@ -93,7 +92,6 @@ class HierarchicalDataReader : public ColumnIterator { std::unique_ptr _root_reader; size_t _rows_read = 0; vectorized::PathInData _path; - bool _output_as_raw_json = false; template Status tranverse(NodeFunction&& node_func) { @@ -154,26 +152,9 @@ class HierarchicalDataReader : public ColumnIterator { return Status::OK(); })); - if (_output_as_raw_json) { - auto col_to = vectorized::ColumnString::create(); - col_to->reserve(nrows * 2); - vectorized::VectorBufferWriter write_buffer(*col_to.get()); - auto type = std::make_shared(); - for (size_t i = 0; i < nrows; ++i) { - type->to_string(container_variant, i, write_buffer); - write_buffer.commit(); - } - if (variant.empty()) { - variant.create_root(std::make_shared(), - std::move(col_to)); - } else { - variant.get_root()->insert_range_from(*col_to, 0, col_to->size()); - } - } else { - // TODO select v:b -> v.b / v.b.c but v.d maybe in v - // copy container variant to dst variant, todo avoid copy - variant.insert_range_from(container_variant, 0, nrows); - } + // TODO select v:b -> v.b / v.b.c but v.d maybe in v + // copy container variant to dst variant, todo avoid copy + variant.insert_range_from(container_variant, 0, nrows); // variant.set_num_rows(nrows); _rows_read += nrows; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index f2ec504c90fe61..58e903c04d55af 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -536,24 +536,17 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, auto sparse_node = tablet_column.has_path_info() ? _sparse_column_tree.find_exact(*tablet_column.path_info_ptr()) : nullptr; - if (opt != nullptr && opt->io_ctx.reader_type == ReaderType::READER_ALTER_TABLE) { - CHECK(tablet_column.is_variant_type()); - if (root == nullptr) { - // No such variant column in this segment, get a default one - RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); - return Status::OK(); - } - bool output_as_raw_json = true; - // Alter table operation should read the whole variant column, since it does not aware of - // subcolumns of variant during processing rewriting rowsets. - // This is slow, since it needs to read all sub columns and merge them into a single column - RETURN_IF_ERROR( - HierarchicalDataReader::create(iter, root_path, root, root, output_as_raw_json)); - return Status::OK(); - } - if (opt == nullptr || opt->io_ctx.reader_type != ReaderType::READER_QUERY) { - // Could be compaction ..etc and read flat leaves nodes data + auto is_compaction = [](ReaderType type) { + return type == ReaderType::READER_BASE_COMPACTION || + type == ReaderType::READER_CUMULATIVE_COMPACTION || + type == ReaderType::READER_COLD_DATA_COMPACTION || + type == ReaderType::READER_SEGMENT_COMPACTION || + type == ReaderType::READER_FULL_COMPACTION; + }; + + if (opt != nullptr && is_compaction(opt->io_ctx.reader_type)) { + // compaction need to read flat leaves nodes data to prevent from amplification const auto* node = tablet_column.has_path_info() ? _sub_column_tree.find_leaf(*tablet_column.path_info_ptr()) : nullptr; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 5eadac2abde41a..143e1b3632983a 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -140,7 +140,8 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t colum } } -Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column) { +Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletSchemaSPtr& tablet_schema) { ColumnWriterOptions opts; opts.meta = _footer.add_columns(); @@ -148,9 +149,9 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS // except for columns whose type don't support zone map. - opts.need_zone_map = column.is_key() || _tablet_schema->keys_type() != KeysType::AGG_KEYS; + opts.need_zone_map = column.is_key() || tablet_schema->keys_type() != KeysType::AGG_KEYS; opts.need_bloom_filter = column.is_bf_column(); - auto* tablet_index = _tablet_schema->get_ngram_bf_index(column.unique_id()); + auto* tablet_index = tablet_schema->get_ngram_bf_index(column.unique_id()); if (tablet_index) { opts.need_bloom_filter = true; opts.is_ngram_bf_index = true; @@ -166,12 +167,14 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo } // skip write inverted index on load if skip_write_index_on_load is true if (_opts.write_type == DataWriteType::TYPE_DIRECT && - _tablet_schema->skip_write_index_on_load()) { + tablet_schema->skip_write_index_on_load()) { skip_inverted_index = true; } // indexes for this column - opts.indexes = _tablet_schema->get_indexes_for_column(column); + opts.indexes = tablet_schema->get_indexes_for_column(column); if (!InvertedIndexColumnWriter::check_support_inverted_index(column)) { + // skip inverted index if invalid + opts.indexes.clear(); opts.need_zone_map = false; opts.need_bloom_filter = false; opts.need_bitmap_index = false; @@ -302,7 +305,8 @@ void VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl // 2.2 build read plan to read by batch // 2.3 fill block // 3. set columns to data convertor and then write all columns -Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data) { +Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data, + vectorized::Block& full_block) { if constexpr (!std::is_same_v) { // TODO(plat1ko): CloudStorageEngine return Status::NotSupported("append_block_with_partial_content"); @@ -313,7 +317,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da auto tablet = static_cast(_tablet.get()); // create full block and fill with input columns - auto full_block = _tablet_schema->create_block(); + full_block = _tablet_schema->create_block(); const auto& including_cids = _opts.rowset_ctx->partial_update_info->update_cids; size_t input_id = 0; for (auto i : including_cids) { @@ -702,16 +706,127 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t return Status::OK(); } +// for variant type, we should do following steps to fill content of block: +// 1. set block data to data convertor, and get all flattened columns from variant subcolumns +// 2. get sparse columns from previous sparse columns stripped in OlapColumnDataConvertorVariant +// 3. merge current columns info(contains extracted columns) with previous merged_tablet_schema +// which will be used to contruct the new schema for rowset +Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) { + if (_tablet_schema->num_variant_columns() == 0) { + return Status::OK(); + } + size_t column_id = _tablet_schema->num_columns(); + for (int i = 0; i < _tablet_schema->columns().size(); ++i) { + if (!_tablet_schema->columns()[i]->is_variant_type()) { + continue; + } + if (_flush_schema == nullptr) { + _flush_schema = std::make_shared(*_tablet_schema); + } + auto column_ref = data.block->get_by_position(i).column; + const vectorized::ColumnObject& object_column = assert_cast( + remove_nullable(column_ref)->assume_mutable_ref()); + const TabletColumnPtr& parent_column = _tablet_schema->columns()[i]; + + // generate column info by entry info + auto generate_column_info = [&](const auto& entry) { + const std::string& column_name = + parent_column->name_lower_case() + "." + entry->path.get_path(); + const vectorized::DataTypePtr& final_data_type_from_object = + entry->data.get_least_common_type(); + vectorized::PathInDataBuilder full_path_builder; + auto full_path = full_path_builder.append(parent_column->name_lower_case(), false) + .append(entry->path.get_parts(), false) + .build(); + return vectorized::schema_util::get_column_by_type( + final_data_type_from_object, column_name, + vectorized::schema_util::ExtraInfo { + .unique_id = -1, + .parent_unique_id = parent_column->unique_id(), + .path_info = full_path}); + }; + + CHECK(object_column.is_finalized()); + // common extracted columns + for (const auto& entry : + vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) { + if (entry->path.empty()) { + // already handled by parent column + continue; + } + CHECK(entry->data.is_finalized()); + int current_column_id = column_id++; + TabletColumn tablet_column = generate_column_info(entry); + vectorized::schema_util::inherit_column_attributes(*parent_column, tablet_column, + _flush_schema); + RETURN_IF_ERROR(_create_column_writer(current_column_id /*unused*/, tablet_column, + _flush_schema)); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + {entry->data.get_finalized_column_ptr()->get_ptr(), + entry->data.get_least_common_type(), tablet_column.name()}, + data.row_pos, data.num_rows, current_column_id)); + // convert column data from engine format to storage layer format + auto [status, column] = _olap_data_convertor->convert_column_data(current_column_id); + if (!status.ok()) { + return status; + } + RETURN_IF_ERROR(_column_writers[current_column_id]->append( + column->get_nullmap(), column->get_data(), data.num_rows)); + _flush_schema->append_column(tablet_column); + _olap_data_convertor->clear_source_content(); + } + // sparse_columns + for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns( + object_column.get_sparse_subcolumns())) { + TabletColumn sparse_tablet_column = generate_column_info(entry); + _flush_schema->mutable_column_by_uid(parent_column->unique_id()) + .append_sparse_column(sparse_tablet_column); + + // add sparse column to footer + auto* column_pb = _footer.mutable_columns(i); + _init_column_meta(column_pb->add_sparse_columns(), -1, sparse_tablet_column); + } + } + + // Update rowset schema, tablet's tablet schema will be updated when build Rowset + // Eg. flush schema: A(int), B(float), C(int), D(int) + // ctx.tablet_schema: A(bigint), B(double) + // => update_schema: A(bigint), B(double), C(int), D(int) + std::lock_guard lock(*(_opts.rowset_ctx->schema_lock)); + if (_opts.rowset_ctx->merged_tablet_schema == nullptr) { + _opts.rowset_ctx->merged_tablet_schema = _opts.rowset_ctx->tablet_schema; + } + TabletSchemaSPtr update_schema; + RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( + {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, update_schema)); + CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns()) + << "Rowset merge schema columns count is " << update_schema->num_columns() + << ", but flush_schema is larger " << _flush_schema->num_columns() + << " update_schema: " << update_schema->dump_structure() + << " flush_schema: " << _flush_schema->dump_structure(); + _opts.rowset_ctx->merged_tablet_schema.swap(update_schema); + VLOG_DEBUG << "dump block " << data.block->dump_data(); + VLOG_DEBUG << "dump rs schema: " << _opts.rowset_ctx->merged_tablet_schema->dump_full_schema(); + VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " << _segment_id; + return Status::OK(); +} + Status VerticalSegmentWriter::write_batch() { if (_opts.rowset_ctx->partial_update_info && _opts.rowset_ctx->partial_update_info->is_partial_update && _opts.write_type == DataWriteType::TYPE_DIRECT && !_opts.rowset_ctx->is_transient_rowset_writer) { for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { - RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid))); + RETURN_IF_ERROR( + _create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); } + vectorized::Block full_block; for (auto& data : _batched_blocks) { - RETURN_IF_ERROR(_append_block_with_partial_content(data)); + RETURN_IF_ERROR(_append_block_with_partial_content(data, full_block)); + } + for (auto& data : _batched_blocks) { + RowsInBlock full_rows_block {&full_block, data.row_pos, data.num_rows}; + RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block)); } for (auto& column_writer : _column_writers) { RETURN_IF_ERROR(column_writer->finish()); @@ -733,7 +848,7 @@ Status VerticalSegmentWriter::write_batch() { std::vector key_columns; vectorized::IOlapColumnDataAccessor* seq_column = nullptr; for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { - RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid))); + RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); for (auto& data : _batched_blocks) { RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( data.block, data.row_pos, data.num_rows, std::vector {cid})); @@ -806,6 +921,19 @@ Status VerticalSegmentWriter::write_batch() { _num_rows_written += data.num_rows; } + if (_opts.write_type == DataWriteType::TYPE_DIRECT || + _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) { + size_t original_writers_cnt = _column_writers.size(); + // handle variant dynamic sub columns + for (auto& data : _batched_blocks) { + RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data)); + } + for (size_t i = original_writers_cnt; i < _column_writers.size(); ++i) { + RETURN_IF_ERROR(_column_writers[i]->finish()); + RETURN_IF_ERROR(_column_writers[i]->write_data()); + } + } + _batched_blocks.clear(); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index ffa5f3807aecca..8fd854c3e95f4a 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -117,11 +117,14 @@ class VerticalSegmentWriter { Slice min_encoded_key(); Slice max_encoded_key(); + TabletSchemaSPtr flush_schema() const { return _flush_schema; }; + void clear(); private: void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column); - Status _create_column_writer(uint32_t cid, const TabletColumn& column); + Status _create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletSchemaSPtr& schema); size_t _calculate_inverted_index_file_size(); uint64_t _estimated_remaining_size(); Status _write_ordinal_index(); @@ -146,7 +149,8 @@ class VerticalSegmentWriter { void _set_min_key(const Slice& key); void _set_max_key(const Slice& key); void _serialize_block_to_row_column(vectorized::Block& block); - Status _append_block_with_partial_content(RowsInBlock& data); + Status _append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block); + Status _append_block_with_variant_subcolumns(RowsInBlock& data); Status _fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, bool has_default_or_nullable, const size_t& segment_start_pos, @@ -204,6 +208,9 @@ class VerticalSegmentWriter { std::map _rsid_to_rowset; std::vector _batched_blocks; + + // contains auto generated columns, should be nullptr if no variants's subcolumns + TabletSchemaSPtr _flush_schema = nullptr; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 23232c4d0a58be..32a6ba88ce784f 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -305,12 +305,22 @@ Status RowsetBuilder::commit_txn() { } std::lock_guard l(_lock); SCOPED_TIMER(_commit_txn_timer); - if (tablet()->tablet_schema()->num_variant_columns() > 0) { + + const RowsetWriterContext& rw_ctx = _rowset_writer->context(); + if (rw_ctx.tablet_schema->num_variant_columns() > 0) { + // Need to merge schema with `rw_ctx.merged_tablet_schema` in prior, + // merged schema keeps the newest merged schema for the rowset, which is updated and merged + // during flushing segments. + if (rw_ctx.merged_tablet_schema != nullptr) { + RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema)); + } + // We should merge rowset schema further, in case that the merged_tablet_schema maybe null + // when enable_memtable_on_sink_node is true, the merged_tablet_schema will not be passed to + // the destination backend. // update tablet schema when meet variant columns, before commit_txn // Eg. rowset schema: A(int), B(float), C(int), D(int) // _tabelt->tablet_schema: A(bigint), B(double) // => update_schema: A(bigint), B(double), C(int), D(int) - const RowsetWriterContext& rw_ctx = _rowset_writer->context(); RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema)); } // Transfer ownership of `PendingRowsetGuard` to `TxnManager` diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index a4ed6a527bfbc5..a0483ad5d8ec37 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -927,7 +927,8 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& // This is because the schema change for a variant needs to ignore the extracted columns. // Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change, // the complete variant is constructed by reading all the sub-columns of the variant. - sc_params.new_tablet_schema = new_tablet->tablet_schema()->copy_without_extracted_columns(); + sc_params.new_tablet_schema = + new_tablet->tablet_schema()->copy_without_variant_extracted_columns(); sc_params.ref_rowset_readers.reserve(rs_splits.size()); for (RowSetSplits& split : rs_splits) { sc_params.ref_rowset_readers.emplace_back(split.rs_reader); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index b48262250edb2b..0a791614b87a8d 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2120,7 +2120,11 @@ Status Tablet::create_transient_rowset_writer( context.rowset_state = PREPARED; context.segments_overlap = OVERLAPPING; context.tablet_schema = std::make_shared(); - context.tablet_schema->copy_from(*(rowset_ptr->tablet_schema())); + // During a partial update, the extracted columns of a variant should not be included in the tablet schema. + // This is because the partial update for a variant needs to ignore the extracted columns. + // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update, + // the complete variant is constructed by reading all the sub-columns of the variant. + context.tablet_schema = rowset_ptr->tablet_schema()->copy_without_variant_extracted_columns(); context.newest_write_timestamp = UnixSeconds(); context.tablet_id = table_id(); context.enable_segcompaction = false; @@ -2945,6 +2949,13 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, (std::find(including_cids.cbegin(), including_cids.cend(), rowset_schema->sequence_col_idx()) != including_cids.cend()); } + if (rowset_schema->num_variant_columns() > 0) { + // During partial updates, the extracted columns of a variant should not be included in the rowset schema. + // This is because the partial update for a variant needs to ignore the extracted columns. + // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update, + // the complete variant is constructed by reading all the sub-columns of the variant. + rowset_schema = rowset_schema->copy_without_variant_extracted_columns(); + } // use for partial update PartialUpdateReadPlan read_plan_ori; PartialUpdateReadPlan read_plan_update; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 0900c6d8d40783..26d9d913f2f4e3 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -563,6 +563,10 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _column_path->from_protobuf(column.column_path_info()); _parent_col_unique_id = column.column_path_info().parrent_column_unique_id(); } + if (is_variant_type() && !column.has_column_path_info()) { + // set path info for variant root column, to prevent from missing + _column_path = std::make_shared(_col_name_lower_case); + } for (auto& column_pb : column.sparse_columns()) { TabletColumn column; column.init_from_pb(column_pb); @@ -854,7 +858,8 @@ void TabletSchema::append_column(TabletColumn column, ColumnType col_type) { _cols.push_back(std::make_shared(std::move(column))); // The dropped column may have same name with exsiting column, so that // not add to name to index map, only for uid to index map - if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type()) { + if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type() || + _cols.back()->is_extracted_column()) { _field_name_to_index.emplace(StringRef(_cols.back()->name()), _num_columns); _field_path_to_index[_cols.back()->path_info_ptr().get()] = _num_columns; } else if (col_type == ColumnType::NORMAL) { @@ -1112,7 +1117,7 @@ void TabletSchema::merge_dropped_columns(const TabletSchema& src_schema) { } } -TabletSchemaSPtr TabletSchema::copy_without_extracted_columns() { +TabletSchemaSPtr TabletSchema::copy_without_variant_extracted_columns() { TabletSchemaSPtr copy = std::make_shared(); TabletSchemaPB tablet_schema_pb; this->to_schema_pb(&tablet_schema_pb); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index b8f26a1f60117d..bd3b1f6ca4efad 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -454,7 +454,7 @@ class TabletSchema { vectorized::Block create_block_by_cids(const std::vector& cids); - std::shared_ptr copy_without_extracted_columns(); + std::shared_ptr copy_without_variant_extracted_columns(); InvertedIndexStorageFormatPB get_inverted_index_storage_format() const { return _inverted_index_storage_format; } diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 3bae978f4d3f6d..1f59e000d32e11 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -749,8 +749,15 @@ void ColumnObject::insert_from(const IColumn& src, size_t n) { void ColumnObject::try_insert(const Field& field) { if (field.get_type() != Field::Types::VariantMap) { auto* root = get_subcolumn({}); - if (!root) { - doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, "Failed to find root column_path"); + // Insert to an emtpy ColumnObject may result root null, + // so create a root column of Variant is expected. + if (root == nullptr) { + bool succ = add_sub_column({}, num_rows); + if (!succ) { + throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, + "Failed to add root sub column {}"); + } + root = get_subcolumn({}); } root->insert(field); ++num_rows; @@ -1290,9 +1297,11 @@ Status ColumnObject::merge_sparse_to_root_column() { parser.getWriter().getOutput()->getSize()); result_column_nullable->get_null_map_data().push_back(0); } - - // assign merged column - subcolumns.get_mutable_root()->data.get_finalized_column_ptr() = mresult->get_ptr(); + subcolumns.get_mutable_root()->data.get_finalized_column().clear(); + // assign merged column, do insert_range_from to make a copy, instead of replace the ptr itselft + // to make sure the root column ptr is not changed + subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from( + *mresult->get_ptr(), 0, num_rows); return Status::OK(); } diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index e8fd23f75694a6..626a78f25659b1 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -374,45 +374,44 @@ void update_least_sparse_column(const std::vector& schemas, update_least_schema_internal(subcolumns_types, common_schema, true, variant_col_unique_id); } -void inherit_root_attributes(TabletSchemaSPtr& schema) { - std::unordered_map variants_index_meta; - // Get all variants tablet index metas if exist - for (const auto& col : schema->columns()) { - auto index_meta = schema->get_inverted_index(col->unique_id(), ""); - if (col->is_variant_type() && index_meta != nullptr) { - variants_index_meta.emplace(col->unique_id(), *index_meta); +void inherit_column_attributes(const TabletColumn& source, TabletColumn& target, + TabletSchemaSPtr& target_schema) { + if (target.type() != FieldType::OLAP_FIELD_TYPE_TINYINT && + target.type() != FieldType::OLAP_FIELD_TYPE_ARRAY && + target.type() != FieldType::OLAP_FIELD_TYPE_DOUBLE && + target.type() != FieldType::OLAP_FIELD_TYPE_FLOAT) { + // above types are not supported in bf + target.set_is_bf_column(source.is_bf_column()); + } + target.set_aggregation_method(source.aggregation()); + const auto* source_index_meta = target_schema->get_inverted_index(source.unique_id(), ""); + if (source_index_meta != nullptr) { + // add index meta + TabletIndex index_info = *source_index_meta; + index_info.set_escaped_escaped_index_suffix_path(target.path_info_ptr()->get_path()); + // get_inverted_index: No need to check, just inherit directly + const auto* target_index_meta = target_schema->get_inverted_index(target, false); + if (target_index_meta != nullptr) { + // already exist + target_schema->update_index(target, index_info); + } else { + target_schema->append_index(index_info); } } +} +void inherit_column_attributes(TabletSchemaSPtr& schema) { // Add index meta if extracted column is missing index meta for (size_t i = 0; i < schema->num_columns(); ++i) { TabletColumn& col = schema->mutable_column(i); if (!col.is_extracted_column()) { continue; } - if (col.type() != FieldType::OLAP_FIELD_TYPE_TINYINT && - col.type() != FieldType::OLAP_FIELD_TYPE_ARRAY && - col.type() != FieldType::OLAP_FIELD_TYPE_DOUBLE && - col.type() != FieldType::OLAP_FIELD_TYPE_FLOAT) { - // above types are not supported in bf - col.set_is_bf_column(schema->column_by_uid(col.parent_unique_id()).is_bf_column()); - } - col.set_aggregation_method(schema->column_by_uid(col.parent_unique_id()).aggregation()); - auto it = variants_index_meta.find(col.parent_unique_id()); - // variant has no index meta, ignore - if (it == variants_index_meta.end()) { + if (schema->field_index(col.parent_unique_id()) == -1) { + // parent column is missing, maybe dropped continue; } - auto index_meta = schema->get_inverted_index(col, false); - // add index meta - TabletIndex index_info = it->second; - index_info.set_escaped_escaped_index_suffix_path(col.path_info_ptr()->get_path()); - if (index_meta != nullptr) { - // already exist - schema->update_index(col, index_info); - } else { - schema->append_index(index_info); - } + inherit_column_attributes(schema->column_by_uid(col.parent_unique_id()), col, schema); } } @@ -473,7 +472,7 @@ Status get_least_common_schema(const std::vector& schemas, update_least_sparse_column(schemas, output_schema, unique_id, path_set); } - inherit_root_attributes(output_schema); + inherit_column_attributes(output_schema); if (check_schema_size && output_schema->columns().size() > config::variant_max_merged_tablet_schema_size) { return Status::DataQualityError("Reached max column size limit {}", @@ -483,25 +482,8 @@ Status get_least_common_schema(const std::vector& schemas, return Status::OK(); } -Status parse_and_encode_variant_columns(Block& block, const std::vector& variant_pos, - const ParseContext& ctx) { - try { - // Parse each variant column from raw string column - RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_pos, ctx)); - vectorized::schema_util::finalize_variant_columns(block, variant_pos, - false /*not ingore sparse*/); - RETURN_IF_ERROR( - vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_pos)); - } catch (const doris::Exception& e) { - // TODO more graceful, max_filter_ratio - LOG(WARNING) << "encounter execption " << e.to_string(); - return Status::InternalError(e.to_string()); - } - return Status::OK(); -} - -Status parse_variant_columns(Block& block, const std::vector& variant_pos, - const ParseContext& ctx) { +Status _parse_variant_columns(Block& block, const std::vector& variant_pos, + const ParseContext& ctx) { for (int i = 0; i < variant_pos.size(); ++i) { auto column_ref = block.get_by_position(variant_pos[i]).column; bool is_nullable = column_ref->is_nullable(); @@ -582,6 +564,19 @@ Status parse_variant_columns(Block& block, const std::vector& variant_pos, return Status::OK(); } +Status parse_variant_columns(Block& block, const std::vector& variant_pos, + const ParseContext& ctx) { + try { + // Parse each variant column from raw string column + RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx)); + } catch (const doris::Exception& e) { + // TODO more graceful, max_filter_ratio + LOG(WARNING) << "encounter execption " << e.to_string(); + return Status::InternalError(e.to_string()); + } + return Status::OK(); +} + void finalize_variant_columns(Block& block, const std::vector& variant_pos, bool ignore_sparse) { for (int i = 0; i < variant_pos.size(); ++i) { @@ -597,19 +592,11 @@ void finalize_variant_columns(Block& block, const std::vector& variant_pos, } } -Status encode_variant_sparse_subcolumns(Block& block, const std::vector& variant_pos) { - for (int i = 0; i < variant_pos.size(); ++i) { - auto& column_ref = block.get_by_position(variant_pos[i]).column->assume_mutable_ref(); - auto& column = - column_ref.is_nullable() - ? assert_cast( - assert_cast(column_ref).get_nested_column()) - : assert_cast(column_ref); - // Make sure the root node is jsonb storage type - auto expected_root_type = make_nullable(std::make_shared()); - column.ensure_root_node_type(expected_root_type); - RETURN_IF_ERROR(column.merge_sparse_to_root_column()); - } +Status encode_variant_sparse_subcolumns(ColumnObject& column) { + // Make sure the root node is jsonb storage type + auto expected_root_type = make_nullable(std::make_shared()); + column.ensure_root_node_type(expected_root_type); + RETURN_IF_ERROR(column.merge_sparse_to_root_column()); return Status::OK(); } @@ -643,7 +630,7 @@ static void _append_column(const TabletColumn& parent_variant, } // sort by paths in lexicographical order -static vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( +vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( const vectorized::ColumnObject::Subcolumns& subcolumns) { // sort by paths in lexicographical order vectorized::ColumnObject::Subcolumns sorted = subcolumns; @@ -716,7 +703,7 @@ void rebuild_schema_and_block(const TabletSchemaSPtr& original, VLOG_DEBUG << "set root_path : " << full_root_path.get_path(); } - vectorized::schema_util::inherit_root_attributes(flush_schema); + vectorized::schema_util::inherit_column_attributes(flush_schema); } // --------------------------- diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index 078081593c549b..f519e4dacae376 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -90,13 +90,11 @@ struct ParseContext { // 1. parse variant from raw json string // 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns // 3. encode sparse sub columns -Status parse_and_encode_variant_columns(Block& block, const std::vector& variant_pos, - const ParseContext& ctx); Status parse_variant_columns(Block& block, const std::vector& variant_pos, const ParseContext& ctx); void finalize_variant_columns(Block& block, const std::vector& variant_pos, bool ignore_sparse = true); -Status encode_variant_sparse_subcolumns(Block& block, const std::vector& variant_pos); +Status encode_variant_sparse_subcolumns(ColumnObject& column); // Pick the tablet schema with the highest schema version as the reference. // Then update all variant columns to there least common types. @@ -117,7 +115,14 @@ void update_least_sparse_column(const std::vector& schemas, const std::unordered_set& path_set); // inherit attributes like index/agg info from it's parent column -void inherit_root_attributes(TabletSchemaSPtr& schema); +void inherit_column_attributes(TabletSchemaSPtr& schema); + +void inherit_column_attributes(const TabletColumn& source, TabletColumn& target, + TabletSchemaSPtr& target_schema); + +// get sorted subcolumns of variant +vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( + const vectorized::ColumnObject::Subcolumns& subcolumns); // Rebuild schema from original schema by extend dynamic columns generated from ColumnObject. // Block consists of two parts, dynamic part of columns and static part of columns. diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h index e708cda164ef1e..986f957d72b42d 100644 --- a/be/src/vec/data_types/data_type.h +++ b/be/src/vec/data_types/data_type.h @@ -412,5 +412,9 @@ inline bool is_complex_type(const DataTypePtr& data_type) { return which.is_array() || which.is_map() || which.is_struct(); } +inline bool is_variant_type(const DataTypePtr& data_type) { + return WhichDataType(data_type).is_variant_type(); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 4d8a3020375403..6a5c7afc0b22a9 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -37,10 +37,11 @@ namespace doris { namespace vectorized { -Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, - MysqlRowBuffer& row_buffer, int row_idx, - bool col_const, - const FormatOptions& options) const { +template +Status DataTypeObjectSerDe::_write_column_to_mysql(const IColumn& column, + MysqlRowBuffer& row_buffer, + int row_idx, bool col_const, + const FormatOptions& options) const { const auto& variant = assert_cast(column); if (!variant.is_finalized()) { const_cast(variant).finalize(); @@ -67,6 +68,20 @@ Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, return Status::OK(); } +Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, + MysqlRowBuffer& row_buffer, int row_idx, + bool col_const, + const FormatOptions& options) const { + return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options); +} + +Status DataTypeObjectSerDe::write_column_to_mysql(const IColumn& column, + MysqlRowBuffer& row_buffer, int row_idx, + bool col_const, + const FormatOptions& options) const { + return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options); +} + void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, int32_t col_id, int row_num) const { diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h index 80554d3dbefe74..66178f0ecb38fb 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.h +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -82,9 +82,7 @@ class DataTypeObjectSerDe : public DataTypeSerDe { Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer& row_buffer, int row_idx, bool col_const, - const FormatOptions& options) const override { - return Status::NotSupported("write_column_to_mysql with type " + column.get_name()); - } + const FormatOptions& options) const override; Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer& row_buffer, int row_idx, bool col_const, @@ -96,6 +94,11 @@ class DataTypeObjectSerDe : public DataTypeSerDe { std::vector& buffer_list) const override { return Status::NotSupported("write_column_to_orc with type " + column.get_name()); } + +private: + template + Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer& result, + int row_idx, bool col_const, const FormatOptions& options) const; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 8f3163c36c4306..9a10ba8cf35a25 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -445,7 +445,7 @@ Status NewOlapScanner::_init_variant_columns() { } } } - schema_util::inherit_root_attributes(tablet_schema); + schema_util::inherit_column_attributes(tablet_schema); return Status::OK(); } diff --git a/be/src/vec/functions/function_variant_element.cpp b/be/src/vec/functions/function_variant_element.cpp index 89256635279f4b..84ddc3b8046f56 100644 --- a/be/src/vec/functions/function_variant_element.cpp +++ b/be/src/vec/functions/function_variant_element.cpp @@ -32,6 +32,7 @@ #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" #include "vec/columns/column_string.h" +#include "vec/columns/subcolumn_tree.h" #include "vec/common/assert_cast.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" @@ -43,6 +44,7 @@ #include "vec/functions/function.h" #include "vec/functions/function_helpers.h" #include "vec/functions/simple_function_factory.h" +#include "vec/json/path_in_data.h" namespace doris::vectorized { @@ -128,8 +130,45 @@ class FunctionVariantElement : public IFunction { *result = ColumnObject::create(true, type, std::move(result_column)); return Status::OK(); } else { - return Status::NotSupported("Not support element_at with none scalar variant {}", - src.debug_string()); + auto mutable_src = src.clone_finalized(); + auto* mutable_ptr = assert_cast(mutable_src.get()); + PathInData path(field_name); + ColumnObject::Subcolumns subcolumns = mutable_ptr->get_subcolumns(); + const auto* node = subcolumns.find_exact(path); + auto result_col = ColumnObject::create(true, false /*should not create root*/); + if (node != nullptr) { + std::vector nodes; + PathsInData paths; + ColumnObject::Subcolumns::get_leaves_of_node(node, nodes, paths); + ColumnObject::Subcolumns new_subcolumns; + for (const auto* n : nodes) { + PathInData new_path = n->path.copy_pop_front(); + VLOG_DEBUG << "add node " << new_path.get_path() + << ", data size: " << n->data.size() + << ", finalized size: " << n->data.get_finalized_column().size() + << ", common type: " << n->data.get_least_common_type()->get_name(); + // if new_path is empty, indicate it's the root column, but adding a root will return false when calling add + if (!new_subcolumns.add(new_path, n->data)) { + VLOG_DEBUG << "failed to add node " << new_path.get_path(); + } + } + // handle the root node + if (new_subcolumns.empty() && !nodes.empty()) { + CHECK_EQ(nodes.size(), 1); + new_subcolumns.create_root(ColumnObject::Subcolumn { + nodes[0]->data.get_finalized_column_ptr()->assume_mutable(), + nodes[0]->data.get_least_common_type(), true, true}); + } + auto container = ColumnObject::create(std::move(new_subcolumns), true); + result_col->insert_range_from(*container, 0, container->size()); + } else { + result_col->insert_many_defaults(src.size()); + } + *result = result_col->get_ptr(); + VLOG_DEBUG << "dump new object " + << static_cast(result_col.get())->debug_string() + << ", path " << path.get_path(); + return Status::OK(); } } diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 86c1d2d6669956..e5f945953f6ef8 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -17,6 +17,7 @@ #include "vec/olap/olap_data_convertor.h" +#include #include #include "common/compiler_util.h" // IWYU pragma: keep @@ -42,6 +43,7 @@ #include "vec/columns/column_struct.h" #include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" +#include "vec/common/schema_util.h" #include "vec/core/block.h" #include "vec/data_types/data_type_agg_state.h" #include "vec/data_types/data_type_array.h" @@ -214,6 +216,16 @@ void OlapBlockDataConvertor::set_source_content(const vectorized::Block* block, } } +Status OlapBlockDataConvertor::set_source_content_with_specifid_column( + const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows, uint32_t cid) { + DCHECK(num_rows > 0); + DCHECK(row_pos + num_rows <= typed_column.column->size()); + DCHECK(cid < _convertors.size()); + RETURN_IF_CATCH_EXCEPTION( + { _convertors[cid]->set_source_column(typed_column, row_pos, num_rows); }); + return Status::OK(); +} + Status OlapBlockDataConvertor::set_source_content_with_specifid_columns( const vectorized::Block* block, size_t row_pos, size_t num_rows, std::vector cids) { @@ -1078,8 +1090,6 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column( ? assert_cast(*typed_column.column) : assert_cast( nullable_column->get_nested_column()); - - const_cast(variant).finalize_if_not(); if (variant.is_null_root()) { auto root_type = make_nullable(std::make_shared()); auto root_col = root_type->create_column(); @@ -1087,19 +1097,25 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column( const_cast(variant).create_root(root_type, std::move(root_col)); variant.check_consistency(); } - auto root_of_variant = variant.get_root(); - auto nullable = assert_cast(root_of_variant.get()); - CHECK(nullable); - _root_data_column = assert_cast(&nullable->get_nested_column()); - _root_data_convertor->set_source_column({root_of_variant->get_ptr(), nullptr, ""}, row_pos, - num_rows); + // ensure data finalized + _source_column_ptr = &const_cast(variant); + _source_column_ptr->finalize(false); + _root_data_convertor = std::make_unique(true); + _root_data_convertor->set_source_column( + {_source_column_ptr->get_root()->get_ptr(), nullptr, ""}, row_pos, num_rows); OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows); } // convert root data Status OlapBlockDataConvertor::OlapColumnDataConvertorVariant::convert_to_olap() { - RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, _root_data_column)); + RETURN_IF_ERROR(vectorized::schema_util::encode_variant_sparse_subcolumns(*_source_column_ptr)); +#ifndef NDEBUG + _source_column_ptr->check_consistency(); +#endif + const auto* nullable = assert_cast(_source_column_ptr->get_root().get()); + const auto* root_column = assert_cast(&nullable->get_nested_column()); + RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, root_column)); return Status::OK(); } diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index 0ec720fcdc1265..764a7a4a7c3953 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -34,6 +34,7 @@ #include "olap/uint24.h" #include "runtime/collection_value.h" #include "util/slice.h" +#include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" #include "vec/columns/column_string.h" @@ -77,6 +78,9 @@ class OlapBlockDataConvertor { void set_source_content(const vectorized::Block* block, size_t row_pos, size_t num_rows); Status set_source_content_with_specifid_columns(const vectorized::Block* block, size_t row_pos, size_t num_rows, std::vector cids); + Status set_source_content_with_specifid_column(const ColumnWithTypeAndName& typed_column, + size_t row_pos, size_t num_rows, uint32_t cid); + void clear_source_content(); std::pair convert_column_data(size_t cid); void add_column_data_convertor(const TabletColumn& column); @@ -487,8 +491,8 @@ class OlapBlockDataConvertor { class OlapColumnDataConvertorVariant : public OlapColumnDataConvertorBase { public: - OlapColumnDataConvertorVariant() - : _root_data_convertor(std::make_unique(true)) {} + OlapColumnDataConvertorVariant() = default; + void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) override; Status convert_to_olap() override; @@ -497,10 +501,11 @@ class OlapBlockDataConvertor { const void* get_data_at(size_t offset) const override; private: - // encodes sparsed columns - const ColumnString* _root_data_column; - // _nullmap contains null info for this variant + // // encodes sparsed columns + // const ColumnString* _root_data_column; + // // _nullmap contains null info for this variant std::unique_ptr _root_data_convertor; + ColumnObject* _source_column_ptr; }; private: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java index 0a8dbf5bad9454..fda46d13fffc72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java @@ -191,12 +191,8 @@ private void analyzeSetExprs(Analyzer analyzer) throws AnalysisException { // step3: generate select list and insert column name list in insert stmt boolean isMow = ((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite(); - boolean hasVariant = false; int setExprCnt = 0; for (Column column : targetTable.getColumns()) { - if (column.getType().isVariantType()) { - hasVariant = true; - } for (BinaryPredicate setExpr : setExprs) { Expr lhs = setExpr.getChild(0); if (((SlotRef) lhs).getColumn().equals(column)) { @@ -204,13 +200,10 @@ private void analyzeSetExprs(Analyzer analyzer) throws AnalysisException { } } } - // 1.table with sequence col cannot use partial update cause in MOW, we encode pk + // table with sequence col cannot use partial update cause in MOW, we encode pk // with seq column but we don't know which column is sequence in update - // 2. variant column update schema during load, so implement partial update is complicated, - // just ignore it at present if (isMow && ((OlapTable) targetTable).getSequenceCol() == null - && setExprCnt <= targetTable.getColumns().size() * 3 / 10 - && !hasVariant) { + && setExprCnt <= targetTable.getColumns().size() * 3 / 10) { isPartialUpdate = true; } Optional sequenceMapCol = Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 48766caa5ce301..542dab31a01e90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -159,7 +159,7 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite() && selectItems.size() < targetTable.getColumns().size() - && !targetTable.hasVariantColumns() && targetTable.getSequenceCol() == null + && targetTable.getSequenceCol() == null && partialUpdateColNameToExpression.size() <= targetTable.getFullSchema().size() * 3 / 10; List partialUpdateColNames = new ArrayList<>(); diff --git a/regression-test/data/variant_p0/delete_update.out b/regression-test/data/variant_p0/delete_update.out index 0e478bdeb0df5c..0cf801f1c004e9 100644 --- a/regression-test/data/variant_p0/delete_update.out +++ b/regression-test/data/variant_p0/delete_update.out @@ -14,3 +14,10 @@ 2 {"updated_value":123} {"updated_value":123} 6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123} +-- !sql -- +1 "ddddddddddd" 1111 199 10 {"new_data1":1} +2 "eeeeee" 2222 299 20 {"new_data2":2} +3 "aaaaa" 3333 399 30 {"new_data3":3} +4 "bbbbbbbb" 4444 499 40 {"new_data4":4} +5 "cccccccccccc" 5555 599 50 {"new_data5":5} + diff --git a/regression-test/data/variant_p0/partial_update_parallel1.csv b/regression-test/data/variant_p0/partial_update_parallel1.csv new file mode 100644 index 00000000000000..4ba84bb7785ff2 --- /dev/null +++ b/regression-test/data/variant_p0/partial_update_parallel1.csv @@ -0,0 +1,5 @@ +1,"ddddddddddd" +2,"eeeeee" +3,"aaaaa" +4,"bbbbbbbb" +5,"cccccccccccc" diff --git a/regression-test/data/variant_p0/partial_update_parallel2.csv b/regression-test/data/variant_p0/partial_update_parallel2.csv new file mode 100644 index 00000000000000..1560d6d3261218 --- /dev/null +++ b/regression-test/data/variant_p0/partial_update_parallel2.csv @@ -0,0 +1,5 @@ +1,1111,199 +2,2222,299 +3,3333,399 +4,4444,499 +5,5555,599 diff --git a/regression-test/data/variant_p0/partial_update_parallel3.csv b/regression-test/data/variant_p0/partial_update_parallel3.csv new file mode 100644 index 00000000000000..17abeef1a9cf9c --- /dev/null +++ b/regression-test/data/variant_p0/partial_update_parallel3.csv @@ -0,0 +1,5 @@ +1,10,{"new_data1" : 1} +2,20,{"new_data2" : 2} +3,30,{"new_data3" : 3} +4,40,{"new_data4" : 4} +5,50,{"new_data5" : 5} diff --git a/regression-test/data/variant_p0/partial_update_parallel4.csv b/regression-test/data/variant_p0/partial_update_parallel4.csv new file mode 100644 index 00000000000000..0a7cbd412faab3 --- /dev/null +++ b/regression-test/data/variant_p0/partial_update_parallel4.csv @@ -0,0 +1,3 @@ +1,1 +3,1 +5,1 diff --git a/regression-test/data/variant_p0/variant_with_rowstore.out b/regression-test/data/variant_p0/variant_with_rowstore.out index d7d759baad3cc4..6c34622bec85f2 100644 --- a/regression-test/data/variant_p0/variant_with_rowstore.out +++ b/regression-test/data/variant_p0/variant_with_rowstore.out @@ -23,3 +23,12 @@ 5 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} 6 {"a":1234,"xxxx":"kaana"} {"a":1234,"xxxx":"kaana"} +-- !point_select -- +-3 {"a":1,"b":1.5,"c":[1,2,3]} {"a":1,"b":1.5,"c":[1,2,3]} + +-- !point_select -- +-2 {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}} {"a":11245,"b":[123,{"xx":1}],"c":{"c":456,"d":"null","e":7.111}} + +-- !point_select -- +-1 {"a":1123} {"a":1123} + diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy b/regression-test/suites/variant_github_events_p0_new/load.groovy index 0be0f205b69b44..c063ebecf26274 100644 --- a/regression-test/suites/variant_github_events_p0_new/load.groovy +++ b/regression-test/suites/variant_github_events_p0_new/load.groovy @@ -95,6 +95,36 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){ sql """ insert into github_events_2 select 1, cast(v["repo"]["name"] as string) FROM github_events; """ + // insert batches of nulls + for(int t = 0; t <= 10; t += 1){ + long k = 9223372036854775107 + t + sql """INSERT INTO github_events VALUES (${k}, NULL)""" + } + sql """ALTER TABLE github_events SET("bloom_filter_columns" = "v")""" + // wait for add bloom filter finished + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='github_events' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + int max_try_time = 200 + while (max_try_time--){ + String result = getJobState("github_events") + if (result == "FINISHED") { + break + } else { + sleep(2000) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + sql """ALTER TABLE github_events ADD COLUMN v2 variant DEFAULT NULL""" + for(int t = 0; t <= 10; t += 1){ + long k = 9223372036854775107 + t + sql """INSERT INTO github_events VALUES (${k}, '{"aaaa" : 1234, "bbbb" : "11ssss"}', '{"xxxx" : 1234, "yyyy" : [1.111]}')""" + } + sql """ALTER TABLE github_events DROP COLUMN v2""" + sql """DELETE FROM github_events where k >= 9223372036854775107""" qt_sql_select_count """ select count(*) from github_events_2; """ } diff --git a/regression-test/suites/variant_p0/delete_update.groovy b/regression-test/suites/variant_p0/delete_update.groovy index bbd999559b42ee..ed9667e7f5b324 100644 --- a/regression-test/suites/variant_p0/delete_update.groovy +++ b/regression-test/suites/variant_p0/delete_update.groovy @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +import org.codehaus.groovy.runtime.IOGroovyMethods + suite("regression_test_variant_delete_and_update", "variant_type"){ // MOR def table_name = "var_delete_update" @@ -30,14 +32,14 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ """ // test mor table - sql """insert into ${table_name} values (1, '{"a" : 1, "b" : [1], "c": 1.0}')""" - sql """insert into ${table_name} values (2, '{"a" : 2, "b" : [1], "c": 2.0}')""" - sql """insert into ${table_name} values (3, '{"a" : 3, "b" : [3], "c": 3.0}')""" - sql """insert into ${table_name} values (4, '{"a" : 4, "b" : [4], "c": 4.0}')""" - sql """insert into ${table_name} values (5, '{"a" : 5, "b" : [5], "c": 5.0}')""" + sql """insert into ${table_name} values (1, '{"a":1,"b":[1],"c":1.0}')""" + sql """insert into ${table_name} values (2, '{"a":2,"b":[1],"c":2.0}')""" + sql """insert into ${table_name} values (3, '{"a":3,"b":[3],"c":3.0}')""" + sql """insert into ${table_name} values (4, '{"a":4,"b":[4],"c":4.0}')""" + sql """insert into ${table_name} values (5, '{"a":5,"b":[5],"c":5.0}')""" sql "delete from ${table_name} where k = 1" - sql """update ${table_name} set v = '{"updated_value" : 123}' where k = 2""" + sql """update ${table_name} set v = '{"updated_value":123}' where k = 2""" qt_sql "select * from ${table_name} order by k" // MOW @@ -46,41 +48,125 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ sql """ CREATE TABLE IF NOT EXISTS ${table_name} ( k bigint, - v variant, + v variant, vs string ) UNIQUE KEY(`k`) - DISTRIBUTED BY HASH(k) BUCKETS 3 + DISTRIBUTED BY HASH(k) BUCKETS 4 properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """ sql "insert into var_delete_update_mow select k, cast(v as string), cast(v as string) from var_delete_update" sql "delete from ${table_name} where k = 1" sql "delete from ${table_name} where k in (select k from var_delete_update_mow where k in (3, 4, 5))" - sql """insert into ${table_name} values (6, '{"a" : 4, "b" : [4], "c": 4.0}', 'xxx')""" - sql """insert into ${table_name} values (7, '{"a" : 4, "b" : [4], "c": 4.0}', 'yyy')""" - sql """update ${table_name} set vs = '{"updated_value" : 123}' where k = 6""" - sql """update ${table_name} set v = '{"updated_value" : 1111}' where k = 7""" - qt_sql "select * from ${table_name} order by k" + sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.0}', 'xxx')""" + sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.0}', 'yyy')""" + sql """update var_delete_update_mow set vs = '{"updated_value" : 123}' where k = 6""" + sql """update var_delete_update_mow set v = '{"updated_value":1111}' where k = 7""" + qt_sql "select * from var_delete_update_mow order by k" sql """delete from ${table_name} where v = 'xxx' or vs = 'yyy'""" sql """delete from ${table_name} where vs = 'xxx' or vs = 'yyy'""" qt_sql "select * from ${table_name} order by k" // delete & insert concurrently - + sql "set enable_unique_key_partial_update=true;" + sql "sync" t1 = Thread.startDaemon { for (int k = 1; k <= 60; k++) { - int x = k % 10; - sql """insert into ${table_name} values(${x}, '${x}', '{"k${x}" : ${x}}')""" + int x = new Random().nextInt(61) % 10; + sql """insert into ${table_name}(k,vs) values(${x}, '{"k${x}" : ${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : ${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')""" } } t2 = Thread.startDaemon { for (int k = 1; k <= 60; k++) { - int x = k % 10; - sql """delete from ${table_name} where k = ${x} """ + int x = new Random().nextInt(61) % 10; + sql """insert into ${table_name}(k,v) values(${x}, '{"k${x}" : ${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : ${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')""" + } + } + t3 = Thread.startDaemon { + for (int k = 1; k <= 60; k++) { + int x = new Random().nextInt(61) % 10; + sql """insert into ${table_name}(k,v) values(${x}, '{"k${x}" : ${x}}'),(${x+1}, '{"k${x+1}" : ${x+1}}'),(${x+2}, '{"k${x+2}" : ${x+2}}'),(${x+3}, '{"k${x+3}" : ${x+3}}')""" + } + } + t1.join() + t2.join() + t3.join() + sql "sync" + + sql "set enable_unique_key_partial_update=false;" + // case 1: concurrent partial update + def tableName = "test_primary_key_partial_update_parallel" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `var` variant NULL) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true") + """ + + sql """insert into ${tableName} values + (2, "doris2", 2000, 223, 2, '{"id":2, "name":"doris2","score":2000,"test":223,"dft":2}'), + (1, "doris", 1000, 123, 1, '{"id":1, "name":"doris","score":1000,"test":123,"dft":1}'), + (5, "doris5", 5000, 523, 5, '{"id":5, "name":"doris5","score":5000,"test":523,"dft":5}'), + (4, "doris4", 4000, 423, 4, '{"id":4, "name":"doris4","score":4000,"test":423,"dft":4}'), + (3, "doris3", 3000, 323, 3, '{"id":3, "name":"doris3","score":3000,"test":323,"dft":3}');""" + + t1 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name' + + file 'partial_update_parallel1.csv' + time 10000 // limit inflight 10s } } + + t2 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score,test' + + file 'partial_update_parallel2.csv' + time 10000 // limit inflight 10s + } + } + + t3 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,dft,var' + + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + } + } + t1.join() t2.join() + t3.join() + + sql "sync" + + if (!isCloudMode()) { + qt_sql """ select * from ${tableName} order by id;""" + } } \ No newline at end of file diff --git a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy index 69c330fa98a280..43f1048f1510c3 100644 --- a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy +++ b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy @@ -85,8 +85,10 @@ suite("test_compaction_extract_root", "nonConcurrent") { union all select 5, '{"a": 1123}' as json_str union all select 5, '{"a": 11245, "b" : 42005}' as json_str from numbers("number" = "4096") limit 4096 ;""" // // fix cast to string tobe {} - qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM ${tableName};""" - qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};""" + qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM test_t""" + qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM test_t""" + // TODO, sparse columns with v['b'] will not be merged in hierachical_data_reader with sparse columns + // qt_select_b_2 """ select v['b'] from test_t where cast(v['b'] as string) != '42005' and cast(v['b'] as string) != '42004' and cast(v['b'] as string) != '42003' order by cast(v['b'] as string); """ qt_select_1_bfcompact """select v['b'] from test_t where k = 0 and cast(v['a'] as int) = 11245;""" @@ -140,8 +142,10 @@ suite("test_compaction_extract_root", "nonConcurrent") { } assert (rowCount <= 8) // fix cast to string tobe {} - qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM ${tableName};""" - qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};""" + qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM test_t""" + qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM test_t""" + // TODO, sparse columns with v['b'] will not be merged in hierachical_data_reader with sparse columns + // qt_select_b_5 """ select v['b'] from test_t where cast(v['b'] as string) != '42005' and cast(v['b'] as string) != '42004' and cast(v['b'] as string) != '42003' order by cast(v['b'] as string); """ qt_select_1 """select v['b'] from test_t where k = 0 and cast(v['a'] as int) = 11245;""" set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1") diff --git a/regression-test/suites/variant_p0/variant_with_rowstore.groovy b/regression-test/suites/variant_p0/variant_with_rowstore.groovy index 58c245ee831685..771f776b3e77e4 100644 --- a/regression-test/suites/variant_p0/variant_with_rowstore.groovy +++ b/regression-test/suites/variant_p0/variant_with_rowstore.groovy @@ -29,7 +29,6 @@ suite("regression_test_variant_rowstore", "variant_type"){ def table_name = "var_rowstore" sql "DROP TABLE IF EXISTS ${table_name}" - set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") sql """ CREATE TABLE IF NOT EXISTS ${table_name} ( @@ -63,4 +62,50 @@ suite("regression_test_variant_rowstore", "variant_type"){ """ sql """insert into ${table_name} select k, cast(v as string), cast(v as string) from var_rowstore""" qt_sql "select * from ${table_name} order by k limit 10" + + // Parse url + def user = context.config.jdbcUser + def password = context.config.jdbcPassword + def realDb = "regression_test_variant_p0" + String jdbcUrl = context.config.jdbcUrl + String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + // set server side prepared statement url + def prepare_url = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb + "?&useServerPrepStmts=true" + table_name = "var_rs_pq" + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant, + v1 variant + ) + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 1 + properties("replication_num" = "1", "disable_auto_compaction" = "false", "store_row_column" = "true", "enable_unique_key_merge_on_write" = "true"); + """ + sql """insert into ${table_name} select k, cast(v as string), cast(v as string) from var_rowstore""" + def result1 = connect(user=user, password=password, url=prepare_url) { + def stmt = prepareStatement "select * from var_rs_pq where k = ?" + assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement); + stmt.setInt(1, -3) + qe_point_select stmt + stmt.setInt(1, -2) + qe_point_select stmt + stmt.setInt(1, -1) + qe_point_select stmt + + // def stmt1 = prepareStatement "select var['a'] from var_rs_pq where k = ?" + // assertEquals(stmt1.class, com.mysql.cj.jdbc.ServerPreparedStatement); + // stmt.setInt(1, -3) + // qe_point_select stmt + } } \ No newline at end of file From 29cd748295e241f9de593f3ce8ddbb8d35eabb36 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Thu, 13 Jun 2024 11:03:37 +0800 Subject: [PATCH 2/4] [Fix](Variant) fix checksum task read with unmatched column type (#36201) ``` Bad cast from type:doris::vectorized::ColumnVector to doris::vectorized::ColumnObject Check failure stack trace: *** @ 0x562dcebca976 google::LogMessage::SendToLog() @ 0x562dcebc73c0 google::LogMessage::Flush() @ 0x562dcebcb1b9 google::LogMessageFatal::~LogMessageFatal() @ 0x562d9ded39e6 assert_cast<>() @ 0x562d9df1e599 doris::segment_v2::HierarchicalDataReader::process_read<>() @ 0x562d9df1e106 doris::segment_v2::HierarchicalDataReader::next_batch() @ 0x562d9df3373e doris::segment_v2::ColumnIterator::next_batch() .... @ 0x562d9ea56d31 doris::EngineChecksumTask::_compute_checksum() @ 0x562d9ea55cc2 doris::EngineChecksumTask::execute() @ 0x562d9b558355 doris::check_consistency_callback() ``` introduced by #34925 --- be/src/olap/rowset/segment_v2/segment.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 58e903c04d55af..f429cb4afb9209 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -537,15 +537,17 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, ? _sparse_column_tree.find_exact(*tablet_column.path_info_ptr()) : nullptr; - auto is_compaction = [](ReaderType type) { + // Currently only compaction and checksum need to read flat leaves + // They both use tablet_schema_with_merged_max_schema_version as read schema + auto type_to_read_flat_leaves = [](ReaderType type) { return type == ReaderType::READER_BASE_COMPACTION || type == ReaderType::READER_CUMULATIVE_COMPACTION || type == ReaderType::READER_COLD_DATA_COMPACTION || type == ReaderType::READER_SEGMENT_COMPACTION || - type == ReaderType::READER_FULL_COMPACTION; + type == ReaderType::READER_FULL_COMPACTION || type == ReaderType::READER_CHECKSUM; }; - if (opt != nullptr && is_compaction(opt->io_ctx.reader_type)) { + if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) { // compaction need to read flat leaves nodes data to prevent from amplification const auto* node = tablet_column.has_path_info() ? _sub_column_tree.find_leaf(*tablet_column.path_info_ptr()) From df69877894ef3f52928ee888b502545c019f319b Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Wed, 19 Jun 2024 11:48:19 +0800 Subject: [PATCH 3/4] [Fix](Variant) fix variant schema change may cause invalid block schema and write missing blocks (#36317) 1. update_rowset_schema should not update _context.tablet_schema, since it's used as src schema for schema change, if it's changed may lead to wrong schema with block 2. buffering block during schema change should merge first and then flush, otherwise may lost current block --- be/src/olap/rowset/beta_rowset_writer.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 1a350bb1664afd..fd09f8b0a7e830 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -626,14 +626,17 @@ int64_t BetaRowsetWriter::_num_seg() const { void BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) { std::lock_guard lock(*(_context.schema_lock)); TabletSchemaSPtr update_schema; + if (_context.merged_tablet_schema == nullptr) { + _context.merged_tablet_schema = _context.tablet_schema; + } static_cast(vectorized::schema_util::get_least_common_schema( - {_context.tablet_schema, flush_schema}, nullptr, update_schema)); + {_context.merged_tablet_schema, flush_schema}, nullptr, update_schema)); CHECK_GE(update_schema->num_columns(), flush_schema->num_columns()) << "Rowset merge schema columns count is " << update_schema->num_columns() << ", but flush_schema is larger " << flush_schema->num_columns() << " update_schema: " << update_schema->dump_structure() << " flush_schema: " << flush_schema->dump_structure(); - _context.tablet_schema.swap(update_schema); + _context.merged_tablet_schema.swap(update_schema); VLOG_DEBUG << "dump rs schema: " << _context.tablet_schema->dump_structure(); } From 8dc4cd87117e8a319c414cd970a53e0f7b3179c9 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Fri, 28 Jun 2024 16:30:58 +0800 Subject: [PATCH 4/4] [Fix](Variant) fix variant partial update with row store enabled (#36793) 1. Variant use serialize_one_row_to_string to string and then parse to jsonb as row store.Since we could not get the original string after 2. Remove redundant code --- be/src/vec/columns/column_object.h | 8 -- be/src/vec/common/schema_util.cpp | 135 +----------------- be/src/vec/common/schema_util.h | 8 -- .../serde/data_type_object_serde.cpp | 10 +- .../data/variant_p0/delete_update.out | 4 +- .../suites/variant_p0/delete_update.groovy | 8 +- 6 files changed, 17 insertions(+), 156 deletions(-) diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 55abd534dd145b..53516877b6d39a 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -230,10 +230,6 @@ class ColumnObject final : public COWHelper { // this structure and fill with Subcolumns sub items mutable std::shared_ptr doc_structure; - // column with raw json strings - // used for quickly row store encoding - ColumnPtr rowstore_column; - using SubColumnWithName = std::pair; // Cached search results for previous row (keyed as index in JSON object) - used as a hint. mutable std::vector _prev_positions; @@ -259,10 +255,6 @@ class ColumnObject final : public COWHelper { return subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable(); } - void set_rowstore_column(ColumnPtr col) { rowstore_column = col; } - - ColumnPtr get_rowstore_column() const { return rowstore_column; } - Status serialize_one_row_to_string(int row, std::string* output) const; Status serialize_one_row_to_string(int row, BufferWritable& output) const; diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 626a78f25659b1..016336d4098d1c 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -492,36 +492,8 @@ Status _parse_variant_columns(Block& block, const std::vector& variant_pos, var.assume_mutable_ref().finalize(); MutableColumnPtr variant_column; - bool record_raw_string_with_serialization = false; - // set - auto encode_rowstore = [&]() { - if (!ctx.record_raw_json_column) { - return Status::OK(); - } - auto* var = static_cast(variant_column.get()); - if (record_raw_string_with_serialization) { - // encode to raw json column - auto raw_column = vectorized::ColumnString::create(); - for (size_t i = 0; i < var->rows(); ++i) { - std::string raw_str; - RETURN_IF_ERROR(var->serialize_one_row_to_string(i, &raw_str)); - raw_column->insert_data(raw_str.c_str(), raw_str.size()); - } - var->set_rowstore_column(raw_column->get_ptr()); - } else { - // use original input json column - auto original_var_root = vectorized::check_and_get_column( - remove_nullable(column_ref).get()) - ->get_root(); - var->set_rowstore_column(original_var_root); - } - return Status::OK(); - }; - if (!var.is_scalar_variant()) { variant_column = var.assume_mutable(); - record_raw_string_with_serialization = true; - RETURN_IF_ERROR(encode_rowstore()); // already parsed continue; } @@ -558,8 +530,6 @@ Status _parse_variant_columns(Block& block, const std::vector& variant_pos, result = ColumnNullable::create(result, null_map); } block.get_by_position(variant_pos[i]).column = result; - RETURN_IF_ERROR(encode_rowstore()); - // block.get_by_position(variant_pos[i]).type = std::make_shared("json", true); } return Status::OK(); } @@ -600,35 +570,6 @@ Status encode_variant_sparse_subcolumns(ColumnObject& column) { return Status::OK(); } -static void _append_column(const TabletColumn& parent_variant, - const ColumnObject::Subcolumns::NodePtr& subcolumn, - TabletSchemaSPtr& to_append, bool is_sparse) { - // If column already exist in original tablet schema, then we pick common type - // and cast column to common type, and modify tablet column to common type, - // otherwise it's a new column - CHECK(to_append.use_count() == 1); - const std::string& column_name = - parent_variant.name_lower_case() + "." + subcolumn->path.get_path(); - const vectorized::DataTypePtr& final_data_type_from_object = - subcolumn->data.get_least_common_type(); - vectorized::PathInDataBuilder full_path_builder; - auto full_path = full_path_builder.append(parent_variant.name_lower_case(), false) - .append(subcolumn->path.get_parts(), false) - .build(); - TabletColumn tablet_column = vectorized::schema_util::get_column_by_type( - final_data_type_from_object, column_name, - vectorized::schema_util::ExtraInfo {.unique_id = -1, - .parent_unique_id = parent_variant.unique_id(), - .path_info = full_path}); - - if (!is_sparse) { - to_append->append_column(std::move(tablet_column)); - } else { - to_append->mutable_column_by_uid(parent_variant.unique_id()) - .append_sparse_column(std::move(tablet_column)); - } -} - // sort by paths in lexicographical order vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( const vectorized::ColumnObject::Subcolumns& subcolumns) { @@ -640,70 +581,12 @@ vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( return sorted; } -void rebuild_schema_and_block(const TabletSchemaSPtr& original, - const std::vector& variant_positions, Block& flush_block, - TabletSchemaSPtr& flush_schema) { - // rebuild schema and block with variant extracted columns - - // 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema - // those columns are extracted columns, leave none extracted columns remain in original variant column, which is - // JSONB format at present. - // 2. Collect columns that need to be added or modified when data type changes or new columns encountered - for (size_t variant_pos : variant_positions) { - auto column_ref = flush_block.get_by_position(variant_pos).column; - bool is_nullable = column_ref->is_nullable(); - const vectorized::ColumnObject& object_column = assert_cast( - remove_nullable(column_ref)->assume_mutable_ref()); - const TabletColumn& parent_column = *original->columns()[variant_pos]; - CHECK(object_column.is_finalized()); - std::shared_ptr root; - // common extracted columns - for (const auto& entry : get_sorted_subcolumns(object_column.get_subcolumns())) { - if (entry->path.empty()) { - // root - root = entry; - continue; - } - _append_column(parent_column, entry, flush_schema, false); - const std::string& column_name = - parent_column.name_lower_case() + "." + entry->path.get_path(); - flush_block.insert({entry->data.get_finalized_column_ptr()->get_ptr(), - entry->data.get_least_common_type(), column_name}); - } - - // add sparse columns to flush_schema - for (const auto& entry : get_sorted_subcolumns(object_column.get_sparse_subcolumns())) { - _append_column(parent_column, entry, flush_schema, true); - } - - // Create new variant column and set root column - auto obj = vectorized::ColumnObject::create(true, false); - // '{}' indicates a root path - static_cast(obj.get())->add_sub_column( - {}, root->data.get_finalized_column_ptr()->assume_mutable(), - root->data.get_least_common_type()); - // // set for rowstore - if (original->store_row_column()) { - static_cast(obj.get())->set_rowstore_column( - object_column.get_rowstore_column()); - } - vectorized::ColumnPtr result = obj->get_ptr(); - if (is_nullable) { - const auto& null_map = assert_cast(*column_ref) - .get_null_map_column_ptr(); - result = vectorized::ColumnNullable::create(result, null_map); - } - flush_block.get_by_position(variant_pos).column = result; - vectorized::PathInDataBuilder full_root_path_builder; - auto full_root_path = - full_root_path_builder.append(parent_column.name_lower_case(), false).build(); - TabletColumn new_col = flush_schema->column(variant_pos); - new_col.set_path_info(full_root_path); - flush_schema->replace_column(variant_pos, new_col); - VLOG_DEBUG << "set root_path : " << full_root_path.get_path(); - } +// --------------------------- - vectorized::schema_util::inherit_column_attributes(flush_schema); +std::string dump_column(DataTypePtr type, const ColumnPtr& col) { + Block tmp; + tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()}); + return tmp.dump_data(0, tmp.rows()); } // --------------------------- @@ -734,13 +617,5 @@ Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst) ->assume_mutable(); return Status::OK(); } -// --------------------------- - -std::string dump_column(DataTypePtr type, const ColumnPtr& col) { - Block tmp; - tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()}); - return tmp.dump_data(0, tmp.rows()); -} -// --------------------------- } // namespace doris::vectorized::schema_util diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index f519e4dacae376..162885414159e0 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -124,14 +124,6 @@ void inherit_column_attributes(const TabletColumn& source, TabletColumn& target, vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( const vectorized::ColumnObject::Subcolumns& subcolumns); -// Rebuild schema from original schema by extend dynamic columns generated from ColumnObject. -// Block consists of two parts, dynamic part of columns and static part of columns. -// static extracted -// | --------- | ----------- | -// The static ones are original tablet_schame columns -void rebuild_schema_and_block(const TabletSchemaSPtr& original, const std::vector& variant_pos, - Block& flush_block, TabletSchemaSPtr& flush_schema); - // Extract json data from source with path Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst); diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 6a5c7afc0b22a9..e9015db653ab9f 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -90,12 +90,14 @@ void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr const_cast(variant).finalize(); } result.writeKey(col_id); + std::string value_str; + if (!variant.serialize_one_row_to_string(row_num, &value_str)) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}", + variant.dump_structure()); + } JsonbParser json_parser; - CHECK(variant.get_rowstore_column() != nullptr); - // use original document - const auto& data_ref = variant.get_rowstore_column()->get_data_at(row_num); // encode as jsonb - bool succ = json_parser.parse(data_ref.data, data_ref.size); + bool succ = json_parser.parse(value_str.data(), value_str.size()); // maybe more graceful, it is ok to check here since data could be parsed CHECK(succ); result.writeStartBinary(); diff --git a/regression-test/data/variant_p0/delete_update.out b/regression-test/data/variant_p0/delete_update.out index 0cf801f1c004e9..4390610c21df33 100644 --- a/regression-test/data/variant_p0/delete_update.out +++ b/regression-test/data/variant_p0/delete_update.out @@ -7,12 +7,12 @@ -- !sql -- 2 {"updated_value":123} {"updated_value":123} -6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123} +6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123} 7 {"updated_value":1111} yyy -- !sql -- 2 {"updated_value":123} {"updated_value":123} -6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123} +6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123} -- !sql -- 1 "ddddddddddd" 1111 199 10 {"new_data1":1} diff --git a/regression-test/suites/variant_p0/delete_update.groovy b/regression-test/suites/variant_p0/delete_update.groovy index ed9667e7f5b324..2b126b4c3a6616 100644 --- a/regression-test/suites/variant_p0/delete_update.groovy +++ b/regression-test/suites/variant_p0/delete_update.groovy @@ -53,14 +53,14 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ ) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(k) BUCKETS 4 - properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); + properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true"); """ sql "insert into var_delete_update_mow select k, cast(v as string), cast(v as string) from var_delete_update" sql "delete from ${table_name} where k = 1" sql "delete from ${table_name} where k in (select k from var_delete_update_mow where k in (3, 4, 5))" - sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.0}', 'xxx')""" - sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.0}', 'yyy')""" + sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.1}', 'xxx')""" + sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.1}', 'yyy')""" sql """update var_delete_update_mow set vs = '{"updated_value" : 123}' where k = 6""" sql """update var_delete_update_mow set v = '{"updated_value":1111}' where k = 7""" qt_sql "select * from var_delete_update_mow order by k" @@ -108,7 +108,7 @@ suite("regression_test_variant_delete_and_update", "variant_type"){ `dft` int(11) DEFAULT "4321", `var` variant NULL) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true") + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true", "store_row_column" = "true") """ sql """insert into ${tableName} values