Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,20 @@ CONF_Bool(enable_storage_vectorization, "true");
CONF_Bool(enable_low_cardinality_optimize, "true");

// be policy
// whether check compaction checksum
CONF_mBool(enable_compaction_checksum, "false");
// whether disable automatic compaction task
CONF_mBool(disable_auto_compaction, "false");
// whether enable vectorized compaction
CONF_Bool(enable_vectorized_compaction, "true");
// whether enable vectorized schema change/material-view/rollup task.
CONF_Bool(enable_vectorized_alter_table, "true");
// whether enable vertical compaction
CONF_mBool(enable_vertical_compaction, "false");
// In vertical compaction, column number for every group
CONF_Int32(vertical_compaction_num_columns_per_group, "5");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config can be changed online and it works according code in merger.cpp

// In vertical compaction, max memory usage for row_source_buffer
CONF_Int32(vertical_compaction_max_row_source_memory_mb, "200");

// check the configuration of auto compaction in seconds when auto compaction disabled
CONF_mInt32(check_auto_compaction_interval_seconds, "5");
Expand Down
61 changes: 55 additions & 6 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet.h"
#include "olap/task/engine_checksum_task.h"
#include "util/time.h"
#include "util/trace.h"

Expand Down Expand Up @@ -112,14 +113,50 @@ Status Compaction::quick_rowsets_compact() {

Status Compaction::do_compaction(int64_t permits) {
TRACE("start to do compaction");
uint32_t checksum_before;
uint32_t checksum_after;
if (config::enable_compaction_checksum) {
EngineChecksumTask checksum_task(_tablet->tablet_id(), _tablet->schema_hash(),
_input_rowsets.back()->end_version(), &checksum_before);
checksum_task.execute();
}

_tablet->data_dir()->disks_compaction_score_increment(permits);
_tablet->data_dir()->disks_compaction_num_increment(1);
Status st = do_compaction_impl(permits);
_tablet->data_dir()->disks_compaction_score_increment(-permits);
_tablet->data_dir()->disks_compaction_num_increment(-1);

if (config::enable_compaction_checksum) {
EngineChecksumTask checksum_task(_tablet->tablet_id(), _tablet->schema_hash(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EngineChecksumTask does not do merge for unique key, so it does not work for now?

_input_rowsets.back()->end_version(), &checksum_after);
checksum_task.execute();
if (checksum_before != checksum_after) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use DCHECK here

LOG(WARNING) << "Compaction tablet=" << _tablet->tablet_id()
<< " checksum not consistent"
<< ", before=" << checksum_before << ", checksum_after=" << checksum_after;
}
}
return st;
}

bool Compaction::should_vertical_compaction() {
// some conditions that not use vertical compaction
if (!config::enable_vertical_compaction) {
return false;
}
if (_tablet->enable_unique_key_merge_on_write()) {
return false;
}
return true;
}

int64_t Compaction::get_avg_segment_rows() {
// take care of empty rowset
// todo(yixiu): add a new conf of segment size in compaction
return config::write_buffer_size / (_input_rowsets_size / (_input_row_num + 1) + 1);
}

Status Compaction::do_compaction_impl(int64_t permits) {
OlapStopWatch watch;

Expand All @@ -142,17 +179,19 @@ Status Compaction::do_compaction_impl(int64_t permits) {

auto use_vectorized_compaction = config::enable_vectorized_compaction;
string merge_type = use_vectorized_compaction ? "v" : "";
bool vertical_compaction = should_vertical_compaction();

LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
<< ", output_version=" << _output_version << ", permits: " << permits
<< ", is_vertical_compaction=" << vertical_compaction;
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
TabletSchemaSPtr cur_tablet_schema =
_tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();

RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema));
RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema, vertical_compaction));
RETURN_NOT_OK(construct_input_rowset_readers());
TRACE("prepare finished");

Expand All @@ -166,8 +205,14 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}

if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
if (vertical_compaction) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &stats);
} else {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
}
} else {
res = Merger::merge_rowsets(_tablet, compaction_type(), cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
Expand Down Expand Up @@ -233,11 +278,15 @@ Status Compaction::do_compaction_impl(int64_t permits) {
<< "s. cumulative_compaction_policy="
<< (cumu_policy == nullptr ? "quick" : cumu_policy->name())
<< ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second());

return Status::OK();
}

Status Compaction::construct_output_rowset_writer(TabletSchemaSPtr schema) {
Status Compaction::construct_output_rowset_writer(TabletSchemaSPtr schema, bool is_vertical) {
if (is_vertical) {
return _tablet->create_vertical_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING,
schema, _oldest_write_timestamp,
_newest_write_timestamp, &_output_rs_writer);
}
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, schema,
_oldest_write_timestamp, _newest_write_timestamp,
&_output_rs_writer);
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Compaction {
Status modify_rowsets();
void gc_output_rowset();

Status construct_output_rowset_writer(TabletSchemaSPtr schema);
Status construct_output_rowset_writer(TabletSchemaSPtr schema, bool is_vertical);
Status construct_input_rowset_readers();

Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
Expand All @@ -74,6 +74,9 @@ class Compaction {
std::vector<Version>* missing_version);
int64_t get_compaction_permits();

bool should_vertical_compaction();
int64_t get_avg_segment_rows();

protected:
// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class RowBlockV2;
class Schema;
class ColumnPredicate;

namespace vectorized {
struct IteratorRowRef;
};

class StorageReadOptions {
public:
struct KeyRange {
Expand Down Expand Up @@ -121,6 +125,13 @@ class RowwiseIterator {
return Status::NotSupported("to be implemented");
}

virtual Status next_row(vectorized::IteratorRowRef* ref) {
return Status::NotSupported("to be implemented");
}
virtual Status unique_key_next_row(vectorized::IteratorRowRef* ref) {
return Status::NotSupported("to be implemented");
}

virtual bool support_return_data_by_ref() { return false; }

virtual Status current_block_row_locations(std::vector<RowLocation>* block_row_locations) {
Expand All @@ -136,6 +147,9 @@ class RowwiseIterator {
// Return the data id such as segment id, used for keep the insert order when do
// merge sort in priority queue
virtual uint64_t data_id() const { return 0; }

// return rows merged count by iterator
virtual uint64_t merged_rows() const { return 0; }
};

} // namespace doris
135 changes: 135 additions & 0 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "olap/tuple_reader.h"
#include "util/trace.h"
#include "vec/olap/block_reader.h"
#include "vec/olap/vertical_block_reader.h"
#include "vec/olap/vertical_merge_iterator.h"

namespace doris {

Expand Down Expand Up @@ -188,4 +190,137 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
return Status::OK();
}

// split columns into several groups, make sure all keys in one group
// unique_key should consider sequence&delete column
void Merger::vertical_split_columns(TabletSchemaSPtr tablet_schema,
std::vector<std::vector<uint32_t>>* column_groups) {
uint32_t num_columns_per_group = config::vertical_compaction_num_columns_per_group;
uint32_t num_key_cols = tablet_schema->num_key_columns();
uint32_t total_cols = tablet_schema->num_columns();
std::vector<uint32_t> key_columns;
for (auto i = 0; i < num_key_cols; ++i) {
key_columns.emplace_back(i);
}
// in unique key, sequence & delete sign column should merge with key columns
int32_t sequence_col_idx = -1;
int32_t delete_sign_idx = -1;
// in key column compaction, seq_col real index is _block->columns() -2
// and delete_sign column is _block->columns() - 1
if (tablet_schema->keys_type() == KeysType::UNIQUE_KEYS) {
if (tablet_schema->has_sequence_col()) {
sequence_col_idx = tablet_schema->sequence_col_idx();
key_columns.emplace_back(sequence_col_idx);
}
delete_sign_idx = tablet_schema->field_index(DELETE_SIGN);
key_columns.emplace_back(delete_sign_idx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not delete sign idx in some table, e.g. created in an older doris.

}
VLOG_NOTICE << "sequence_col_idx=" << sequence_col_idx
<< ", delete_sign_idx=" << delete_sign_idx;
column_groups->emplace_back(std::move(key_columns));
std::vector<uint32_t> value_columns;
for (auto i = num_key_cols; i < total_cols; ++i) {
if (i == sequence_col_idx || i == delete_sign_idx) {
continue;
}
if ((i - num_key_cols) % num_columns_per_group == 0) {
column_groups->emplace_back();
}
column_groups->back().emplace_back(i);
}
}

Status Merger::vertical_compact_one_group(
TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, bool is_key,
const std::vector<uint32_t>& column_group, vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
vectorized::VerticalBlockReader reader(row_source_buf);
TabletReader::ReaderParams reader_params;
reader_params.is_key_column_group = is_key;
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
reader_params.rs_readers = src_rowset_readers;
reader_params.version = dst_rowset_writer->version();
{
std::shared_lock rdlock(tablet->get_header_lock());
auto delete_preds = tablet->delete_predicates();
std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*tablet_schema);
// Merge the columns in delete predicate that not in latest schema in to current tablet schema
for (auto& del_pred_rs : reader_params.delete_predicates) {
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_rs->version()));
}
reader_params.tablet_schema = merge_tablet_schema;

reader_params.return_columns = column_group;
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_NOT_OK(reader.init(reader_params));

vectorized::Block block = tablet_schema->create_block(reader_params.return_columns);
size_t output_rows = 0;
bool eof = false;
while (!eof) {
// Read one block from block reader
RETURN_NOT_OK_LOG(
reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof),
"failed to read next block when merging rowsets of tablet " + tablet->full_name());
RETURN_NOT_OK_LOG(
dst_rowset_writer->add_columns(&block, column_group, is_key, max_rows_per_segment),
"failed to write block when merging rowsets of tablet " + tablet->full_name());

output_rows += block.rows();
block.clear_column_data();
}

if (is_key && stats_output != nullptr) {
stats_output->output_rows = output_rows;
stats_output->merged_rows = reader.merged_rows();
stats_output->filtered_rows = reader.filtered_rows();
}
RETURN_IF_ERROR(dst_rowset_writer->flush_columns());

return Status::OK();
}

// steps to do vertical merge:
// 1. split columns into column groups
// 2. compact groups one by one, generate a row_source_buf when compact key group
// and use this row_source_buf to compact value column groups
// 3. build output rowset
Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
TabletSchemaSPtr tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output) {
LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
vertical_split_columns(tablet_schema, &column_groups);

vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),
reader_type);
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output));
if (is_key) {
row_sources_buf.flush();
}
row_sources_buf.seek_to_begin();
}
// finish compact, build output rowset
VLOG_NOTICE << "finish compact groups";
RETURN_IF_ERROR(dst_rowset_writer->final_flush());

return Status::OK();
}

} // namespace doris
21 changes: 21 additions & 0 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

namespace doris {

namespace vectorized {
class RowSourcesBuffer;
};

class Merger {
public:
struct Statistics {
Expand All @@ -46,6 +50,23 @@ class Merger {
TabletSchemaSPtr cur_tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, Statistics* stats_output);
static Status vertical_merge_rowsets(
TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output);

public:
// for vertical compaction
static void vertical_split_columns(TabletSchemaSPtr tablet_schema,
std::vector<std::vector<uint32_t>>* column_groups);
static Status vertical_compact_one_group(
TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema,
bool is_key, const std::vector<uint32_t>& column_group,
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output);
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
_reader_context.delete_bitmap = read_params.delete_bitmap;
_reader_context.enable_unique_key_merge_on_write = tablet()->enable_unique_key_merge_on_write();
_reader_context.record_rowids = read_params.record_rowids;
_reader_context.is_key_column_group = read_params.is_key_column_group;

*valid_rs_readers = *rs_readers;

Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class TabletReader {
// num of columns for orderby key
size_t read_orderby_key_num_prefix_columns = 0;

// for vertical compaction
bool is_key_column_group = false;

void check_validation() const;

std::string to_string() const;
Expand Down Expand Up @@ -134,7 +137,7 @@ class TabletReader {
return Status::OLAPInternalError(OLAP_ERR_READER_INITIALIZE_ERROR);
}

uint64_t merged_rows() const { return _merged_rows; }
virtual uint64_t merged_rows() const { return _merged_rows; }

uint64_t filtered_rows() const {
return _stats.rows_del_filtered + _stats.rows_del_by_bitmap +
Expand Down
Loading