Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,14 @@ namespace config {
CONF_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
CONF_mInt64(load_error_log_reserve_hours, "48");
// Deprecated, use streaming_load_max_mb instead
// CONF_Int64(mini_load_max_mb, "2048");
CONF_Int32(number_tablet_writer_threads, "16");

// The maximum amount of data that can be processed by a stream load
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a stream load can process 10G by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this default value is too big for a stream load.
But I am not going to modify this to avoid some user troubles.

CONF_mInt64(streaming_load_max_mb, "10240");
// Some data formats, such as JSON, cannot be streamed.
// Therefore, it is necessary to limit the maximum number of
// such data when using stream load to prevent excessive memory consumption.
CONF_mInt64(streaming_load_max_batch_size_mb, "100");
// the alive time of a TabletsChannel.
// If the channel does not receive any data till this time,
// the channel will be removed.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Status BaseScanner::init_expr_ctxes() {
return Status::OK();
}

bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool) {
bool BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
int ctx_idx = 0;
for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BaseScanner {

// Close this scanner
virtual void close() = 0;
bool fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool);
bool fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);

void fill_slots_of_columns_from_path(int start, const std::vector<std::string>& columns_from_path);

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ bool BrokerScanner::convert_one_row(
if (!line_to_src_tuple(line)) {
return false;
}
return fill_dest_tuple(line, tuple, tuple_pool);
return fill_dest_tuple(tuple, tuple_pool);
}

// Convert one row to this tuple
Expand Down
30 changes: 24 additions & 6 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
if (fill_dest_tuple(Slice(), tuple, tuple_pool)) {
break;// break if true
if (fill_dest_tuple(tuple, tuple_pool)) {
break; // break if true
}
}
if (_scanner_eof) {
Expand Down Expand Up @@ -399,6 +399,15 @@ void JsonReader::_write_data_to_tuple(rapidjson::Value::ConstValueIterator value

// for simple format json
void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool *valid) {
if (!objectValue.IsObject()) {
// Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"},
// not other type of Json format.
_state->append_error_msg_to_file(_print_json_value(objectValue), "Expect json object value");
_counter->num_rows_filtered++;
*valid = false; // current row is invalid
return;
}

int nullcount = 0;
for (auto v : slot_descs) {
if (objectValue.HasMember(v->col_name().c_str())) {
Expand Down Expand Up @@ -443,20 +452,29 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, c
Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs, MemPool* tuple_pool, bool* eof) {
do {
bool valid = false;
if (_next_line >= _total_lines) {//parse json and generic document
if (_next_line >= _total_lines) { // parse json and generic document
Status st = _parse_json_doc(eof);
if (st.is_data_quality_error()) {
continue; // continue to read next
}
RETURN_IF_ERROR(st); // terminate if encounter other errors
if (*eof) {// read all data, then return
if (*eof) { // read all data, then return
return Status::OK();
}
if (_json_doc->IsArray()) {
_total_lines = _json_doc->Size();
if (_total_lines == 0) {
// may be passing an empty json, such as "[]"
std::stringstream str_error;
str_error << "Empty json line";
_state->append_error_msg_to_file(_print_json_value(*_json_doc), str_error.str());
_counter->num_rows_filtered++;
continue;
}
} else {
_total_lines = 1; // only one row
}

_next_line = 0;
}

Expand Down Expand Up @@ -534,7 +552,7 @@ Status JsonReader::_handle_nested_complex_json(Tuple* tuple, const std::vector<S
if (*eof) {
return Status::OK();// read over,then return
}
break; //read a valid row
break; // read a valid row
}
_write_values_by_jsonpath(*_json_doc, tuple_pool, tuple, slot_descs);
return Status::OK();
Expand All @@ -558,7 +576,7 @@ Status JsonReader::_handle_flat_array_complex_json(Tuple* tuple, const std::vect
continue; // continue to read next
}
RETURN_IF_ERROR(st); // terminate if encounter other errors
if (*eof) {// read all data, then return
if (*eof) { // read all data, then return
return Status::OK();
}
_total_lines = _json_doc->Size();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/orc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
if (fill_dest_tuple(Slice(), tuple, tuple_pool)) {
if (fill_dest_tuple(tuple, tuple_pool)) {
break; // get one line, break from while
} // else skip this line and continue get_next to return
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/parquet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {

COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
if (fill_dest_tuple(Slice(), tuple, tuple_pool)) {
if (fill_dest_tuple(tuple, tuple_pool)) {
break;// break if true
}
}
Expand Down
18 changes: 15 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
LOG(WARNING) << "body exceed max size." << ctx->brief();

std::stringstream ss;
ss << "body exceed max size, max_body_bytes=" << max_body_bytes;
ss << "body exceed max size: " << max_body_bytes << ", limit: " << max_body_bytes;
return Status::InternalError(ss.str());
}
} else {
Expand All @@ -234,11 +234,20 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
} else {
ctx->format = parse_format(http_req->header(HTTP_FORMAT_KEY));
if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
LOG(WARNING) << "unknown data format." << ctx->brief();
std::stringstream ss;
ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY);
return Status::InternalError(ss.str());
}

if (ctx->format == TFileFormatType::FORMAT_JSON) {
size_t max_body_bytes = config::streaming_load_max_batch_size_mb * 1024 * 1024;
if (ctx->body_bytes > max_body_bytes) {
std::stringstream ss;
ss << "The size of this batch exceed the max size [" << max_body_bytes
<< "] of json type data " << " data [ " << ctx->body_bytes << " ]";
return Status::InternalError(ss.str());
}
}
}

if (!http_req->header(HTTP_TIMEOUT).empty()) {
Expand Down Expand Up @@ -312,7 +321,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.formatType = ctx->format;
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
auto pipe = std::make_shared<StreamLoadPipe>();
auto pipe = std::make_shared<StreamLoadPipe>(
1024 * 1024 /* max_buffered_bytes */,
64 * 1024 /* min_chunk_size */,
ctx->body_bytes /* total_length */);
RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
Expand Down
82 changes: 61 additions & 21 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ namespace doris {
class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024,
size_t min_chunk_size = 64 * 1024)
size_t min_chunk_size = 64 * 1024,
int64_t total_length = -1)
: _buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size),
_total_length(total_length),
_finished(false), _cancelled(false) {
}
virtual ~StreamLoadPipe() { }
Expand Down Expand Up @@ -84,31 +86,33 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
return _append(buf);
}

// If _total_length == -1, this should be a Kafka routine load task,
// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the specified amount of data.
Status read_one_message(uint8_t** data, size_t* length) override {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
}
// cancelled
if (_cancelled) {
return Status::InternalError("cancelled");
}
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
*data = nullptr;
if (_total_length < -1) {
std::stringstream ss;
ss << "invalid, _total_length is: " << _total_length;
return Status::InternalError(ss.str());
} else if (_total_length == 0) {
// no data
*length = 0;
return Status::OK();
}
auto buf = _buf_queue.front();
*length = buf->remaining();
*data = new uint8_t[*length];
buf->get_bytes((char*)(*data) , *length);

_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
_put_cond.notify_one();
return Status::OK();
if (_total_length == -1) {
return _read_next_buffer(data, length);
}

// _total_length > 0, read the entire data
*data = new uint8_t[_total_length];
*length = _total_length;
bool eof = false;
Status st = read(*data, length, &eof);
if (eof) {
*length = 0;
}
return st;
}

Status read(uint8_t* data, size_t* data_size, bool* eof) override {
Expand Down Expand Up @@ -196,6 +200,34 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
}

private:
// read the next buffer from _buf_queue
Status _read_next_buffer(uint8_t** data, size_t* length) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
}
// cancelled
if (_cancelled) {
return Status::InternalError("cancelled");
}
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
*data = nullptr;
*length = 0;
return Status::OK();
}
auto buf = _buf_queue.front();
*length = buf->remaining();
*data = new uint8_t[*length];
buf->get_bytes((char*)(*data) , *length);

_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
_put_cond.notify_one();
return Status::OK();
}

Status _append(const ByteBufferPtr& buf) {
{
std::unique_lock<std::mutex> l(_lock);
Expand All @@ -221,6 +253,14 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
size_t _buffered_bytes;
size_t _max_buffered_bytes;
size_t _min_chunk_size;
// The total amount of data expected to be read.
// In some scenarios, such as loading json format data through stream load,
// the data needs to be completely read before it can be parsed,
// so the total size of the data needs to be known.
// The default is -1, which means that the data arrives in a stream
// and the length is unknown.
// size_t is unsigned, so use int64_t
int64_t _total_length = -1;
std::deque<ByteBufferPtr> _buf_queue;
std::condition_variable _put_cond;
std::condition_variable _get_cond;
Expand Down
16 changes: 16 additions & 0 deletions docs/en/administrator-guide/config/be_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,22 @@ Indicates how many tablets in this data directory failed to load. At the same ti

### `streaming_load_max_mb`

* Type: int64
* Description: Used to limit the maximum amount of data allowed in one Stream load. The unit is MB.
* Default value: 10240
* Dynamically modify: yes

Stream Load is generally suitable for loading data less than a few GB, not suitable for loading` too large data.

### `streaming_load_max_batch_size_mb`

* Type: int64
* Description: For some data formats, such as JSON, it is used to limit the maximum amount of data allowed in one Stream load. The unit is MB.
* Default value: 100
* Dynamically modify: yes

Some data formats, such as JSON, cannot be split. Doris must read all the data into the memory before parsing can begin. Therefore, this value is used to limit the maximum amount of data that can be loaded in a single Stream load.

### `streaming_load_rpc_max_alive_time_sec`

### `sync_tablet_meta`
Expand Down
Loading