Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_
{_max_version_schema, update_schema}, _max_version_schema, final_schema,
check_column_size));
_max_version_schema = final_schema;
VLOG_DEBUG << "dump updated tablet schema: " << final_schema->dump_structure();
VLOG_DEBUG << "dump updated tablet schema: " << final_schema->dump_full_schema();
return Status::OK();
}

Expand Down
16 changes: 9 additions & 7 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,12 @@ Status BaseBetaRowsetWriter::build(RowsetSharedPtr& rowset) {
}

// update rowset meta tablet schema if tablet schema updated
if (_context.tablet_schema->num_variant_columns() > 0) {
_rowset_meta->set_tablet_schema(_context.tablet_schema);
}
auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema
: _context.tablet_schema;
_rowset_meta->set_tablet_schema(rowset_schema);

RETURN_NOT_OK_STATUS_WITH_WARN(
RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta,
&rowset),
RowsetFactory::create_rowset(rowset_schema, _context.rowset_dir, _rowset_meta, &rowset),
"rowset init failed when build new rowset");
_already_built = true;
return Status::OK();
Expand All @@ -627,14 +626,17 @@ int64_t BetaRowsetWriter::_num_seg() const {
void BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
std::lock_guard<std::mutex> lock(*(_context.schema_lock));
TabletSchemaSPtr update_schema;
if (_context.merged_tablet_schema == nullptr) {
_context.merged_tablet_schema = _context.tablet_schema;
}
static_cast<void>(vectorized::schema_util::get_least_common_schema(
{_context.tablet_schema, flush_schema}, nullptr, update_schema));
{_context.merged_tablet_schema, flush_schema}, nullptr, update_schema));
CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
<< "Rowset merge schema columns count is " << update_schema->num_columns()
<< ", but flush_schema is larger " << flush_schema->num_columns()
<< " update_schema: " << update_schema->dump_structure()
<< " flush_schema: " << flush_schema->dump_structure();
_context.tablet_schema.swap(update_schema);
_context.merged_tablet_schema.swap(update_schema);
VLOG_DEBUG << "dump rs schema: " << _context.tablet_schema->dump_structure();
}

Expand Down
16 changes: 16 additions & 0 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "olap/segment_loader.h"
#include "olap/tablet_schema.h"
#include "util/time.h"
#include "vec/common/schema_util.h"

namespace doris {

Expand Down Expand Up @@ -98,6 +99,21 @@ void Rowset::merge_rowset_meta(const RowsetMetaSharedPtr& other) {
for (auto key_bound : key_bounds) {
_rowset_meta->add_segment_key_bounds(key_bound);
}

// In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated
// Otherwise the schema is stale and lead to wrong data read
if (tablet_schema()->num_variant_columns() > 0) {
// merge extracted columns
TabletSchemaSPtr merged_schema;
static_cast<void>(vectorized::schema_util::get_least_common_schema(
{tablet_schema(), other->tablet_schema()}, nullptr, merged_schema));
if (*_schema != *merged_schema) {
_rowset_meta->set_tablet_schema(merged_schema);
}
// rowset->meta_meta()->tablet_schema() maybe updated so make sure _schema is
// consistent with rowset meta
_schema = _rowset_meta->tablet_schema();
}
}

void Rowset::clear_cache() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class Rowset : public std::enable_shared_from_this<Rowset> {

const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }

void merge_rowset_meta(const RowsetMeta& other);

bool is_pending() const { return _is_pending; }

bool is_local() const { return _rowset_meta->is_local(); }
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "olap/rowset/rowset_meta.h"

#include <gen_cpp/olap_file.pb.h>

#include <memory>

#include "common/logging.h"
#include "google/protobuf/util/message_differencer.h"
#include "io/fs/local_file_system.h"
Expand All @@ -28,6 +32,7 @@
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "olap/tablet_schema_cache.h"
#include "vec/common/schema_util.h"

namespace doris {

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ struct RowsetWriterContext {
io::FileSystemSPtr fs;
std::string rowset_dir;
TabletSchemaSPtr tablet_schema;
TabletSchemaSPtr original_tablet_schema;
// for variant schema update
TabletSchemaSPtr merged_tablet_schema;
// PREPARED/COMMITTED for pending rowset
// VISIBLE for non-pending rowset
RowsetStatePB rowset_state;
Expand Down
96 changes: 23 additions & 73 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include "vec/common/schema_util.h" // variant column
#include "vec/core/block.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"

namespace doris {
using namespace ErrorCode;
Expand All @@ -61,60 +63,38 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
if (block->rows() == 0) {
return Status::OK();
}
// Expand variant columns
vectorized::Block flush_block(*block);
TabletSchemaSPtr flush_schema;
if (_context->write_type != DataWriteType::TYPE_COMPACTION &&
_context->tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema));
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer &&
_context->tablet_schema->cluster_key_idxes().empty()) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema));
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size));
RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), flush_size));
} else {
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema));
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, flush_size));
RETURN_IF_ERROR(_flush_segment_writer(writer, nullptr, flush_size));
}
return Status::OK();
}

Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
TabletSchemaSPtr& flush_schema) {
Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
}

{
std::lock_guard<std::mutex> lock(*(_context->schema_lock));
// save original tablet schema, _context->tablet_schema maybe modified
if (_context->original_tablet_schema == nullptr) {
_context->original_tablet_schema = _context->tablet_schema;
}
}

std::vector<int> variant_column_pos;
if (_context->partial_update_info && _context->partial_update_info->is_partial_update) {
// check columns that used to do partial updates should not include variant
for (int i : _context->partial_update_info->update_cids) {
const auto& col = *_context->original_tablet_schema->columns()[i];
if (!col.is_key() && col.name() != DELETE_SIGN) {
return Status::InvalidArgument(
"Not implement partial update for variant only support delete currently");
}
}
} else {
// find positions of variant columns
for (int i = 0; i < _context->original_tablet_schema->columns().size(); ++i) {
if (_context->original_tablet_schema->columns()[i]->is_variant_type()) {
variant_column_pos.push_back(i);
}
for (int i = 0; i < block.columns(); ++i) {
const auto& entry = block.get_by_position(i);
if (vectorized::is_variant_type(remove_nullable(entry.type))) {
variant_column_pos.push_back(i);
}
}

Expand All @@ -123,37 +103,8 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
}

vectorized::schema_util::ParseContext ctx;
ctx.record_raw_json_column = _context->original_tablet_schema->store_row_column();
RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns(
block, variant_column_pos, ctx));

flush_schema = std::make_shared<TabletSchema>();
flush_schema->copy_from(*_context->original_tablet_schema);
vectorized::Block flush_block(std::move(block));
vectorized::schema_util::rebuild_schema_and_block(
_context->original_tablet_schema, variant_column_pos, flush_block, flush_schema);

{
// Update rowset schema, tablet's tablet schema will be updated when build Rowset
// Eg. flush schema: A(int), B(float), C(int), D(int)
// ctx.tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
std::lock_guard<std::mutex> lock(*(_context->schema_lock));
TabletSchemaSPtr update_schema;
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
{_context->tablet_schema, flush_schema}, nullptr, update_schema));
CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
<< "Rowset merge schema columns count is " << update_schema->num_columns()
<< ", but flush_schema is larger " << flush_schema->num_columns()
<< " update_schema: " << update_schema->dump_structure()
<< " flush_schema: " << flush_schema->dump_structure();
_context->tablet_schema.swap(update_schema);
VLOG_DEBUG << "dump rs schema: " << _context->tablet_schema->dump_structure();
}

block.swap(flush_block); // NOLINT(bugprone-use-after-move)
VLOG_DEBUG << "dump block: " << block.dump_data();
VLOG_DEBUG << "dump flush schema: " << flush_schema->dump_structure();
ctx.record_raw_json_column = _context->tablet_schema->store_row_column();
RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_column_pos, ctx));
return Status::OK();
}

Expand Down Expand Up @@ -194,8 +145,7 @@ Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWrit
}

Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int32_t segment_id, bool no_compression,
TabletSchemaSPtr flush_schema) {
int32_t segment_id, bool no_compression) {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id, file_writer));

Expand All @@ -207,10 +157,10 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
writer_options.compression_type = NO_COMPRESSION;
}

const auto& tablet_schema = flush_schema ? flush_schema : _context->tablet_schema;
writer.reset(new segment_v2::SegmentWriter(
file_writer.get(), segment_id, tablet_schema, _context->tablet, _context->data_dir,
_context->max_rows_per_segment, writer_options, _context->mow_context));
writer.reset(new segment_v2::SegmentWriter(file_writer.get(), segment_id,
_context->tablet_schema, _context->tablet,
_context->data_dir, _context->max_rows_per_segment,
writer_options, _context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.emplace(segment_id, std::move(file_writer));
Expand All @@ -226,7 +176,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen

Status SegmentFlusher::_create_segment_writer(
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t segment_id,
bool no_compression, TabletSchemaSPtr flush_schema) {
bool no_compression) {
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id, file_writer));

Expand All @@ -238,10 +188,10 @@ Status SegmentFlusher::_create_segment_writer(
writer_options.compression_type = NO_COMPRESSION;
}

const auto& tablet_schema = flush_schema ? flush_schema : _context->tablet_schema;
writer.reset(new segment_v2::VerticalSegmentWriter(
file_writer.get(), segment_id, tablet_schema, _context->tablet, _context->data_dir,
_context->max_rows_per_segment, writer_options, _context->mow_context));
file_writer.get(), segment_id, _context->tablet_schema, _context->tablet,
_context->data_dir, _context->max_rows_per_segment, writer_options,
_context->mow_context));
{
std::lock_guard<SpinLock> l(_lock);
_file_writers.emplace(segment_id, std::move(file_writer));
Expand Down
8 changes: 3 additions & 5 deletions be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,15 @@ class SegmentFlusher {
bool need_buffering();

private:
Status _expand_variant_to_subcolumns(vectorized::Block& block, TabletSchemaSPtr& flush_schema);
Status _parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
int32_t segment_id, bool no_compression = false,
TabletSchemaSPtr flush_schema = nullptr);
int32_t segment_id, bool no_compression = false);
Status _create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
int32_t segment_id, bool no_compression = false,
TabletSchemaSPtr flush_schema = nullptr);
int32_t segment_id, bool no_compression = false);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
TabletSchemaSPtr flush_schema = nullptr,
int64_t* flush_size = nullptr);
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ namespace segment_v2 {
Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
vectorized::PathInData path,
const SubcolumnColumnReaders::Node* node,
const SubcolumnColumnReaders::Node* root,
bool output_as_raw_json) {
const SubcolumnColumnReaders::Node* root) {
// None leave node need merge with root
auto* stream_iter = new HierarchicalDataReader(path, output_as_raw_json);
auto* stream_iter = new HierarchicalDataReader(path);
std::vector<const SubcolumnColumnReaders::Node*> leaves;
vectorized::PathsInData leaves_paths;
SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths);
Expand Down
29 changes: 5 additions & 24 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ using SubcolumnColumnReaders = vectorized::SubcolumnsTree<SubcolumnReader>;
// Reader for hierarchical data for variant, merge with root(sparse encoded columns)
class HierarchicalDataReader : public ColumnIterator {
public:
HierarchicalDataReader(const vectorized::PathInData& path, bool output_as_raw_json = false)
: _path(path), _output_as_raw_json(output_as_raw_json) {}
HierarchicalDataReader(const vectorized::PathInData& path) : _path(path) {}

static Status create(std::unique_ptr<ColumnIterator>* reader, vectorized::PathInData path,
const SubcolumnColumnReaders::Node* target_node,
const SubcolumnColumnReaders::Node* root, bool output_as_raw_json = false);
const SubcolumnColumnReaders::Node* root);

Status init(const ColumnIteratorOptions& opts) override;

Expand All @@ -93,7 +92,6 @@ class HierarchicalDataReader : public ColumnIterator {
std::unique_ptr<StreamReader> _root_reader;
size_t _rows_read = 0;
vectorized::PathInData _path;
bool _output_as_raw_json = false;

template <typename NodeFunction>
Status tranverse(NodeFunction&& node_func) {
Expand Down Expand Up @@ -154,26 +152,9 @@ class HierarchicalDataReader : public ColumnIterator {
return Status::OK();
}));

if (_output_as_raw_json) {
auto col_to = vectorized::ColumnString::create();
col_to->reserve(nrows * 2);
vectorized::VectorBufferWriter write_buffer(*col_to.get());
auto type = std::make_shared<vectorized::DataTypeObject>();
for (size_t i = 0; i < nrows; ++i) {
type->to_string(container_variant, i, write_buffer);
write_buffer.commit();
}
if (variant.empty()) {
variant.create_root(std::make_shared<vectorized::DataTypeString>(),
std::move(col_to));
} else {
variant.get_root()->insert_range_from(*col_to, 0, col_to->size());
}
} else {
// TODO select v:b -> v.b / v.b.c but v.d maybe in v
// copy container variant to dst variant, todo avoid copy
variant.insert_range_from(container_variant, 0, nrows);
}
// TODO select v:b -> v.b / v.b.c but v.d maybe in v
// copy container variant to dst variant, todo avoid copy
variant.insert_range_from(container_variant, 0, nrows);

// variant.set_num_rows(nrows);
_rows_read += nrows;
Expand Down
Loading