Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 45 additions & 26 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1042,9 +1042,10 @@ Status NewJsonReader::_simdjson_init_reader() {
}
_ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
for (int i = 0; i < _file_slot_descs.size(); ++i) {
_slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
_slot_desc_index[_file_slot_descs[i]->col_name()] = i;
}
_simdjson_ondemand_padding_buffer.resize(_padded_size);
_prev_positions.resize(_file_slot_descs.size());
return Status::OK();
}

Expand Down Expand Up @@ -1303,27 +1304,50 @@ Status NewJsonReader::_simdjson_handle_nested_complex_json(
return Status::OK();
}

size_t NewJsonReader::_column_index(const StringRef& name, size_t key_index) {
/// Optimization by caching the order of fields (which is almost always the same)
/// and a quick check to match the next expected field, instead of searching the hash table.
if (_prev_positions.size() > key_index && _prev_positions[key_index] &&
name == _prev_positions[key_index]->get_first()) {
return _prev_positions[key_index]->get_second();
} else {
auto* it = _slot_desc_index.find(name);
if (it) {
if (key_index < _prev_positions.size()) {
_prev_positions[key_index] = it;
}
return it->get_second();
} else {
return size_t(-1);
}
}
}

Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* value,
std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs,
bool* valid) {
// set
_seen_columns.assign(columns.size(), false);
size_t cur_row_count = columns[0]->size();
bool has_valid_value = false;
// iterate through object, simdjson::ondemond will parsing on the fly
size_t key_index = 0;
for (auto field : *value) {
std::string_view key = field.unescaped_key();
auto iter = _slot_desc_index.find(std::string(key));
if (iter == _slot_desc_index.end()) {
StringRef name_ref(key.data(), key.size());
const size_t column_index = _column_index(name_ref, key_index++);
if (UNLIKELY(ssize_t(column_index) < 0)) {
// This key is not exist in slot desc, just ignore
continue;
}
simdjson::ondemand::value val = field.value();
RETURN_IF_ERROR(_simdjson_write_data_to_column(val, slot_descs[iter->second],
columns[iter->second].get(), valid));
RETURN_IF_ERROR(_simdjson_write_data_to_column(val, slot_descs[column_index],
columns[column_index].get(), valid));
if (!(*valid)) {
return Status::OK();
}
_seen_columns[column_index] = true;
has_valid_value = true;
}
if (!has_valid_value) {
Expand All @@ -1334,22 +1358,31 @@ Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* val

// fill missing slot
int nullcount = 0;
int ctx_idx = 0;
for (auto slot_desc : slot_descs) {
for (size_t i = 0; i < slot_descs.size(); ++i) {
if (_seen_columns[i]) {
continue;
}
auto slot_desc = slot_descs[i];
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx++;
auto* column_ptr = columns[dest_index].get();
auto* column_ptr = columns[i].get();
if (column_ptr->size() < cur_row_count + 1) {
DCHECK(column_ptr->size() == cur_row_count);
column_ptr->assume_mutable()->insert_default();
++nullcount;
}
DCHECK(column_ptr->size() == cur_row_count + 1);
}

#ifndef NDEBUG
// Check all columns rows matched
for (size_t i = 0; i < columns.size(); ++i) {
DCHECK_EQ(columns[i]->size(), cur_row_count + 1);
}
// There is at least one valid value here
DCHECK(nullcount < columns.size());
#endif
*valid = true;
return Status::OK();
}
Expand Down Expand Up @@ -1389,27 +1422,13 @@ Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value&
}
break;
}
case simdjson::ondemand::json_type::object:
case simdjson::ondemand::json_type::array: {
auto str_view = simdjson::to_json_string(value).value();
std::string value_str(str_view.data(), str_view.size());
// compact json value
value_str.erase(std::remove_if(value_str.begin(), value_str.end(),
[](const char& c) {
// white space
return c == ' ' || c == '\t' || c == '\n' || c == '\r' ||
c == '\f' || c == '\v';
}),
value_str.end());
nullable_column->get_null_map_data().push_back(0);
column_string->insert_data(value_str.data(), value_str.length());
break;
}
default: {
auto str_view = simdjson::to_json_string(value).value();
if (value.type() == simdjson::ondemand::json_type::string) {
nullable_column->get_null_map_data().push_back(0);
// trim
str_view = str_view.substr(1, str_view.length() - 2);
column_string->insert_data(str_view.data() + 1, str_view.length() - 2);
break;
}
nullable_column->get_null_map_data().push_back(0);
column_string->insert_data(str_view.data(), str_view.length());
Expand Down
13 changes: 12 additions & 1 deletion be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "io/fs/file_reader.h"
#include "olap/iterators.h"
#include "vec/common/hash_table/hash_map.h"
#include "vec/exec/format/generic_reader.h"

namespace doris {
Expand Down Expand Up @@ -138,6 +139,8 @@ class NewJsonReader : public GenericReader {
Status _append_error_msg(simdjson::ondemand::object* obj, std::string error_msg,
std::string col_name, bool* valid);

size_t _column_index(const StringRef& name, size_t key_index);

Status (NewJsonReader::*_vhandle_json_callback)(
std::vector<vectorized::MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof);
Expand Down Expand Up @@ -194,8 +197,16 @@ class NewJsonReader : public GenericReader {
RuntimeProfile::Counter* _file_read_timer;

bool _is_dynamic_schema = false;

// ======SIMD JSON======
// name mapping
phmap::flat_hash_map<String, size_t> _slot_desc_index;
/// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
using NameMap = HashMap<StringRef, size_t, StringRefHash>;
NameMap _slot_desc_index;
/// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
std::vector<NameMap::LookupResult> _prev_positions;
/// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
std::vector<UInt8> _seen_columns;
// simdjson
static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
Expand Down
26 changes: 13 additions & 13 deletions regression-test/data/load_p0/stream_load/invalid_nest_json2.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}}
{"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}}
{"no": 3, "item": ["id": 3, "city"], "hangzhou", "code": 2345673}}
{"no": 3, "xxxxx": ["id": 3, "city"], "hangzhou", "code": 2345673}}
{"no": 3, xxxxx": ["id": 3, "city"], "hangzhou", "code": 2345673}}
{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}}
{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}}
{"no": 5, "item": {"id": 5, "city": ["guangzhou"], "code": 2345675}}
{"no": 5, "item": {"id": 5, "city": {"guangzhou": 1}, "code": 2345675}}
{"no": 5, "item": {"id": 5, "city": ["guangzhou", "code": 2345675]]}
{"no": 5, "item": {"id": 5, "city": ["guangzhou", "code": 2345675}}
{"no": 5, "item": {"id": 5, "city": {"guangzhou", "code": 2345675}}
{"no": 5, "item": {"id": 5, "city": {"guangzhou":1, "code": 2345675}}}
{"no": 2, "item": {"id": 2, "city": "shanghai","code": 2345672}}
{"no": 3, "item": ["id": 3, "city"], "hangzhou","code": 2345673}}
{"no": 3, "xxxxx": ["id": 3, "city"], "hangzhou","code": 2345673}}
{"no": 3, xxxxx": ["id": 3, "city"], "hangzhou","code":2345673}}
{"no": 4, "item": {"id": 4, "city": "shenzhen","code":2345674}}
{"no": 5, "item": {"id": 5, "city": "guangzhou","code":2345675}}
{"no": 5, "item": {"id": 5, "city": ["guangzhou"],"code":2345675}}
{"no": 5, "item": {"id": 5, "city": {"guangzhou":1},"code":2345675}}
{"no": 5, "item": {"id": 5, "city": ["guangzhou","code":2345675]]}
{"no": 5, "item": {"id": 5, "city": ["guangzhou","code":2345675}}
{"no": 5, "item": {"id": 5, "city": {"guangzhou","code":2345675}}
{"no": 5, "item": {"id": 5, "city": {"guangzhou":1,"code":2345675}}}
{"no": 5, "item": {"id": 5, "city": "1}}}
{"no": 5, "item": {"id": 5, "city": "1]}}}
{"no": 5, ["item": {"id": 5, "city": {"guangzhou": 1, "code": 2345675}]}
{"no": 5, ["item": {"id": 5, "city": {"guangzhou":1,"code":2345675}]}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}}
{"no": , "item": {"id": 2, "city": "shanghai", "code": 2345672}}
{"no": 3, "item": ["id": 3, "city": "hangzhou", "code": 2345673}}
{"no": 4, item": {"id": 4, "city": "shenzhen", "code": 2345674}}
{"no": 5, "item": {"id": 5, "city": guangzhou", "code": 2345675}}
{"no": 4, item": {"id": 4, "city":"shenzhen","code":2345674}}
{"no": 5, "item": {"id": 5, "city":guangzhou","code":2345675}}