Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ S3FileWriter::~S3FileWriter() {
}

Status S3FileWriter::_create_multi_upload_request() {
LOG(INFO) << "create_multi_upload_request " << _obj_storage_path_opts.path.native();
const auto& client = _obj_client->get();
if (nullptr == client) {
return Status::InternalError<false>("invalid obj storage client");
Expand Down Expand Up @@ -224,11 +225,6 @@ Status S3FileWriter::_close_impl() {
_countdown_event.add_count();
RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
_pending_buf = nullptr;
} else if (_bytes_appended != 0) { // Non-empty file and has nothing to be uploaded
// NOTE: When the data size is a multiple of config::s3_write_buffer_size,
// _cur_part_num may exceed the actual number of parts that need to be uploaded.
// This is because it is incremented by 1 in advance within the S3FileWriter::appendv method.
_cur_part_num--;
}

RETURN_IF_ERROR(_complete());
Expand Down Expand Up @@ -265,12 +261,13 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
Slice {data[i].get_data() + pos, data_size_to_append}));
TEST_SYNC_POINT_CALLBACK("s3_file_writer::appenv_1", &_pending_buf, _cur_part_num);

// if it's the last part, it could be less than 5MB, or it must
// satisfy that the size is larger than or euqal to 5MB
// _complete() would handle the first situation
// If this is the last part and the data size is less than s3_write_buffer_size,
// the pending_buf will be handled by _close_impl() and _complete()
// If this is the last part and the data size is equal to s3_write_buffer_size,
// the pending_buf is handled here and submitted. it will be waited by _complete()
if (_pending_buf->get_size() == buffer_size) {
// only create multiple upload request when the data is more
// than one memory buffer
// only create multiple upload request when the data size is
// larger or equal to s3_write_buffer_size than one memory buffer
if (_cur_part_num == 1) {
RETURN_IF_ERROR(_create_multi_upload_request());
}
Expand All @@ -286,6 +283,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
}

void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {
VLOG_DEBUG << "upload_one_part " << _obj_storage_path_opts.path.native()
<< " part=" << part_num;
if (buf.is_cancelled()) {
LOG_INFO("file {} skip part {} because previous failure {}",
_obj_storage_path_opts.path.native(), part_num, _st);
Expand Down Expand Up @@ -337,11 +336,20 @@ Status S3FileWriter::_complete() {
return Status::OK();
}

if (_failed || _completed_parts.size() != _cur_part_num) {
// check number of parts
int expected_num_parts1 = (_bytes_appended / config::s3_write_buffer_size) +
!!(_bytes_appended % config::s3_write_buffer_size);
int expected_num_parts2 =
(_bytes_appended % config::s3_write_buffer_size) ? _cur_part_num : _cur_part_num - 1;
DCHECK_EQ(expected_num_parts1, expected_num_parts2)
<< " bytes_appended=" << _bytes_appended << " cur_part_num=" << _cur_part_num
<< " s3_write_buffer_size=" << config::s3_write_buffer_size;
if (_failed || _completed_parts.size() != expected_num_parts1 ||
expected_num_parts1 != expected_num_parts2) {
_st = Status::InternalError(
"error status={} failed={} #complete_parts={} #expected_parts={} "
"completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
_st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(),
_st, _failed, _completed_parts.size(), expected_num_parts1, _dump_completed_part(),
_obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
LOG(WARNING) << _st;
return _st;
Expand All @@ -350,6 +358,9 @@ Status S3FileWriter::_complete() {
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
LOG(INFO) << "complete_multipart_upload " << _obj_storage_path_opts.path.native()
<< " size=" << _bytes_appended << " number_parts=" << _completed_parts.size()
<< " s3_write_buffer_size=" << config::s3_write_buffer_size;
auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
if (resp.status.code != ErrorCode::OK) {
LOG_WARNING("Compltet multi part upload failed because {}, file path {}", resp.status.msg,
Expand Down Expand Up @@ -379,6 +390,8 @@ Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
}

void S3FileWriter::_put_object(UploadFileBuffer& buf) {
LOG(INFO) << "put_object " << _obj_storage_path_opts.path.native()
<< " size=" << _bytes_appended;
if (state() == State::CLOSED) {
DCHECK(state() != State::CLOSED)
<< "state=" << (int)state() << " path=" << _obj_storage_path_opts.path.native();
Expand Down
Loading
Loading