Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
if (_is_cancelled) {
return Status::OK();
}
if (_rowset_writer && _rowset_writer->is_doing_segcompaction()) {
_rowset_writer->wait_flying_segcompaction(); /* already cancel, ignore the return status */
}
_mem_table.reset();
if (_flush_token != nullptr) {
// cancel and wait all memtables in flush queue to be finished
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ BetaRowsetWriter::~BetaRowsetWriter() {
* when the job is cancelled. Although it is meaningless to continue segcompaction when the job
* is cancelled, the objects involved in the job should be preserved during segcompaction to
* avoid crashs for memory issues. */
_wait_flying_segcompaction();
wait_flying_segcompaction();

// TODO(lingbin): Should wrapper exception logic, no need to know file ops directly.
if (!_already_built) { // abnormal exit, remove all files generated
Expand Down Expand Up @@ -423,7 +423,7 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, i
return Status::OK();
}

Status BetaRowsetWriter::_wait_flying_segcompaction() {
Status BetaRowsetWriter::wait_flying_segcompaction() {
std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
uint64_t begin_wait = GetCurrentTimeMicros();
while (_is_doing_segcompaction) {
Expand Down Expand Up @@ -468,7 +468,7 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}
}
Status status;
status = _wait_flying_segcompaction();
status = wait_flying_segcompaction();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset 1st wait, res=" << status;
return nullptr;
Expand All @@ -478,7 +478,7 @@ RowsetSharedPtr BetaRowsetWriter::build() {
LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status;
return nullptr;
}
status = _wait_flying_segcompaction();
status = wait_flying_segcompaction();
if (!status.ok()) {
LOG(WARNING) << "segcompaction failed when build new rowset 2nd wait, res=" << status;
return nullptr;
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class BetaRowsetWriter : public RowsetWriter {
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
KeyBoundsPB& key_bounds);

bool is_doing_segcompaction() const override { return _is_doing_segcompaction; }

Status wait_flying_segcompaction() override;

private:
Status _add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* writer);
Expand All @@ -117,7 +121,6 @@ class BetaRowsetWriter : public RowsetWriter {
size_t num);
Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments);
Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, bool is_last);
Status _wait_flying_segcompaction();
bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }

bool _check_and_set_is_doing_segcompaction();
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class RowsetWriter {

virtual int32_t get_atomic_num_segment() const = 0;

virtual bool is_doing_segcompaction() const = 0;

virtual Status wait_flying_segcompaction() = 0;

private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
Expand Down