Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0d8ce83
[feature-dynamic-table](variant) add variant type
eldenmoon Sep 5, 2022
61fd41d
[feature-dynamic-table](column-object) Add a new ColumnObject and it'…
eldenmoon Sep 5, 2022
f2ee7cc
[feature-dynamic-table](column-object) support parse json documents t…
eldenmoon Sep 5, 2022
aed6429
[feature-dynamic-table](storage-engine) adapt storage engine to suppo…
eldenmoon Sep 5, 2022
56190cc
[feature-dynamic-table](column) support get_indices_of_non_default_ro…
eldenmoon Sep 5, 2022
903150a
[feature-dynamic-table](syntax) support dynamic table syntax
xiaokang May 23, 2022
eb01724
[feature-dynamic-table](syntax) support column which contains '.' lik…
eldenmoon Sep 5, 2022
5239393
[feature-dynamic-table](schemachange) support addColumns rpc and adap…
eldenmoon Sep 6, 2022
d3e5e29
[feature-dynamic-table](vtablet_sink) support sink block with extende…
eldenmoon Sep 6, 2022
766a4e5
[feature-dynamic-table](vec/block) add names to MutableBlock
eldenmoon Sep 6, 2022
a61ba90
[feature-dynamic-table](load) support dynamic table load
eldenmoon Sep 6, 2022
6a362cc
[]ture-dynamic-table](regression-test) add regression-test
eldenmoon Sep 7, 2022
f7037c8
[Fix](dynamic-table) update tablet schema during load and add more te…
eldenmoon Oct 13, 2022
98de1ba
[fix] fix regression case (#894)
Tanya-W Oct 19, 2022
956eb17
[fix](dynamic-table) fix dynamic table in cloud mode
eldenmoon Nov 3, 2022
9bff039
[feature-opt](dynamic-table) support `show table` displays `dynamic_s…
eldenmoon Nov 3, 2022
cf042b3
[feature-dynamic-table](load) support dynamic table s3load
eldenmoon Nov 10, 2022
5cd5c28
[regression-test](dynamic-table) fix dynamic table cases
eldenmoon Nov 30, 2022
50bc05a
(improvement)[dynamic-table] support load in new_load_scan_node (#1331)
eldenmoon Jan 11, 2023
aa011d7
[compile](dynamic-table) fix compile in clang (#1333)
eldenmoon Jan 11, 2023
c24163a
[improvement](dynamic-table) refine some logic and add more cases (#1…
eldenmoon Jan 18, 2023
4bcabc4
fix compile and format
eldenmoon Feb 1, 2023
f1b35a1
rename object_util to schema_util
eldenmoon Feb 1, 2023
d11e928
format
eldenmoon Feb 1, 2023
9a4c789
remove regression case
eldenmoon Feb 1, 2023
22d17bb
fix license
eldenmoon Feb 1, 2023
156cf7d
format
eldenmoon Feb 1, 2023
f9a4bb7
fix FeNameFormatTest
eldenmoon Feb 1, 2023
5c07d34
modified by comments
eldenmoon Feb 7, 2023
37d7134
rebase
eldenmoon Feb 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,8 @@ CONF_String(inverted_index_dict_path, "${DORIS_HOME}/dict");
CONF_Int32(max_depth_in_bkd_tree, "32");
// use num_broadcast_buffer blocks as buffer to do broadcast
CONF_Int32(num_broadcast_buffer, "32");
// semi-structure configs
CONF_Bool(enable_parse_multi_dimession_array, "true");
#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types";
const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__";
const std::string ROWID_COL = "__DORIS_ROWID_COL__";
const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";

constexpr int MAX_DECIMAL32_PRECISION = 9;
constexpr int MAX_DECIMAL64_PRECISION = 18;
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class Status {
ErrorCode::TEST_FILE_ERROR == _code || ErrorCode::ROWBLOCK_READ_INFO_ERROR == _code;
}

bool is_invalid_argument() const { return ErrorCode::INVALID_ARGUMENT == _code; }

bool is_not_found() const { return _code == ErrorCode::NOT_FOUND; }

// Convert into TStatus. Call this if 'status_container' contains an optional
Expand Down
37 changes: 37 additions & 0 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <fmt/format.h>

#include "common/consts.h"
#include "common/utils.h"
#include "exec/exec_node.h"
#include "runtime/descriptors.h"
Expand Down Expand Up @@ -51,6 +52,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
_scanner_eof(false) {}

Status BaseScanner::open() {
_full_base_schema_view.reset(new vectorized::schema_util::FullBaseSchemaView);
RETURN_IF_ERROR(init_expr_ctxes());
if (_params.__isset.strict_mode) {
_strict_mode = _params.strict_mode;
Expand Down Expand Up @@ -102,6 +104,11 @@ Status BaseScanner::init_expr_ctxes() {
return Status::InternalError("Unknown source slot descriptor, slot_id={}", slot_id);
}
_src_slot_descs.emplace_back(it->second);

if (it->second->type().is_variant_type() &&
it->second->col_name() == BeConsts::DYNAMIC_COLUMN_NAME) {
_is_dynamic_schema = true;
}
}
_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
std::vector<TupleId>({_params.src_tuple_id}),
Expand Down Expand Up @@ -156,6 +163,11 @@ Status BaseScanner::init_expr_ctxes() {
}
}
}
if (_dest_tuple_desc->table_desc()) {
_full_base_schema_view->db_name = _dest_tuple_desc->table_desc()->database();
_full_base_schema_view->table_name = _dest_tuple_desc->table_desc()->name();
_full_base_schema_view->table_id = _dest_tuple_desc->table_desc()->table_id();
}
return Status::OK();
}

Expand All @@ -181,6 +193,9 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
if (!slot_desc->is_materialized()) {
continue;
}
if (slot_desc->type().is_variant_type()) {
continue;
}
int dest_index = ctx_idx++;

auto* ctx = _dest_vexpr_ctx[dest_index];
Expand Down Expand Up @@ -250,6 +265,28 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}

// handle dynamic generated columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does new scanners still use BaseScanner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no new load scan will use vfile_scanner

if (!_full_base_schema_view->empty()) {
assert(_is_dynamic_schema);
for (size_t x = dest_block->columns(); x < _src_block.columns(); ++x) {
auto& column_type_name = _src_block.get_by_position(x);
const TColumn& tcolumn =
_full_base_schema_view->column_name_to_column[column_type_name.name];
auto original_type = vectorized::DataTypeFactory::instance().create_data_type(tcolumn);
// type conflict free path, always cast to original type
if (!column_type_name.type->equals(*original_type)) {
vectorized::ColumnPtr column_ptr;
RETURN_IF_ERROR(vectorized::schema_util::cast_column(column_type_name,
original_type, &column_ptr));
column_type_name.column = column_ptr;
column_type_name.type = original_type;
}
dest_block->insert(vectorized::ColumnWithTypeAndName(std::move(column_type_name.column),
std::move(column_type_name.type),
column_type_name.name));
}
}

// after do the dest block insert operation, clear _src_block to remove the reference of origin column
if (_src_block_mem_reuse) {
_src_block.clear_column_data(origin_column_num);
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "common/status.h"
#include "util/runtime_profile.h"
#include "vec/common/schema_util.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

Expand Down Expand Up @@ -65,6 +66,8 @@ class BaseScanner {
// Close this scanner
virtual void close() = 0;

bool is_dynamic_schema() const { return _is_dynamic_schema; }

protected:
Status _fill_dest_block(vectorized::Block* dest_block, bool* eof);
virtual Status _init_src_block();
Expand Down Expand Up @@ -124,6 +127,10 @@ class BaseScanner {
// slot_ids for parquet predicate push down are in tuple desc
TupleId _tupleId = -1;

bool _is_dynamic_schema = false;
// for tracing dynamic schema
std::unique_ptr<vectorized::schema_util::FullBaseSchemaView> _full_base_schema_view;

private:
Status _filter_src_block();
void _fill_columns_from_path();
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_db_id = tschema.db_id;
_table_id = tschema.table_id;
_version = tschema.version;
_is_dynamic_schema = tschema.is_dynamic_schema;
std::map<std::string, SlotDescriptor*> slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
for (auto& t_slot_desc : tschema.slot_descs) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class OlapTableSchemaParam {
return _proto_schema;
}

bool is_dynamic_schema() const { return _is_dynamic_schema; }

std::string debug_string() const;

private:
Expand All @@ -81,6 +83,7 @@ class OlapTableSchemaParam {
mutable POlapTableSchemaParam* _proto_schema = nullptr;
std::vector<OlapTableIndexSchema*> _indexes;
mutable ObjectPool _obj_pool;
bool _is_dynamic_schema = false;
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand All @@ -90,6 +93,7 @@ using OlapTableIndexTablets = TOlapTableIndexTablets;
// }

using BlockRow = std::pair<vectorized::Block*, int32_t>;
using VecBlock = vectorized::Block;

struct VOlapTablePartition {
int64_t id = 0;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Status DeltaWriter::init() {
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = _tablet->table_id();
context.is_direct_write = true;
context.tablet = _tablet;
RETURN_NOT_OK(_tablet->create_rowset_writer(context, &_rowset_writer));
_schema.reset(new Schema(_tablet_schema));
_reset_mem_table();
Expand Down Expand Up @@ -457,6 +458,8 @@ void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) {
_tablet->update_max_version_schema(_tablet_schema);
}

_tablet_schema->set_table_id(table_schema_param->table_id());
}

void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
Expand Down
15 changes: 14 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,

void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs) {
SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
auto target_block = input_block->copy_block(_column_offset);
vectorized::Block target_block = *input_block;
if (!_tablet_schema->is_dynamic_schema()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment for why not copy_block for dynamic schema

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// This insert may belong to a rollup tablet, rollup columns is a subset of base table
// but for dynamic table, it's need full columns, so input_block should ignore _column_offset
// of each column and avoid copy_block
target_block = input_block->copy_block(_column_offset);
}
if (_is_first_insertion) {
_is_first_insertion = false;
auto cloneBlock = target_block.clone_without_columns();
Expand All @@ -159,6 +165,13 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
if (_keys_type != KeysType::DUP_KEYS) {
_init_agg_functions(&target_block);
}
if (_tablet_schema->is_dynamic_schema()) {
// Set _input_mutable_block to dynamic since
// input blocks may be structure-variable(dyanmic)
// this will align _input_mutable_block with
// input_block and auto extends columns
_input_mutable_block.set_block_type(vectorized::BlockType::DYNAMIC);
}
}

auto num_rows = row_idxs.size();
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ enum FieldType {
OLAP_FIELD_TYPE_DECIMAL64 = 32,
OLAP_FIELD_TYPE_DECIMAL128I = 33,
OLAP_FIELD_TYPE_JSONB = 34,
OLAP_FIELD_TYPE_VARIANT = 35
};

// Define all aggregation methods supported by Field
Expand Down
39 changes: 30 additions & 9 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
#include "vec/jsonb/serialize.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -98,6 +100,8 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
}
_rowset_meta->set_tablet_uid(_context.tablet_uid);
_rowset_meta->set_tablet_schema(_context.tablet_schema);
_context.schema_change_recorder =
std::make_shared<vectorized::schema_util::LocalSchemaChangeRecorder>();

return Status::OK();
}
Expand All @@ -107,7 +111,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
return Status::OK();
}
if (UNLIKELY(_segment_writer == nullptr)) {
RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
RETURN_NOT_OK(_create_segment_writer(&_segment_writer, block));
}
return _add_block(block, &_segment_writer);
}
Expand Down Expand Up @@ -489,7 +493,7 @@ bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status status = Status::OK();
if (!config::enable_segcompaction || !config::enable_storage_vectorization ||
!_check_and_set_is_doing_segcompaction()) {
_context.tablet_schema->is_dynamic_schema() || !_check_and_set_is_doing_segcompaction()) {
return status;
}
if (_segcompaction_status.load() != OK) {
Expand Down Expand Up @@ -557,7 +561,7 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
if (UNLIKELY(max_row_add < 1)) {
// no space for another signle row, need flush now
RETURN_NOT_OK(_flush_segment_writer(segment_writer));
RETURN_NOT_OK(_create_segment_writer(segment_writer));
RETURN_NOT_OK(_create_segment_writer(segment_writer, block));
max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
DCHECK(max_row_add > 0);
}
Expand Down Expand Up @@ -621,7 +625,7 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, i
}
RETURN_NOT_OK(_segcompaction_if_necessary());
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_NOT_OK(_create_segment_writer(&writer));
RETURN_NOT_OK(_create_segment_writer(&writer, block));
RETURN_NOT_OK(_add_block(block, &writer));
RETURN_NOT_OK(_flush_segment_writer(&writer, flush_size));
return Status::OK();
Expand Down Expand Up @@ -700,6 +704,23 @@ RowsetSharedPtr BetaRowsetWriter::build() {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}

// schema changed during this load
if (_context.schema_change_recorder->has_extended_columns()) {
DCHECK(_context.tablet_schema->is_dynamic_schema())
<< "Load can change local schema only in dynamic table";
TabletSchemaSPtr new_schema = std::make_shared<TabletSchema>();
new_schema->copy_from(*_context.tablet_schema);
for (auto const& [_, col] : _context.schema_change_recorder->copy_extended_columns()) {
new_schema->append_column(col);
}
new_schema->set_schema_version(_context.schema_change_recorder->schema_version());
if (_context.schema_change_recorder->schema_version() >
_context.tablet_schema->schema_version()) {
_context.tablet->update_max_version_schema(new_schema);
}
_rowset_meta->set_tablet_schema(new_schema);
}

RowsetSharedPtr rowset;
status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta,
&rowset);
Expand Down Expand Up @@ -801,7 +822,7 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() {

Status BetaRowsetWriter::_do_create_segment_writer(
std::unique_ptr<segment_v2::SegmentWriter>* writer, bool is_segcompaction, int64_t begin,
int64_t end) {
int64_t end, const vectorized::Block* block) {
std::string path;
int32_t segment_id = 0;
if (is_segcompaction) {
Expand Down Expand Up @@ -847,7 +868,7 @@ Status BetaRowsetWriter::_do_create_segment_writer(
}
}

auto s = (*writer)->init();
auto s = (*writer)->init(block);
if (!s.ok()) {
LOG(WARNING) << "failed to init segment writer: " << s.to_string();
writer->reset(nullptr);
Expand All @@ -856,8 +877,8 @@ Status BetaRowsetWriter::_do_create_segment_writer(
return Status::OK();
}

Status BetaRowsetWriter::_create_segment_writer(
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
const vectorized::Block* block) {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
LOG(WARNING) << "too many segments in rowset."
Expand All @@ -868,7 +889,7 @@ Status BetaRowsetWriter::_create_segment_writer(
<< " _num_segcompacted:" << _num_segcompacted;
return Status::Error<TOO_MANY_SEGMENTS>();
} else {
return _do_create_segment_writer(writer, false, -1, -1);
return _do_create_segment_writer(writer, false, -1, -1, block);
}
}

Expand Down
14 changes: 12 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class FileWriter;

using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
namespace vectorized::schema_util {
class LocalSchemaChangeRecorder;
}

class BetaRowsetWriter : public RowsetWriter {
public:
Expand Down Expand Up @@ -80,15 +83,22 @@ class BetaRowsetWriter : public RowsetWriter {

int32_t get_atomic_num_segment() const override { return _num_segment.load(); }

// Maybe modified by local schema change
vectorized::schema_util::LocalSchemaChangeRecorder* mutable_schema_change_recorder() {
return _context.schema_change_recorder.get();
}

private:
Status _add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* writer);
Status _add_block_for_segcompaction(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* writer);

Status _do_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
bool is_segcompaction, int64_t begin, int64_t end);
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);
bool is_segcompaction, int64_t begin, int64_t end,
const vectorized::Block* block = nullptr);
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
const vectorized::Block* block = nullptr);
Status _create_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t begin, uint64_t end);

Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ class RowsetMeta {
int64_t newest_write_timestamp() const { return _rowset_meta_pb.newest_write_timestamp(); }

void set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
DCHECK(_schema == nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete DCHECK?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_tablet_schema could override original _schema

_schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
}

Expand Down
Loading