Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ Status JsonScanner::open_next_reader() {
std::string json_root = "";
std::string jsonpath = "";
bool strip_outer_array = false;
bool num_as_string = false;

if (range.__isset.jsonpaths) {
jsonpath = range.jsonpaths;
}
Expand All @@ -147,7 +149,10 @@ Status JsonScanner::open_next_reader() {
if (range.__isset.strip_outer_array) {
strip_outer_array = range.strip_outer_array;
}
_cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array);
if (range.__isset.num_as_string) {
num_as_string = range.num_as_string;
}
_cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array, num_as_string);
RETURN_IF_ERROR(_cur_file_reader->init(jsonpath, json_root));

return Status::OK();
Expand Down Expand Up @@ -178,18 +183,23 @@ rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() {
}

////// class JsonReader
JsonReader::JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
FileReader* file_reader, bool strip_outer_array)
: _handle_json_callback(nullptr),
_next_line(0),
_total_lines(0),
_state(state),
_counter(counter),
_profile(profile),
_file_reader(file_reader),
_closed(false),
_strip_outer_array(strip_outer_array),
_json_doc(nullptr) {
JsonReader::JsonReader(
RuntimeState* state, ScannerCounter* counter,
RuntimeProfile* profile,
FileReader* file_reader,
bool strip_outer_array,
bool num_as_string) :
_handle_json_callback(nullptr),
_next_line(0),
_total_lines(0),
_state(state),
_counter(counter),
_profile(profile),
_file_reader(file_reader),
_closed(false),
_strip_outer_array(strip_outer_array),
_num_as_string(num_as_string),
_json_doc(nullptr) {
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "FileReadTime");
}
Expand Down Expand Up @@ -270,8 +280,18 @@ Status JsonReader::_parse_json_doc(bool* eof) {
*eof = true;
return Status::OK();
}

bool has_parse_error = false;
// parse jsondata to JsonDoc
if (_origin_json_doc.Parse((char*)json_str, length).HasParseError()) {
// As the issue: https://github.com/Tencent/rapidjson/issues/1458
// Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag.
if (_num_as_string) {
has_parse_error = _origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, length).HasParseError();
} else {
has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError();
}

if (has_parse_error) {
std::stringstream str_error;
str_error << "Parse json data for JsonDoc failed. code = "
<< _origin_json_doc.GetParseError() << ", error-info:"
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ struct JsonPath;
// return other error Status if encounter other errors.
class JsonReader {
public:
JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile,
FileReader* file_reader, bool strip_outer_array);
JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, FileReader* file_reader,
bool strip_outer_array, bool num_as_string);

~JsonReader();

Status init(const std::string& jsonpath, const std::string& json_root); // must call before use
Expand Down Expand Up @@ -150,6 +151,7 @@ class JsonReader {
FileReader* _file_reader;
bool _closed;
bool _strip_outer_array;
bool _num_as_string;
RuntimeProfile::Counter* _bytes_read_counter;
RuntimeProfile::Counter* _read_timer;

Expand Down
9 changes: 9 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
} else {
request.__set_strip_outer_array(false);
}
if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
if (boost::iequals(http_req->header(HTTP_NUM_AS_STRING), "true")) {
request.__set_num_as_string(true);
} else {
request.__set_num_as_string(false);
}
} else {
request.__set_num_as_string(false);
}
if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
request.__set_sequence_col(
http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL));
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
static const std::string HTTP_JSONPATHS = "jsonpaths";
static const std::string HTTP_JSONROOT = "json_root";
static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array";
static const std::string HTTP_NUM_AS_STRING = "num_as_string";
static const std::string HTTP_MERGE_TYPE = "merge_type";
static const std::string HTTP_DELETE_CONDITION = "delete";
static const std::string HTTP_FUNCTION_COLUMN = "function_column";
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/raw_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ void RawValue::print_value(const void* value, const TypeDescriptor& type, int sc
break;

case TYPE_DECIMAL:
*stream << *reinterpret_cast<const DecimalValue*>(value);
*stream << reinterpret_cast<const DecimalValue*>(value)->to_string();
break;

case TYPE_DECIMALV2:
*stream << reinterpret_cast<const PackedInt128*>(value)->value;
*stream << DecimalV2Value(reinterpret_cast<const PackedInt128*>(value)->value).to_string();
break;

case TYPE_LARGEINT:
Expand Down
194 changes: 191 additions & 3 deletions be/test/exec/json_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "exprs/decimalv2_operators.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/row_batch.h"
Expand All @@ -49,6 +50,7 @@ class JsonScannerTest : public testing::Test {
UserFunctionCache::instance()->init(
"./be/test/runtime/test_data/user_function_cache/normal");
CastFunctions::init();
DecimalV2Operators::init();
}

protected:
Expand All @@ -70,11 +72,11 @@ class JsonScannerTest : public testing::Test {

#define TUPLE_ID_DST 0
#define TUPLE_ID_SRC 1
#define COLUMN_NUMBERS 4
#define COLUMN_NUMBERS 6
#define DST_TUPLE_SLOT_ID_START 1
#define SRC_TUPLE_SLOT_ID_START 5
#define SRC_TUPLE_SLOT_ID_START 7
int JsonScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) {
const char* columnNames[] = {"category", "author", "title", "price"};
const char *columnNames[] = {"category","author","title","price", "largeint", "decimal"};
for (int i = 0; i < COLUMN_NUMBERS; i++) {
TSlotDescriptor slot_desc;

Expand Down Expand Up @@ -223,6 +225,62 @@ int JsonScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_s
t_desc_table.slotDescriptors.push_back(slot_desc);
}
byteOffset += 8;
{// lagreint
TSlotDescriptor slot_desc;

slot_desc.id = next_slot_id++;
slot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::LARGEINT);
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
slot_desc.slotType = type;
slot_desc.columnPos = 4;
slot_desc.byteOffset = byteOffset;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = 4;
slot_desc.colName = "lagreint";
slot_desc.slotIdx = 5;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
}
byteOffset += 16;
{// decimal
TSlotDescriptor slot_desc;

slot_desc.id = next_slot_id++;
slot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__isset.precision = true;
scalar_type.__isset.scale = true;
scalar_type.__set_precision(-1);
scalar_type.__set_scale(-1);
scalar_type.__set_type(TPrimitiveType::DECIMALV2);
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
slot_desc.slotType = type;
slot_desc.columnPos = 5;
slot_desc.byteOffset = byteOffset;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = 5;
slot_desc.colName = "decimal";
slot_desc.slotIdx = 6;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
}

t_desc_table.__isset.slotDescriptors = true;
{
// TTupleDescriptor dest
Expand Down Expand Up @@ -363,6 +421,94 @@ void JsonScannerTest::create_expr_info() {
_params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr);
_params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3);
}
// largeint VARCHAR --> LargeInt
{
TTypeDesc int_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::LARGEINT);
node.__set_scalar_type(scalar_type);
int_type.types.push_back(node);
}
TExprNode cast_expr;
cast_expr.node_type = TExprNodeType::CAST_EXPR;
cast_expr.type = int_type;
cast_expr.__set_opcode(TExprOpcode::CAST);
cast_expr.__set_num_children(1);
cast_expr.__set_output_scale(-1);
cast_expr.__isset.fn = true;
cast_expr.fn.name.function_name = "casttolargeint";
cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
cast_expr.fn.arg_types.push_back(varchar_type);
cast_expr.fn.ret_type = int_type;
cast_expr.fn.has_var_args = false;
cast_expr.fn.__set_signature("casttolargeint(VARCHAR(*))");
cast_expr.fn.__isset.scalar_fn = true;
cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_large_int_val";

TExprNode slot_ref;
slot_ref.node_type = TExprNodeType::SLOT_REF;
slot_ref.type = varchar_type;
slot_ref.num_children = 0;
slot_ref.__isset.slot_ref = true;
slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 4; // price id in src tuple
slot_ref.slot_ref.tuple_id = 1;

TExpr expr;
expr.nodes.push_back(cast_expr);
expr.nodes.push_back(slot_ref);

_params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 4, expr);
_params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 4);
}
// decimal VARCHAR --> Decimal
{
TTypeDesc int_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__isset.precision = true;
scalar_type.__isset.scale = true;
scalar_type.__set_precision(-1);
scalar_type.__set_scale(-1);
scalar_type.__set_type(TPrimitiveType::DECIMALV2);
node.__set_scalar_type(scalar_type);
int_type.types.push_back(node);
}
TExprNode cast_expr;
cast_expr.node_type = TExprNodeType::CAST_EXPR;
cast_expr.type = int_type;
cast_expr.__set_opcode(TExprOpcode::CAST);
cast_expr.__set_num_children(1);
cast_expr.__set_output_scale(-1);
cast_expr.__isset.fn = true;
cast_expr.fn.name.function_name = "casttodecimalv2";
cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
cast_expr.fn.arg_types.push_back(varchar_type);
cast_expr.fn.ret_type = int_type;
cast_expr.fn.has_var_args = false;
cast_expr.fn.__set_signature("casttodecimalv2(VARCHAR(*))");
cast_expr.fn.__isset.scalar_fn = true;
cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val";

TExprNode slot_ref;
slot_ref.node_type = TExprNodeType::SLOT_REF;
slot_ref.type = varchar_type;
slot_ref.num_children = 0;
slot_ref.__isset.slot_ref = true;
slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 5; // price id in src tuple
slot_ref.slot_ref.tuple_id = 1;

TExpr expr;
expr.nodes.push_back(cast_expr);
expr.nodes.push_back(slot_ref);

_params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 5, expr);
_params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 5);
}
// _params.__isset.expr_of_dest_slot = true;
_params.__set_dest_tuple_id(TUPLE_ID_DST);
_params.__set_src_tuple_id(TUPLE_ID_SRC);
Expand Down Expand Up @@ -420,6 +566,10 @@ TEST_F(JsonScannerTest, normal_simple_arrayjson) {
status = scan_node.get_next(&_runtime_state, &batch, &eof);
ASSERT_TRUE(status.ok());
ASSERT_EQ(2, batch.num_rows());
// Do not use num_as_string, so largeint is too big is null and decimal value loss precision
auto tuple_str = batch.get_row(1)->get_tuple(0)->to_string(*scan_node.row_desc().tuple_descriptors()[0]);
ASSERT_TRUE(tuple_str.find("1180591620717411303424") == tuple_str.npos);
ASSERT_TRUE(tuple_str.find("9999999999999.999999") == tuple_str.npos);
ASSERT_FALSE(eof);
batch.reset();

Expand All @@ -428,7 +578,45 @@ TEST_F(JsonScannerTest, normal_simple_arrayjson) {
ASSERT_EQ(0, batch.num_rows());
ASSERT_TRUE(eof);

// Use num_as_string load data again
BrokerScanNode scan_node2(&_obj_pool, _tnode, *_desc_tbl);
status = scan_node2.prepare(&_runtime_state);
ASSERT_TRUE(status.ok());
scan_ranges.clear();
{
TScanRangeParams scan_range_params;

TBrokerScanRange broker_scan_range;
broker_scan_range.params = _params;
TBrokerRangeDesc range;
range.start_offset = 0;
range.size = -1;
range.format_type = TFileFormatType::FORMAT_JSON;
range.strip_outer_array = true;
range.num_as_string = true;
range.__isset.strip_outer_array = true;
range.__isset.num_as_string = true;
range.splittable = true;
range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
range.file_type = TFileType::FILE_LOCAL;
broker_scan_range.ranges.push_back(range);
scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
scan_ranges.push_back(scan_range_params);
}
scan_node2.set_scan_ranges(scan_ranges);
status = scan_node2.open(&_runtime_state);
ASSERT_TRUE(status.ok());

status = scan_node2.get_next(&_runtime_state, &batch, &eof);
ASSERT_TRUE(status.ok());
ASSERT_EQ(2, batch.num_rows());
// Use num as string, load largeint, decimal successfully
tuple_str = batch.get_row(1)->get_tuple(0)->to_string(*scan_node2.row_desc().tuple_descriptors()[0]);
ASSERT_FALSE(tuple_str.find("1180591620717411303424") == tuple_str.npos);
ASSERT_FALSE(tuple_str.find("9999999999999.999999") == tuple_str.npos);

scan_node.close(&_runtime_state);
scan_node2.close(&_runtime_state);
{
std::stringstream ss;
scan_node.runtime_profile()->pretty_print(&ss);
Expand Down
4 changes: 2 additions & 2 deletions be/test/exec/test_data/json_scanner/test_simple2.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[
{"category":"reference","author":"NigelRees","title":"SayingsoftheCentury","price":8.95},
{"category":"fiction","author":"EvelynWaugh","title":"SwordofHonour","price":12.99}
{"category":"reference","author":"NigelRees","title":"SayingsoftheCentury","price":8.95, "largeint":1234, "decimal":1234.1234},
{"category":"fiction","author":"EvelynWaugh","title":"SwordofHonour","price":12.99, "largeint":1180591620717411303424, "decimal":9999999999999.999999}
]

Loading