diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 3191bf16c847e5..a03164529c7323 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -138,6 +138,8 @@ Status JsonScanner::open_next_reader() { std::string json_root = ""; std::string jsonpath = ""; bool strip_outer_array = false; + bool num_as_string = false; + if (range.__isset.jsonpaths) { jsonpath = range.jsonpaths; } @@ -147,7 +149,10 @@ Status JsonScanner::open_next_reader() { if (range.__isset.strip_outer_array) { strip_outer_array = range.strip_outer_array; } - _cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array); + if (range.__isset.num_as_string) { + num_as_string = range.num_as_string; + } + _cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array, num_as_string); RETURN_IF_ERROR(_cur_file_reader->init(jsonpath, json_root)); return Status::OK(); @@ -178,18 +183,23 @@ rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { } ////// class JsonReader -JsonReader::JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, - FileReader* file_reader, bool strip_outer_array) - : _handle_json_callback(nullptr), - _next_line(0), - _total_lines(0), - _state(state), - _counter(counter), - _profile(profile), - _file_reader(file_reader), - _closed(false), - _strip_outer_array(strip_outer_array), - _json_doc(nullptr) { +JsonReader::JsonReader( + RuntimeState* state, ScannerCounter* counter, + RuntimeProfile* profile, + FileReader* file_reader, + bool strip_outer_array, + bool num_as_string) : + _handle_json_callback(nullptr), + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _file_reader(file_reader), + _closed(false), + _strip_outer_array(strip_outer_array), + _num_as_string(num_as_string), + _json_doc(nullptr) { _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); _read_timer = ADD_TIMER(_profile, "FileReadTime"); } @@ -270,8 +280,18 @@ Status JsonReader::_parse_json_doc(bool* eof) { *eof = true; return Status::OK(); } + + bool has_parse_error = false; // parse jsondata to JsonDoc - if (_origin_json_doc.Parse((char*)json_str, length).HasParseError()) { + // As the issue: https://github.com/Tencent/rapidjson/issues/1458 + // Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag. + if (_num_as_string) { + has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError(); + } else { + has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError(); + } + + if (has_parse_error) { std::stringstream str_error; str_error << "Parse json data for JsonDoc failed. code = " << _origin_json_doc.GetParseError() << ", error-info:" diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index 16c274b0733e36..0ce2805daece36 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -103,8 +103,9 @@ struct JsonPath; // return other error Status if encounter other errors. class JsonReader { public: - JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, - FileReader* file_reader, bool strip_outer_array); + JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, FileReader* file_reader, + bool strip_outer_array, bool num_as_string); + ~JsonReader(); Status init(const std::string& jsonpath, const std::string& json_root); // must call before use @@ -150,6 +151,7 @@ class JsonReader { FileReader* _file_reader; bool _closed; bool _strip_outer_array; + bool _num_as_string; RuntimeProfile::Counter* _bytes_read_counter; RuntimeProfile::Counter* _read_timer; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 3243019c9a9643..c9abc73b8ba6a8 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -399,6 +399,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* } else { request.__set_strip_outer_array(false); } + if (!http_req->header(HTTP_NUM_AS_STRING).empty()) { + if (boost::iequals(http_req->header(HTTP_NUM_AS_STRING), "true")) { + request.__set_num_as_string(true); + } else { + request.__set_num_as_string(false); + } + } else { + request.__set_num_as_string(false); + } if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { request.__set_sequence_col( http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL)); diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 3356f7728ad9bb..f6a89e20067234 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -39,6 +39,7 @@ static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit"; static const std::string HTTP_JSONPATHS = "jsonpaths"; static const std::string HTTP_JSONROOT = "json_root"; static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array"; +static const std::string HTTP_NUM_AS_STRING = "num_as_string"; static const std::string HTTP_MERGE_TYPE = "merge_type"; static const std::string HTTP_DELETE_CONDITION = "delete"; static const std::string HTTP_FUNCTION_COLUMN = "function_column"; diff --git a/be/src/runtime/raw_value.cpp b/be/src/runtime/raw_value.cpp index 01cf2a1b11f2f8..2fc1a8a532702e 100644 --- a/be/src/runtime/raw_value.cpp +++ b/be/src/runtime/raw_value.cpp @@ -163,11 +163,11 @@ void RawValue::print_value(const void* value, const TypeDescriptor& type, int sc break; case TYPE_DECIMAL: - *stream << *reinterpret_cast(value); + *stream << reinterpret_cast(value)->to_string(); break; case TYPE_DECIMALV2: - *stream << reinterpret_cast(value)->value; + *stream << DecimalV2Value(reinterpret_cast(value)->value).to_string(); break; case TYPE_LARGEINT: diff --git a/be/test/exec/json_scanner_test.cpp b/be/test/exec/json_scanner_test.cpp index 1fe3430cafeb23..b244e97efd2da6 100644 --- a/be/test/exec/json_scanner_test.cpp +++ b/be/test/exec/json_scanner_test.cpp @@ -28,6 +28,7 @@ #include "exprs/cast_functions.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "exprs/decimalv2_operators.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/row_batch.h" @@ -49,6 +50,7 @@ class JsonScannerTest : public testing::Test { UserFunctionCache::instance()->init( "./be/test/runtime/test_data/user_function_cache/normal"); CastFunctions::init(); + DecimalV2Operators::init(); } protected: @@ -70,11 +72,11 @@ class JsonScannerTest : public testing::Test { #define TUPLE_ID_DST 0 #define TUPLE_ID_SRC 1 -#define COLUMN_NUMBERS 4 +#define COLUMN_NUMBERS 6 #define DST_TUPLE_SLOT_ID_START 1 -#define SRC_TUPLE_SLOT_ID_START 5 +#define SRC_TUPLE_SLOT_ID_START 7 int JsonScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { - const char* columnNames[] = {"category", "author", "title", "price"}; + const char *columnNames[] = {"category","author","title","price", "largeint", "decimal"}; for (int i = 0; i < COLUMN_NUMBERS; i++) { TSlotDescriptor slot_desc; @@ -223,6 +225,62 @@ int JsonScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_s t_desc_table.slotDescriptors.push_back(slot_desc); } byteOffset += 8; + {// lagreint + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::LARGEINT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 4; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 4; + slot_desc.colName = "lagreint"; + slot_desc.slotIdx = 5; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + {// decimal + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__isset.precision = true; + scalar_type.__isset.scale = true; + scalar_type.__set_precision(-1); + scalar_type.__set_scale(-1); + scalar_type.__set_type(TPrimitiveType::DECIMALV2); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 5; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 5; + slot_desc.colName = "decimal"; + slot_desc.slotIdx = 6; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + t_desc_table.__isset.slotDescriptors = true; { // TTupleDescriptor dest @@ -363,6 +421,94 @@ void JsonScannerTest::create_expr_info() { _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr); _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3); } + // largeint VARCHAR --> LargeInt + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::LARGEINT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttolargeint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttolargeint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_large_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 4; // price id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 4, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 4); + } + // decimal VARCHAR --> Decimal + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__isset.precision = true; + scalar_type.__isset.scale = true; + scalar_type.__set_precision(-1); + scalar_type.__set_scale(-1); + scalar_type.__set_type(TPrimitiveType::DECIMALV2); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodecimalv2"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttodecimalv2(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 5; // price id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 5, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 5); + } // _params.__isset.expr_of_dest_slot = true; _params.__set_dest_tuple_id(TUPLE_ID_DST); _params.__set_src_tuple_id(TUPLE_ID_SRC); @@ -420,6 +566,10 @@ TEST_F(JsonScannerTest, normal_simple_arrayjson) { status = scan_node.get_next(&_runtime_state, &batch, &eof); ASSERT_TRUE(status.ok()); ASSERT_EQ(2, batch.num_rows()); + // Do not use num_as_string, so largeint is too big is null and decimal value loss precision + auto tuple_str = batch.get_row(1)->get_tuple(0)->to_string(*scan_node.row_desc().tuple_descriptors()[0]); + ASSERT_TRUE(tuple_str.find("1180591620717411303424") == tuple_str.npos); + ASSERT_TRUE(tuple_str.find("9999999999999.999999") == tuple_str.npos); ASSERT_FALSE(eof); batch.reset(); @@ -428,7 +578,45 @@ TEST_F(JsonScannerTest, normal_simple_arrayjson) { ASSERT_EQ(0, batch.num_rows()); ASSERT_TRUE(eof); + // Use num_as_string load data again + BrokerScanNode scan_node2(&_obj_pool, _tnode, *_desc_tbl); + status = scan_node2.prepare(&_runtime_state); + ASSERT_TRUE(status.ok()); + scan_ranges.clear(); + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.strip_outer_array = true; + range.num_as_string = true; + range.__isset.strip_outer_array = true; + range.__isset.num_as_string = true; + range.splittable = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + scan_node2.set_scan_ranges(scan_ranges); + status = scan_node2.open(&_runtime_state); + ASSERT_TRUE(status.ok()); + + status = scan_node2.get_next(&_runtime_state, &batch, &eof); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(2, batch.num_rows()); + // Use num as string, load largeint, decimal successfully + tuple_str = batch.get_row(1)->get_tuple(0)->to_string(*scan_node2.row_desc().tuple_descriptors()[0]); + ASSERT_FALSE(tuple_str.find("1180591620717411303424") == tuple_str.npos); + ASSERT_FALSE(tuple_str.find("9999999999999.999999") == tuple_str.npos); + scan_node.close(&_runtime_state); + scan_node2.close(&_runtime_state); { std::stringstream ss; scan_node.runtime_profile()->pretty_print(&ss); diff --git a/be/test/exec/test_data/json_scanner/test_simple2.json b/be/test/exec/test_data/json_scanner/test_simple2.json index e33be8f8a7a7b5..3bd193f45e908e 100644 --- a/be/test/exec/test_data/json_scanner/test_simple2.json +++ b/be/test/exec/test_data/json_scanner/test_simple2.json @@ -1,5 +1,5 @@ [ - {"category":"reference","author":"NigelRees","title":"SayingsoftheCentury","price":8.95}, - {"category":"fiction","author":"EvelynWaugh","title":"SwordofHonour","price":12.99} + {"category":"reference","author":"NigelRees","title":"SayingsoftheCentury","price":8.95, "largeint":1234, "decimal":1234.1234}, + {"category":"fiction","author":"EvelynWaugh","title":"SwordofHonour","price":12.99, "largeint":1180591620717411303424, "decimal":9999999999999.999999} ] diff --git a/docs/en/administrator-guide/load-data/load-json-format.md b/docs/en/administrator-guide/load-data/load-json-format.md index 200979eb5ca482..57a1038899292b 100644 --- a/docs/en/administrator-guide/load-data/load-json-format.md +++ b/docs/en/administrator-guide/load-data/load-json-format.md @@ -119,11 +119,11 @@ Doris supports extracting the data specified in Json through Json Path. ``` Doris will use the specified Json Path for data matching and extraction. - + * Match non-primitive types The values that the previous example finally matched are all primitive types, such as Integer, String, and so on. Doris currently does not support complex types, such as Array, Map, etc. So when a non-primitive type is matched, Doris will convert the type to a Json format string and load it as a string type. Examples are as follows: - + ``` { "id": 123, "city" : { "name" : "beijing", "region" : "haidian" }} ``` @@ -304,6 +304,45 @@ If you want to load the above data as expected, the load statement is as follows curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.k1\", \"$.k2\"]"- H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2,'x')" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load ``` +## LargetInt与Decimal + +Doris supports data types such as largeint and decimal with larger data range and higher data precision. However, due to the fact that the maximum range of the rapid JSON library used by Doris for the resolution of digital types is Int64 and double, there may be some problems when importing largeint or decimal by JSON format, such as loss of precision, data conversion error, etc. + +For example: + +``` +[ + {"k1": 1, "k2":9999999999999.999999 } +] +``` + + +The imported K2 column type is `Decimal (16,9)`the import data is: ` 9999999999.999999`. During the JSON load which cause the precision loss of double conversion, the imported data convert to: ` 10000000000.0002 `. It is a import error. + +To solve this problem, Doris provides a param `num_as_string `. Doris converts the numeric type to a string when parsing JSON data and JSON load without losing precision. + +``` +curl -v --location-trusted -u root: -H "format: json" -H "num_as_string: true" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +But using the param will cause unexpected side effects. Doris currently does not support composite types, such as Array, Map, etc. So when a non basic type is matched, Doris will convert the type to a string in JSON format.` num_as_string`will also convert compound type numbers into strings, for example: + +JSON Data: + + { "id": 123, "city" : { "name" : "beijing", "city_id" : 1 }} + +Not use `num_as_string `, the data of the city column is: + +`{ "name" : "beijing", "city_id" : 1 }` + +Use `num_as_string `, the data of the city column is: + +`{ "name" : "beijing", "city_id" : "1" }` + +Warning, the param leads to the city_id of the numeric type in the compound type is treated as a string column and quoted, which is different from the original data. + +Therefore, when using JSON load. we should try to avoid importing largeint, decimal and composite types at the same time. If you can't avoid it, you need to fully understand the **side effects**. + ## Examples ### Stream Load @@ -347,7 +386,7 @@ code INT NULL ``` 100 beijing 1 ``` - + 2. Load sigle-line data 2 ``` @@ -401,7 +440,7 @@ code INT NULL 104 ["zhejiang","guangzhou"] 5 105 {"order1":["guangzhou"]} 6 ``` - + 4. Convert load data The data is still the multi-row data in Example 3. Now you need to add 1 to the `code` column in the loaded data and load it. diff --git a/docs/zh-CN/administrator-guide/load-data/load-json-format.md b/docs/zh-CN/administrator-guide/load-data/load-json-format.md index 9e7d71d9c6d6bb..ae7a841fa798da 100644 --- a/docs/zh-CN/administrator-guide/load-data/load-json-format.md +++ b/docs/zh-CN/administrator-guide/load-data/load-json-format.md @@ -306,6 +306,46 @@ curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: tru curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.k1\", \"$.k2\"]" -H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2, 'x')" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load ``` +## LargetInt与Decimal + +Doris支持LargeInt与Decimal等数据范围更大,数据精度更高的数据类型。但是由于Doris使用的Rapid Json库对于数字类型能够解析的最大范围为Int64与Double,这导致了通过Json导入LargeInt或Decimal时可能会出现:精度丢失,数据转换出错等问题。 + +示例数据如下: + +``` +[ + {"k1": 1, "k2":9999999999999.999999 } +] +``` + + +导入k2列类型为`Decimal(16, 9)`,数据为:`9999999999999.999999`。在进行Json导入时,由于Double转换的精度丢失导致了导入的数据为:`10000000000000.0002`,引发了导入出错。 + +为了解决这个问题,Doris在导入时提供了 `num_as_string`的开关。Doris在解析Json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。 + +``` +curl -v --location-trusted -u root: -H "format: json" -H "num_as_string: true" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +但是开启这个开关会引起一些意想不到的副作用。Doris 当前暂不支持复合类型,如 Array、Map 等。所以当匹配到一个非基本类型时,Doris 会将该类型转换为 Json 格式的字符串,而`num_as_string`会同样将复合类型的数字转换为字符串,举个例子: + +Json 数据为: + + { "id": 123, "city" : { "name" : "beijing", "city_id" : 1 }} + +不开启`num_as_string`时,导入的city列的数据为: + +`{ "name" : "beijing", "city_id" : 1 }` + +而开启了`num_as_string`时,导入的city列的数据为: + +`{ "name" : "beijing", "city_id" : "1" }` + +注意,这里导致了复合类型原先为1的数字类型的city_id被作为字符串列处理并添加上了引号,与原始数据相比,产生了变化。 + +所以用在使用Json导入时,要尽量避免LargeInt与Decimal与复合类型的同时导入。如果无法避免,则需要充分了解开启`num_as_string`后对复合类型导入的**副作用**。 + + ## 应用示例 ### Stream Load @@ -349,7 +389,7 @@ code INT NULL ``` 100 beijing 1 ``` - + 2. 导入单行数据2 ``` @@ -403,7 +443,7 @@ code INT NULL 104 ["zhejiang","guangzhou"] 5 105 {"order1":["guangzhou"]} 6 ``` - + 4. 对导入数据进行转换 数据依然是示例3中的多行数据,现需要对导入数据中的 `code` 列加1后导入。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java index 47bfc659efa6a9..4f165728193ce5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -51,6 +51,7 @@ public class AlterRoutineLoadStmt extends DdlStmt { .add(CreateRoutineLoadStmt.JSONPATHS) .add(CreateRoutineLoadStmt.JSONROOT) .add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY) + .add(CreateRoutineLoadStmt.NUM_AS_STRING) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) .build(); @@ -181,6 +182,12 @@ private void checkJobProperties() throws UserException { analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY), String.valueOf(stripOuterArray)); } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.NUM_AS_STRING)) { + boolean numAsString = Boolean.valueOf(jobProperties.get(CreateRoutineLoadStmt.NUM_AS_STRING)); + analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.NUM_AS_STRING), + String.valueOf(numAsString)); + } } private void checkDataSourceProperties() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 438a48eed841ae..7b1c0db277c2d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -101,6 +101,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String STRIP_OUTER_ARRAY = "strip_outer_array"; public static final String JSONPATHS = "jsonpaths"; public static final String JSONROOT = "json_root"; + public static final String NUM_AS_STRING = "num_as_string"; // kafka type properties public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list"; @@ -122,6 +123,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(FORMAT) .add(JSONPATHS) .add(STRIP_OUTER_ARRAY) + .add(NUM_AS_STRING) .add(JSONROOT) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) @@ -165,6 +167,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String jsonPaths = ""; private String jsonRoot = ""; // MUST be a jsonpath string private boolean stripOuterArray = false; + private boolean numAsString = false; // kafka related properties private String kafkaBrokerList; @@ -255,6 +258,10 @@ public boolean isStripOuterArray() { return stripOuterArray; } + public boolean isNumAsString() { + return numAsString; + } + public String getJsonPaths() { return jsonPaths; } @@ -431,6 +438,7 @@ private void checkJobProperties() throws UserException { jsonPaths = jobProperties.get(JSONPATHS); jsonRoot = jobProperties.get(JSONROOT); stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false")); + numAsString = Boolean.valueOf(jobProperties.getOrDefault(NUM_AS_STRING, "false")); } else { throw new UserException("Format type is invalid. format=`" + format + "`"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java index d8373989bfd45d..0a09c5bba65ce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java @@ -270,6 +270,7 @@ public static class LoadContext { public String jsonPaths; public String stripOuterArray; public String jsonRoot; + public String numAsString; public LoadContext(HttpServletRequest request, String db, String tbl, String user, String passwd, TmpFileMgr.TmpFile file) { @@ -299,6 +300,7 @@ private void parseHeader(HttpServletRequest request) { this.format = request.getHeader("format"); this.jsonPaths = request.getHeader("jsonpaths"); this.stripOuterArray = request.getHeader("strip_outer_array"); + this.numAsString = request.getHeader("num_as_string"); this.jsonRoot = request.getHeader("json_root"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index dd1f51341ff3fe..e82d234833b4c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -193,6 +193,7 @@ public boolean isFinalState() { */ private static final String PROPS_FORMAT = "format"; private static final String PROPS_STRIP_OUTER_ARRAY = "strip_outer_array"; + private static final String PROPS_NUM_AS_STRING = "num_as_string"; private static final String PROPS_JSONPATHS = "jsonpaths"; private static final String PROPS_JSONROOT = "json_root"; @@ -302,6 +303,7 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) { jobProperties.put(PROPS_FORMAT, "csv"); jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false"); + jobProperties.put(PROPS_NUM_AS_STRING, "false"); jobProperties.put(PROPS_JSONPATHS, ""); jobProperties.put(PROPS_JSONROOT, ""); } else if (stmt.getFormat().equals("json")) { @@ -321,6 +323,12 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { } else { jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false"); } + if (stmt.isNumAsString()) { + jobProperties.put(PROPS_NUM_AS_STRING, "true"); + } else { + jobProperties.put(PROPS_NUM_AS_STRING, "false"); + } + } else { throw new UserException("Invalid format type."); } @@ -546,6 +554,10 @@ public boolean isStripOuterArray() { return Boolean.valueOf(jobProperties.get(PROPS_STRIP_OUTER_ARRAY)); } + public boolean isNumAsString() { + return Boolean.valueOf(jobProperties.get(PROPS_NUM_AS_STRING)); + } + @Override public String getPath() { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 801c181305f2d4..2c4065a558b15a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -93,6 +93,7 @@ public void init(Analyzer analyzer) throws UserException { rangeDesc.setJsonRoot(taskInfo.getJsonRoot()); } rangeDesc.setStripOuterArray(taskInfo.isStripOuterArray()); + rangeDesc.setNumAsString(taskInfo.isNumAsString()); } rangeDesc.splittable = false; switch (taskInfo.getFileType()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 1f86825e9cbbe8..7692d317544128 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -42,6 +42,7 @@ public interface LoadTaskInfo { public String getJsonPaths(); public String getJsonRoot(); public boolean isStripOuterArray(); + public boolean isNumAsString(); public String getPath(); public List getColumnExprDescs(); public boolean isStrictMode(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 90ba6e1b2cf188..512b21e6f174d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -57,6 +57,7 @@ public class StreamLoadTask implements LoadTaskInfo { private TFileType fileType; private TFileFormatType formatType; private boolean stripOuterArray; + private boolean numAsString; private String jsonPaths; private String jsonRoot; @@ -83,6 +84,7 @@ public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatT this.jsonPaths = ""; this.jsonRoot = ""; this.stripOuterArray = false; + this.numAsString = false; } public TUniqueId getId() { @@ -141,10 +143,19 @@ public boolean isStripOuterArray() { return stripOuterArray; } + @Override + public boolean isNumAsString() { + return numAsString; + } + public void setStripOuterArray(boolean stripOuterArray) { this.stripOuterArray = stripOuterArray; } + public void setNumAsString(boolean numAsString) { + this.numAsString = numAsString; + } + public String getJsonPaths() { return jsonPaths; } @@ -227,6 +238,7 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request, Databas jsonRoot = request.getJsonRoot(); } stripOuterArray = request.isStripOuterArray(); + numAsString = request.isNumAsString(); } if (request.isSetMergeType()) { try { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 4964e3fa670023..a721e70b50a2cc 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -572,6 +572,7 @@ struct TStreamLoadPutRequest { 27: optional Types.TMergeType merge_type 28: optional string delete_condition 29: optional string sequence_col + 30: optional bool num_as_string } struct TStreamLoadPutResult { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 61036447b3ac0b..250ac95500139f 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -131,6 +131,8 @@ struct TBrokerRangeDesc { 11: optional bool strip_outer_array; 12: optional string jsonpaths; 13: optional string json_root; + // it's usefull when format_type == FORMAT_JSON + 14: optional bool num_as_string; } struct TBrokerScanRangeParams {