diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0be957f2379f2b..ff9233381cffc1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1045,10 +1045,6 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000"); DEFINE_mInt64(row_column_page_size, "4096"); // it must be larger than or equal to 5MB DEFINE_mInt32(s3_write_buffer_size, "5242880"); -// the size of the whole s3 buffer pool, which indicates the s3 file writer -// can at most buffer 50MB data. And the num of multi part upload task is -// s3_write_buffer_whole_size / s3_write_buffer_size -DEFINE_mInt32(s3_write_buffer_whole_size, "524288000"); // The timeout config for S3 buffer allocation DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300"); DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000"); diff --git a/be/src/common/config.h b/be/src/common/config.h index eaebffbd1ebf8c..ff703680336797 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1082,10 +1082,6 @@ DECLARE_mInt32(tablet_path_check_batch_size); DECLARE_mInt64(row_column_page_size); // it must be larger than or equal to 5MB DECLARE_mInt32(s3_write_buffer_size); -// the size of the whole s3 buffer pool, which indicates the s3 file writer -// can at most buffer 50MB data. And the num of multi part upload task is -// s3_write_buffer_whole_size / s3_write_buffer_size -DECLARE_mInt32(s3_write_buffer_whole_size); // The timeout config for S3 buffer allocation DECLARE_mInt32(s3_writer_buffer_allocation_timeout); // the max number of cached file handle for block segemnt diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp index 0ca9edc530a8f5..9b7ec178d77760 100644 --- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp +++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp @@ -119,8 +119,6 @@ int main(int argc, char** argv) { .set_min_threads(num_cores) .set_max_threads(num_cores) .build(&s3_file_upload_thread_pool)); - doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance(); - s3_buffer_pool->init(524288000, 5242880, s3_file_upload_thread_pool.get()); try { doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads), diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index 576a7dd8959ee9..44582a0ff8dd29 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() { Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer, const FileWriterOptions* opts) { *writer = std::make_unique(ExecEnv::GetInstance(), _broker_addr, _broker_prop, - path, 0 /* offset */, getSPtr()); + path, 0 /* offset */, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/broker_file_writer.cpp b/be/src/io/fs/broker_file_writer.cpp index d5b2baa7a66e0c..75acf40084c9fd 100644 --- a/be/src/io/fs/broker_file_writer.cpp +++ b/be/src/io/fs/broker_file_writer.cpp @@ -37,12 +37,15 @@ namespace io { BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map& properties, - const std::string& path, int64_t start_offset, FileSystemSPtr fs) + const std::string& path, int64_t start_offset, FileSystemSPtr fs, + const FileWriterOptions* opts) : FileWriter(path, fs), _env(env), _address(broker_address), _properties(properties), - _cur_offset(start_offset) {} + _cur_offset(start_offset) { + _create_empty_file = opts ? opts->create_empty_file : true; +} BrokerFileWriter::~BrokerFileWriter() { if (_opened) { @@ -153,6 +156,14 @@ Status BrokerFileWriter::finalize() { return Status::OK(); } +Status BrokerFileWriter::open() { + if (_create_empty_file && !_opened) { + RETURN_IF_ERROR(_open()); + _opened = true; + } + return Status::OK(); +} + Status BrokerFileWriter::_open() { TBrokerOpenWriterRequest request; diff --git a/be/src/io/fs/broker_file_writer.h b/be/src/io/fs/broker_file_writer.h index cf5b8013acb146..05b62846e6c773 100644 --- a/be/src/io/fs/broker_file_writer.h +++ b/be/src/io/fs/broker_file_writer.h @@ -42,9 +42,10 @@ class BrokerFileWriter : public FileWriter { public: BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map& properties, const std::string& path, - int64_t start_offset, FileSystemSPtr fs); + int64_t start_offset, FileSystemSPtr fs, const FileWriterOptions* opts); virtual ~BrokerFileWriter(); + Status open() override; Status close() override; Status appendv(const Slice* data, size_t data_cnt) override; Status finalize() override; diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 58c9c9ff060fa9..bb3235e7d27e4d 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -35,6 +35,8 @@ struct FileWriterOptions { bool is_cold_data = false; bool sync_file_data = true; // Whether flush data into storage system int64_t file_cache_expiration = 0; // Absolute time + // Whether to create empty file if no content + bool create_empty_file = true; }; class FileWriter { @@ -46,6 +48,9 @@ class FileWriter { DISALLOW_COPY_AND_ASSIGN(FileWriter); + // Open the file for writing. + virtual Status open() { return Status::OK(); } + // Normal close. Wait for all data to persist before returning. virtual Status close() = 0; @@ -74,6 +79,7 @@ class FileWriter { std::shared_ptr _fs; bool _closed = false; bool _opened = false; + bool _create_empty_file = true; }; using FileWriterPtr = std::unique_ptr; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 8ada4b92acca57..a65784226f305c 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -167,8 +167,8 @@ Status HdfsFileSystem::connect_impl() { } Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, - const FileWriterOptions*) { - *writer = std::make_unique(file, getSPtr()); + const FileWriterOptions* opts) { + *writer = std::make_unique(file, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 00081db310fa0f..40c3c59dcd7ad7 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -34,7 +34,9 @@ namespace doris { namespace io { -HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : FileWriter(std::move(file), fs) { +HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts) + : FileWriter(std::move(file), fs) { + _create_empty_file = opts ? opts->create_empty_file : true; _hdfs_fs = (HdfsFileSystem*)_fs.get(); } @@ -103,6 +105,14 @@ Status HdfsFileWriter::finalize() { return Status::OK(); } +Status HdfsFileWriter::open() { + if (_create_empty_file && !_opened) { + RETURN_IF_ERROR(_open()); + _opened = true; + } + return Status::OK(); +} + Status HdfsFileWriter::_open() { _path = convert_path(_path, _hdfs_fs->_fs_name); std::string hdfs_dir = _path.parent_path().string(); diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index c05f7625124020..21dcaff1cd6ac1 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -33,9 +33,10 @@ class HdfsFileSystem; class HdfsFileWriter : public FileWriter { public: - HdfsFileWriter(Path file, FileSystemSPtr fs); + HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts); ~HdfsFileWriter(); + Status open() override; Status close() override; Status appendv(const Slice* data, size_t data_cnt) override; Status finalize() override; diff --git a/be/src/io/fs/s3_file_bufferpool.cpp b/be/src/io/fs/s3_file_bufferpool.cpp index a4f75319ec0939..4efbd439429ab9 100644 --- a/be/src/io/fs/s3_file_bufferpool.cpp +++ b/be/src/io/fs/s3_file_bufferpool.cpp @@ -17,50 +17,73 @@ #include "s3_file_bufferpool.h" +#include +#include + #include "common/config.h" +#include "common/exception.h" #include "common/logging.h" +#include "common/status.h" +#include "common/sync_point.h" +#include "io/cache/block/block_file_cache_fwd.h" #include "io/cache/block/block_file_segment.h" #include "io/fs/s3_common.h" #include "runtime/exec_env.h" #include "util/defer_op.h" #include "util/slice.h" +#include "vec/common/arena.h" namespace doris { namespace io { bvar::Adder s3_file_buffer_allocated("s3_file_buffer_allocated"); -bvar::Adder s3_file_buffer_allocating("s3_file_buffer_allocating"); -/** - * 0. check if the inner memory buffer is empty or not - * 1. relcaim the memory buffer if it's mot empty - */ -void FileBuffer::on_finish() { - if (_buffer.empty()) { - return; +template > +struct Memory : boost::noncopyable, Allocator { + Memory() = default; + explicit Memory(size_t size) : _size(size) { + alloc(size); + s3_file_buffer_allocated << 1; } - S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(), _capacity}); - _buffer.clear(); -} + ~Memory() { + dealloc(); + s3_file_buffer_allocated << -1; + } + void alloc(size_t size) { _data = static_cast(Allocator::alloc(size, 0)); } + void dealloc() { + if (_data == nullptr) { + return; + } + Allocator::free(_data, _size); + _data = nullptr; + } + size_t _size; + char* _data; +}; -/** - * take other buffer's memory space and refresh capacity - */ -void FileBuffer::swap_buffer(Slice& other) { - _buffer = other; - _capacity = _buffer.get_size(); - other.clear(); +struct FileBuffer::PartData { + Memory<> _memory; + PartData() : _memory(config::s3_write_buffer_size) {} + ~PartData() = default; + [[nodiscard]] Slice data() const { return Slice {_memory._data, _memory._size}; } + [[nodiscard]] size_t size() const { return _memory._size; } +}; + +Slice FileBuffer::get_slice() const { + return _inner_data->data(); } -FileBuffer::FileBuffer(std::function alloc_holder, size_t offset, - OperationState state, bool reserve) - : _alloc_holder(std::move(alloc_holder)), - _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)), +FileBuffer::FileBuffer(BufferType type, std::function alloc_holder, + size_t offset, OperationState state) + : _type(type), + _alloc_holder(std::move(alloc_holder)), _offset(offset), _size(0), _state(std::move(state)), - _capacity(_buffer.get_size()) {} + _inner_data(std::make_unique()), + _capacity(_inner_data->size()) {} +FileBuffer::~FileBuffer() = default; /** * 0. check if file cache holder allocated * 1. update the cache's type to index cache @@ -86,136 +109,69 @@ void UploadFileBuffer::set_index_offset(size_t offset) { * 1. write to file cache otherwise, then we'll wait for free buffer and to rob it */ Status UploadFileBuffer::append_data(const Slice& data) { - Defer defer {[&] { _size += data.get_size(); }}; - while (true) { - // if buf is not empty, it means there is memory preserved for this buf - if (!_buffer.empty()) { - std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(), data.get_size()); - break; - } - // if the buf has no memory reserved, then write to disk first - if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder != nullptr) { - _holder = _alloc_holder(); - bool cache_is_not_enough = false; - for (auto& segment : _holder->file_segments) { - DCHECK(segment->state() == FileBlock::State::SKIP_CACHE || - segment->state() == FileBlock::State::EMPTY); - if (segment->state() == FileBlock::State::SKIP_CACHE) [[unlikely]] { - cache_is_not_enough = true; - break; - } - if (_index_offset != 0) { - RETURN_IF_ERROR(segment->change_cache_type_self(CacheType::INDEX)); - } - } - // if cache_is_not_enough, cannot use it ! - _cur_file_segment = _holder->file_segments.begin(); - _append_offset = (*_cur_file_segment)->range().left; - _holder = cache_is_not_enough ? nullptr : std::move(_holder); - if (_holder) { - (*_cur_file_segment)->get_or_set_downloader(); - } - _is_cache_allocated = true; - } - if (_holder) [[likely]] { - size_t data_remain_size = data.get_size(); - size_t pos = 0; - while (data_remain_size != 0) { - auto range = (*_cur_file_segment)->range(); - size_t segment_remain_size = range.right - _append_offset + 1; - size_t append_size = std::min(data_remain_size, segment_remain_size); - Slice append_data(data.get_data() + pos, append_size); - // When there is no available free memory buffer, the data will be written to the cache first - // and then uploaded to S3 when there is an available free memory buffer. - // However, if an error occurs during the write process to the local cache, - // continuing to upload the dirty data from the cache to S3 will result in erroneous data(Bad segment). - // Considering that local disk write failures are rare, a simple approach is chosen here, - // which is to treat the import as a failure directly when a local write failure occurs - RETURN_IF_ERROR((*_cur_file_segment)->append(append_data)); - if (segment_remain_size == append_size) { - RETURN_IF_ERROR((*_cur_file_segment)->finalize_write()); - if (++_cur_file_segment != _holder->file_segments.end()) { - (*_cur_file_segment)->get_or_set_downloader(); - } - } - data_remain_size -= append_size; - _append_offset += append_size; - pos += append_size; - } - break; - } else { - // wait allocate buffer pool - auto tmp = S3FileBufferPool::GetInstance()->allocate(true); - if (tmp.empty()) [[unlikely]] { - return Status::InternalError("Failed to allocate S3 buffer for {} seconds", - config::s3_writer_buffer_allocation_timeout); - } - swap_buffer(tmp); - } - } + TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::append_data", Status::OK()); + std::memcpy((void*)(_inner_data->data().get_data() + _size), data.get_data(), data.get_size()); + _size += data.get_size(); + _crc_value = crc32c::Extend(_crc_value, data.get_data(), data.get_size()); return Status::OK(); } /** - * 0. allocate one memory buffer - * 1. read the content from the cache and then write - * it into memory buffer + * 0. constrcut the stream ptr if the buffer is not empty + * 1. submit the on_upload() callback to executor */ -void UploadFileBuffer::read_from_cache() { - auto tmp = S3FileBufferPool::GetInstance()->allocate(true); - if (tmp.empty()) [[unlikely]] { - set_val(Status::InternalError("Failed to allocate S3 buffer for {} seconds", - config::s3_writer_buffer_allocation_timeout)); - return; - } - swap_buffer(tmp); +static Status submit_upload_buffer(std::shared_ptr buffer) { + TEST_SYNC_POINT_RETURN_WITH_VALUE("UploadFileBuffer::submit", Status::OK(), buffer.get()); + return ExecEnv::GetInstance()->s3_file_upload_thread_pool()->submit_func( + [buf = std::move(buffer)]() { buf->execute_async(); }); +} - DCHECK(_holder != nullptr); - DCHECK(_capacity >= _size); - size_t pos = 0; - for (auto& segment : _holder->file_segments) { - if (pos == _size) { - break; - } - if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] { - set_val(std::move(s)); - return; - } - size_t segment_size = segment->range().size(); - Slice s(_buffer.get_data() + pos, segment_size); - if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] { - set_val(std::move(st)); - return; - } - pos += segment_size; +std::ostream& operator<<(std::ostream& os, const BufferType& value) { + switch (value) { + case BufferType::UPLOAD: + os << "upload"; + break; + case BufferType::DOWNLOAD: + os << "download"; + break; + default: + auto cast_value = static_cast(value); + os << cast_value; } + return os; +} - // the real lenght should be the buf.get_size() in this situation(consider it's the last part, - // size of it could be less than 5MB) - _stream_ptr = std::make_shared(_buffer.get_data(), _size); +Status FileBuffer::submit(std::shared_ptr buf) { + switch (buf->_type) { + case BufferType::UPLOAD: + return submit_upload_buffer(std::move(buf)); + break; + default: + CHECK(false) << "should never come here, the illegal type is " << buf->_type; + }; + return Status::InternalError("should never come here"); } -/** - * 0. constrcut the stream ptr if the buffer is not empty - * 1. submit the on_upload() callback to executor - */ -void UploadFileBuffer::submit() { - if (!_buffer.empty()) [[likely]] { - _stream_ptr = std::make_shared(_buffer.get_data(), _size); +void UploadFileBuffer::on_upload() { + _stream_ptr = std::make_shared(_inner_data->data().get_data(), _size); + if (_crc_value != crc32c::Value(_inner_data->data().get_data(), _size)) { + DCHECK(false); + set_status(Status::IOError("Buffer checksum not match")); + return; } - // If the data is written into file cache - if (_holder && _cur_file_segment != _holder->file_segments.end()) { - if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok()) [[unlikely]] { - set_val(std::move(s)); - return; - } + _upload_to_remote(*this); + if (config::enable_flush_file_cache_async) { + // If we call is_cancelled() after _state.set_status() then there might one situation where + // s3 file writer is already destructed + bool cancelled = is_cancelled(); + _state.set_status(); + // this control flow means the buf and the stream shares one memory + // so we can directly use buf here + upload_to_local_file_cache(cancelled); + } else { + upload_to_local_file_cache(is_cancelled()); + _state.set_status(); } - static_cast(S3FileBufferPool::GetInstance()->thread_pool()->submit_func( - [buf = this->shared_from_this(), this]() { - // to extend buf's lifetime - // (void)buf; - on_upload(); - })); } /** @@ -231,6 +187,7 @@ void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) { if (is_cancelled) { return; } + TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache"); // the data is already written to S3 in this situation // so i didn't handle the file cache write error _holder = _alloc_holder(); @@ -244,20 +201,26 @@ void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) { size_t append_size = std::min(data_remain_size, segment_size); if (segment->state() == FileBlock::State::EMPTY) { if (_index_offset != 0 && segment->range().right >= _index_offset) { - // segment->change_cache_type_self(CacheType::INDEX); + static_cast(segment->change_cache_type_self(CacheType::INDEX)); } segment->get_or_set_downloader(); // Another thread may have started downloading due to a query // Just skip putting to cache from UploadFileBuffer if (segment->is_downloader()) { - Slice s(_buffer.get_data() + pos, append_size); - if (auto st = segment->append(s); !st.ok()) [[unlikely]] { - LOG_WARNING("append data to cache segmetn failed due to {}", st); - return; + Slice s(_inner_data->data().get_data() + pos, append_size); + Status st = segment->append(s); + TEST_INJECTION_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache_inject", + &st); + if (st.ok()) { + st = segment->finalize_write(); } - if (auto st = segment->finalize_write(); !st.ok()) [[unlikely]] { - LOG_WARNING("finalize write to cache segmetn failed due to {}", st); - return; + if (!st.ok()) { + { + [[maybe_unused]] bool ret = false; + TEST_SYNC_POINT_CALLBACK("UploadFileBuffer::upload_to_local_file_cache", + &ret); + } + LOG_WARNING("failed to append data to file cache").error(st); } } } @@ -287,82 +250,17 @@ FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder( return *this; } -std::shared_ptr FileBufferBuilder::build() { +Status FileBufferBuilder::build(std::shared_ptr* buf) { OperationState state(_sync_after_complete_task, _is_cancelled); + if (_type == BufferType::UPLOAD) { - return std::make_shared(std::move(_upload_cb), std::move(state), _offset, - std::move(_alloc_holder_cb), _index_offset); + RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared( + std::move(_upload_cb), std::move(state), _offset, + std::move(_alloc_holder_cb), _index_offset)); + return Status::OK(); } // should never come here - return nullptr; -} - -void S3FileBufferPool::reclaim(Slice buf) { - { - std::unique_lock lck {_lock}; - _free_raw_buffers.emplace_back(buf); - // only works when not set file cache - _cv.notify_all(); - } - s3_file_buffer_allocated << -1; -} - -void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size, - ThreadPool* thread_pool) { - // the nums could be one configuration - size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size; - DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) && - (s3_write_buffer_whole_size > s3_write_buffer_size)) - << "s3 write buffer size " << s3_write_buffer_size << " whole s3 write buffer size " - << s3_write_buffer_whole_size; - LOG_INFO("S3 file buffer pool with {} buffers, each with {}", buf_num, s3_write_buffer_size); - _whole_mem_buffer = std::make_unique(s3_write_buffer_whole_size); - for (size_t i = 0; i < buf_num; i++) { - Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size, - static_cast(s3_write_buffer_size)}; - _free_raw_buffers.emplace_back(s); - } - _thread_pool = thread_pool; -} - -Slice S3FileBufferPool::allocate(bool reserve) { - Slice buf; - Defer defer {[&]() { - if (!buf.empty()) { - s3_file_buffer_allocated << 1; - } - s3_file_buffer_allocating << -1; - }}; - s3_file_buffer_allocating << 1; - // if need reserve or no cache then we must ensure return buf with memory preserved - if (reserve || !config::enable_file_cache) { - { - std::unique_lock lck {_lock}; - _cv.wait_for(lck, std::chrono::seconds(config::s3_writer_buffer_allocation_timeout), - [this]() { return !_free_raw_buffers.empty(); }); - if (!_free_raw_buffers.empty()) { - buf = _free_raw_buffers.front(); - _free_raw_buffers.pop_front(); - } - } - return buf; - } - // try to get one memory reserved buffer - { - std::unique_lock lck {_lock}; - if (!_free_raw_buffers.empty()) { - buf = _free_raw_buffers.front(); - _free_raw_buffers.pop_front(); - } - } - if (!buf.empty()) { - return buf; - } - // if there is no free buffer and no need to reserve memory, we could return one empty buffer - buf = Slice(); - // if the buf has no memory reserved, it would try to write the data to file cache first - // or it would try to rob buffer from other S3FileBuffer - return buf; + return Status::InternalError("unsupport buffer type {}", _type); } } // namespace io } // namespace doris diff --git a/be/src/io/fs/s3_file_bufferpool.h b/be/src/io/fs/s3_file_bufferpool.h index 01d31928748035..c1bdf08f7ae9b0 100644 --- a/be/src/io/fs/s3_file_bufferpool.h +++ b/be/src/io/fs/s3_file_bufferpool.h @@ -27,13 +27,13 @@ #include "common/status.h" #include "io/cache/block/block_file_segment.h" -#include "runtime/exec_env.h" +#include "util/crc32c.h" #include "util/slice.h" #include "util/threadpool.h" namespace doris { namespace io { -enum class BufferType { DOWNLOAD, UPLOAD }; +enum class BufferType : uint32_t { DOWNLOAD, UPLOAD }; using FileBlocksHolderPtr = std::unique_ptr; struct OperationState { OperationState(std::function sync_after_complete_task, @@ -45,13 +45,13 @@ struct OperationState { * * @param S the execution result */ - void set_val(Status s = Status::OK()) { + void set_status(Status s = Status::OK()) { // make sure we wouldn't sync twice if (_value_set) [[unlikely]] { return; } if (nullptr != _sync_after_complete_task) { - _fail_after_sync = _sync_after_complete_task(s); + _fail_after_sync = _sync_after_complete_task(std::move(s)); } _value_set = true; } @@ -75,36 +75,27 @@ struct OperationState { bool _fail_after_sync = false; }; -struct FileBuffer : public std::enable_shared_from_this { - FileBuffer(std::function alloc_holder, size_t offset, - OperationState state, bool reserve = false); - virtual ~FileBuffer() { on_finish(); } +struct FileBuffer { + FileBuffer(BufferType type, std::function alloc_holder, size_t offset, + OperationState state); + virtual ~FileBuffer(); /** * submit the correspoding task to async executor */ - virtual void submit() = 0; + static Status submit(std::shared_ptr buf); /** * append data to the inner memory buffer * * @param S the content to be appended */ virtual Status append_data(const Slice& s) = 0; - /** - * call the reclaim callback when task is done - */ - void on_finish(); - /** - * swap memory buffer - * - * @param other which has memory buffer allocated - */ - void swap_buffer(Slice& other); + virtual void execute_async() = 0; /** * set the val of it's operation state * * @param S the execution result */ - void set_val(Status s) { _state.set_val(s); } + void set_status(Status s) { _state.set_status(s); } /** * get the start offset of this file buffer * @@ -117,6 +108,8 @@ struct FileBuffer : public std::enable_shared_from_this { * @return the size of the buffered data */ size_t get_size() const { return _size; } + size_t get_capacaticy() const { return _capacity; } + Slice get_slice() const; /** * detect whether the execution task is done * @@ -124,11 +117,13 @@ struct FileBuffer : public std::enable_shared_from_this { */ bool is_cancelled() const { return _state.is_cancelled(); } + BufferType _type; std::function _alloc_holder; - Slice _buffer; size_t _offset; size_t _size; OperationState _state; + struct PartData; + std::unique_ptr _inner_data; size_t _capacity; }; @@ -136,11 +131,10 @@ struct UploadFileBuffer final : public FileBuffer { UploadFileBuffer(std::function upload_cb, OperationState state, size_t offset, std::function alloc_holder, size_t index_offset) - : FileBuffer(alloc_holder, offset, state), + : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state), _upload_to_remote(std::move(upload_cb)), _index_offset(index_offset) {} ~UploadFileBuffer() override = default; - void submit() override; /** * set the index offset * @@ -158,6 +152,8 @@ struct UploadFileBuffer final : public FileBuffer { * local file cache */ void upload_to_local_file_cache(bool); + + void execute_async() override { on_upload(); } /** * do the upload work * 1. read from cache if the data is written to cache first @@ -166,25 +162,7 @@ struct UploadFileBuffer final : public FileBuffer { * 4. call the finish callback caller specified * 5. reclaim self */ - void on_upload() { - if (_buffer.empty()) { - read_from_cache(); - } - _upload_to_remote(*this); - if (config::enable_flush_file_cache_async) { - // If we call is_cancelled() after _state.set_val() then there might one situation where - // s3 file writer is already destructed - bool cancelled = is_cancelled(); - _state.set_val(); - // this control flow means the buf and the stream shares one memory - // so we can directly use buf here - upload_to_local_file_cache(cancelled); - } else { - upload_to_local_file_cache(is_cancelled()); - _state.set_val(); - } - on_finish(); - } + void on_upload(); /** * * @return the stream representing the inner memory buffer @@ -207,6 +185,7 @@ struct UploadFileBuffer final : public FileBuffer { decltype(_holder->file_segments.begin()) _cur_file_segment; size_t _append_offset {0}; size_t _index_offset {0}; + uint32_t _crc_value = 0; }; struct FileBufferBuilder { @@ -216,7 +195,7 @@ struct FileBufferBuilder { * build one file buffer using previously set properties * @return the file buffer's base shared pointer */ - std::shared_ptr build(); + Status build(std::shared_ptr* buf); /** * set the file buffer type * @@ -310,43 +289,5 @@ struct FileBufferBuilder { size_t _offset; size_t _index_offset; }; - -class S3FileBufferPool { -public: - S3FileBufferPool() = default; - ~S3FileBufferPool() = default; - - // should be called one and only once - // at startup - void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size, - doris::ThreadPool* thread_pool); - - /** - * - * @return singleton of the S3FileBufferPool - */ - static S3FileBufferPool* GetInstance() { - return ExecEnv::GetInstance()->get_s3_file_buffer_pool(); - } - - void reclaim(Slice buf); - - /** - * - * @param reserve must return buffer with memory allocated - * @return memory buffer - */ - Slice allocate(bool reserve = false); - - ThreadPool* thread_pool() { return _thread_pool; } - -private: - std::mutex _lock; - std::condition_variable _cv; - std::unique_ptr _whole_mem_buffer; - std::list _free_raw_buffers; - // not owned - ThreadPool* _thread_pool = nullptr; -}; } // namespace io } // namespace doris diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 6a54244ceb65db..2776696a9a067c 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -102,6 +102,8 @@ S3FileWriter::S3FileWriter(std::string key, std::shared_ptr fs, _cache_key = IFileCache::hash(_path.filename().native()); _cache = FileCacheFactory::instance()->get_by_path(_cache_key); } + + _create_empty_file = opts ? opts->create_empty_file : true; } S3FileWriter::~S3FileWriter() { @@ -165,7 +167,6 @@ Status S3FileWriter::_abort() { } // we need to reclaim the memory if (_pending_buf) { - _pending_buf->on_finish(); _pending_buf = nullptr; } LOG(INFO) << "S3FileWriter::abort, path: " << _path.native(); @@ -203,17 +204,46 @@ Status S3FileWriter::close() { return _st; } VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native(); - // it might be one file less than 5MB, we do upload here - if (_pending_buf != nullptr) { - if (_upload_id.empty()) { - auto buf = dynamic_cast(_pending_buf.get()); + + if (_upload_id.empty()) { + if (_pending_buf != nullptr) { + // it might be one file less than 5MB, we do upload here + auto* buf = dynamic_cast(_pending_buf.get()); DCHECK(buf != nullptr); buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); + } else if (_create_empty_file) { + // if there is no pending buffer, we need to create an empty file + auto builder = FileBufferBuilder(); + builder.set_type(BufferType::UPLOAD) + .set_upload_callback([this](UploadFileBuffer& buf) { _put_object(buf); }) + .set_sync_after_complete_task([this](Status s) { + bool ret = false; + if (!s.ok()) [[unlikely]] { + VLOG_NOTICE << "failed at key: " << _key + << ", status: " << s.to_string(); + std::unique_lock _lck {_completed_lock}; + _failed = true; + ret = true; + this->_st = std::move(s); + } + // After the signal, there is a scenario where the previous invocation of _wait_until_finish + // returns to the caller, and subsequently, the S3 file writer is destructed. + // This means that accessing _failed afterwards would result in a heap use after free vulnerability. + _countdown_event.signal(); + return ret; + }) + .set_is_cancelled([this]() { return _failed.load(); }); + RETURN_IF_ERROR(builder.build(&_pending_buf)); + auto* buf = dynamic_cast(_pending_buf.get()); + DCHECK(buf != nullptr); } + } + if (_pending_buf != nullptr) { _countdown_event.add_count(); - _pending_buf->submit(); + RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); _pending_buf = nullptr; } + DBUG_EXECUTE_IF("s3_file_writer::close", { RETURN_IF_ERROR(_complete()); return Status::InternalError("failed to close s3 file writer"); @@ -277,7 +307,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { return std::make_unique(std::move(holder)); }); } - _pending_buf = builder.build(); + RETURN_IF_ERROR(builder.build(&_pending_buf)); } // we need to make sure all parts except the last one to be 5MB or more // and shouldn't be larger than buf @@ -300,7 +330,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { } _cur_part_num++; _countdown_event.add_count(); - _pending_buf->submit(); + RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); _pending_buf = nullptr; } _bytes_appended += data_size_to_append; @@ -337,16 +367,20 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) { "injected error", _bucket, _path.native(), part_num, _upload_id); LOG_WARNING(s.to_string()); - buf.set_val(s); + buf.set_status(std::move(s)); return; } }); if (!upload_part_outcome.IsSuccess()) { - _st = s3fs_error(upload_part_outcome.GetError(), - fmt::format("failed to upload part {}, part_num={}, upload_id={}", - _path.native(), part_num, _upload_id)); - LOG(WARNING) << _st; - buf.set_val(_st); + auto s = Status::IOError( + "failed to upload part (bucket={}, key={}, part_num={}, up_load_id={}): {}, " + "exception {}, error code {}", + _bucket, _path.native(), part_num, _upload_id, + upload_part_outcome.GetError().GetMessage(), + upload_part_outcome.GetError().GetExceptionName(), + upload_part_outcome.GetError().GetResponseCode()); + LOG_WARNING(s.to_string()); + buf.set_status(std::move(s)); return; } s3_bytes_written_total << buf.get_size(); @@ -434,12 +468,12 @@ Status S3FileWriter::finalize() { // if we only need to upload one file less than 5MB, we can just // call PutObject to reduce the network IO if (_upload_id.empty()) { - auto buf = dynamic_cast(_pending_buf.get()); + auto* buf = dynamic_cast(_pending_buf.get()); DCHECK(buf != nullptr); buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); } _countdown_event.add_count(); - _pending_buf->submit(); + RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); _pending_buf = nullptr; } _wait_until_finish("finalize"); @@ -457,7 +491,7 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) { request.SetContentType("application/octet-stream"); DBUG_EXECUTE_IF("s3_file_writer::_put_object", { _st = Status::InternalError("failed to put object"); - buf.set_val(_st); + buf.set_status(_st); LOG(WARNING) << _st; return; }); @@ -467,7 +501,7 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) { _st = s3fs_error(response.GetError(), fmt::format("failed to put object {}, upload_id={}", _path.native(), _upload_id)); LOG(WARNING) << _st; - buf.set_val(_st); + buf.set_status(_st); return; } _bytes_written += buf.get_size(); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index ba6a8a11f97331..2aba5740773724 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -707,7 +707,7 @@ Status BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWrite _context.file_cache_ttl_sec > 0 && _context.newest_write_timestamp > 0 ? _context.newest_write_timestamp + _context.file_cache_ttl_sec : 0, - }; + .create_empty_file = false}; Status st = fs->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4d0e548ea22ed7..edc765ce879d3f 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -52,7 +52,6 @@ namespace taskgroup { class TaskGroupManager; } namespace io { -class S3FileBufferPool; class FileCacheFactory; } // namespace io namespace segment_v2 { @@ -252,7 +251,6 @@ class ExecEnv { TabletSchemaCache* get_tablet_schema_cache() { return _tablet_schema_cache; } StorageEngine* get_storage_engine() { return _storage_engine; } - io::S3FileBufferPool* get_s3_file_buffer_pool() { return _s3_buffer_pool; } SchemaCache* schema_cache() { return _schema_cache; } StoragePageCache* get_storage_page_cache() { return _storage_page_cache; } SegmentLoader* segment_loader() { return _segment_loader; } @@ -366,7 +364,6 @@ class ExecEnv { // these redundancy header could introduce potential bug, at least, more header means slow compile. // So we choose to use raw pointer, please remember to delete these pointer in deconstructor. TabletSchemaCache* _tablet_schema_cache = nullptr; - io::S3FileBufferPool* _s3_buffer_pool = nullptr; StorageEngine* _storage_engine = nullptr; SchemaCache* _schema_cache = nullptr; StoragePageCache* _storage_page_cache = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 384b5260aecfba..c0e3a88dd6aa31 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -253,11 +253,6 @@ Status ExecEnv::_init(const std::vector& store_paths, _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity); - // S3 buffer pool - _s3_buffer_pool = new io::S3FileBufferPool(); - _s3_buffer_pool->init(config::s3_write_buffer_whole_size, config::s3_write_buffer_size, - this->s3_file_upload_thread_pool()); - // Storage engine doris::EngineOptions options; options.store_paths = store_paths; @@ -561,7 +556,6 @@ void ExecEnv::destroy() { SAFE_SHUTDOWN(_lazy_release_obj_pool); SAFE_SHUTDOWN(_send_report_thread_pool); SAFE_SHUTDOWN(_send_batch_thread_pool); - SAFE_DELETE(_s3_buffer_pool); _deregister_metrics(); SAFE_DELETE(_load_channel_mgr); diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index f7015e17c96ddb..d7f5cb5d84c471 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -141,6 +141,7 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { FileFactory::convert_storage_type(_storage_type), _state->exec_env(), _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0, _file_writer_impl)); + RETURN_IF_ERROR(_file_writer_impl->open()); switch (_file_opts->file_format) { case TFileFormatType::FORMAT_CSV_PLAIN: _vfile_writer.reset(new VCSVTransformer( diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index b37d358f6579cf..5ff1cc5e48acc8 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -30,7 +30,6 @@ #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" -#include "io/fs/s3_file_bufferpool.h" #include "io/fs/s3_file_system.h" #include "io/io_common.h" #include "runtime/exec_env.h" @@ -69,10 +68,6 @@ class S3FileWriterTest : public testing::Test { .build(&_s3_file_upload_thread_pool)); ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_s3_file_upload_thread_pool); - ExecEnv::GetInstance()->_s3_buffer_pool = new io::S3FileBufferPool(); - io::S3FileBufferPool::GetInstance()->init( - config::s3_write_buffer_whole_size, config::s3_write_buffer_size, - ExecEnv::GetInstance()->_s3_file_upload_thread_pool.get()); } static void TearDownTestSuite() { @@ -81,8 +76,6 @@ class S3FileWriterTest : public testing::Test { } ExecEnv::GetInstance()->_s3_file_upload_thread_pool->shutdown(); ExecEnv::GetInstance()->_s3_file_upload_thread_pool = nullptr; - delete ExecEnv::GetInstance()->_s3_buffer_pool; - ExecEnv::GetInstance()->_s3_buffer_pool = nullptr; } void SetUp() override { diff --git a/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out new file mode 100644 index 00000000000000..260c177d310c7d --- /dev/null +++ b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_base1 -- + +-- !select_tvf1 -- + +-- !select_tvf2 -- + +-- !select_tvf3 -- + diff --git a/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy new file mode 100644 index 00000000000000..1804fff2a11450 --- /dev/null +++ b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_outfile_empty_data", "external,hive,tvf,external_docker") { + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + // use to outfile to hdfs + String hdfs_port = context.config.otherConfigs.get("hdfs_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + // It's okay to use random `hdfsUser`, but can not be empty. + def hdfsUserName = "doris" + def format = "csv" + def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}" + + // use to outfile to s3 + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + // broker + String broker_name = "hdfs" + + def export_table_name = "outfile_empty_data_test" + + def create_table = {table_name, column_define -> + sql """ DROP TABLE IF EXISTS ${table_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + ${column_define} + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + } + + def outfile_to_HDFS_directly = { + // select ... into outfile ... + def uuid = UUID.randomUUID().toString() + + hdfs_outfile_path = "/user/doris/tmp_data/${uuid}" + uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS ${format} + PROPERTIES ( + "fs.defaultFS"="${defaultFS}", + "hadoop.username" = "${hdfsUserName}" + ); + """ + logger.info("outfile to hdfs direct success path: " + res[0][3]); + return res[0][3] + } + + def outfile_to_HDFS_with_broker = { + // select ... into outfile ... + def uuid = UUID.randomUUID().toString() + + hdfs_outfile_path = "/user/doris/tmp_data/${uuid}" + uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS ${format} + PROPERTIES ( + "broker.fs.defaultFS"="${defaultFS}", + "broker.name"="hdfs", + "broker.username" = "${hdfsUserName}" + ); + """ + logger.info("outfile to hdfs with broker success path: " + res[0][3]); + return res[0][3] + } + + def outfile_to_S3_directly = { + // select ... into outfile ... + s3_outfile_path = "${bucket}/outfile/csv/test-outfile-empty/" + uri = "s3://${s3_outfile_path}/exp_" + + def res = sql """ + SELECT * FROM ${export_table_name} t ORDER BY user_id + INTO OUTFILE "${uri}" + FORMAT AS csv + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + logger.info("outfile to s3 success path: " + res[0][3]); + return res[0][3] + } + + try { + def doris_column_define = """ + `user_id` INT NOT NULL COMMENT "用户id", + `name` STRING NULL, + `age` INT NULL""" + // create table + create_table(export_table_name, doris_column_define); + // test outfile empty data to hdfs directly + def outfile_to_hdfs_directly_url = outfile_to_HDFS_directly() + // test outfile empty data to hdfs with broker + def outfile_to_hdfs_with_broker_url= outfile_to_HDFS_with_broker() + // test outfile empty data to s3 directly + def outfile_to_s3_directly_url = outfile_to_S3_directly() + qt_select_base1 """ SELECT * FROM ${export_table_name} ORDER BY user_id; """ + + qt_select_tvf1 """ select * from HDFS( + "uri" = "${outfile_to_hdfs_directly_url}0.csv", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}"); + """ + + qt_select_tvf2 """ select * from HDFS( + "uri" = "${outfile_to_hdfs_with_broker_url}0.csv", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}"); + """ + + qt_select_tvf3 """ SELECT * FROM S3 ( + "uri" = "http://${s3_endpoint}${outfile_to_s3_directly_url.substring(4, outfile_to_s3_directly_url.length())}0.csv", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}", + "use_path_style" = "true" + ); + """ + + } finally { + } +}