Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1045,10 +1045,6 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt32(s3_write_buffer_size, "5242880");
// the size of the whole s3 buffer pool, which indicates the s3 file writer
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
// The timeout config for S3 buffer allocation
DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
Expand Down
4 changes: 0 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1082,10 +1082,6 @@ DECLARE_mInt32(tablet_path_check_batch_size);
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt32(s3_write_buffer_size);
// the size of the whole s3 buffer pool, which indicates the s3 file writer
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DECLARE_mInt32(s3_write_buffer_whole_size);
// The timeout config for S3 buffer allocation
DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
// the max number of cached file handle for block segemnt
Expand Down
2 changes: 0 additions & 2 deletions be/src/io/fs/benchmark/fs_benchmark_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ int main(int argc, char** argv) {
.set_min_threads(num_cores)
.set_max_threads(num_cores)
.build(&s3_file_upload_thread_pool));
doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
s3_buffer_pool->init(524288000, 5242880, s3_file_upload_thread_pool.get());

try {
doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads),
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/broker_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() {
Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer,
const FileWriterOptions* opts) {
*writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(), _broker_addr, _broker_prop,
path, 0 /* offset */, getSPtr());
path, 0 /* offset */, getSPtr(), opts);
return Status::OK();
}

Expand Down
15 changes: 13 additions & 2 deletions be/src/io/fs/broker_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ namespace io {

BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset, FileSystemSPtr fs)
const std::string& path, int64_t start_offset, FileSystemSPtr fs,
const FileWriterOptions* opts)
: FileWriter(path, fs),
_env(env),
_address(broker_address),
_properties(properties),
_cur_offset(start_offset) {}
_cur_offset(start_offset) {
_create_empty_file = opts ? opts->create_empty_file : true;
}

BrokerFileWriter::~BrokerFileWriter() {
if (_opened) {
Expand Down Expand Up @@ -153,6 +156,14 @@ Status BrokerFileWriter::finalize() {
return Status::OK();
}

Status BrokerFileWriter::open() {
if (_create_empty_file && !_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
return Status::OK();
}

Status BrokerFileWriter::_open() {
TBrokerOpenWriterRequest request;

Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/broker_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ class BrokerFileWriter : public FileWriter {
public:
BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties, const std::string& path,
int64_t start_offset, FileSystemSPtr fs);
int64_t start_offset, FileSystemSPtr fs, const FileWriterOptions* opts);
virtual ~BrokerFileWriter();

Status open() override;
Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
Expand Down
6 changes: 6 additions & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ struct FileWriterOptions {
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage system
int64_t file_cache_expiration = 0; // Absolute time
// Whether to create empty file if no content
bool create_empty_file = true;
};

class FileWriter {
Expand All @@ -46,6 +48,9 @@ class FileWriter {

DISALLOW_COPY_AND_ASSIGN(FileWriter);

// Open the file for writing.
virtual Status open() { return Status::OK(); }

// Normal close. Wait for all data to persist before returning.
virtual Status close() = 0;

Expand Down Expand Up @@ -74,6 +79,7 @@ class FileWriter {
std::shared_ptr<FileSystem> _fs;
bool _closed = false;
bool _opened = false;
bool _create_empty_file = true;
};

using FileWriterPtr = std::unique_ptr<FileWriter>;
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ Status HdfsFileSystem::connect_impl() {
}

Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions*) {
*writer = std::make_unique<HdfsFileWriter>(file, getSPtr());
const FileWriterOptions* opts) {
*writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts);
return Status::OK();
}

Expand Down
12 changes: 11 additions & 1 deletion be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
namespace doris {
namespace io {

HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : FileWriter(std::move(file), fs) {
HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts)
: FileWriter(std::move(file), fs) {
_create_empty_file = opts ? opts->create_empty_file : true;
_hdfs_fs = (HdfsFileSystem*)_fs.get();
}

Expand Down Expand Up @@ -103,6 +105,14 @@ Status HdfsFileWriter::finalize() {
return Status::OK();
}

Status HdfsFileWriter::open() {
if (_create_empty_file && !_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
return Status::OK();
}

Status HdfsFileWriter::_open() {
_path = convert_path(_path, _hdfs_fs->_fs_name);
std::string hdfs_dir = _path.parent_path().string();
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/hdfs_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ class HdfsFileSystem;

class HdfsFileWriter : public FileWriter {
public:
HdfsFileWriter(Path file, FileSystemSPtr fs);
HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts);
~HdfsFileWriter();

Status open() override;
Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
Expand Down
Loading