diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index d5f2446b5245bd..17026a9b5f5cee 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -390,6 +390,9 @@ Status DeltaWriter::cancel_with_status(const Status& st) { if (_is_cancelled) { return Status::OK(); } + if (_rowset_writer && _rowset_writer->is_doing_segcompaction()) { + _rowset_writer->wait_flying_segcompaction(); /* already cancel, ignore the return status */ + } _mem_table.reset(); if (_flush_token != nullptr) { // cancel and wait all memtables in flush queue to be finished diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 3f7985ed30e368..71e863192f3b23 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -66,7 +66,7 @@ BetaRowsetWriter::~BetaRowsetWriter() { * when the job is cancelled. Although it is meaningless to continue segcompaction when the job * is cancelled, the objects involved in the job should be preserved during segcompaction to * avoid crashs for memory issues. */ - _wait_flying_segcompaction(); + wait_flying_segcompaction(); // TODO(lingbin): Should wrapper exception logic, no need to know file ops directly. if (!_already_built) { // abnormal exit, remove all files generated @@ -423,7 +423,7 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, i return Status::OK(); } -Status BetaRowsetWriter::_wait_flying_segcompaction() { +Status BetaRowsetWriter::wait_flying_segcompaction() { std::unique_lock l(_is_doing_segcompaction_lock); uint64_t begin_wait = GetCurrentTimeMicros(); while (_is_doing_segcompaction) { @@ -468,7 +468,7 @@ RowsetSharedPtr BetaRowsetWriter::build() { } } Status status; - status = _wait_flying_segcompaction(); + status = wait_flying_segcompaction(); if (!status.ok()) { LOG(WARNING) << "segcompaction failed when build new rowset 1st wait, res=" << status; return nullptr; @@ -478,7 +478,7 @@ RowsetSharedPtr BetaRowsetWriter::build() { LOG(WARNING) << "segcompaction failed when build new rowset, res=" << status; return nullptr; } - status = _wait_flying_segcompaction(); + status = wait_flying_segcompaction(); if (!status.ok()) { LOG(WARNING) << "segcompaction failed when build new rowset 2nd wait, res=" << status; return nullptr; diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 005ec998172261..9afeaddb49da93 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -99,6 +99,10 @@ class BetaRowsetWriter : public RowsetWriter { std::unique_ptr* writer, uint64_t index_size, KeyBoundsPB& key_bounds); + bool is_doing_segcompaction() const override { return _is_doing_segcompaction; } + + Status wait_flying_segcompaction() override; + private: Status _add_block(const vectorized::Block* block, std::unique_ptr* writer); @@ -117,7 +121,6 @@ class BetaRowsetWriter : public RowsetWriter { size_t num); Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments); Status _get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, bool is_last); - Status _wait_flying_segcompaction(); bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } bool _check_and_set_is_doing_segcompaction(); diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index d49d027e32e18e..f4d55ac0511035 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -88,6 +88,10 @@ class RowsetWriter { virtual int32_t get_atomic_num_segment() const = 0; + virtual bool is_doing_segcompaction() const = 0; + + virtual Status wait_flying_segcompaction() = 0; + private: DISALLOW_COPY_AND_ASSIGN(RowsetWriter); };