Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ Status DeltaWriter::close() {
}

Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
bool is_broken) {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
Expand All @@ -321,7 +322,13 @@ Status DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
}

// return error if previous flush failed
RETURN_NOT_OK(_flush_token->wait());
Status s = _flush_token->wait();
if (!s.ok()) {
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(_tablet->tablet_id());
tablet_error->set_msg(s.get_error_msg());
return s;
}

// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ class DeltaWriter {
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
Status close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, bool is_broken);
Status close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
bool is_broken);

// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class LoadChannel {
bool finished = false;
auto index_id = request.index_id();
RETURN_IF_ERROR(channel->close(request.sender_id(), request.backend_id(), &finished,
request.partition_ids(), response->mutable_tablet_vec()));
request.partition_ids(), response->mutable_tablet_vec(),
response->mutable_tablet_errors()));
if (finished) {
std::lock_guard<std::mutex> l(_lock);
_tablets_channels.erase(index_id);
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {

Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
const google::protobuf::RepeatedField<int64_t>& partition_ids,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
return _close_status;
Expand Down Expand Up @@ -128,8 +129,9 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
for (auto writer : need_wait_writers) {
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE judge it.
writer->close_wait(tablet_vec, (_broken_tablets.find(writer->tablet_id()) !=
_broken_tablets.end()));
writer->close_wait(
tablet_vec, tablet_errors,
(_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
}
}
return Status::OK();
Expand Down Expand Up @@ -259,4 +261,4 @@ std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) {
return os;
}

} // namespace doris
} // namespace doris
5 changes: 3 additions & 2 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class TabletsChannel {
// no-op when this channel has been closed or cancelled
Status close(int sender_id, int64_t backend_id, bool* finished,
const google::protobuf::RepeatedField<int64_t>& partition_ids,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
google::protobuf::RepeatedPtrField<PTabletError>* tablet_error);

// no-op when this channel has been closed or cancelled
Status cancel();
Expand Down Expand Up @@ -235,4 +236,4 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
}
return Status::OK();
}
} // namespace doris
} // namespace doris