Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/io/fs/broker_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() {
Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer,
const FileWriterOptions* opts) {
*writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(), _broker_addr, _broker_prop,
path, 0 /* offset */, getSPtr());
path, 0 /* offset */, getSPtr(), opts);
return Status::OK();
}

Expand Down
9 changes: 6 additions & 3 deletions be/src/io/fs/broker_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ namespace io {

BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset, FileSystemSPtr fs)
const std::string& path, int64_t start_offset, FileSystemSPtr fs,
const FileWriterOptions* opts)
: FileWriter(path, fs),
_env(env),
_address(broker_address),
_properties(properties),
_cur_offset(start_offset) {}
_cur_offset(start_offset) {
_create_empty_file = opts ? opts->create_empty_file : true;
}

BrokerFileWriter::~BrokerFileWriter() {
if (_opened) {
Expand Down Expand Up @@ -154,7 +157,7 @@ Status BrokerFileWriter::finalize() {
}

Status BrokerFileWriter::open() {
if (!_opened) {
if (_create_empty_file && !_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/broker_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class BrokerFileWriter : public FileWriter {
public:
BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties, const std::string& path,
int64_t start_offset, FileSystemSPtr fs);
int64_t start_offset, FileSystemSPtr fs, const FileWriterOptions* opts);
virtual ~BrokerFileWriter();

Status open() override;
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ struct FileWriterOptions {
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage system
int64_t file_cache_expiration = 0; // Absolute time
// Whether to create empty file if no content
bool create_empty_file = true;
};

class FileWriter {
Expand Down Expand Up @@ -77,6 +79,7 @@ class FileWriter {
std::shared_ptr<FileSystem> _fs;
bool _closed = false;
bool _opened = false;
bool _create_empty_file = true;
};

using FileWriterPtr = std::unique_ptr<FileWriter>;
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ Status HdfsFileSystem::connect_impl() {
}

Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions*) {
*writer = std::make_unique<HdfsFileWriter>(file, getSPtr());
const FileWriterOptions* opts) {
*writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts);
return Status::OK();
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
namespace doris {
namespace io {

HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : FileWriter(std::move(file), fs) {
HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts)
: FileWriter(std::move(file), fs) {
_create_empty_file = opts ? opts->create_empty_file : true;
_hdfs_fs = (HdfsFileSystem*)_fs.get();
}

Expand Down Expand Up @@ -104,7 +106,7 @@ Status HdfsFileWriter::finalize() {
}

Status HdfsFileWriter::open() {
if (!_opened) {
if (_create_empty_file && !_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/hdfs_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class HdfsFileSystem;

class HdfsFileWriter : public FileWriter {
public:
HdfsFileWriter(Path file, FileSystemSPtr fs);
HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts);
~HdfsFileWriter();

Status open() override;
Expand Down
12 changes: 8 additions & 4 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ S3FileWriter::S3FileWriter(std::string key, std::shared_ptr<S3FileSystem> fs,
_cache_key = IFileCache::hash(_path.filename().native());
_cache = FileCacheFactory::instance()->get_by_path(_cache_key);
}

_create_empty_file = opts ? opts->create_empty_file : true;
}

S3FileWriter::~S3FileWriter() {
Expand Down Expand Up @@ -205,7 +207,7 @@ Status S3FileWriter::close() {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
} else {
} else if (_create_empty_file) {
// if there is no pending buffer, we need to create an empty file
auto builder = FileBufferBuilder();
builder.set_type(BufferType::UPLOAD)
Expand All @@ -232,9 +234,11 @@ Status S3FileWriter::close() {
DCHECK(buf != nullptr);
}
}
_countdown_event.add_count();
RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
_pending_buf = nullptr;
if (_pending_buf != nullptr) {
_countdown_event.add_count();
RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
_pending_buf = nullptr;
}

DBUG_EXECUTE_IF("s3_file_writer::close", {
RETURN_IF_ERROR(_complete());
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ Status BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWrite
_context.file_cache_ttl_sec > 0 && _context.newest_write_timestamp > 0
? _context.newest_write_timestamp + _context.file_cache_ttl_sec
: 0,
};
.create_empty_file = false};
Status st = fs->create_file(path, &file_writer, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;
Expand Down