From 177f71da82ae55769533f8a8c04fa3c9fc79f0b1 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Tue, 7 Mar 2023 08:26:45 +0000 Subject: [PATCH] [fix](insert) fix memory leak for insert transaction --- be/src/exec/plain_binary_line_reader.cpp | 7 ++++--- be/src/exec/plain_binary_line_reader.h | 4 ++++ be/src/runtime/stream_load/stream_load_pipe.h | 18 ++++++++++++++++-- be/src/service/internal_service.cpp | 10 +++++++--- be/src/vec/exec/format/csv/csv_reader.cpp | 13 ++++--------- 5 files changed, 35 insertions(+), 17 deletions(-) diff --git a/be/src/exec/plain_binary_line_reader.cpp b/be/src/exec/plain_binary_line_reader.cpp index f63671c622c4bc..138ab79f99d6c3 100644 --- a/be/src/exec/plain_binary_line_reader.cpp +++ b/be/src/exec/plain_binary_line_reader.cpp @@ -31,13 +31,14 @@ PlainBinaryLineReader::~PlainBinaryLineReader() { void PlainBinaryLineReader::close() {} Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) { - std::unique_ptr file_buf; int64_t read_size = 0; - RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size)); - *ptr = file_buf.release(); + RETURN_IF_ERROR(_file_reader->read_one_message(&_file_buf, &read_size)); + *ptr = _file_buf.get(); *size = read_size; if (read_size == 0) { *eof = true; + } else { + _cur_row.reset(*reinterpret_cast(_file_buf.get())); } return Status::OK(); } diff --git a/be/src/exec/plain_binary_line_reader.h b/be/src/exec/plain_binary_line_reader.h index 9e1143b60c48c1..b3dd9215c3b8eb 100644 --- a/be/src/exec/plain_binary_line_reader.h +++ b/be/src/exec/plain_binary_line_reader.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "exec/line_reader.h" namespace doris { @@ -35,6 +37,8 @@ class PlainBinaryLineReader : public LineReader { private: FileReader* _file_reader; + std::unique_ptr _file_buf; + std::unique_ptr _cur_row; }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index fac16b81ff7ab5..abb0945f360eb5 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -60,6 +60,16 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { return _append(buf, proto_byte_size); } + Status append(std::unique_ptr&& row) { + PDataRow* row_ptr = row.get(); + { + std::unique_lock l(_lock); + _data_row_ptrs.emplace_back(std::move(row)); + } + return append_and_flush(reinterpret_cast(&row_ptr), sizeof(row_ptr), + sizeof(PDataRow*) + row_ptr->ByteSizeLong()); + } + Status append(const char* data, size_t size) override { size_t pos = 0; if (_write_buf != nullptr) { @@ -219,8 +229,11 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { _buf_queue.pop_front(); _buffered_bytes -= buf->limit; if (_use_proto) { - PDataRow** ptr = reinterpret_cast(data->get()); - _proto_buffered_bytes -= (sizeof(PDataRow*) + (*ptr)->GetCachedSize()); + auto row_ptr = std::move(_data_row_ptrs.front()); + _proto_buffered_bytes -= (sizeof(PDataRow*) + row_ptr->GetCachedSize()); + _data_row_ptrs.pop_front(); + // PlainBinaryLineReader will hold the PDataRow + row_ptr.release(); } _put_cond.notify_one(); return Status::OK(); @@ -271,6 +284,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { int64_t _total_length = -1; bool _use_proto = false; std::deque _buf_queue; + std::deque> _data_row_ptrs; std::condition_variable _put_cond; std::condition_variable _get_cond; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 24b2a098e418e9..1f658da09cf304 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -619,10 +619,14 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller response->mutable_status()->add_error_msgs("pipe is null"); } else { for (int i = 0; i < request->data_size(); ++i) { - PDataRow* row = new PDataRow(); + std::unique_ptr row(new PDataRow()); row->CopyFrom(request->data(i)); - pipe->append_and_flush(reinterpret_cast(&row), sizeof(row), - sizeof(row) + row->ByteSizeLong()); + Status s = pipe->append(std::move(row)); + if (!s.ok()) { + response->mutable_status()->set_status_code(1); + response->mutable_status()->add_error_msgs(s.to_string()); + return; + } } response->mutable_status()->set_status_code(0); } diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index ce537d93cf4383..29be96f4d1661b 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -402,16 +402,11 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { void CsvReader::_split_line(const Slice& line) { _split_values.clear(); if (_file_format_type == TFileFormatType::FORMAT_PROTO) { - PDataRow** ptr = reinterpret_cast(line.data); - PDataRow* row = *ptr; - for (const PDataColumn& col : (row)->col()) { - int len = col.value().size(); - uint8_t* buf = new uint8_t[len]; - memcpy(buf, col.value().c_str(), len); - _split_values.emplace_back(buf, len); + PDataRow** row_ptr = reinterpret_cast(line.data); + PDataRow* row = *row_ptr; + for (const PDataColumn& col : row->col()) { + _split_values.emplace_back(col.value()); } - delete row; - delete[] ptr; } else { const char* value = line.data; size_t start = 0; // point to the start pos of next col value.