Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ CONF_mInt32(snapshot_expire_time_sec, "172800");
// It is only a recommended value. When the disk space is insufficient,
// the file storage period under trash dose not have to comply with this parameter.
CONF_mInt32(trash_file_expire_time_sec, "259200");
// check row nums for BE/CE and schema change. true is open, false is closed.
CONF_mBool(row_nums_check, "true");
// minimum file descriptor number
// modify them upon necessity
CONF_Int32(min_file_descriptor_number, "60000");
Expand All @@ -267,19 +265,13 @@ CONF_Bool(disable_storage_page_cache, "false");
// whether to disable row cache feature in storage
CONF_Bool(disable_storage_row_cache, "true");

CONF_Bool(enable_storage_vectorization, "true");

CONF_Bool(enable_low_cardinality_optimize, "true");

// be policy
// whether check compaction checksum
CONF_mBool(enable_compaction_checksum, "false");
// whether disable automatic compaction task
CONF_mBool(disable_auto_compaction, "false");
// whether enable vectorized compaction
CONF_Bool(enable_vectorized_compaction, "true");
// whether enable vectorized schema change/material-view/rollup task.
CONF_Bool(enable_vectorized_alter_table, "true");
// whether enable vertical compaction
CONF_mBool(enable_vertical_compaction, "true");
// whether enable ordered data compaction
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ struct ColumnMapping {
int32_t ref_column;
// normally for default value. stores values for filters
WrapperField* default_value;
// materialize view transform function used in schema change
std::string materialized_function;
std::shared_ptr<TExpr> expr;
const TabletColumn* new_column;
};
Expand Down
30 changes: 11 additions & 19 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,6 @@ bool Compaction::handle_ordered_data_compaction() {
Status Compaction::do_compaction_impl(int64_t permits) {
OlapStopWatch watch;

auto use_vectorized_compaction = config::enable_vectorized_compaction;
string merge_type = use_vectorized_compaction ? "v" : "";

if (handle_ordered_data_compaction()) {
RETURN_NOT_OK(modify_rowsets());
TRACE("modify rowsets finished");
Expand All @@ -255,7 +252,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
_tablet->set_last_base_compaction_success_time(now);
}
auto cumu_policy = _tablet->cumulative_compaction_policy();
LOG(INFO) << "succeed to do ordered data " << merge_type << compaction_name()
LOG(INFO) << "succeed to do ordered data " << compaction_name()
<< ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version
<< ", disk=" << _tablet->data_dir()->path()
<< ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num
Expand All @@ -267,7 +264,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}
build_basic_info();

LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
bool vertical_compaction = should_vertical_compaction();
RETURN_NOT_OK(construct_input_rowset_readers());
Expand All @@ -286,21 +283,17 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}

Status res;
if (use_vectorized_compaction) {
if (vertical_compaction) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &stats);
} else {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
}
if (vertical_compaction) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &stats);
} else {
LOG(FATAL) << "Only support vectorized compaction";
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
}

if (!res.ok()) {
LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version;
return res;
Expand Down Expand Up @@ -351,9 +344,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {

auto cumu_policy = _tablet->cumulative_compaction_policy();
DCHECK(cumu_policy);
LOG(INFO) << "succeed to do " << merge_type << compaction_name()
<< " is_vertical=" << vertical_compaction << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version
LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" << vertical_compaction
<< ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version
<< ", current_max_version=" << current_max_version
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" << _input_num_segments
<< ", input_row_num=" << _input_row_num
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Status StorageEngine::start_bg_threads() {
.set_min_threads(config::max_cumu_compaction_threads)
.set_max_threads(config::max_cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool);
if (config::enable_segcompaction && config::enable_storage_vectorization) {
if (config::enable_segcompaction) {
ThreadPoolBuilder("SegCompactionTaskThreadPool")
.set_min_threads(config::seg_compaction_max_threads)
.set_max_threads(config::seg_compaction_max_threads)
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class RowCursor;

class PushHandler {
public:
using SchemaMapping = std::vector<ColumnMapping>;

PushHandler() = default;
~PushHandler() = default;

Expand All @@ -62,7 +60,6 @@ class PushHandler {
Status _do_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request,
PushType push_type, std::vector<TTabletInfo>* tablet_info_vec);

private:
// mainly tablet_id, version and delta file path
TPushReq _request;

Expand Down
20 changes: 8 additions & 12 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,19 +250,15 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) {

Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
if (config::enable_storage_vectorization && _context->is_vec) {
do {
auto s = _iterator->next_block_view(block_view);
if (!s.ok()) {
if (!s.is<END_OF_FILE>()) {
LOG(WARNING) << "failed to read next block view: " << s.to_string();
}
return s;
do {
auto s = _iterator->next_block_view(block_view);
if (!s.ok()) {
if (!s.is<END_OF_FILE>()) {
LOG(WARNING) << "failed to read next block view: " << s.to_string();
}
} while (block_view->empty());
} else {
return Status::NotSupported("block view only support enable_storage_vectorization");
}
return s;
}
} while (block_view->empty());

return Status::OK();
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {

Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status status = Status::OK();
if (!config::enable_segcompaction || !config::enable_storage_vectorization ||
_context.tablet_schema->is_dynamic_schema() || !_check_and_set_is_doing_segcompaction()) {
if (!config::enable_segcompaction || _context.tablet_schema->is_dynamic_schema() ||
!_check_and_set_is_doing_segcompaction()) {
return status;
}
if (_segcompaction_status.load() != OK) {
Expand Down Expand Up @@ -326,7 +326,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
Status status = Status::OK();
DCHECK_EQ(_is_doing_segcompaction, false);
if (!config::enable_segcompaction || !config::enable_storage_vectorization) {
if (!config::enable_segcompaction) {
return Status::OK();
}
if (_segcompaction_status.load() != OK) {
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ struct RowsetReaderContext {
bool use_page_cache = false;
int sequence_id_idx = -1;
int batch_size = 1024;
bool is_vec = false;
bool is_unique = false;
//record row num merged in generic iterator
uint64_t* merged_rows = nullptr;
Expand Down
41 changes: 11 additions & 30 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,9 @@ class MultiBlockMerger {
RowRefComparator _cmp;
};

RowBlockChanger::RowBlockChanger(TabletSchemaSPtr tablet_schema,
const DeleteHandler* delete_handler, DescriptorTbl desc_tbl)
RowBlockChanger::RowBlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl)
: _desc_tbl(desc_tbl) {
_schema_mapping.resize(tablet_schema->num_columns());
_delete_handler = delete_handler;
}

RowBlockChanger::~RowBlockChanger() {
Expand Down Expand Up @@ -259,7 +257,7 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,

int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
DCHECK(ref_block->get_by_position(result_column_id).column->size() == row_size)
CHECK(ref_block->get_by_position(result_column_id).column->size() == row_size)
<< new_block->get_by_position(idx).name << " size invalid"
<< ", expect=" << row_size
<< ", real=" << ref_block->get_by_position(result_column_id).column->size();
Expand Down Expand Up @@ -288,7 +286,7 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,
// not nullable to nullable
if (new_col_nullable) {
auto* new_nullable_col = assert_cast<vectorized::ColumnNullable*>(
std::move(*new_col.column).mutate().get());
new_col.column->assume_mutable().get());

new_nullable_col->swap_nested_column(ref_col.column);
new_nullable_col->get_null_map_data().resize_fill(new_nullable_col->size());
Expand All @@ -299,7 +297,7 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,
// the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`,
// so need to handle nullable to not nullable here
auto* ref_nullable_col = assert_cast<vectorized::ColumnNullable*>(
std::move(*ref_col.column).mutate().get());
ref_col.column->assume_mutable().get());

ref_nullable_col->swap_nested_column(new_col.column);
}
Expand Down Expand Up @@ -582,9 +580,7 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
const std::vector<TOlapTableIndex>& alter_inverted_indexs,
const TabletSchemaSPtr& tablet_schema)
: SchemaChange(),
_alter_inverted_indexs(alter_inverted_indexs),
_tablet_schema(tablet_schema) {
: _alter_inverted_indexs(alter_inverted_indexs), _tablet_schema(tablet_schema) {
_olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();
}

Expand Down Expand Up @@ -676,7 +672,7 @@ Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr rowset_reader

std::shared_ptr<vectorized::Block> block =
std::make_shared<vectorized::Block>(_tablet_schema->create_block(return_columns));
do {
while (true) {
res = iter->next_batch(block.get());
if (!res.ok()) {
if (res.is<END_OF_FILE>()) {
Expand All @@ -693,7 +689,7 @@ Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr rowset_reader
return res;
}
block->clear_column_data();
} while (true);
}

// finish write inverted index, flush data to compound file
for (auto& writer_sign : inverted_index_writer_signs) {
Expand Down Expand Up @@ -724,10 +720,11 @@ Status SchemaChangeForInvertedIndex::_add_nullable(
auto next_run_step = [&]() {
size_t step = 1;
for (auto i = offset + 1; i < num_rows; ++i) {
if (null_map[offset] == null_map[i])
if (null_map[offset] == null_map[i]) {
step++;
else
} else {
break;
}
}
return step;
};
Expand Down Expand Up @@ -877,8 +874,6 @@ Status SchemaChangeHandler::process_alter_inverted_index(const TAlterInvertedInd

std::shared_mutex SchemaChangeHandler::_mutex;
std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
std::set<std::string> SchemaChangeHandler::_supported_functions = {"hll_hash", "to_bitmap",
"to_bitmap_with_check"};

// In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished
// It will cost a lot of time to wait and the task is very difficult to understand.
Expand Down Expand Up @@ -1055,7 +1050,6 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.is_vec = config::enable_vectorized_alter_table;
reader_context.delete_bitmap = &base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, end_version);
for (auto& rs_reader : rs_readers) {
Expand Down Expand Up @@ -1105,16 +1099,6 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}

if (item.__isset.mv_expr) {
if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) {
mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name;
if (!config::enable_vectorized_alter_table &&
!_supported_functions.count(mv_param.mv_expr)) {
return Status::NotSupported("Unknow materialized view expr " +
mv_param.mv_expr);
}
} else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
mv_param.mv_expr = "count_field";
}
mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
}
sc_params.materialized_params_map.insert(
Expand Down Expand Up @@ -1362,7 +1346,6 @@ Status SchemaChangeHandler::_get_rowset_readers(TabletSharedPtr tablet,
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.is_vec = config::enable_vectorized_alter_table;
reader_context.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, end_version);

Expand Down Expand Up @@ -1532,8 +1515,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams

// Add filter information in change, and filter column information will be set in _parse_request
// And filter some data every time the row block changes
RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler,
*sc_params.desc_tbl);
RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), *sc_params.desc_tbl);

bool sc_sorting = false;
bool sc_directly = false;
Expand Down Expand Up @@ -1671,7 +1653,6 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,

if (materialized_function_map.find(column_name) != materialized_function_map.end()) {
auto mvParam = materialized_function_map.find(column_name)->second;
column_mapping->materialized_function = mvParam.mv_expr;
column_mapping->expr = mvParam.expr;
int32_t column_index = base_tablet_schema->field_index(mvParam.origin_column_name);
if (column_index >= 0) {
Expand Down
20 changes: 6 additions & 14 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class InvertedIndexColumnWriter;

class RowBlockChanger {
public:
RowBlockChanger(TabletSchemaSPtr tablet_schema, const DeleteHandler* delete_handler,
DescriptorTbl desc_tbl);
RowBlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl);

~RowBlockChanger();

Expand All @@ -52,12 +51,7 @@ class RowBlockChanger {
// @brief column-mapping specification of new schema
SchemaMapping _schema_mapping;

// delete handler for filtering data which use specified in DELETE_DATA
const DeleteHandler* _delete_handler = nullptr;

DescriptorTbl _desc_tbl;

DISALLOW_COPY_AND_ASSIGN(RowBlockChanger);
};

class SchemaChange {
Expand Down Expand Up @@ -85,7 +79,7 @@ class SchemaChange {
_add_filtered_rows(rowset_reader->filtered_rows());

// Check row num changes
if (config::row_nums_check && !_check_row_nums(rowset_reader, *rowset_writer)) {
if (!_check_row_nums(rowset_reader, *rowset_writer)) {
return Status::Error<ErrorCode::ALTER_STATUS_ERR>();
}

Expand Down Expand Up @@ -181,11 +175,11 @@ class SchemaChangeForInvertedIndex : public SchemaChange {
public:
explicit SchemaChangeForInvertedIndex(const std::vector<TOlapTableIndex>& alter_inverted_indexs,
const TabletSchemaSPtr& tablet_schema);
virtual ~SchemaChangeForInvertedIndex();
~SchemaChangeForInvertedIndex() override;

virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
TabletSchemaSPtr base_tablet_schema) override;
Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
TabletSchemaSPtr base_tablet_schema) override;

private:
DISALLOW_COPY_AND_ASSIGN(SchemaChangeForInvertedIndex);
Expand All @@ -197,7 +191,6 @@ class SchemaChangeForInvertedIndex : public SchemaChange {
const std::pair<int64_t, int64_t>& index_writer_sign, Field* field,
const uint8_t* null_map, const uint8_t** ptr, size_t num_rows);

private:
std::vector<TOlapTableIndex> _alter_inverted_indexs;
TabletSchemaSPtr _tablet_schema;

Expand Down Expand Up @@ -240,7 +233,6 @@ class SchemaChangeHandler {
struct AlterMaterializedViewParam {
std::string column_name;
std::string origin_column_name;
std::string mv_expr;
std::shared_ptr<TExpr> expr;
};

Expand Down
Loading