Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ Result<io::FileSystemSPtr> FileFactory::create_fs(const io::FSPropertiesRef& fs_

Result<io::FileWriterPtr> FileFactory::create_file_writer(
TFileType::type type, ExecEnv* env, const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties, const std::string& path) {
const std::map<std::string, std::string>& properties, const std::string& path,
const io::FileWriterOptions& options) {
io::FileWriterPtr file_writer;
switch (type) {
case TFileType::FILE_LOCAL: {
Expand All @@ -117,16 +118,15 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer(
RETURN_IF_ERROR_RESULT(
S3ClientFactory::convert_properties_to_s3_conf(properties, s3_uri, &s3_conf));
auto client = S3ClientFactory::instance().create(s3_conf.client_conf);
// TODO(plat1ko): Set opts
return std::make_unique<io::S3FileWriter>(std::move(client), std::move(s3_conf.bucket),
s3_uri.get_key(), nullptr);
s3_uri.get_key(), &options);
}
case TFileType::FILE_HDFS: {
THdfsParams hdfs_params = parse_properties(properties);
io::HdfsHandler* handler;
RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection(
hdfs_params, hdfs_params.fs_name, &handler));
auto res = io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name);
auto res = io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options);
if (!res.has_value()) {
handler->dec_ref();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class FileFactory {
static Result<io::FileWriterPtr> create_file_writer(
TFileType::type type, ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties, const std::string& path);
const std::map<std::string, std::string>& properties, const std::string& path,
const io::FileWriterOptions& options);

/// Create FileReader without FS
static Result<io::FileReaderSPtr> create_file_reader(
Expand Down
3 changes: 1 addition & 2 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,7 @@ Status HdfsFileSystem::upload_impl(const Path& local_file, const Path& remote_fi
left_len -= read_len;
}

LOG(INFO) << "finished to write file: " << local_file << ", length: " << file_len;
return Status::OK();
return hdfs_writer->close();
}

Status HdfsFileSystem::batch_upload_impl(const std::vector<Path>& local_files,
Expand Down
49 changes: 32 additions & 17 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@
namespace doris::io {

HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file,
std::string fs_name)
std::string fs_name, const FileWriterOptions* opts)
: _path(std::move(path)),
_hdfs_handler(handler),
_hdfs_file(hdfs_file),
_fs_name(std::move(fs_name)) {}
_fs_name(std::move(fs_name)),
_sync_file_data(opts ? opts->sync_file_data : true) {}

HdfsFileWriter::~HdfsFileWriter() {
if (_hdfs_file) {
hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
}

if (_hdfs_handler->from_cache) {
_hdfs_handler->dec_ref();
} else {
Expand All @@ -53,23 +58,25 @@ Status HdfsFileWriter::close() {
return Status::OK();
}
_closed = true;
int result = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
if (result == -1) {
std::stringstream ss;
ss << "failed to flush hdfs file. "
<< "fs_name:" << _fs_name << " path:" << _path << ", err: " << hdfs_error();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());

if (_sync_file_data) {
int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
if (ret != 0) {
return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}",
_fs_name, _path.native(), hdfs_error());
}
}

result = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
// The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for
// the HDFS response, but won't guarantee the synchronization of data to HDFS.
int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
_hdfs_file = nullptr;
if (result != 0) {
std::string err_msg = hdfs_error();
if (ret != 0) {
return Status::InternalError(
"Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _fs_name, _path.string(), err_msg);
BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error());
}

return Status::OK();
}

Expand Down Expand Up @@ -101,12 +108,20 @@ Status HdfsFileWriter::finalize() {
if (_closed) [[unlikely]] {
return Status::InternalError("finalize closed file: {}", _path.native());
}
// FIXME(plat1ko): `finalize` method should not be an operation which can be blocked for a long time
return close();

// Flush buffered data to HDFS without waiting for HDFS response
int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
if (ret != 0) {
return Status::InternalError("failed to flush hdfs file. fs_name={} path={} : {}", _fs_name,
_path.native(), hdfs_error());
}

return Status::OK();
}

Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handler,
const std::string& fs_name) {
const std::string& fs_name,
const FileWriterOptions* opts) {
auto path = convert_path(full_path, fs_name);
std::string hdfs_dir = path.parent_path().string();
int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str());
Expand All @@ -133,7 +148,7 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handle
return ResultError(Status::InternalError(ss.str()));
}
VLOG_NOTICE << "open file. fs_name:" << fs_name << ", path:" << path;
return std::make_unique<HdfsFileWriter>(std::move(path), handler, hdfs_file, fs_name);
return std::make_unique<HdfsFileWriter>(std::move(path), handler, hdfs_file, fs_name, opts);
}

} // namespace doris::io
8 changes: 5 additions & 3 deletions be/src/io/fs/hdfs_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ class HdfsFileWriter final : public FileWriter {
// - fs_name/path_to_file
// - /path_to_file
// TODO(plat1ko): Support related path for cloud mode
static Result<FileWriterPtr> create(Path path, HdfsHandler* handler,
const std::string& fs_name);
static Result<FileWriterPtr> create(Path path, HdfsHandler* handler, const std::string& fs_name,
const FileWriterOptions* opts = nullptr);

HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name);
HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name,
const FileWriterOptions* opts = nullptr);
~HdfsFileWriter() override;

Status close() override;
Expand All @@ -53,6 +54,7 @@ class HdfsFileWriter final : public FileWriter {
std::string _fs_name;
size_t _bytes_appended = 0;
bool _closed = false;
bool _sync_file_data;
};

} // namespace doris::io
6 changes: 5 additions & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,11 @@ void PInternalService::outfile_write_success(google::protobuf::RpcController* co
auto&& res = FileFactory::create_file_writer(
FileFactory::convert_storage_type(result_file_sink.storage_backend_type),
ExecEnv::GetInstance(), file_options.broker_addresses,
file_options.broker_properties, file_name);
file_options.broker_properties, file_name,
{
.write_file_cache = false,
.sync_file_data = false,
});
using T = std::decay_t<decltype(res)>;
if (!res.has_value()) [[unlikely]] {
st = std::forward<T>(res).error();
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/core/block_spill_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ void BlockSpillWriter::_init_profile() {

Status BlockSpillWriter::open() {
file_writer_ = DORIS_TRY(FileFactory::create_file_writer(
TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_));
TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_,
{
.write_file_cache = false,
.sync_file_data = false,
}));
is_open_ = true;
return Status::OK();
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/sink/writer/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ Status VFileResultWriter::_create_next_file_writer() {
Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
_file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
FileFactory::convert_storage_type(_storage_type), _state->exec_env(),
_file_opts->broker_addresses, _file_opts->broker_properties, file_name));
_file_opts->broker_addresses, _file_opts->broker_properties, file_name,
{
.write_file_cache = false,
.sync_file_data = false,
}));
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
_vfile_writer.reset(new VCSVTransformer(_state, _file_writer_impl.get(),
Expand Down