Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
// this structure and fill with Subcolumns sub items
mutable std::shared_ptr<rapidjson::Document> doc_structure;

// column with raw json strings
// used for quickly row store encoding
ColumnPtr rowstore_column;

using SubColumnWithName = std::pair<PathInData, const Subcolumn*>;
// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
mutable std::vector<SubColumnWithName> _prev_positions;
Expand All @@ -259,10 +255,6 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
return subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable();
}

void set_rowstore_column(ColumnPtr col) { rowstore_column = col; }

ColumnPtr get_rowstore_column() const { return rowstore_column; }

Status serialize_one_row_to_string(int row, std::string* output) const;

Status serialize_one_row_to_string(int row, BufferWritable& output) const;
Expand Down
135 changes: 5 additions & 130 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,36 +492,8 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
var.assume_mutable_ref().finalize();

MutableColumnPtr variant_column;
bool record_raw_string_with_serialization = false;
// set
auto encode_rowstore = [&]() {
if (!ctx.record_raw_json_column) {
return Status::OK();
}
auto* var = static_cast<vectorized::ColumnObject*>(variant_column.get());
if (record_raw_string_with_serialization) {
// encode to raw json column
auto raw_column = vectorized::ColumnString::create();
for (size_t i = 0; i < var->rows(); ++i) {
std::string raw_str;
RETURN_IF_ERROR(var->serialize_one_row_to_string(i, &raw_str));
raw_column->insert_data(raw_str.c_str(), raw_str.size());
}
var->set_rowstore_column(raw_column->get_ptr());
} else {
// use original input json column
auto original_var_root = vectorized::check_and_get_column<vectorized::ColumnObject>(
remove_nullable(column_ref).get())
->get_root();
var->set_rowstore_column(original_var_root);
}
return Status::OK();
};

if (!var.is_scalar_variant()) {
variant_column = var.assume_mutable();
record_raw_string_with_serialization = true;
RETURN_IF_ERROR(encode_rowstore());
// already parsed
continue;
}
Expand Down Expand Up @@ -558,8 +530,6 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
result = ColumnNullable::create(result, null_map);
}
block.get_by_position(variant_pos[i]).column = result;
RETURN_IF_ERROR(encode_rowstore());
// block.get_by_position(variant_pos[i]).type = std::make_shared<DataTypeObject>("json", true);
}
return Status::OK();
}
Expand Down Expand Up @@ -600,35 +570,6 @@ Status encode_variant_sparse_subcolumns(ColumnObject& column) {
return Status::OK();
}

static void _append_column(const TabletColumn& parent_variant,
const ColumnObject::Subcolumns::NodePtr& subcolumn,
TabletSchemaSPtr& to_append, bool is_sparse) {
// If column already exist in original tablet schema, then we pick common type
// and cast column to common type, and modify tablet column to common type,
// otherwise it's a new column
CHECK(to_append.use_count() == 1);
const std::string& column_name =
parent_variant.name_lower_case() + "." + subcolumn->path.get_path();
const vectorized::DataTypePtr& final_data_type_from_object =
subcolumn->data.get_least_common_type();
vectorized::PathInDataBuilder full_path_builder;
auto full_path = full_path_builder.append(parent_variant.name_lower_case(), false)
.append(subcolumn->path.get_parts(), false)
.build();
TabletColumn tablet_column = vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
vectorized::schema_util::ExtraInfo {.unique_id = -1,
.parent_unique_id = parent_variant.unique_id(),
.path_info = full_path});

if (!is_sparse) {
to_append->append_column(std::move(tablet_column));
} else {
to_append->mutable_column_by_uid(parent_variant.unique_id())
.append_sparse_column(std::move(tablet_column));
}
}

// sort by paths in lexicographical order
vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
const vectorized::ColumnObject::Subcolumns& subcolumns) {
Expand All @@ -640,70 +581,12 @@ vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
return sorted;
}

void rebuild_schema_and_block(const TabletSchemaSPtr& original,
const std::vector<int>& variant_positions, Block& flush_block,
TabletSchemaSPtr& flush_schema) {
// rebuild schema and block with variant extracted columns

// 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema
// those columns are extracted columns, leave none extracted columns remain in original variant column, which is
// JSONB format at present.
// 2. Collect columns that need to be added or modified when data type changes or new columns encountered
for (size_t variant_pos : variant_positions) {
auto column_ref = flush_block.get_by_position(variant_pos).column;
bool is_nullable = column_ref->is_nullable();
const vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>(
remove_nullable(column_ref)->assume_mutable_ref());
const TabletColumn& parent_column = *original->columns()[variant_pos];
CHECK(object_column.is_finalized());
std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
// common extracted columns
for (const auto& entry : get_sorted_subcolumns(object_column.get_subcolumns())) {
if (entry->path.empty()) {
// root
root = entry;
continue;
}
_append_column(parent_column, entry, flush_schema, false);
const std::string& column_name =
parent_column.name_lower_case() + "." + entry->path.get_path();
flush_block.insert({entry->data.get_finalized_column_ptr()->get_ptr(),
entry->data.get_least_common_type(), column_name});
}

// add sparse columns to flush_schema
for (const auto& entry : get_sorted_subcolumns(object_column.get_sparse_subcolumns())) {
_append_column(parent_column, entry, flush_schema, true);
}

// Create new variant column and set root column
auto obj = vectorized::ColumnObject::create(true, false);
// '{}' indicates a root path
static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
{}, root->data.get_finalized_column_ptr()->assume_mutable(),
root->data.get_least_common_type());
// // set for rowstore
if (original->has_row_store_for_all_columns()) {
static_cast<vectorized::ColumnObject*>(obj.get())->set_rowstore_column(
object_column.get_rowstore_column());
}
vectorized::ColumnPtr result = obj->get_ptr();
if (is_nullable) {
const auto& null_map = assert_cast<const vectorized::ColumnNullable&>(*column_ref)
.get_null_map_column_ptr();
result = vectorized::ColumnNullable::create(result, null_map);
}
flush_block.get_by_position(variant_pos).column = result;
vectorized::PathInDataBuilder full_root_path_builder;
auto full_root_path =
full_root_path_builder.append(parent_column.name_lower_case(), false).build();
TabletColumn new_col = flush_schema->column(variant_pos);
new_col.set_path_info(full_root_path);
flush_schema->replace_column(variant_pos, new_col);
VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
}
// ---------------------------

vectorized::schema_util::inherit_column_attributes(flush_schema);
std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
Block tmp;
tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
return tmp.dump_data(0, tmp.rows());
}

// ---------------------------
Expand Down Expand Up @@ -734,13 +617,5 @@ Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst)
->assume_mutable();
return Status::OK();
}
// ---------------------------

std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
Block tmp;
tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
return tmp.dump_data(0, tmp.rows());
}
// ---------------------------

} // namespace doris::vectorized::schema_util
8 changes: 0 additions & 8 deletions be/src/vec/common/schema_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,6 @@ void inherit_column_attributes(const TabletColumn& source, TabletColumn& target,
vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
const vectorized::ColumnObject::Subcolumns& subcolumns);

// Rebuild schema from original schema by extend dynamic columns generated from ColumnObject.
// Block consists of two parts, dynamic part of columns and static part of columns.
// static extracted
// | --------- | ----------- |
// The static ones are original tablet_schame columns
void rebuild_schema_and_block(const TabletSchemaSPtr& original, const std::vector<int>& variant_pos,
Block& flush_block, TabletSchemaSPtr& flush_schema);

// Extract json data from source with path
Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst);

Expand Down
10 changes: 6 additions & 4 deletions be/src/vec/data_types/serde/data_type_object_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,14 @@ void DataTypeObjectSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr
const_cast<ColumnObject&>(variant).finalize();
}
result.writeKey(col_id);
std::string value_str;
if (!variant.serialize_one_row_to_string(row_num, &value_str)) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}",
variant.dump_structure());
}
JsonbParser json_parser;
CHECK(variant.get_rowstore_column() != nullptr);
// use original document
const auto& data_ref = variant.get_rowstore_column()->get_data_at(row_num);
// encode as jsonb
bool succ = json_parser.parse(data_ref.data, data_ref.size);
bool succ = json_parser.parse(value_str.data(), value_str.size());
// maybe more graceful, it is ok to check here since data could be parsed
CHECK(succ);
result.writeStartBinary();
Expand Down
4 changes: 2 additions & 2 deletions regression-test/data/variant_p0/delete_update.out
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

-- !sql --
2 {"updated_value":123} {"updated_value":123}
6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123}
6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123}
7 {"updated_value":1111} yyy

-- !sql --
2 {"updated_value":123} {"updated_value":123}
6 {"a":4,"b":[4],"c":4.0} {"updated_value" : 123}
6 {"a":4,"b":[4],"c":4.1} {"updated_value" : 123}

-- !sql --
1 "ddddddddddd" 1111 199 10 {"new_data1":1}
Expand Down
8 changes: 4 additions & 4 deletions regression-test/suites/variant_p0/delete_update.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ suite("regression_test_variant_delete_and_update", "variant_type"){
)
UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true");
properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true");
"""
sql "insert into var_delete_update_mow select k, cast(v as string), cast(v as string) from var_delete_update"
sql "delete from ${table_name} where k = 1"
sql "delete from ${table_name} where k in (select k from var_delete_update_mow where k in (3, 4, 5))"

sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.0}', 'xxx')"""
sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.0}', 'yyy')"""
sql """insert into var_delete_update_mow values (6, '{"a":4,"b":[4],"c":4.1}', 'xxx')"""
sql """insert into var_delete_update_mow values (7, '{"a":4,"b":[4],"c":4.1}', 'yyy')"""
sql """update var_delete_update_mow set vs = '{"updated_value" : 123}' where k = 6"""
sql """update var_delete_update_mow set v = '{"updated_value":1111}' where k = 7"""
qt_sql "select * from var_delete_update_mow order by k"
Expand Down Expand Up @@ -108,7 +108,7 @@ suite("regression_test_variant_delete_and_update", "variant_type"){
`dft` int(11) DEFAULT "4321",
`var` variant NULL)
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true")
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "disable_auto_compaction" = "true", "store_row_column" = "true")
"""

sql """insert into ${tableName} values
Expand Down