Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include "stream_load_pipe.h"

#include <gen_cpp/internal_service.pb.h>

#include "olap/iterators.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
Expand Down Expand Up @@ -112,6 +110,16 @@ Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t pr
return _append(buf, proto_byte_size);
}

Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) {
PDataRow* row_ptr = row.get();
{
std::unique_lock<std::mutex> l(_lock);
_data_row_ptrs.emplace_back(std::move(row));
}
return append_and_flush(reinterpret_cast<char*>(&row_ptr), sizeof(row_ptr),
sizeof(PDataRow*) + row_ptr->ByteSizeLong());
}

Status StreamLoadPipe::append(const char* data, size_t size) {
size_t pos = 0;
if (_write_buf != nullptr) {
Expand Down Expand Up @@ -168,8 +176,11 @@ Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_
_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
if (_use_proto) {
PDataRow** ptr = reinterpret_cast<PDataRow**>(data->get());
_proto_buffered_bytes -= (sizeof(PDataRow*) + (*ptr)->GetCachedSize());
auto row_ptr = std::move(_data_row_ptrs.front());
_proto_buffered_bytes -= (sizeof(PDataRow*) + row_ptr->GetCachedSize());
_data_row_ptrs.pop_front();
// PlainBinaryLineReader will hold the PDataRow
row_ptr.release();
}
_put_cond.notify_one();
return Status::OK();
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/fs/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <gen_cpp/internal_service.pb.h>

#include <condition_variable>
#include <deque>

Expand All @@ -39,6 +41,8 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0);

Status append(std::unique_ptr<PDataRow>&& row);

Status append(const char* data, size_t size) override;

Status append(const ByteBufferPtr& buf) override;
Expand Down Expand Up @@ -90,6 +94,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
int64_t _total_length = -1;
bool _use_proto = false;
std::deque<ByteBufferPtr> _buf_queue;
std::deque<std::unique_ptr<PDataRow>> _data_row_ptrs;
std::condition_variable _put_cond;
std::condition_variable _get_cond;

Expand Down
10 changes: 7 additions & 3 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,10 +728,14 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller
} else {
auto pipe = stream_load_ctx->pipe;
for (int i = 0; i < request->data_size(); ++i) {
PDataRow* row = new PDataRow();
std::unique_ptr<PDataRow> row(new PDataRow());
row->CopyFrom(request->data(i));
pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
sizeof(row) + row->ByteSizeLong());
Status s = pipe->append(std::move(row));
if (!s.ok()) {
response->mutable_status()->set_status_code(1);
response->mutable_status()->add_error_msgs(s.to_string());
return;
}
}
response->mutable_status()->set_status_code(0);
}
Expand Down
15 changes: 5 additions & 10 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ Status CsvReader::init_reader(bool is_load) {

break;
case TFileFormatType::FORMAT_PROTO:
_line_reader.reset(new NewPlainBinaryLineReader(_file_reader, _params.file_type));
_line_reader.reset(new NewPlainBinaryLineReader(_file_reader));
break;
default:
return Status::InternalError(
Expand Down Expand Up @@ -441,16 +441,11 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
}

void CsvReader::_split_line_for_proto_format(const Slice& line) {
PDataRow** ptr = reinterpret_cast<PDataRow**>(line.data);
PDataRow* row = *ptr;
for (const PDataColumn& col : (row)->col()) {
int len = col.value().size();
uint8_t* buf = new uint8_t[len];
memcpy(buf, col.value().c_str(), len);
_split_values.emplace_back(buf, len);
PDataRow** row_ptr = reinterpret_cast<PDataRow**>(line.data);
PDataRow* row = *row_ptr;
for (const PDataColumn& col : row->col()) {
_split_values.emplace_back(col.value());
}
delete row;
delete[] ptr;
}

void CsvReader::_split_line_for_single_char_delimiter(const Slice& line) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@

#include "io/fs/file_reader.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/iterators.h"

namespace doris {

NewPlainBinaryLineReader::NewPlainBinaryLineReader(io::FileReaderSPtr file_reader,
TFileType::type file_type)
: _file_reader(file_reader), _file_type(file_type) {}
NewPlainBinaryLineReader::NewPlainBinaryLineReader(io::FileReaderSPtr file_reader)
: _file_reader(file_reader) {}

NewPlainBinaryLineReader::~NewPlainBinaryLineReader() {
close();
Expand All @@ -36,35 +34,15 @@ NewPlainBinaryLineReader::~NewPlainBinaryLineReader() {
void NewPlainBinaryLineReader::close() {}

Status NewPlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) {
std::unique_ptr<uint8_t[]> file_buf;
size_t read_size = 0;
switch (_file_type) {
case TFileType::FILE_LOCAL:
[[fallthrough]];
case TFileType::FILE_HDFS:
[[fallthrough]];
case TFileType::FILE_S3: {
size_t file_size = _file_reader->size();
file_buf.reset(new uint8_t[file_size]);
Slice result(file_buf.get(), file_size);
IOContext io_ctx;
RETURN_IF_ERROR(_file_reader->read_at(0, result, io_ctx, &read_size));
break;
}
case TFileType::FILE_STREAM: {
RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()))
->read_one_message(&file_buf, &read_size));

break;
}
default: {
return Status::NotSupported("no supported file reader type: {}", _file_type);
}
}
*ptr = file_buf.release();
RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()))
->read_one_message(&_file_buf, &read_size));
*ptr = _file_buf.get();
*size = read_size;
if (read_size == 0) {
*eof = true;
} else {
_cur_row.reset(*reinterpret_cast<PDataRow**>(_file_buf.get()));
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
#pragma once

#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>

#include "exec/line_reader.h"
#include "io/fs/file_reader.h"

namespace doris {

// only used for FORMAT_PROTO type, which used for insert
// transaction(begin/insert into/commit)
class NewPlainBinaryLineReader : public LineReader {
public:
NewPlainBinaryLineReader(io::FileReaderSPtr file_reader, TFileType::type file_type);
NewPlainBinaryLineReader(io::FileReaderSPtr file_reader);

~NewPlainBinaryLineReader() override;

Expand All @@ -37,7 +40,8 @@ class NewPlainBinaryLineReader : public LineReader {
private:
io::FileReaderSPtr _file_reader;

TFileType::type _file_type;
std::unique_ptr<uint8_t[]> _file_buf;
std::unique_ptr<PDataRow> _cur_row;
};

} // namespace doris