From a434614071fdf7fa9ecbacea2a8269b7d5d61c26 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 20 Feb 2024 19:23:12 +0800 Subject: [PATCH] [fix](file-writer) avoid empty file for segment writer --- be/src/io/fs/broker_file_system.cpp | 2 +- be/src/io/fs/broker_file_writer.cpp | 9 ++++++--- be/src/io/fs/broker_file_writer.h | 2 +- be/src/io/fs/file_writer.h | 3 +++ be/src/io/fs/hdfs_file_system.cpp | 4 ++-- be/src/io/fs/hdfs_file_writer.cpp | 6 ++++-- be/src/io/fs/hdfs_file_writer.h | 2 +- be/src/io/fs/s3_file_writer.cpp | 12 ++++++++---- be/src/olap/rowset/beta_rowset_writer.cpp | 2 +- 9 files changed, 27 insertions(+), 15 deletions(-) diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index 576a7dd8959ee9..44582a0ff8dd29 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() { Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer, const FileWriterOptions* opts) { *writer = std::make_unique(ExecEnv::GetInstance(), _broker_addr, _broker_prop, - path, 0 /* offset */, getSPtr()); + path, 0 /* offset */, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/broker_file_writer.cpp b/be/src/io/fs/broker_file_writer.cpp index 0d305bf269b53a..75acf40084c9fd 100644 --- a/be/src/io/fs/broker_file_writer.cpp +++ b/be/src/io/fs/broker_file_writer.cpp @@ -37,12 +37,15 @@ namespace io { BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map& properties, - const std::string& path, int64_t start_offset, FileSystemSPtr fs) + const std::string& path, int64_t start_offset, FileSystemSPtr fs, + const FileWriterOptions* opts) : FileWriter(path, fs), _env(env), _address(broker_address), _properties(properties), - _cur_offset(start_offset) {} + _cur_offset(start_offset) { + _create_empty_file = opts ? opts->create_empty_file : true; +} BrokerFileWriter::~BrokerFileWriter() { if (_opened) { @@ -154,7 +157,7 @@ Status BrokerFileWriter::finalize() { } Status BrokerFileWriter::open() { - if (!_opened) { + if (_create_empty_file && !_opened) { RETURN_IF_ERROR(_open()); _opened = true; } diff --git a/be/src/io/fs/broker_file_writer.h b/be/src/io/fs/broker_file_writer.h index f132545f0a8bc8..05b62846e6c773 100644 --- a/be/src/io/fs/broker_file_writer.h +++ b/be/src/io/fs/broker_file_writer.h @@ -42,7 +42,7 @@ class BrokerFileWriter : public FileWriter { public: BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map& properties, const std::string& path, - int64_t start_offset, FileSystemSPtr fs); + int64_t start_offset, FileSystemSPtr fs, const FileWriterOptions* opts); virtual ~BrokerFileWriter(); Status open() override; diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 256c67a9838ff7..bb3235e7d27e4d 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -35,6 +35,8 @@ struct FileWriterOptions { bool is_cold_data = false; bool sync_file_data = true; // Whether flush data into storage system int64_t file_cache_expiration = 0; // Absolute time + // Whether to create empty file if no content + bool create_empty_file = true; }; class FileWriter { @@ -77,6 +79,7 @@ class FileWriter { std::shared_ptr _fs; bool _closed = false; bool _opened = false; + bool _create_empty_file = true; }; using FileWriterPtr = std::unique_ptr; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 8ada4b92acca57..a65784226f305c 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -167,8 +167,8 @@ Status HdfsFileSystem::connect_impl() { } Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, - const FileWriterOptions*) { - *writer = std::make_unique(file, getSPtr()); + const FileWriterOptions* opts) { + *writer = std::make_unique(file, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 1f262e1abcd4f3..40c3c59dcd7ad7 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -34,7 +34,9 @@ namespace doris { namespace io { -HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : FileWriter(std::move(file), fs) { +HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts) + : FileWriter(std::move(file), fs) { + _create_empty_file = opts ? opts->create_empty_file : true; _hdfs_fs = (HdfsFileSystem*)_fs.get(); } @@ -104,7 +106,7 @@ Status HdfsFileWriter::finalize() { } Status HdfsFileWriter::open() { - if (!_opened) { + if (_create_empty_file && !_opened) { RETURN_IF_ERROR(_open()); _opened = true; } diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index bffd0efdca9c72..21dcaff1cd6ac1 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -33,7 +33,7 @@ class HdfsFileSystem; class HdfsFileWriter : public FileWriter { public: - HdfsFileWriter(Path file, FileSystemSPtr fs); + HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts); ~HdfsFileWriter(); Status open() override; diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 7711529b6f5989..dbe5ce8e70cdab 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -98,6 +98,8 @@ S3FileWriter::S3FileWriter(std::string key, std::shared_ptr fs, _cache_key = IFileCache::hash(_path.filename().native()); _cache = FileCacheFactory::instance()->get_by_path(_cache_key); } + + _create_empty_file = opts ? opts->create_empty_file : true; } S3FileWriter::~S3FileWriter() { @@ -205,7 +207,7 @@ Status S3FileWriter::close() { auto* buf = dynamic_cast(_pending_buf.get()); DCHECK(buf != nullptr); buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); - } else { + } else if (_create_empty_file) { // if there is no pending buffer, we need to create an empty file auto builder = FileBufferBuilder(); builder.set_type(BufferType::UPLOAD) @@ -232,9 +234,11 @@ Status S3FileWriter::close() { DCHECK(buf != nullptr); } } - _countdown_event.add_count(); - RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); - _pending_buf = nullptr; + if (_pending_buf != nullptr) { + _countdown_event.add_count(); + RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); + _pending_buf = nullptr; + } DBUG_EXECUTE_IF("s3_file_writer::close", { RETURN_IF_ERROR(_complete()); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index b11a236ebf243f..8556e19483fc8a 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -701,7 +701,7 @@ Status BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWrite _context.file_cache_ttl_sec > 0 && _context.newest_write_timestamp > 0 ? _context.newest_write_timestamp + _context.file_cache_ttl_sec : 0, - }; + .create_empty_file = false}; Status st = fs->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;