From 7f1354010e4eb3a22cf6b8e8eeb82805eb2a95d0 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 23 Feb 2024 23:11:34 +0800 Subject: [PATCH 1/2] [fix](file-writer) avoid empty file for segment writer --- be/src/io/fs/broker_file_system.cpp | 2 +- be/src/io/fs/broker_file_writer.cpp | 9 ++++++--- be/src/io/fs/broker_file_writer.h | 2 +- be/src/io/fs/file_writer.h | 1 + be/src/io/fs/file_writer_options.h | 2 ++ be/src/io/fs/hdfs_file_system.cpp | 2 +- be/src/io/fs/hdfs_file_writer.cpp | 6 ++++-- be/src/io/fs/hdfs_file_writer.h | 2 +- be/src/io/fs/s3_file_writer.cpp | 11 +++++++---- be/src/olap/rowset/beta_rowset_writer.cpp | 3 ++- .../segment_v2/inverted_index_compound_directory.cpp | 3 ++- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 3 ++- be/src/olap/tablet.cpp | 3 ++- 13 files changed, 32 insertions(+), 17 deletions(-) diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index a3c93c04a7a6de..f0d6bc12380931 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() { Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer, const FileWriterOptions* opts) { *writer = std::make_unique(ExecEnv::GetInstance(), _broker_addr, _broker_prop, - path, 0 /* offset */, getSPtr()); + path, 0 /* offset */, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/broker_file_writer.cpp b/be/src/io/fs/broker_file_writer.cpp index daba1af2bce832..a46d22e1505e88 100644 --- a/be/src/io/fs/broker_file_writer.cpp +++ b/be/src/io/fs/broker_file_writer.cpp @@ -37,12 +37,15 @@ namespace io { BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map& properties, - const std::string& path, int64_t start_offset, FileSystemSPtr fs) + const std::string& path, int64_t start_offset, FileSystemSPtr fs, + const FileWriterOptions* opts) : FileWriter(path, fs), _env(env), _address(broker_address), _properties(properties), - _cur_offset(start_offset) {} + _cur_offset(start_offset) { + _create_empty_file = opts ? opts->create_empty_file : true; +} BrokerFileWriter::~BrokerFileWriter() { if (_opened) { @@ -159,7 +162,7 @@ Status BrokerFileWriter::finalize() { } Status BrokerFileWriter::open() { - if (!_opened) { + if (_create_empty_file && !_opened) { RETURN_IF_ERROR(_open()); _opened = true; } diff --git a/be/src/io/fs/broker_file_writer.h b/be/src/io/fs/broker_file_writer.h index e3e53525679671..3e8edab0078914 100644 --- a/be/src/io/fs/broker_file_writer.h +++ b/be/src/io/fs/broker_file_writer.h @@ -42,7 +42,7 @@ class BrokerFileWriter : public FileWriter { public: BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map& properties, const std::string& path, - int64_t start_offset, FileSystemSPtr fs); + int64_t start_offset, FileSystemSPtr fs, const FileWriterOptions* opts); virtual ~BrokerFileWriter(); Status open() override; diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 03f092c042402a..1fd9b8391d964d 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -67,6 +67,7 @@ class FileWriter { FileSystemSPtr _fs; bool _closed = false; bool _opened = false; + bool _create_empty_file = true; }; } // namespace io diff --git a/be/src/io/fs/file_writer_options.h b/be/src/io/fs/file_writer_options.h index 511bd81a168e73..4af380923738f8 100644 --- a/be/src/io/fs/file_writer_options.h +++ b/be/src/io/fs/file_writer_options.h @@ -26,6 +26,8 @@ struct FileWriterOptions { bool is_cold_data = false; bool sync_file_data = true; // Whether flush data into storage system int64_t file_cache_expiration = 0; // Absolute time + // Whether to create empty file if no content + bool create_empty_file = true; }; } // namespace io diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 6b44b21912804c..d3d54527836310 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -167,7 +167,7 @@ Status HdfsFileSystem::connect_impl() { Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, const FileWriterOptions* opts) { - *writer = std::make_unique(file, getSPtr()); + *writer = std::make_unique(file, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index a5f5dab9fd4e49..fe4b6cde19967f 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -34,7 +34,9 @@ namespace doris { namespace io { -HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : FileWriter(std::move(file), fs) { +HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts) + : FileWriter(std::move(file), fs) { + _create_empty_file = opts ? opts->create_empty_file : true; _hdfs_fs = (HdfsFileSystem*)_fs.get(); } @@ -109,7 +111,7 @@ Status HdfsFileWriter::finalize() { } Status HdfsFileWriter::open() { - if (!_opened) { + if (_create_empty_file && !_opened) { RETURN_IF_ERROR(_open()); _opened = true; } diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index 812598e7a51cd4..e62c6d6367ef31 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -33,7 +33,7 @@ class HdfsFileSystem; class HdfsFileWriter : public FileWriter { public: - HdfsFileWriter(Path file, FileSystemSPtr fs); + HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts); ~HdfsFileWriter(); Status open() override; diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 78c8f9355c9875..2c8e1f06d89fb2 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -87,6 +87,7 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr client, const S3 s3_file_writer_total << 1; s3_file_being_written << 1; + _create_empty_file = opts ? opts->create_empty_file : true; Aws::Http::SetCompliantRfc3986Encoding(true); } @@ -195,7 +196,7 @@ Status S3FileWriter::close() { // it might be one file less than 5MB, we do upload here _pending_buf->set_upload_remote_callback( [this, buf = _pending_buf]() { _put_object(*buf); }); - } else { + } else if (_create_empty_file) { // if there is no pending buffer, we need to create an empty file _pending_buf = S3FileBufferPool::GetInstance()->allocate(); // if there is no upload id, we need to create a new one @@ -211,9 +212,11 @@ Status S3FileWriter::close() { }); } } - _countdown_event.add_count(); - _pending_buf->submit(); - _pending_buf = nullptr; + if (_pending_buf != nullptr) { + _countdown_event.add_count(); + _pending_buf->submit(); + _pending_buf = nullptr; + } RETURN_IF_ERROR(_complete()); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index c90cd6ba079981..f2421eaeca59f5 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -706,7 +706,8 @@ Status BetaRowsetWriter::_do_create_segment_writer( return Status::Error("get fs failed"); } io::FileWriterPtr file_writer; - Status st = fs->create_file(path, &file_writer); + FileWriterOptions opts {.create_empty_file = false}; + Status st = fs->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; return st; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp index 1af26a57674565..d5072df96f1097 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp @@ -596,7 +596,8 @@ void DorisCompoundDirectory::touchFile(const char* name) { snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA, name); io::FileWriterPtr tmp_writer; - LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer), "Touch file IO error") + FileWriterOptions opts {.create_empty_file = false}; + LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer, &opts), "Touch file IO error") } int64_t DorisCompoundDirectory::fileLength(const char* name) const { diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index 10cef8e1675313..59aedd20cce136 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -182,7 +182,8 @@ Status VerticalBetaRowsetWriter::_create_segment_writer( return Status::Error("get fs failed"); } io::FileWriterPtr file_writer; - Status st = fs->create_file(path, &file_writer); + FileWriterOptions opts {.create_empty_file = false}; + Status st = fs->create_file(path, &file_writer.& opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; return st; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 61fe618f0321fc..6eb01da8cfd661 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2267,8 +2267,9 @@ Status Tablet::write_cooldown_meta() { std::string remote_meta_path = remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term); io::FileWriterPtr tablet_meta_writer; + FileWriterOptions opts {.create_empty_file = false}; // FIXME(plat1ko): What if object store permanently unavailable? - RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer)); + RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer, &opts)); auto val = tablet_meta_pb.SerializeAsString(); RETURN_IF_ERROR(tablet_meta_writer->append({val.data(), val.size()})); return tablet_meta_writer->close(); From c3748fd7ff8c75d51657280656436fa9158cc74a Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 24 Feb 2024 00:11:55 +0800 Subject: [PATCH 2/2] 2 --- be/src/io/fs/s3_file_system.cpp | 2 +- be/src/io/fs/s3_file_writer.cpp | 2 +- be/src/io/fs/s3_file_writer.h | 2 +- be/src/olap/rowset/beta_rowset_writer.cpp | 2 +- .../rowset/segment_v2/inverted_index_compound_directory.cpp | 2 +- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 4 ++-- be/src/olap/tablet.cpp | 2 +- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index a92539713e2834..cad49b4555c1b6 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -131,7 +131,7 @@ Status S3FileSystem::connect_impl() { Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, const FileWriterOptions* opts) { GET_KEY(key, file); - *writer = std::make_unique(key, get_client(), _s3_conf, getSPtr()); + *writer = std::make_unique(key, get_client(), _s3_conf, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 2c8e1f06d89fb2..aa7dcb573ea545 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -78,7 +78,7 @@ bvar::Adder s3_file_created_total("s3_file_writer", "file_created"); bvar::Adder s3_file_being_written("s3_file_writer", "file_being_written"); S3FileWriter::S3FileWriter(Path path, std::shared_ptr client, const S3Conf& s3_conf, - FileSystemSPtr fs) + FileSystemSPtr fs, const FileWriterOptions* opts) : FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path, std::move(fs)), _bucket(s3_conf.bucket), _key(std::move(path)), diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 2c139242ed489d..ab4c1f9f47c902 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -46,7 +46,7 @@ struct S3FileBuffer; class S3FileWriter final : public FileWriter { public: S3FileWriter(Path path, std::shared_ptr client, const S3Conf& s3_conf, - FileSystemSPtr fs); + FileSystemSPtr fs, const FileWriterOptions* opts); ~S3FileWriter() override; Status close() override; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index f2421eaeca59f5..2b47b3aaed8ef5 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -706,7 +706,7 @@ Status BetaRowsetWriter::_do_create_segment_writer( return Status::Error("get fs failed"); } io::FileWriterPtr file_writer; - FileWriterOptions opts {.create_empty_file = false}; + io::FileWriterOptions opts {.create_empty_file = false}; Status st = fs->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp index d5072df96f1097..e7f8f6abe26d62 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp @@ -596,7 +596,7 @@ void DorisCompoundDirectory::touchFile(const char* name) { snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA, name); io::FileWriterPtr tmp_writer; - FileWriterOptions opts {.create_empty_file = false}; + io::FileWriterOptions opts {.create_empty_file = false}; LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer, &opts), "Touch file IO error") } diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index 59aedd20cce136..33d638b63f3b8f 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -182,8 +182,8 @@ Status VerticalBetaRowsetWriter::_create_segment_writer( return Status::Error("get fs failed"); } io::FileWriterPtr file_writer; - FileWriterOptions opts {.create_empty_file = false}; - Status st = fs->create_file(path, &file_writer.& opts); + io::FileWriterOptions opts {.create_empty_file = false}; + Status st = fs->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; return st; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 6eb01da8cfd661..47e089add4170f 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2267,7 +2267,7 @@ Status Tablet::write_cooldown_meta() { std::string remote_meta_path = remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term); io::FileWriterPtr tablet_meta_writer; - FileWriterOptions opts {.create_empty_file = false}; + io::FileWriterOptions opts {.create_empty_file = false}; // FIXME(plat1ko): What if object store permanently unavailable? RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer, &opts)); auto val = tablet_meta_pb.SerializeAsString();