Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,24 +602,44 @@ vectorized::DataTypePtr Segment::get_data_type_of(const TabletColumn& column,
// Find the specific node within the variant structure using the relative path.
const auto* node = variant_reader->get_reader_by_path(relative_path);

if (relative_path.get_path() == SPARSE_COLUMN_PATH) {
return vectorized::DataTypeFactory::instance().create_data_type(column);
}

// Case 1: Node not found for the given path within the variant reader.
// If relative_path is empty, it means the original path pointed to the root
// of the variant column itself. We should return the Variant type.
if (node == nullptr || relative_path.empty()) {
return vectorized::DataTypeFactory::instance().create_data_type(column);
if (column.is_nested_subcolumn()) {
return vectorized::DataTypeFactory::instance().create_data_type(column);
}
return column.is_nullable()
? vectorized::make_nullable(std::make_shared<vectorized::DataTypeVariant>(
column.variant_max_subcolumns_count()))
: std::make_shared<vectorized::DataTypeVariant>(
column.variant_max_subcolumns_count());
}

bool exist_in_sparse = variant_reader->exist_in_sparse_column(relative_path);
bool is_physical_leaf = node->children.empty();

if (is_physical_leaf && column.is_nested_subcolumn()) {
return node->data.file_column_type;
}

// Condition to return the specific underlying type of the node:
// 1. We are reading flat leaves (ignoring hierarchy).
// 2. OR It's a leaf in the physical column structure AND it doesn't *also* exist
// in the sparse column (meaning it's purely a materialized leaf).
if (read_flat_leaves || (is_physical_leaf && !exist_in_sparse)) {
if (read_flat_leaves || (is_physical_leaf && !exist_in_sparse &&
!variant_reader->is_exceeded_sparse_column_limit())) {
return node->data.file_column_type;
}
return vectorized::DataTypeFactory::instance().create_data_type(column);
return column.is_nullable()
? vectorized::make_nullable(std::make_shared<vectorized::DataTypeVariant>(
column.variant_max_subcolumns_count()))
: std::make_shared<vectorized::DataTypeVariant>(
column.variant_max_subcolumns_count());
}

Status Segment::_create_column_readers_once(OlapReaderStatistics* stats) {
Expand Down
10 changes: 9 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,17 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
}
return Status::OK();
}
if (block->columns() < _column_writers.size()) {
return Status::InternalError(
"block->columns() < _column_writers.size(), block->columns()=" +
std::to_string(block->columns()) +
", _column_writers.size()=" + std::to_string(_column_writers.size()) +
", _tablet_schema->dump_structure()=" + _tablet_schema->dump_structure());
}
CHECK(block->columns() >= _column_writers.size())
<< ", block->columns()=" << block->columns()
<< ", _column_writers.size()=" << _column_writers.size();
<< ", _column_writers.size()=" << _column_writers.size()
<< ", _tablet_schema->dump_structure()=" << _tablet_schema->dump_structure();
// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
Expand Down
36 changes: 19 additions & 17 deletions be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ bool VariantColumnReader::exist_in_sparse_column(
return existed_in_sparse_column || prefix_existed_in_sparse_column;
}

bool VariantColumnReader::is_exceeded_sparse_column_limit() const {
return !_statistics->sparse_column_non_null_size.empty() &&
_statistics->sparse_column_non_null_size.size() >=
config::variant_max_sparse_column_statistics_size;
}

int64_t VariantColumnReader::get_metadata_size() const {
int64_t size = ColumnReader::get_metadata_size();
if (_statistics) {
Expand Down Expand Up @@ -205,10 +211,16 @@ Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** iter
RETURN_IF_ERROR(_create_sparse_merge_reader(iterator, opts, target_col, inner_iter));
return Status::OK();
}

if (target_col.is_nested_subcolumn()) {
// using the sibling of the nested column to fill the target nested column
RETURN_IF_ERROR(_new_default_iter_with_same_nested(iterator, target_col));
return Status::OK();
}

// If the path is typed, it means the path is not a sparse column, so we can't read the sparse column
// even if the sparse column size is reached limit
if (existed_in_sparse_column ||
(exceeded_sparse_column_limit && !relative_path.get_is_typed())) {
if (existed_in_sparse_column || exceeded_sparse_column_limit) {
// Sparse column exists or reached sparse size limit, read sparse column
ColumnIterator* inner_iter;
RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter, nullptr));
Expand All @@ -219,11 +231,7 @@ Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** iter
const_cast<StorageReadOptions*>(opts), target_col);
return Status::OK();
}
if (target_col.is_nested_subcolumn()) {
// using the sibling of the nested column to fill the target nested column
RETURN_IF_ERROR(_new_default_iter_with_same_nested(iterator, target_col));
return Status::OK();
}

VLOG_DEBUG << "new_default_iter: " << target_col.path_info_ptr()->get_path();
std::unique_ptr<ColumnIterator> it;
RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &it));
Expand Down Expand Up @@ -264,7 +272,7 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const Tablet
// Otherwise the prefix is not exist and the sparse column size is reached limit
// which means the path maybe exist in sparse_column
bool exceeded_sparse_column_limit = !_statistics->sparse_column_non_null_size.empty() &&
_statistics->sparse_column_non_null_size.size() ==
_statistics->sparse_column_non_null_size.size() >=
config::variant_max_sparse_column_statistics_size;

// For compaction operations, read flat leaves, otherwise read hierarchical data
Expand All @@ -284,7 +292,7 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const Tablet
_statistics->sparse_column_non_null_size.end()) &&
_statistics->sparse_column_non_null_size.lower_bound(prefix)->first.starts_with(prefix);
// if prefix exists in sparse column, read sparse column with hierarchical reader
if (prefix_existed_in_sparse_column) {
if (prefix_existed_in_sparse_column || exceeded_sparse_column_limit) {
// Example {"b" : {"c":456,"e":7.111}}
// b.c is sparse column, b.e is subcolumn, so b is both the prefix of sparse column and subcolumn
return _create_hierarchical_reader(iterator, relative_path, node, root);
Expand Down Expand Up @@ -317,9 +325,6 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const Tablet
RETURN_IF_ERROR(_create_hierarchical_reader(iterator, relative_path, node, root));
}
} else {
if (exceeded_sparse_column_limit) {
return _create_hierarchical_reader(iterator, relative_path, nullptr, root);
}
// Sparse column not exists and not reached stats limit, then the target path is not exist, get a default iterator
std::unique_ptr<ColumnIterator> iter;
RETURN_IF_ERROR(Segment::new_default_iterator(*target_col, &iter));
Expand Down Expand Up @@ -444,11 +449,8 @@ void VariantColumnReader::get_subcolumns_types(
std::unordered_map<vectorized::PathInData, vectorized::DataTypes,
vectorized::PathInData::Hash>* subcolumns_types) const {
for (const auto& subcolumn_reader : *_subcolumn_readers) {
// no need typed path and root path
if (!subcolumn_reader->path.get_is_typed() && !subcolumn_reader->path.empty()) {
auto& path_types = (*subcolumns_types)[subcolumn_reader->path];
path_types.push_back(subcolumn_reader->data.file_column_type);
}
auto& path_types = (*subcolumns_types)[subcolumn_reader->path];
path_types.push_back(subcolumn_reader->data.file_column_type);
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class VariantColumnReader : public ColumnReader {

bool exist_in_sparse_column(const vectorized::PathInData& path) const;

bool is_exceeded_sparse_column_limit() const;

const SubcolumnColumnReaders* get_subcolumn_readers() const { return _subcolumn_readers.get(); }

void get_subcolumns_types(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ Status _create_column_writer(uint32_t cid, const TabletColumn& column,
IndexFileWriter* inverted_index_file_writer,
std::unique_ptr<ColumnWriter>* writer,
TabletIndexes& subcolumn_indexes, ColumnWriterOptions* opt,
int64_t none_null_value_size) {
int64_t none_null_value_size, bool need_record_none_null_value_size) {
_init_column_meta(opt->meta, cid, column, opt->compression_type);
// no need to record none null value size for typed column or nested column, since it's compaction stage
// will directly pick it as sub column
if (!column.path_info_ptr()->get_is_typed() && !column.path_info_ptr()->has_nested_part()) {
if (need_record_none_null_value_size) {
// record none null value size for statistics
opt->meta->set_none_null_size(none_null_value_size);
}
Expand Down Expand Up @@ -265,32 +265,22 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnVariant* p
// create subcolumn writer
int current_column_id = column_id++;
TabletColumn tablet_column;
int64_t none_null_value_size = -1;
vectorized::ColumnPtr current_column;
vectorized::DataTypePtr current_type;
int64_t none_null_value_size = entry->data.get_non_null_value_size();
vectorized::ColumnPtr current_column = entry->data.get_finalized_column_ptr()->get_ptr();
vectorized::DataTypePtr current_type = entry->data.get_least_common_type();
if (auto current_path = entry->path.get_path();
_subcolumns_info.find(current_path) != _subcolumns_info.end()) {
tablet_column = std::move(_subcolumns_info[current_path].column);
vectorized::DataTypePtr storage_type =
vectorized::DataTypeFactory::instance().create_data_type(tablet_column);
vectorized::DataTypePtr finalized_type = entry->data.get_least_common_type();
current_column = entry->data.get_finalized_column_ptr()->get_ptr();
if (!storage_type->equals(*finalized_type)) {
RETURN_IF_ERROR(vectorized::schema_util::cast_column(
{current_column, finalized_type, ""}, storage_type, &current_column));
}
_subcolumns_indexes[current_column_id] =
std::move(_subcolumns_info[current_path].indexes);
const auto& null_data = assert_cast<const vectorized::ColumnNullable&>(*current_column)
.get_null_map_data();
none_null_value_size =
simd::count_zero_num((int8_t*)null_data.data(), null_data.size());
current_type = storage_type;
if (auto storage_type =
vectorized::DataTypeFactory::instance().create_data_type(tablet_column);
!storage_type->equals(*current_type)) {
return Status::InvalidArgument("Storage type {} is not equal to current type {}",
storage_type->get_name(), current_type->get_name());
}
} else {
tablet_column = generate_column_info(entry);
none_null_value_size = entry->data.get_non_null_value_size();
current_column = entry->data.get_finalized_column_ptr()->get_ptr();
current_type = entry->data.get_least_common_type();
}
ColumnWriterOptions opts;
opts.meta = _opts.footer->add_columns();
Expand All @@ -300,10 +290,16 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnVariant* p
opts.file_writer = _opts.file_writer;
std::unique_ptr<ColumnWriter> writer;
vectorized::schema_util::inherit_column_attributes(*_tablet_column, tablet_column);

bool need_record_none_null_value_size =
(!tablet_column.path_info_ptr()->get_is_typed() ||
_tablet_column->variant_enable_typed_paths_to_sparse()) &&
!tablet_column.path_info_ptr()->has_nested_part();

RETURN_IF_ERROR(_create_column_writer(
current_column_id, tablet_column, _opts.rowset_ctx->tablet_schema,
_opts.index_file_writer, &writer, _subcolumns_indexes[current_column_id], &opts,
none_null_value_size));
none_null_value_size, need_record_none_null_value_size));
_subcolumn_writers.push_back(std::move(writer));
_subcolumn_opts.push_back(opts);
_subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows);
Expand Down Expand Up @@ -394,7 +390,10 @@ Status VariantColumnWriterImpl::finalize() {
}
}

RETURN_IF_ERROR(ptr->pick_subcolumns_to_sparse_column(_subcolumns_info));
RETURN_IF_ERROR(ptr->convert_typed_path_to_storage_type(_subcolumns_info));

RETURN_IF_ERROR(ptr->pick_subcolumns_to_sparse_column(
_subcolumns_info, _tablet_column->variant_enable_typed_paths_to_sparse()));

#ifndef NDEBUG
ptr->check_consistency();
Expand Down Expand Up @@ -542,7 +541,7 @@ VariantSubcolumnWriter::VariantSubcolumnWriter(const ColumnWriterOptions& opts,
//
_tablet_column = column;
_opts = opts;
_column = vectorized::ColumnVariant::create(true);
_column = vectorized::ColumnVariant::create(0);
}

Status VariantSubcolumnWriter::init() {
Expand Down Expand Up @@ -575,34 +574,41 @@ Status VariantSubcolumnWriter::finalize() {
const auto& parent_column =
_opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id());

TabletColumn flush_column;

auto path = _tablet_column->path_info_ptr()->copy_pop_front().get_path();

TabletSchema::SubColumnInfo sub_column_info;
if (ptr->get_subcolumns().get_root()->data.get_least_common_base_type_id() ==
PrimitiveType::INVALID_TYPE) {
auto flush_type = vectorized::DataTypeFactory::instance().create_data_type(
PrimitiveType::TYPE_TINYINT, true /* is_nullable */);
ptr->ensure_root_node_type(flush_type);
}

TabletColumn flush_column = vectorized::schema_util::get_column_by_type(
flush_column = vectorized::schema_util::get_column_by_type(
ptr->get_root_type(), _tablet_column->name(),
vectorized::schema_util::ExtraInfo {
.unique_id = -1,
.parent_unique_id = _tablet_column->parent_unique_id(),
.path_info = *_tablet_column->path_info_ptr()});

int64_t none_null_value_size = ptr->get_subcolumns().get_root()->data.get_non_null_value_size();
bool need_record_none_null_value_size = (!flush_column.path_info_ptr()->get_is_typed()) &&
!flush_column.path_info_ptr()->has_nested_part();
ColumnWriterOptions opts = _opts;

// refresh opts and get writer with flush column
vectorized::schema_util::inherit_column_attributes(parent_column, flush_column);
RETURN_IF_ERROR(_create_column_writer(0, flush_column, _opts.rowset_ctx->tablet_schema,
_opts.index_file_writer, &_writer, _indexes, &opts,
none_null_value_size));
none_null_value_size, need_record_none_null_value_size));

_opts = opts;
auto olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();
int column_id = 0;
RETURN_IF_ERROR(convert_and_write_column(olap_data_convertor.get(), flush_column,
ptr->get_root_type(), _writer.get(),
ptr->get_root()->get_ptr(), ptr->rows(), column_id));

_is_finalized = true;
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco
column->set_pattern_type(PatternTypePB::MATCH_NAME_GLOB);
}
}
if (tcolumn.__isset.variant_enable_typed_paths_to_sparse) {
column->set_variant_enable_typed_paths_to_sparse(
tcolumn.variant_enable_typed_paths_to_sparse);
}
}

void TabletMeta::remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version& version) {
Expand Down
Loading
Loading