Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <re2/re2.h>
#include <simdjson/error.h>
#include <simdjson/simdjson.h> // IWYU pragma: keep
#include <stdlib.h>

Expand Down Expand Up @@ -254,13 +255,17 @@ Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
const std::vector<JsonPath>& jsonpath,
simdjson::ondemand::value* value) noexcept {
// Return DataQualityError when it's a malformed json.
// Otherwise the path was not found, due to array out of bound or not exist
// Otherwise the path was not found, due to
// 1. array out of bound
// 2. not exist such field in object
// 3. the input type is not object but could be null or other types and lead to simdjson::INCORRECT_TYPE
#define HANDLE_SIMDJSON_ERROR(err, msg) \
do { \
const simdjson::error_code& _err = err; \
const std::string& _msg = msg; \
if (UNLIKELY(_err)) { \
if (_err == simdjson::NO_SUCH_FIELD || _err == simdjson::INDEX_OUT_OF_BOUNDS) { \
if (_err == simdjson::NO_SUCH_FIELD || _err == simdjson::INDEX_OUT_OF_BOUNDS || \
_err == simdjson::INCORRECT_TYPE) { \
return Status::NotFound<false>( \
fmt::format("Not found target filed, err: {}, msg: {}", \
simdjson::error_message(_err), _msg)); \
Expand Down Expand Up @@ -348,4 +353,9 @@ void JsonFunctions::merge_objects(rapidjson::Value& dst_object, rapidjson::Value
}
}

// root path "$."
bool JsonFunctions::is_root_path(const std::vector<JsonPath>& json_path) {
return json_path.size() == 2 && json_path[0].key == "$" && json_path[1].key.empty();
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/exprs/json_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class JsonFunctions {

static std::string print_json_value(const rapidjson::Value& value);

static bool is_root_path(const std::vector<JsonPath>& json_path);

private:
static rapidjson::Value* match_value(const std::vector<JsonPath>& parsed_paths,
rapidjson::Value* document,
Expand Down
14 changes: 13 additions & 1 deletion be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,19 @@ Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
return st;
}
}
if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) {
if (i < _parsed_jsonpaths.size() && JsonFunctions::is_root_path(_parsed_jsonpaths[i])) {
// Indicate that the jsonpath is "$.", read the full root json object, insert the original doc directly
ColumnNullable* nullable_column = nullptr;
IColumn* target_column_ptr = nullptr;
if (slot_desc->is_nullable()) {
nullable_column = assert_cast<ColumnNullable*>(column_ptr);
target_column_ptr = &nullable_column->get_nested_column();
}
auto* column_string = assert_cast<ColumnString*>(target_column_ptr);
column_string->insert_data(_simdjson_ondemand_padding_buffer.data(),
_original_doc_size);
has_valid_value = true;
} else if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) {
// not match in jsondata, filling with default value
RETURN_IF_ERROR(_fill_missing_column(slot_desc, column_ptr, valid));
if (!(*valid)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[
{
"id": 789,
"city": {
"name": "beijing",
"region": "haidian"
}
},
{
"id": 1111,
"city": null
}
]
10 changes: 10 additions & 0 deletions regression-test/data/load_p0/stream_load/test_json_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,13 @@ test k2_value

-- !select29 --
10 \N

-- !select30 --
12345 {"k1":12345,"k2":"11111","k3":111111,"k4":[11111]} {"k1":12345,"k2":"11111","k3":111111,"k4":[11111]} 111111
12346 {"k1":12346,"k2":"22222","k4":[22222]} {"k1":12346,"k2":"22222","k4":[22222]} \N
12347 {"k1":12347,"k3":"33333","k4":[22222]} {"k1":12347,"k3":"33333","k4":[22222]} 33333
12348 {"k1":12348,"k3":"33333","k5":{"k51":1024,"xxxx":[11111]}} {"k1":12348,"k3":"33333","k5":{"k51":1024,"xxxx":[11111]}} 33333

-- !select31 --
789 beijing haidian
1111 \N \N
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"k1" : 12345, "k2" : "11111", "k3" : 111111, "k4" : [11111]}
{"k1" : 12346, "k2" : "22222", "k4" : [22222]}
{"k1" : 12347, "k3" : "33333", "k4" : [22222]}
{"k1" : 12348, "k3" : "33333", "k5" : {"k51" : 1024, "xxxx" : [11111]}}
Original file line number Diff line number Diff line change
Expand Up @@ -878,4 +878,58 @@ suite("test_json_load", "p0") {
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}

// support read "$." as root
try {
sql "DROP TABLE IF EXISTS ${testTable}"
sql """CREATE TABLE IF NOT EXISTS ${testTable}
(
`k1` varchar(1024) NULL,
`k2` variant NULL,
`k3` variant NULL,
`k4` variant NULL
)
DUPLICATE KEY(`k1`)
COMMENT ''
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""

load_json_data.call("${testTable}", "${testTable}_case30", 'false', 'true', 'json', '', '[\"$.k1\",\"$.\", \"$.\", \"$.k3\"]',
'', '', '', 'test_read_root_path.json')

sql "sync"
qt_select30 "select * from ${testTable} order by k1"

} finally {
// try_sql("DROP TABLE IF EXISTS ${testTable}")
}

// test extract json path with invalid type(none object types like null)
try {
sql "DROP TABLE IF EXISTS ${testTable}"
sql """
CREATE TABLE ${testTable} (
`id` int NOT NULL,
`name` varchar(24) NULL,
`region` varchar(30) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT ''
DISTRIBUTED BY RANDOM BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

load_json_data.call("${testTable}", "${testTable}_case31", 'true', 'false', 'json', '', '[\"$.id\", \"$.city.name\", \"$.city.region\"]',
'', '', '', 'test_json_extract_path_invalid_type.json', false, 2)

sql "sync"
qt_select31 "select * from ${testTable} order by id"

} finally {
// try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}