diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index cc5132478cedce..9d33556bd26802 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -17,8 +17,6 @@ #include "stream_load_pipe.h" -#include - #include "olap/iterators.h" #include "runtime/thread_context.h" #include "util/bit_util.h" @@ -112,6 +110,16 @@ Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t pr return _append(buf, proto_byte_size); } +Status StreamLoadPipe::append(std::unique_ptr&& row) { + PDataRow* row_ptr = row.get(); + { + std::unique_lock l(_lock); + _data_row_ptrs.emplace_back(std::move(row)); + } + return append_and_flush(reinterpret_cast(&row_ptr), sizeof(row_ptr), + sizeof(PDataRow*) + row_ptr->ByteSizeLong()); +} + Status StreamLoadPipe::append(const char* data, size_t size) { size_t pos = 0; if (_write_buf != nullptr) { @@ -168,8 +176,11 @@ Status StreamLoadPipe::_read_next_buffer(std::unique_ptr* data, size_ _buf_queue.pop_front(); _buffered_bytes -= buf->limit; if (_use_proto) { - PDataRow** ptr = reinterpret_cast(data->get()); - _proto_buffered_bytes -= (sizeof(PDataRow*) + (*ptr)->GetCachedSize()); + auto row_ptr = std::move(_data_row_ptrs.front()); + _proto_buffered_bytes -= (sizeof(PDataRow*) + row_ptr->GetCachedSize()); + _data_row_ptrs.pop_front(); + // PlainBinaryLineReader will hold the PDataRow + row_ptr.release(); } _put_cond.notify_one(); return Status::OK(); diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index 9aa42a027a3491..75401b8c20bef6 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include @@ -39,6 +41,8 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0); + Status append(std::unique_ptr&& row); + Status append(const char* data, size_t size) override; Status append(const ByteBufferPtr& buf) override; @@ -90,6 +94,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { int64_t _total_length = -1; bool _use_proto = false; std::deque _buf_queue; + std::deque> _data_row_ptrs; std::condition_variable _put_cond; std::condition_variable _get_cond; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index b1a48d25ff3b6e..407ad5dad0806a 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -728,10 +728,14 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller } else { auto pipe = stream_load_ctx->pipe; for (int i = 0; i < request->data_size(); ++i) { - PDataRow* row = new PDataRow(); + std::unique_ptr row(new PDataRow()); row->CopyFrom(request->data(i)); - pipe->append_and_flush(reinterpret_cast(&row), sizeof(row), - sizeof(row) + row->ByteSizeLong()); + Status s = pipe->append(std::move(row)); + if (!s.ok()) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs(s.to_string()); + return; + } } response->mutable_status()->set_status_code(0); } diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 2f8ab0d84591b5..620a167ceeb093 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -179,7 +179,7 @@ Status CsvReader::init_reader(bool is_load) { break; case TFileFormatType::FORMAT_PROTO: - _line_reader.reset(new NewPlainBinaryLineReader(_file_reader, _params.file_type)); + _line_reader.reset(new NewPlainBinaryLineReader(_file_reader)); break; default: return Status::InternalError( @@ -441,16 +441,11 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { } void CsvReader::_split_line_for_proto_format(const Slice& line) { - PDataRow** ptr = reinterpret_cast(line.data); - PDataRow* row = *ptr; - for (const PDataColumn& col : (row)->col()) { - int len = col.value().size(); - uint8_t* buf = new uint8_t[len]; - memcpy(buf, col.value().c_str(), len); - _split_values.emplace_back(buf, len); + PDataRow** row_ptr = reinterpret_cast(line.data); + PDataRow* row = *row_ptr; + for (const PDataColumn& col : row->col()) { + _split_values.emplace_back(col.value()); } - delete row; - delete[] ptr; } void CsvReader::_split_line_for_single_char_delimiter(const Slice& line) { diff --git a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp index 493fad180adb11..35948eb7cafe10 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp @@ -21,13 +21,11 @@ #include "io/fs/file_reader.h" #include "io/fs/stream_load_pipe.h" -#include "olap/iterators.h" namespace doris { -NewPlainBinaryLineReader::NewPlainBinaryLineReader(io::FileReaderSPtr file_reader, - TFileType::type file_type) - : _file_reader(file_reader), _file_type(file_type) {} +NewPlainBinaryLineReader::NewPlainBinaryLineReader(io::FileReaderSPtr file_reader) + : _file_reader(file_reader) {} NewPlainBinaryLineReader::~NewPlainBinaryLineReader() { close(); @@ -36,35 +34,15 @@ NewPlainBinaryLineReader::~NewPlainBinaryLineReader() { void NewPlainBinaryLineReader::close() {} Status NewPlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) { - std::unique_ptr file_buf; size_t read_size = 0; - switch (_file_type) { - case TFileType::FILE_LOCAL: - [[fallthrough]]; - case TFileType::FILE_HDFS: - [[fallthrough]]; - case TFileType::FILE_S3: { - size_t file_size = _file_reader->size(); - file_buf.reset(new uint8_t[file_size]); - Slice result(file_buf.get(), file_size); - IOContext io_ctx; - RETURN_IF_ERROR(_file_reader->read_at(0, result, io_ctx, &read_size)); - break; - } - case TFileType::FILE_STREAM: { - RETURN_IF_ERROR((dynamic_cast(_file_reader.get())) - ->read_one_message(&file_buf, &read_size)); - - break; - } - default: { - return Status::NotSupported("no supported file reader type: {}", _file_type); - } - } - *ptr = file_buf.release(); + RETURN_IF_ERROR((dynamic_cast(_file_reader.get())) + ->read_one_message(&_file_buf, &read_size)); + *ptr = _file_buf.get(); *size = read_size; if (read_size == 0) { *eof = true; + } else { + _cur_row.reset(*reinterpret_cast(_file_buf.get())); } return Status::OK(); } diff --git a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h index b8b3f398dbe49d..784682cb569414 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h +++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h @@ -18,15 +18,18 @@ #pragma once #include +#include #include "exec/line_reader.h" #include "io/fs/file_reader.h" namespace doris { +// only used for FORMAT_PROTO type, which used for insert +// transaction(begin/insert into/commit) class NewPlainBinaryLineReader : public LineReader { public: - NewPlainBinaryLineReader(io::FileReaderSPtr file_reader, TFileType::type file_type); + NewPlainBinaryLineReader(io::FileReaderSPtr file_reader); ~NewPlainBinaryLineReader() override; @@ -37,7 +40,8 @@ class NewPlainBinaryLineReader : public LineReader { private: io::FileReaderSPtr _file_reader; - TFileType::type _file_type; + std::unique_ptr _file_buf; + std::unique_ptr _cur_row; }; } // namespace doris