Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
}

// update rowset meta tablet schema if tablet schema updated
if (_context.tablet_schema->num_variant_columns() > 0) {
_rowset_meta->set_tablet_schema(_context.tablet_schema);
}
auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema
: _context.tablet_schema;
_rowset_meta->set_tablet_schema(rowset_schema);

if (_rowset_meta->newest_write_timestamp() == -1) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
Expand All @@ -115,10 +115,9 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}

RETURN_NOT_OK_STATUS_WITH_WARN(
RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
&rowset),
"rowset init failed when build new rowset");
RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path,
_rowset_meta, &rowset),
"rowset init failed when build new rowset");
_already_built = true;
return Status::OK();
}
Expand Down
12 changes: 9 additions & 3 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ TabletSchemaSPtr BaseTablet::tablet_schema_with_merged_max_schema_version(
[](const RowsetMetaSharedPtr& rs_meta) { return rs_meta->tablet_schema(); });
static_cast<void>(
vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema));
VLOG_DEBUG << "dump schema: " << target_schema->dump_structure();
VLOG_DEBUG << "dump schema: " << target_schema->dump_full_schema();
}
return target_schema;
}
Expand Down Expand Up @@ -625,6 +625,13 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
(std::find(including_cids.cbegin(), including_cids.cend(),
rowset_schema->sequence_col_idx()) != including_cids.cend());
}
if (rowset_schema->num_variant_columns() > 0) {
// During partial updates, the extracted columns of a variant should not be included in the rowset schema.
// This is because the partial update for a variant needs to ignore the extracted columns.
// Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
// the complete variant is constructed by reading all the sub-columns of the variant.
rowset_schema = rowset_schema->copy_without_variant_extracted_columns();
}
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
Expand Down Expand Up @@ -1233,15 +1240,14 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf
RowsetSharedPtr transient_rowset;
RETURN_IF_ERROR(transient_rs_writer->build(transient_rowset));
auto old_segments = rowset->num_segments();
rowset->rowset_meta()->merge_rowset_meta(*transient_rowset->rowset_meta());
rowset->merge_rowset_meta(*transient_rowset->rowset_meta());
auto new_segments = rowset->num_segments();
ss << ", partial update flush rowset (old segment num: " << old_segments
<< ", new segment num: " << new_segments << ")"
<< ", cost:" << watch.get_elapse_time_us() - t4 << "(us)";

// update the shared_ptr to new bitmap, which is consistent with current rowset.
txn_info->delete_bitmap = delete_bitmap;

// erase segment cache cause we will add a segment to rowset
SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments());
}
Expand Down
13 changes: 6 additions & 7 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,14 +695,13 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
}

// update rowset meta tablet schema if tablet schema updated
if (_context.tablet_schema->num_variant_columns() > 0) {
_rowset_meta->set_tablet_schema(_context.tablet_schema);
}
auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema
: _context.tablet_schema;
_rowset_meta->set_tablet_schema(rowset_schema);

RETURN_NOT_OK_STATUS_WITH_WARN(
RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
&rowset),
"rowset init failed when build new rowset");
RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path,
_rowset_meta, &rowset),
"rowset init failed when build new rowset");
_already_built = true;
return Status::OK();
}
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,11 @@ Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) {
return Status::OK();
}

void Rowset::merge_rowset_meta(const RowsetMeta& other) {
_rowset_meta->merge_rowset_meta(other);
// rowset->meta_meta()->tablet_schema() maybe updated so make sure _schema is
// consistent with rowset meta
_schema = _rowset_meta->tablet_schema();
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class Rowset : public std::enable_shared_from_this<Rowset> {

const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }

void merge_rowset_meta(const RowsetMeta& other);

bool is_pending() const { return _is_pending; }

bool is_local() const { return _rowset_meta->is_local(); }
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <gen_cpp/olap_file.pb.h>

#include <memory>

#include "common/logging.h"
#include "google/protobuf/util/message_differencer.h"
#include "io/fs/file_writer.h"
Expand All @@ -31,6 +33,7 @@
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "olap/tablet_schema_cache.h"
#include "vec/common/schema_util.h"

namespace doris {

Expand Down Expand Up @@ -230,6 +233,17 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
_rowset_meta_pb.add_segments_file_size(fsize);
}
}
// In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated
// Otherwise the schema is stale and lead to wrong data read
if (tablet_schema()->num_variant_columns() > 0) {
// merge extracted columns
TabletSchemaSPtr merged_schema;
static_cast<void>(vectorized::schema_util::get_least_common_schema(
{tablet_schema(), other.tablet_schema()}, nullptr, merged_schema));
if (*_schema != *merged_schema) {
set_tablet_schema(merged_schema);
}
}
if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) {
set_rowset_state(RowsetStatePB::COMMITTED);
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ struct RowsetWriterContext {
RowsetTypePB rowset_type {BETA_ROWSET};

TabletSchemaSPtr tablet_schema;
TabletSchemaSPtr original_tablet_schema;
// for variant schema update
TabletSchemaSPtr merged_tablet_schema;
// PREPARED/COMMITTED for pending rowset
// VISIBLE for non-pending rowset
RowsetStatePB rowset_state {PREPARED};
Expand Down
92 changes: 20 additions & 72 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include "vec/common/schema_util.h" // variant column
#include "vec/core/block.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"

namespace doris {
using namespace ErrorCode;
Expand All @@ -58,60 +60,38 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
if (block->rows() == 0) {
return Status::OK();
}
// Expand variant columns
vectorized::Block flush_block(*block);
TabletSchemaSPtr flush_schema;
if (_context.write_type != DataWriteType::TYPE_COMPACTION &&
_context.tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema));
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer &&
_context.tablet_schema->cluster_key_idxes().empty()) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema));
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size));
RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), flush_size));
} else {
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema));
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size));
RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), flush_size));
}
return Status::OK();
}

Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
TabletSchemaSPtr& flush_schema) {
Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
}

{
std::lock_guard<std::mutex> lock(*(_context.schema_lock));
// save original tablet schema, _context->tablet_schema maybe modified
if (_context.original_tablet_schema == nullptr) {
_context.original_tablet_schema = _context.tablet_schema;
}
}

std::vector<int> variant_column_pos;
if (_context.partial_update_info && _context.partial_update_info->is_partial_update) {
// check columns that used to do partial updates should not include variant
for (int i : _context.partial_update_info->update_cids) {
const auto& col = *_context.original_tablet_schema->columns()[i];
if (!col.is_key() && col.name() != DELETE_SIGN) {
return Status::InvalidArgument(
"Not implement partial update for variant only support delete currently");
}
}
} else {
// find positions of variant columns
for (int i = 0; i < _context.original_tablet_schema->columns().size(); ++i) {
if (_context.original_tablet_schema->columns()[i]->is_variant_type()) {
variant_column_pos.push_back(i);
}
for (int i = 0; i < block.columns(); ++i) {
const auto& entry = block.get_by_position(i);
if (vectorized::is_variant_type(remove_nullable(entry.type))) {
variant_column_pos.push_back(i);
}
}

Expand All @@ -120,37 +100,8 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
}

vectorized::schema_util::ParseContext ctx;
ctx.record_raw_json_column = _context.original_tablet_schema->store_row_column();
RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns(
block, variant_column_pos, ctx));

flush_schema = std::make_shared<TabletSchema>();
flush_schema->copy_from(*_context.original_tablet_schema);
vectorized::Block flush_block(std::move(block));
vectorized::schema_util::rebuild_schema_and_block(
_context.original_tablet_schema, variant_column_pos, flush_block, flush_schema);

{
// Update rowset schema, tablet's tablet schema will be updated when build Rowset
// Eg. flush schema: A(int), B(float), C(int), D(int)
// ctx.tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
std::lock_guard<std::mutex> lock(*(_context.schema_lock));
TabletSchemaSPtr update_schema;
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
{_context.tablet_schema, flush_schema}, nullptr, update_schema));
CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
<< "Rowset merge schema columns count is " << update_schema->num_columns()
<< ", but flush_schema is larger " << flush_schema->num_columns()
<< " update_schema: " << update_schema->dump_structure()
<< " flush_schema: " << flush_schema->dump_structure();
_context.tablet_schema.swap(update_schema);
VLOG_DEBUG << "dump rs schema: " << _context.tablet_schema->dump_structure();
}

block.swap(flush_block); // NOLINT(bugprone-use-after-move)
VLOG_DEBUG << "dump block: " << block.dump_data();
VLOG_DEBUG << "dump flush schema: " << flush_schema->dump_structure();
ctx.record_raw_json_column = _context.tablet_schema->store_row_column();
RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_column_pos, ctx));
return Status::OK();
}

Expand Down Expand Up @@ -182,8 +133,7 @@ Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWrit
}

Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int32_t segment_id, bool no_compression,
TabletSchemaSPtr flush_schema) {
int32_t segment_id, bool no_compression) {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer));

Expand All @@ -195,10 +145,9 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
writer_options.compression_type = NO_COMPRESSION;
}

const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema;
writer = std::make_unique<segment_v2::SegmentWriter>(
file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir,
_context.max_rows_per_segment, writer_options, _context.mow_context);
file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
_context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context);
RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer)));
auto s = writer->init();
if (!s.ok()) {
Expand All @@ -211,7 +160,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen

Status SegmentFlusher::_create_segment_writer(
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t segment_id,
bool no_compression, TabletSchemaSPtr flush_schema) {
bool no_compression) {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer));

Expand All @@ -223,10 +172,9 @@ Status SegmentFlusher::_create_segment_writer(
writer_options.compression_type = NO_COMPRESSION;
}

const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema;
writer = std::make_unique<segment_v2::VerticalSegmentWriter>(
file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir,
_context.max_rows_per_segment, writer_options, _context.mow_context);
file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
_context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context);
RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer)));
auto s = writer->init();
if (!s.ok()) {
Expand Down
8 changes: 3 additions & 5 deletions be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,15 @@ class SegmentFlusher {
bool need_buffering();

private:
Status _expand_variant_to_subcolumns(vectorized::Block& block, TabletSchemaSPtr& flush_schema);
Status _parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int32_t segment_id, bool no_compression = false,
TabletSchemaSPtr flush_schema = nullptr);
int32_t segment_id, bool no_compression = false);
Status _create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
int32_t segment_id, bool no_compression = false,
TabletSchemaSPtr flush_schema = nullptr);
int32_t segment_id, bool no_compression = false);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
TabletSchemaSPtr flush_schema = nullptr,
int64_t* flush_size = nullptr);
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ namespace segment_v2 {
Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
vectorized::PathInData path,
const SubcolumnColumnReaders::Node* node,
const SubcolumnColumnReaders::Node* root,
bool output_as_raw_json) {
const SubcolumnColumnReaders::Node* root) {
// None leave node need merge with root
auto* stream_iter = new HierarchicalDataReader(path, output_as_raw_json);
auto* stream_iter = new HierarchicalDataReader(path);
std::vector<const SubcolumnColumnReaders::Node*> leaves;
vectorized::PathsInData leaves_paths;
SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths);
Expand Down
Loading