diff --git a/be/src/common/config.h b/be/src/common/config.h index 4d59fd94d4e9d0..ac8ddd8ff286a3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -240,6 +240,8 @@ CONF_Bool(enable_low_cardinality_optimize, "false"); // be policy // whether disable automatic compaction task CONF_mBool(disable_auto_compaction, "false"); +// whether enable vectorized compaction +CONF_Bool(enable_vectorized_compaction, "false"); // check the configuration of auto compaction in seconds when auto compaction disabled CONF_mInt32(check_auto_compaction_interval_seconds, "5"); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 156977a9baddc2..02b41d0aef4c5c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -86,10 +86,17 @@ Status Compaction::do_compaction_impl(int64_t permits) { // 2. write merged rows to output rowset // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool Merger::Statistics stats; - auto res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers, - _output_rs_writer.get(), &stats); + Status res; + if (config::enable_vectorized_compaction) { + res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers, + _output_rs_writer.get(), &stats); + } else { + res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers, + _output_rs_writer.get(), &stats); + } + string merge_type = config::enable_vectorized_compaction ? "v" : ""; if (!res.ok()) { - LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res + LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res << ", tablet=" << _tablet->full_name() << ", output_version=" << _output_version; return res; @@ -132,8 +139,8 @@ Status Compaction::do_compaction_impl(int64_t permits) { current_max_version = _tablet->rowset_with_max_version()->end_version(); } - LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name() - << ", output_version=" << _output_version + LOG(INFO) << "succeed to do " << merge_type << compaction_name() + << ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version << ", current_max_version=" << current_max_version << ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num << ". elapsed time=" << watch.get_elapse_second() diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 42759f6b6fa417..f55b08f43718e3 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -107,15 +107,12 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, vectorized::Block block = schema.create_block(reader_params.return_columns); size_t output_rows = 0; - while (true) { - bool eof = false; + bool eof = false; + while (!eof) { // Read one block from block reader RETURN_NOT_OK_LOG( reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof), "failed to read next block when merging rowsets of tablet " + tablet->full_name()); - if (eof) { - break; - } RETURN_NOT_OK_LOG( dst_rowset_writer->add_block(&block), "failed to write block when merging rowsets of tablet " + tablet->full_name()); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 88eef92b2f417a..d1dda3e89dd02a 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -323,6 +323,9 @@ Status BetaRowsetWriter::_create_segment_writer( } Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr* writer) { + if ((*writer)->num_rows_written() == 0) { + return Status::OK(); + } uint64_t segment_size; uint64_t index_size; Status s = (*writer)->finalize(&segment_size, &index_size); diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 30dd2f3279dc69..78056d8fb89866 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -192,6 +192,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP } auto target_block_row = 0; + auto merged_row = 0; auto target_columns = block->mutate_columns(); _insert_data_normal(target_columns); @@ -218,6 +219,8 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP _insert_data_normal(target_columns); target_block_row++; + } else { + merged_row++; } _append_agg_data(target_columns); @@ -227,7 +230,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP _last_agg_data_counter = 0; _update_agg_data(target_columns); - _merged_rows += target_block_row; + _merged_rows += merged_row; return Status::OK(); } @@ -260,7 +263,6 @@ Status BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, Obje } } while (target_block_row < _batch_size); - _merged_rows += target_block_row; return Status::OK(); }