Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/io/fs/broker_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() {
Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer,
const FileWriterOptions* opts) {
*writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(), _broker_addr, _broker_prop,
path, 0 /* offset */, getSPtr());
path, 0 /* offset */, getSPtr(), opts);
return Status::OK();
}

Expand Down
9 changes: 6 additions & 3 deletions be/src/io/fs/broker_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ namespace io {

BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset, FileSystemSPtr fs)
const std::string& path, int64_t start_offset, FileSystemSPtr fs,
const FileWriterOptions* opts)
: FileWriter(path, fs),
_env(env),
_address(broker_address),
_properties(properties),
_cur_offset(start_offset) {}
_cur_offset(start_offset) {
_create_empty_file = opts ? opts->create_empty_file : true;
}

BrokerFileWriter::~BrokerFileWriter() {
if (_opened) {
Expand Down Expand Up @@ -159,7 +162,7 @@ Status BrokerFileWriter::finalize() {
}

Status BrokerFileWriter::open() {
if (!_opened) {
if (_create_empty_file && !_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/broker_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class BrokerFileWriter : public FileWriter {
public:
BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties, const std::string& path,
int64_t start_offset, FileSystemSPtr fs);
int64_t start_offset, FileSystemSPtr fs, const FileWriterOptions* opts);
virtual ~BrokerFileWriter();

Status open() override;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class FileWriter {
FileSystemSPtr _fs;
bool _closed = false;
bool _opened = false;
bool _create_empty_file = true;
};

} // namespace io
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/file_writer_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ struct FileWriterOptions {
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage system
int64_t file_cache_expiration = 0; // Absolute time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: unknown type name 'int64_t' [clang-diagnostic-error]

    int64_t file_cache_expiration = 0; // Absolute time
    ^

// Whether to create empty file if no content
bool create_empty_file = true;
};

} // namespace io
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ Status HdfsFileSystem::connect_impl() {

Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
*writer = std::make_unique<HdfsFileWriter>(file, getSPtr());
*writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts);
return Status::OK();
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
namespace doris {
namespace io {

HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : FileWriter(std::move(file), fs) {
HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts)
: FileWriter(std::move(file), fs) {
_create_empty_file = opts ? opts->create_empty_file : true;
_hdfs_fs = (HdfsFileSystem*)_fs.get();
}

Expand Down Expand Up @@ -109,7 +111,7 @@ Status HdfsFileWriter::finalize() {
}

Status HdfsFileWriter::open() {
if (!_opened) {
if (_create_empty_file && !_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/hdfs_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class HdfsFileSystem;

class HdfsFileWriter : public FileWriter {
public:
HdfsFileWriter(Path file, FileSystemSPtr fs);
HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts);
~HdfsFileWriter();

Status open() override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Status S3FileSystem::connect_impl() {
Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
GET_KEY(key, file);
*writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf, getSPtr());
*writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf, getSPtr(), opts);
return Status::OK();
}

Expand Down
13 changes: 8 additions & 5 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer", "file_created");
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer", "file_being_written");

S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const S3Conf& s3_conf,
FileSystemSPtr fs)
FileSystemSPtr fs, const FileWriterOptions* opts)
: FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path, std::move(fs)),
_bucket(s3_conf.bucket),
_key(std::move(path)),
Expand All @@ -87,6 +87,7 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const S3
s3_file_writer_total << 1;
s3_file_being_written << 1;

_create_empty_file = opts ? opts->create_empty_file : true;
Aws::Http::SetCompliantRfc3986Encoding(true);
}

Expand Down Expand Up @@ -195,7 +196,7 @@ Status S3FileWriter::close() {
// it might be one file less than 5MB, we do upload here
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
} else {
} else if (_create_empty_file) {
// if there is no pending buffer, we need to create an empty file
_pending_buf = S3FileBufferPool::GetInstance()->allocate();
// if there is no upload id, we need to create a new one
Expand All @@ -211,9 +212,11 @@ Status S3FileWriter::close() {
});
}
}
_countdown_event.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
if (_pending_buf != nullptr) {
_countdown_event.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}

RETURN_IF_ERROR(_complete());

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/s3_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct S3FileBuffer;
class S3FileWriter final : public FileWriter {
public:
S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const S3Conf& s3_conf,
FileSystemSPtr fs);
FileSystemSPtr fs, const FileWriterOptions* opts);
~S3FileWriter() override;

Status close() override;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,8 @@ Status BetaRowsetWriter::_do_create_segment_writer(
return Status::Error<INIT_FAILED>("get fs failed");
}
io::FileWriterPtr file_writer;
Status st = fs->create_file(path, &file_writer);
io::FileWriterOptions opts {.create_empty_file = false};
Status st = fs->create_file(path, &file_writer, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;
return st;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ void DorisCompoundDirectory::touchFile(const char* name) {
snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA, name);

io::FileWriterPtr tmp_writer;
LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer), "Touch file IO error")
io::FileWriterOptions opts {.create_empty_file = false};
LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer, &opts), "Touch file IO error")
}

int64_t DorisCompoundDirectory::fileLength(const char* name) const {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/vertical_beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ Status VerticalBetaRowsetWriter::_create_segment_writer(
return Status::Error<INIT_FAILED>("get fs failed");
}
io::FileWriterPtr file_writer;
Status st = fs->create_file(path, &file_writer);
io::FileWriterOptions opts {.create_empty_file = false};
Status st = fs->create_file(path, &file_writer, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;
return st;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2267,8 +2267,9 @@ Status Tablet::write_cooldown_meta() {
std::string remote_meta_path =
remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term);
io::FileWriterPtr tablet_meta_writer;
io::FileWriterOptions opts {.create_empty_file = false};
// FIXME(plat1ko): What if object store permanently unavailable?
RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer, &opts));
auto val = tablet_meta_pb.SerializeAsString();
RETURN_IF_ERROR(tablet_meta_writer->append({val.data(), val.size()}));
return tablet_meta_writer->close();
Expand Down