diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 3a98d2bfec63d0..769169a8ac6338 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -302,9 +302,11 @@ Status MemTable::_do_flush(int64_t& duration_ns) { } } else { vectorized::Block block = _collect_vskiplist_results(); - RETURN_NOT_OK(_rowset_writer->add_block(&block)); + // beta rowset flush parallel, segment write add block is not + // thread safe, so use tmp variable segment_write instead of + // member variable + RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block)); _flush_size = block.allocated_bytes(); - RETURN_NOT_OK(_rowset_writer->flush()); } return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index d1dda3e89dd02a..f0891234d23eb1 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -104,23 +104,28 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { if (UNLIKELY(_segment_writer == nullptr)) { RETURN_NOT_OK(_create_segment_writer(&_segment_writer)); } + return _add_block(block, &_segment_writer); +} + +Status BetaRowsetWriter::_add_block(const vectorized::Block* block, + std::unique_ptr* segment_writer) { size_t block_size_in_bytes = block->bytes(); size_t block_row_num = block->rows(); size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num); size_t row_offset = 0; do { - auto max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes); + auto max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes); if (UNLIKELY(max_row_add < 1)) { // no space for another signle row, need flush now - RETURN_NOT_OK(_flush_segment_writer(&_segment_writer)); - RETURN_NOT_OK(_create_segment_writer(&_segment_writer)); - max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes); + RETURN_NOT_OK(_flush_segment_writer(segment_writer)); + RETURN_NOT_OK(_create_segment_writer(segment_writer)); + max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes); DCHECK(max_row_add > 0); } size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add)); - auto s = _segment_writer->append_block(block, row_offset, input_row_num); + auto s = (*segment_writer)->append_block(block, row_offset, input_row_num); if (UNLIKELY(!s.ok())) { LOG(WARNING) << "failed to append block: " << s.to_string(); return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR); @@ -250,6 +255,17 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus return Status::OK(); } +Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) { + if (block->rows() == 0) { + return Status::OK(); + } + std::unique_ptr writer; + RETURN_NOT_OK(_create_segment_writer(&writer)); + RETURN_NOT_OK(_add_block(block, &writer)); + RETURN_NOT_OK(_flush_segment_writer(&writer)); + return Status::OK(); +} + RowsetSharedPtr BetaRowsetWriter::build() { // TODO(lingbin): move to more better place, or in a CreateBlockBatch? for (auto& wblock : _wblocks) { diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 53c3496e05ffac..570a8f765076bf 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -57,6 +57,7 @@ class BetaRowsetWriter : public RowsetWriter { // Return the file size flushed to disk in "flush_size" Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) override; + Status flush_single_memtable(const vectorized::Block* block) override; RowsetSharedPtr build() override; @@ -71,6 +72,8 @@ class BetaRowsetWriter : public RowsetWriter { private: template Status _add_row(const RowType& row); + Status _add_block(const vectorized::Block* block, + std::unique_ptr* writer); Status _create_segment_writer(std::unique_ptr* writer); diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index b349743a442c09..22239f4eafd702 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -64,6 +64,9 @@ class RowsetWriter { virtual Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) { return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED); } + virtual Status flush_single_memtable(const vectorized::Block* block) { + return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED); + } // finish building and return pointer to the built rowset (guaranteed to be inited). // return nullptr when failed