diff --git a/be/src/common/config.h b/be/src/common/config.h index d0858aa279e95f..1299ac6b8b2629 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -303,11 +303,14 @@ namespace config { CONF_Int64(load_data_reserve_hours, "4"); // log error log will be removed after this time CONF_mInt64(load_error_log_reserve_hours, "48"); - // Deprecated, use streaming_load_max_mb instead - // CONF_Int64(mini_load_max_mb, "2048"); CONF_Int32(number_tablet_writer_threads, "16"); + // The maximum amount of data that can be processed by a stream load CONF_mInt64(streaming_load_max_mb, "10240"); + // Some data formats, such as JSON, cannot be streamed. + // Therefore, it is necessary to limit the maximum number of + // such data when using stream load to prevent excessive memory consumption. + CONF_mInt64(streaming_load_max_batch_size_mb, "100"); // the alive time of a TabletsChannel. // If the channel does not receive any data till this time, // the channel will be removed. diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index d72c301462922c..a6b28fcd6bffe3 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -134,7 +134,7 @@ Status BaseScanner::init_expr_ctxes() { return Status::OK(); } -bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool) { +bool BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { int ctx_idx = 0; for (auto slot_desc : _dest_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 43461c2e4f85b2..060311384f3cbb 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -59,7 +59,7 @@ class BaseScanner { // Close this scanner virtual void close() = 0; - bool fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool); + bool fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); void fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path); diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index a854bdee4b83b2..6a2b23bc97e23c 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -395,7 +395,7 @@ bool BrokerScanner::convert_one_row( if (!line_to_src_tuple(line)) { return false; } - return fill_dest_tuple(line, tuple, tuple_pool); + return fill_dest_tuple(tuple, tuple_pool); } // Convert one row to this tuple diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index ca6cd5622c602e..0c0c2ddfd3b8f5 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -73,8 +73,8 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { - break;// break if true + if (fill_dest_tuple(tuple, tuple_pool)) { + break; // break if true } } if (_scanner_eof) { @@ -399,6 +399,15 @@ void JsonReader::_write_data_to_tuple(rapidjson::Value::ConstValueIterator value // for simple format json void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool *valid) { + if (!objectValue.IsObject()) { + // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, + // not other type of Json format. + _state->append_error_msg_to_file(_print_json_value(objectValue), "Expect json object value"); + _counter->num_rows_filtered++; + *valid = false; // current row is invalid + return; + } + int nullcount = 0; for (auto v : slot_descs) { if (objectValue.HasMember(v->col_name().c_str())) { @@ -443,20 +452,29 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, c Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof) { do { bool valid = false; - if (_next_line >= _total_lines) {//parse json and generic document + if (_next_line >= _total_lines) { // parse json and generic document Status st = _parse_json_doc(eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); // terminate if encounter other errors - if (*eof) {// read all data, then return + if (*eof) { // read all data, then return return Status::OK(); } if (_json_doc->IsArray()) { _total_lines = _json_doc->Size(); + if (_total_lines == 0) { + // may be passing an empty json, such as "[]" + std::stringstream str_error; + str_error << "Empty json line"; + _state->append_error_msg_to_file(_print_json_value(*_json_doc), str_error.str()); + _counter->num_rows_filtered++; + continue; + } } else { _total_lines = 1; // only one row } + _next_line = 0; } @@ -534,7 +552,7 @@ Status JsonReader::_handle_nested_complex_json(Tuple* tuple, const std::vectorSize(); diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 7edf204a2de4cb..847b465343ca2f 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -326,7 +326,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + if (fill_dest_tuple(tuple, tuple_pool)) { break; // get one line, break from while } // else skip this line and continue get_next to return } diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index cb2268760e6728..d2e69e948526f9 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -80,7 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { + if (fill_dest_tuple(tuple, tuple_pool)) { break;// break if true } } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 52e16a5ebcfb9b..294ea16b8e3179 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -218,7 +218,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct LOG(WARNING) << "body exceed max size." << ctx->brief(); std::stringstream ss; - ss << "body exceed max size, max_body_bytes=" << max_body_bytes; + ss << "body exceed max size: " << max_body_bytes << ", limit: " << max_body_bytes; return Status::InternalError(ss.str()); } } else { @@ -234,11 +234,20 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct } else { ctx->format = parse_format(http_req->header(HTTP_FORMAT_KEY)); if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { - LOG(WARNING) << "unknown data format." << ctx->brief(); std::stringstream ss; ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY); return Status::InternalError(ss.str()); } + + if (ctx->format == TFileFormatType::FORMAT_JSON) { + size_t max_body_bytes = config::streaming_load_max_batch_size_mb * 1024 * 1024; + if (ctx->body_bytes > max_body_bytes) { + std::stringstream ss; + ss << "The size of this batch exceed the max size [" << max_body_bytes + << "] of json type data " << " data [ " << ctx->body_bytes << " ]"; + return Status::InternalError(ss.str()); + } + } } if (!http_req->header(HTTP_TIMEOUT).empty()) { @@ -312,7 +321,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.formatType = ctx->format; request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { - auto pipe = std::make_shared(); + auto pipe = std::make_shared( + 1024 * 1024 /* max_buffered_bytes */, + 64 * 1024 /* min_chunk_size */, + ctx->body_bytes /* total_length */); RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe)); request.fileType = TFileType::FILE_STREAM; ctx->body_sink = pipe; diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 2fc4a9b1a10af1..3deb4043f8aedc 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -33,10 +33,12 @@ namespace doris { class StreamLoadPipe : public MessageBodySink, public FileReader { public: StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, - size_t min_chunk_size = 64 * 1024) + size_t min_chunk_size = 64 * 1024, + int64_t total_length = -1) : _buffered_bytes(0), _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), + _total_length(total_length), _finished(false), _cancelled(false) { } virtual ~StreamLoadPipe() { } @@ -84,31 +86,33 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { return _append(buf); } + // If _total_length == -1, this should be a Kafka routine load task, + // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. + // Otherwise, this should be a stream load task that needs to read the specified amount of data. Status read_one_message(uint8_t** data, size_t* length) override { - std::unique_lock l(_lock); - while (!_cancelled && !_finished && _buf_queue.empty()) { - _get_cond.wait(l); - } - // cancelled - if (_cancelled) { - return Status::InternalError("cancelled"); - } - // finished - if (_buf_queue.empty()) { - DCHECK(_finished); - *data = nullptr; + if (_total_length < -1) { + std::stringstream ss; + ss << "invalid, _total_length is: " << _total_length; + return Status::InternalError(ss.str()); + } else if (_total_length == 0) { + // no data *length = 0; return Status::OK(); } - auto buf = _buf_queue.front(); - *length = buf->remaining(); - *data = new uint8_t[*length]; - buf->get_bytes((char*)(*data) , *length); - _buf_queue.pop_front(); - _buffered_bytes -= buf->limit; - _put_cond.notify_one(); - return Status::OK(); + if (_total_length == -1) { + return _read_next_buffer(data, length); + } + + // _total_length > 0, read the entire data + *data = new uint8_t[_total_length]; + *length = _total_length; + bool eof = false; + Status st = read(*data, length, &eof); + if (eof) { + *length = 0; + } + return st; } Status read(uint8_t* data, size_t* data_size, bool* eof) override { @@ -196,6 +200,34 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { } private: + // read the next buffer from _buf_queue + Status _read_next_buffer(uint8_t** data, size_t* length) { + std::unique_lock l(_lock); + while (!_cancelled && !_finished && _buf_queue.empty()) { + _get_cond.wait(l); + } + // cancelled + if (_cancelled) { + return Status::InternalError("cancelled"); + } + // finished + if (_buf_queue.empty()) { + DCHECK(_finished); + *data = nullptr; + *length = 0; + return Status::OK(); + } + auto buf = _buf_queue.front(); + *length = buf->remaining(); + *data = new uint8_t[*length]; + buf->get_bytes((char*)(*data) , *length); + + _buf_queue.pop_front(); + _buffered_bytes -= buf->limit; + _put_cond.notify_one(); + return Status::OK(); + } + Status _append(const ByteBufferPtr& buf) { { std::unique_lock l(_lock); @@ -221,6 +253,14 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { size_t _buffered_bytes; size_t _max_buffered_bytes; size_t _min_chunk_size; + // The total amount of data expected to be read. + // In some scenarios, such as loading json format data through stream load, + // the data needs to be completely read before it can be parsed, + // so the total size of the data needs to be known. + // The default is -1, which means that the data arrives in a stream + // and the length is unknown. + // size_t is unsigned, so use int64_t + int64_t _total_length = -1; std::deque _buf_queue; std::condition_variable _put_cond; std::condition_variable _get_cond; diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md index 196cd5ed5a08fc..bf6c0bf2e0df06 100644 --- a/docs/en/administrator-guide/config/be_config.md +++ b/docs/en/administrator-guide/config/be_config.md @@ -437,6 +437,22 @@ Indicates how many tablets in this data directory failed to load. At the same ti ### `streaming_load_max_mb` +* Type: int64 +* Description: Used to limit the maximum amount of data allowed in one Stream load. The unit is MB. +* Default value: 10240 +* Dynamically modify: yes + +Stream Load is generally suitable for loading data less than a few GB, not suitable for loading` too large data. + +### `streaming_load_max_batch_size_mb` + +* Type: int64 +* Description: For some data formats, such as JSON, it is used to limit the maximum amount of data allowed in one Stream load. The unit is MB. +* Default value: 100 +* Dynamically modify: yes + +Some data formats, such as JSON, cannot be split. Doris must read all the data into the memory before parsing can begin. Therefore, this value is used to limit the maximum amount of data that can be loaded in a single Stream load. + ### `streaming_load_rpc_max_alive_time_sec` ### `sync_tablet_meta` diff --git a/docs/en/administrator-guide/load-data/load-json-format.md b/docs/en/administrator-guide/load-data/load-json-format.md index 14caff63bb6a1d..c57ec99db2dc2d 100644 --- a/docs/en/administrator-guide/load-data/load-json-format.md +++ b/docs/en/administrator-guide/load-data/load-json-format.md @@ -183,6 +183,124 @@ Doris supports extracting the data specified in Json through Json Path. Will result in a complete match failure, the line will be marked as an error row, instead of producing `null, null`. +## Json Path and Columns + +Json Path is used to specify how to extract data in JSON format, and Columns specify the mapping and conversion relationship of columns. The two can be used together, for example as follows. + +Data content: + +``` +{"k1": 1, "k2": 2} +``` + +Table schema: + +`k2 int, k1 int` + +Load statement 1 (take Stream Load as an example): + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -T example.json http:/ /127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +In Load statement 1, only Json Path is specified, and Columns are not specified. The role of Json Path is to extract the Json data in the order of the fields in the Json Path, and then write it in the order of the table schema. The final loaded data results are as follows: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 2 | 1 | ++------+------+ +``` + +You will see that the actual k1 column has loaded the value of the "k2" column in the Json data. This is because the field name in Json is not equivalent to the field name in the table schema. We need to explicitly specify the mapping relationship between the two. + +Load statement 2: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, k1 "-T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +Compared to load statement 1, here is the Columns field, which is used to describe the mapping relationship of columns, in the order of `k2, k1`. That is, after extracting in the order of the fields in the Json Path, specify the first column as the value of the k2 column in the table, and the second column as the value of the k1 column in the table. The final loaded data results are as follows: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | 2 | ++------+------+ +``` + +Of course, like other load methods, you can perform column conversion operations in Columns. Examples are as follows: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, tmp_k1 , k1 = tmp_k1 * 100" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +The above example will multiply the value of k1 by 100 and import it. The final imported data results are as follows: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 100 | 2 | ++------+------+ +``` + +## NULL and Default value + +The sample data is as follows: + +``` +[ + {"k1": 1, "k2": "a"}, + {"k1": 2}, + {"k1": 3, "k2": "c"}, +] +``` + +The table schema is: `k1 int null, k2 varchar(32) null default "x"` + +The load statement is as follows: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +The import results that users may expect are as follows, that is, for missing columns, fill in default values. + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | a | ++------+------+ +| 2 | x | ++------+------+ +| 3 | c | ++------+------+ +``` + +But the actual load result is as follows, that is, for missing columns, NULL is added. + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | a | ++------+------+ +| 2 | NULL | ++------+------+ +| 3 | c | ++------+------+ +``` + +This is because through the information in the load statement, Doris does not know that "the missing column is the k2 column in the table". +If you want to load the above data as expected, the load statement is as follows: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.k1\", \"$.k2\"]"- H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2,'x')" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` ## Examples @@ -305,4 +423,4 @@ code INT NULL Routine Load processes Json data the same as Stream Load. I will not repeat them here. -For the Kafka data source, the content of each Massage is treated as a complete Json data. If multiple rows of data expressed in Array format in a Massage are loaded, multiple rows will be loaded, and Kafka's offset will only increase by 1. If an Array format Json represents multiple rows of data, but because the Json format error causes the parsing Json to fail, the error row will only increase by 1 (because the parsing fails, in fact, Doris cannot determine how many rows of data it contains, and can only add one row of errors rows record). \ No newline at end of file +For the Kafka data source, the content of each Massage is treated as a complete Json data. If multiple rows of data expressed in Array format in a Massage are loaded, multiple rows will be loaded, and Kafka's offset will only increase by 1. If an Array format Json represents multiple rows of data, but because the Json format error causes the parsing Json to fail, the error row will only increase by 1 (because the parsing fails, in fact, Doris cannot determine how many rows of data it contains, and can only add one row of errors rows record). diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md index 658ce21a88c9f8..79712ed37a5b39 100644 --- a/docs/zh-CN/administrator-guide/config/be_config.md +++ b/docs/zh-CN/administrator-guide/config/be_config.md @@ -436,6 +436,22 @@ load tablets from header failed, failed tablets size: xxx, path=xxx ### `streaming_load_max_mb` +* 类型:int64 +* 描述:用于限制一次 Stream load 导入中,允许的最大数据量。单位 MB。 +* 默认值: 10240 +* 可动态修改:是 + +Stream Load 一般适用于导入几个GB以内的数据,不适合导入过大的数据。 + +### `streaming_load_max_batch_size_mb` + +* 类型:int64 +* 描述:对于某些数据格式,如 JSON,用于限制一次 Stream load 导入中,允许的最大数据量。单位 MB。 +* 默认值: 100 +* 可动态修改:是 + +一些数据格式,如 JSON,无法进行拆分处理,必须读取全部数据到内存后才能开始解析,因此,这个值用于限制此类格式数据单次导入最大数据量。 + ### `streaming_load_rpc_max_alive_time_sec` ### `sync_tablet_meta` diff --git a/docs/zh-CN/administrator-guide/load-data/load-json-format.md b/docs/zh-CN/administrator-guide/load-data/load-json-format.md index 8780f77b30b55c..5acc418c8f9a9a 100644 --- a/docs/zh-CN/administrator-guide/load-data/load-json-format.md +++ b/docs/zh-CN/administrator-guide/load-data/load-json-format.md @@ -185,6 +185,124 @@ Doris 支持通过 Json Path 抽取 Json 中指定的数据。 则会导致完全匹配失败,则该行会标记为错误行,而不是产出 `null, null`。 +## Json Path 和 Columns + +Json Path 用于指定如何对 JSON 格式中的数据进行抽取,而 Columns 指定列的映射和转换关系。两者可以配合使用,举例如下。 + +数据内容: + +``` +{"k1" : 1, "k2": 2} +``` + +表结构: + +`k2 int, k1 int` + +导入语句1(以 Stream Load 为例): + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +导入语句1中,仅指定了 Json Path,没有指定 Columns。其中 Json Path 的作用是将 Json 数据按照 Json Path 中字段的顺序进行抽取,之后会按照表结构的顺序进行写入。最终导入的数据结果如下: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 2 | 1 | ++------+------+ +``` + +会看到,实际的 k1 列导入了 Json 数据中的 "k2" 列的值。这是因为,Json 中字段名称并不等同于表结构中字段的名称。我们需要显式的指定这两者之间的映射关系。 + +导入语句2: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, k1" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +相比如导入语句1,这里增加了 Columns 字段,用于描述列的映射关系,按 `k2, k1` 的顺序。即按Json Path 中字段的顺序抽取后,指定第一列为表中 k2 列的值,而第二列为表中 k1 列的值。最终导入的数据结果如下: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | 2 | ++------+------+ +``` + +当然,如其他导入一样,可以在 Columns 中进行列的转换操作。示例如下: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, tmp_k1, k1 = tmp_k1 * 100" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +上述示例会将 k1 的值乘以 100 后导入。最终导入的数据结果如下: + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 100 | 2 | ++------+------+ +``` + +## NULL 和 Default 值 + +示例数据如下: + +``` +[ + {"k1": 1, "k2": "a"}, + {"k1": 2}, + {"k1": 3, "k2": "c"}, +] +``` + +表结构为:`k1 int null, k2 varchar(32) null default "x"` + +导入语句如下: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` + +用户可能期望的导入结果如下,即对于缺失的列,填写默认值。 + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | a | ++------+------+ +| 2 | x | ++------+------+ +| 3 | c | ++------+------+ +``` + +但实际的导入结果如下,即对于缺失的列,补上了 NULL。 + +``` ++------+------+ +| k1 | k2 | ++------+------+ +| 1 | a | ++------+------+ +| 2 | NULL | ++------+------+ +| 3 | c | ++------+------+ +``` + +这是因为通过导入语句中的信息,Doris 并不知道 “缺失的列是表中的 k2 列”。 +如果要对以上数据按照期望结果导入,则导入语句如下: + +``` +curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.k1\", \"$.k2\"]" -H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2, 'x')" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load +``` ## 应用示例