Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,16 @@ class BeExecVersionManager {
* b. clear old version of version 3->4
* c. change FunctionIsIPAddressInRange from AlwaysNotNullable to DependOnArguments
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 5;
constexpr inline int BeExecVersionManager::max_be_exec_version = 6;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;

/// functional
constexpr inline int BITMAP_SERDE = 3;
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility issues, see pr #32299
constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable property: PR #37215
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413

} // namespace doris
5 changes: 5 additions & 0 deletions be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,9 @@ void JsonFunctions::merge_objects(rapidjson::Value& dst_object, rapidjson::Value
}
}

// root path "$."
bool JsonFunctions::is_root_path(const std::vector<JsonPath>& json_path) {
return json_path.size() == 2 && json_path[0].key == "$" && json_path[1].key.empty();
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/exprs/json_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class JsonFunctions {

static std::string print_json_value(const rapidjson::Value& value);

static bool is_root_path(const std::vector<JsonPath>& json_path);

private:
static rapidjson::Value* match_value(const std::vector<JsonPath>& parsed_paths,
rapidjson::Value* document,
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,11 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) {
RowRanges dict_row_ranges = RowRanges::create_single(num_rows());
for (auto cid : cids) {
if (!_segment->can_apply_predicate_safely(cid,
_opts.col_id_to_predicates.at(cid).get(),
*_schema, _opts.io_ctx.reader_type)) {
continue;
}
RowRanges tmp_row_ranges = RowRanges::create_single(num_rows());
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_dict(
Expand Down
61 changes: 59 additions & 2 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,24 @@ MutableColumnPtr ColumnObject::apply_for_subcolumns(Func&& func) const {
res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(),
subcolumn->data.get_least_common_type());
}
check_consistency();
return res;
}

void ColumnObject::resize(size_t n) {
if (n == num_rows) {
return;
}
if (n > num_rows) {
insert_many_defaults(n - num_rows);
} else {
for (auto& subcolumn : subcolumns) {
subcolumn->data.pop_back(num_rows - n);
}
}
num_rows = n;
}

bool ColumnObject::Subcolumn::check_if_sparse_column(size_t num_rows) {
if (num_rows < config::variant_threshold_rows_to_estimate_sparse_column) {
return false;
Expand Down Expand Up @@ -697,8 +712,16 @@ MutableColumnPtr ColumnObject::clone_resized(size_t new_size) const {
if (new_size == 0) {
return ColumnObject::create(is_nullable);
}
return apply_for_subcolumns(
// If subcolumns are empty, then res will be empty but new_size > 0
if (subcolumns.empty()) {
// Add an emtpy column with new_size rows
auto res = ColumnObject::create(true, false);
res->set_num_rows(new_size);
return res;
}
auto res = apply_for_subcolumns(
[&](const auto& subcolumn) { return subcolumn.clone_resized(new_size); });
return res;
}

size_t ColumnObject::byte_size() const {
Expand Down Expand Up @@ -838,7 +861,10 @@ Field ColumnObject::operator[](size_t n) const {
}

void ColumnObject::get(size_t n, Field& res) const {
assert(n < size());
if (UNLIKELY(n >= size())) {
throw doris::Exception(ErrorCode::OUT_OF_BOUND,
"Index ({}) for getting field is out of range", n);
}
res = VariantMap();
auto& object = res.get<VariantMap&>();

Expand Down Expand Up @@ -886,11 +912,32 @@ void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t le
}

ColumnPtr ColumnObject::replicate(const Offsets& offsets) const {
if (subcolumns.empty()) {
// Add an emtpy column with offsets.back rows
auto res = ColumnObject::create(true, false);
res->set_num_rows(offsets.back());
}
return apply_for_subcolumns(
[&](const auto& subcolumn) { return subcolumn.replicate(offsets); });
}

ColumnPtr ColumnObject::permute(const Permutation& perm, size_t limit) const {
if (subcolumns.empty()) {
if (limit == 0) {
limit = num_rows;
} else {
limit = std::min(num_rows, limit);
}

if (perm.size() < limit) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Size of permutation is less than required.");
}
// Add an emtpy column with limit rows
auto res = ColumnObject::create(true, false);
res->set_num_rows(limit);
return res;
}
return apply_for_subcolumns(
[&](const auto& subcolumn) { return subcolumn.permute(perm, limit); });
}
Expand Down Expand Up @@ -1420,6 +1467,12 @@ ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const {
return finalized_object.apply_for_subcolumns(
[&](const auto& subcolumn) { return subcolumn.filter(filter, count); });
}
if (subcolumns.empty()) {
// Add an emtpy column with filtered rows
auto res = ColumnObject::create(true, false);
res->set_num_rows(count_bytes_in_filter(filter));
return res;
}
auto new_column = ColumnObject::create(true, false);
for (auto& entry : subcolumns) {
auto subcolumn = entry->data.get_finalized_column().filter(filter, count);
Expand All @@ -1433,6 +1486,10 @@ Status ColumnObject::filter_by_selector(const uint16_t* sel, size_t sel_size, IC
if (!is_finalized()) {
finalize();
}
if (subcolumns.empty()) {
assert_cast<ColumnObject*>(col_ptr)->insert_many_defaults(sel_size);
return Status::OK();
}
auto* res = assert_cast<ColumnObject*>(col_ptr);
for (const auto& subcolumn : subcolumns) {
auto new_subcolumn = subcolumn->data.get_least_common_type()->create_column();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

void clear() override;

void resize(size_t n) override;

void clear_subcolumns_data();

std::string get_name() const override {
Expand Down
18 changes: 17 additions & 1 deletion be/src/vec/data_types/data_type_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <utility>
#include <vector>

#include "agent/be_exec_version_manager.h"
#include "vec/columns/column_object.h"
#include "vec/common/assert_cast.h"
#include "vec/common/typeid_cast.h"
Expand Down Expand Up @@ -66,7 +67,6 @@ int64_t DataTypeObject::get_uncompressed_serialized_bytes(const IColumn& column,
if (is_nothing(type)) {
continue;
}

PColumnMeta column_meta_pb;
column_meta_pb.set_name(entry->path.get_path());
type->to_pb_column_meta(&column_meta_pb);
Expand All @@ -78,6 +78,10 @@ int64_t DataTypeObject::get_uncompressed_serialized_bytes(const IColumn& column,
size += type->get_uncompressed_serialized_bytes(entry->data.get_finalized_column(),
be_exec_version);
}
// serialize num of rows, only take effect when subcolumns empty
if (be_exec_version >= VARIANT_SERDE) {
size += sizeof(uint32_t);
}

return size;
}
Expand Down Expand Up @@ -121,6 +125,11 @@ char* DataTypeObject::serialize(const IColumn& column, char* buf, int be_exec_ve
}
// serialize num of subcolumns
*reinterpret_cast<uint32_t*>(size_pos) = num_of_columns;
// serialize num of rows, only take effect when subcolumns empty
if (be_exec_version >= VARIANT_SERDE) {
*reinterpret_cast<uint32_t*>(buf) = column_object.rows();
buf += sizeof(uint32_t);
}

return buf;
}
Expand Down Expand Up @@ -155,6 +164,13 @@ const char* DataTypeObject::deserialize(const char* buf, IColumn* column,
}
column_object->add_sub_column(key, std::move(sub_column), type);
}
size_t num_rows = 0;
// serialize num of rows, only take effect when subcolumns empty
if (be_exec_version >= VARIANT_SERDE) {
num_rows = *reinterpret_cast<const uint32_t*>(buf);
column_object->set_num_rows(num_rows);
buf += sizeof(uint32_t);
}

column_object->finalize();
#ifndef NDEBUG
Expand Down
14 changes: 13 additions & 1 deletion be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,19 @@ Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
return st;
}
}
if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) {
if (i < _parsed_jsonpaths.size() && JsonFunctions::is_root_path(_parsed_jsonpaths[i])) {
// Indicate that the jsonpath is "$.", read the full root json object, insert the original doc directly
ColumnNullable* nullable_column = nullptr;
IColumn* target_column_ptr = nullptr;
if (slot_desc->is_nullable()) {
nullable_column = assert_cast<ColumnNullable*>(column_ptr);
target_column_ptr = &nullable_column->get_nested_column();
}
auto* column_string = assert_cast<ColumnString*>(target_column_ptr);
column_string->insert_data(_simdjson_ondemand_padding_buffer.data(),
_original_doc_size);
has_valid_value = true;
} else if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) {
// not match in jsondata, filling with default value
RETURN_IF_ERROR(_fill_missing_column(slot_desc, column_ptr, valid));
if (!(*valid)) {
Expand Down
17 changes: 14 additions & 3 deletions be/src/vec/functions/function_cast.h
Original file line number Diff line number Diff line change
Expand Up @@ -719,14 +719,25 @@ struct ConvertImplGenericFromJsonb {
}
// add string to string column
if (context->jsonb_string_as_string() && is_dst_string && value->isString()) {
auto blob = static_cast<const JsonbBlobVal*>(value);
const auto* blob = static_cast<const JsonbBlobVal*>(value);
assert_cast<ColumnString&>(*col_to).insert_data(blob->getBlob(),
blob->getBlobLen());
(*vec_null_map_to)[i] = 0;
continue;
}
std::string json_str = JsonbToJson::jsonb_to_json_string(val.data, val.size);
ReadBuffer read_buffer((char*)(json_str.data()), json_str.size());
std::string input_str;
if (context->jsonb_string_as_string() && value->isString()) {
const auto* blob = static_cast<const JsonbBlobVal*>(value);
input_str = std::string(blob->getBlob(), blob->getBlobLen());
} else {
input_str = JsonbToJson::jsonb_to_json_string(val.data, val.size);
}
if (input_str.empty()) {
col_to->insert_default();
(*vec_null_map_to)[i] = 1;
continue;
}
ReadBuffer read_buffer((char*)(input_str.data()), input_str.size());
Status st = data_type_to->from_string(read_buffer, col_to);
// if parsing failed, will return null
(*vec_null_map_to)[i] = !st.ok();
Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/functions/function_variant_element.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "common/status.h"
#include "exprs/json_functions.h"
#include "simdjson.h"
#include "util/defer_op.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
Expand Down Expand Up @@ -102,8 +103,11 @@ class FunctionVariantElement : public IFunction {
static Status get_element_column(const ColumnObject& src, const ColumnPtr& index_column,
ColumnPtr* result) {
std::string field_name = index_column->get_data_at(0).to_string();
Defer finalize([&]() { (*result)->assume_mutable()->finalize(); });
if (src.empty()) {
*result = ColumnObject::create(true);
// src subcolumns empty but src row count may not be 0
(*result)->assume_mutable()->insert_many_defaults(src.size());
return Status::OK();
}
if (src.is_scalar_variant() &&
Expand Down Expand Up @@ -135,8 +139,10 @@ class FunctionVariantElement : public IFunction {
PathInData path(field_name);
ColumnObject::Subcolumns subcolumns = mutable_ptr->get_subcolumns();
const auto* node = subcolumns.find_exact(path);
auto result_col = ColumnObject::create(true, false /*should not create root*/);
MutableColumnPtr result_col;
if (node != nullptr) {
// Create without root, since root will be added
result_col = ColumnObject::create(true, false /*should not create root*/);
std::vector<decltype(node)> nodes;
PathsInData paths;
ColumnObject::Subcolumns::get_leaves_of_node(node, nodes, paths);
Expand All @@ -162,6 +168,8 @@ class FunctionVariantElement : public IFunction {
auto container = ColumnObject::create(std::move(new_subcolumns), true);
result_col->insert_range_from(*container, 0, container->size());
} else {
// Create with root, otherwise the root type maybe type Nothing
result_col = ColumnObject::create(true);
result_col->insert_many_defaults(src.size());
}
*result = result_col->get_ptr();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ public class Config extends ConfigBase {
* Max data version of backends serialize block.
*/
@ConfField(mutable = false)
public static int max_be_exec_version = 5;
public static int max_be_exec_version = 6;

/**
* Min data version of backends serialize block.
Expand Down
6 changes: 6 additions & 0 deletions regression-test/data/load_p0/stream_load/test_json_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,9 @@ test k2_value

-- !select29 --
10 \N

-- !select30 --
12345 {"k1":12345,"k2":"11111","k3":111111,"k4":[11111]} {"k1":12345,"k2":"11111","k3":111111,"k4":[11111]} 111111
12346 {"k1":12346,"k2":"22222","k4":[22222]} {"k1":12346,"k2":"22222","k4":[22222]} \N
12347 {"k1":12347,"k3":"33333","k4":[22222]} {"k1":12347,"k3":"33333","k4":[22222]} 33333
12348 {"k1":12348,"k3":"33333","k5":{"k51":1024,"xxxx":[11111]}} {"k1":12348,"k3":"33333","k5":{"k51":1024,"xxxx":[11111]}} 33333
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"k1" : 12345, "k2" : "11111", "k3" : 111111, "k4" : [11111]}
{"k1" : 12346, "k2" : "22222", "k4" : [22222]}
{"k1" : 12347, "k3" : "33333", "k4" : [22222]}
{"k1" : 12348, "k3" : "33333", "k5" : {"k51" : 1024, "xxxx" : [11111]}}
48 changes: 48 additions & 0 deletions regression-test/data/variant_p0/rqg/rqg2.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !rqg2 --
0

-- !rqg2_2 --
500

-- !rqg2_3 --
500

-- !rqg2_4 --
0

-- !rqg2_5 --
0

-- !rqg2_6 --
500

-- !rqg2_7 --
500

-- !rqg2_8 --
2023-12-09
2023-12-10
2023-12-11
2023-12-12
2023-12-13
2023-12-14
2023-12-15
2023-12-16
2023-12-17
2023-12-18
2023-12-19
2023-12-20
2024-01-08
2024-01-09
2024-01-17
2024-01-19
2024-01-31
2024-02-18
2025-02-18
2025-06-18
2026-01-18
2026-02-18
2027-01-09
2027-01-16

Loading