Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ CONF_Bool(enable_low_cardinality_optimize, "false");
// be policy
// whether disable automatic compaction task
CONF_mBool(disable_auto_compaction, "false");
// whether enable vectorized compaction
CONF_Bool(enable_vectorized_compaction, "false");
// check the configuration of auto compaction in seconds when auto compaction disabled
CONF_mInt32(check_auto_compaction_interval_seconds, "5");

Expand Down
17 changes: 12 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,17 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// 2. write merged rows to output rowset
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
Merger::Statistics stats;
auto res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
Status res;
if (config::enable_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
} else {
res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
}
string merge_type = config::enable_vectorized_compaction ? "v" : "";
if (!res.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version;
return res;
Expand Down Expand Up @@ -132,8 +139,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
current_max_version = _tablet->rowset_with_max_version()->end_version();
}

LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version
LOG(INFO) << "succeed to do " << merge_type << compaction_name()
<< ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version
<< ", current_max_version=" << current_max_version
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
<< ". elapsed time=" << watch.get_elapse_second()
Expand Down
7 changes: 2 additions & 5 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,12 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,

vectorized::Block block = schema.create_block(reader_params.return_columns);
size_t output_rows = 0;
while (true) {
bool eof = false;
bool eof = false;
while (!eof) {
// Read one block from block reader
RETURN_NOT_OK_LOG(
reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof),
"failed to read next block when merging rowsets of tablet " + tablet->full_name());
if (eof) {
break;
}
RETURN_NOT_OK_LOG(
dst_rowset_writer->add_block(&block),
"failed to write block when merging rowsets of tablet " + tablet->full_name());
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ Status BetaRowsetWriter::_create_segment_writer(
}

Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
if ((*writer)->num_rows_written() == 0) {
return Status::OK();
}
uint64_t segment_size;
uint64_t index_size;
Status s = (*writer)->finalize(&segment_size, &index_size);
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
}

auto target_block_row = 0;
auto merged_row = 0;
auto target_columns = block->mutate_columns();

_insert_data_normal(target_columns);
Expand All @@ -218,6 +219,8 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP

_insert_data_normal(target_columns);
target_block_row++;
} else {
merged_row++;
}

_append_agg_data(target_columns);
Expand All @@ -227,7 +230,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
_last_agg_data_counter = 0;
_update_agg_data(target_columns);

_merged_rows += target_block_row;
_merged_rows += merged_row;
return Status::OK();
}

Expand Down Expand Up @@ -260,7 +263,6 @@ Status BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, Obje
}
} while (target_block_row < _batch_size);

_merged_rows += target_block_row;
return Status::OK();
}

Expand Down