Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,13 @@ Status OlapScanner::_init_tablet_reader_params(
std::copy(function_filters.cbegin(), function_filters.cend(),
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));
std::copy(_tablet->delete_predicates().cbegin(), _tablet->delete_predicates().cend(),
auto& delete_preds = _tablet->delete_predicates();
std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(_tablet_reader_params.delete_predicates,
_tablet_reader_params.delete_predicates.begin()));
// Merge the columns in delete predicate that not in latest schema in to current tablet schema
for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
_tablet_schema->merge_dropped_columns(
_tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version())));
for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) {
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version()));
}
// Range
for (auto key_range : key_ranges) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ void BaseCompaction::_filter_input_rowset() {

Status BaseCompaction::pick_rowsets_to_compact() {
_input_rowsets.clear();
_tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets);
std::shared_lock rdlock(_tablet->get_header_lock());
_tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets, rdlock);
std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator);
RETURN_NOT_OK(check_version_continuity(_input_rowsets));
RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets));
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ Status CumulativeCompaction::execute_compact_impl() {

Status CumulativeCompaction::pick_rowsets_to_compact() {
std::vector<RowsetSharedPtr> candidate_rowsets;

_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
std::shared_lock rdlock(_tablet->get_header_lock());
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);

if (candidate_rowsets.empty()) {
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION);
Expand Down
15 changes: 7 additions & 8 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,22 @@ bool DeleteHandler::_parse_condition(const std::string& condition_str, TConditio
return true;
}

Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema,
const std::vector<DeletePredicatePB>& delete_conditions,
int64_t version) {
Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
const std::vector<RowsetMetaSharedPtr>& delete_preds, int64_t version) {
DCHECK(!_is_inited) << "reinitialize delete handler.";
DCHECK(version >= 0) << "invalid parameters. version=" << version;
_predicate_mem_pool.reset(new MemPool());

for (const auto& delete_condition : delete_conditions) {
for (const auto& delete_pred : delete_preds) {
// Skip the delete condition with large version
if (delete_condition.version() > version) {
if (delete_pred->version().first > version) {
continue;
}
// Need the tablet schema at the delete condition to parse the accurate column unique id
TabletSchemaSPtr delete_pred_related_schema = tablet->tablet_schema(
Version(delete_condition.version(), delete_condition.version()));
TabletSchemaSPtr delete_pred_related_schema = delete_pred->tablet_schema();
auto& delete_condition = delete_pred->delete_predicate();
DeleteConditions temp;
temp.filter_version = delete_condition.version();
temp.filter_version = delete_pred->version().first;
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
TCondition condition;
if (!_parse_condition(sub_predicate, &condition)) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/delete_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet_schema.h"

namespace doris {
Expand Down Expand Up @@ -89,8 +90,8 @@ class DeleteHandler {
// return:
// * Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS): input parameters are not valid
// * Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR): alloc memory failed
Status init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema,
const std::vector<DeletePredicatePB>& delete_conditions, int64_t version);
Status init(TabletSchemaSPtr tablet_schema,
const std::vector<RowsetMetaSharedPtr>& delete_conditions, int64_t version);

bool empty() const { return _del_conds.empty(); }

Expand Down
16 changes: 8 additions & 8 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
reader_params.version = dst_rowset_writer->version();
{
std::shared_lock rdlock(tablet->get_header_lock());
std::copy(tablet->delete_predicates().cbegin(), tablet->delete_predicates().cend(),
auto delete_preds = tablet->delete_predicates();
std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*cur_tablet_schema);
// Merge the columns in delete predicate that not in latest schema in to current tablet schema
for (auto& del_pred_pb : reader_params.delete_predicates) {
merge_tablet_schema->merge_dropped_columns(
tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version())));
for (auto& del_pred_rs : reader_params.delete_predicates) {
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_rs->version()));
}
reader_params.tablet_schema = merge_tablet_schema;
RETURN_NOT_OK(reader.init(reader_params));
Expand Down Expand Up @@ -116,16 +116,16 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
reader_params.version = dst_rowset_writer->version();
{
std::shared_lock rdlock(tablet->get_header_lock());
std::copy(tablet->delete_predicates().cbegin(), tablet->delete_predicates().cend(),
auto delete_preds = tablet->delete_predicates();
std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*cur_tablet_schema);
// Merge the columns in delete predicate that not in latest schema in to current tablet schema
for (auto& del_pred_pb : reader_params.delete_predicates) {
merge_tablet_schema->merge_dropped_columns(
tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version())));
for (auto& del_pred_rs : reader_params.delete_predicates) {
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_rs->version()));
}
reader_params.tablet_schema = merge_tablet_schema;
if (tablet->enable_unique_key_merge_on_write()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
_filter_delete = true;
}

return _delete_handler.init(_tablet, _tablet_schema, read_params.delete_predicates,
return _delete_handler.init(_tablet_schema, read_params.delete_predicates,
read_params.version.second);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TabletReader {
std::vector<TCondition> conditions;
std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>> bloom_filters;
std::vector<FunctionFilter> function_filters;
std::vector<DeletePredicatePB> delete_predicates;
std::vector<RowsetMetaSharedPtr> delete_predicates;

// For unique key table with merge-on-write
DeleteBitmap* delete_bitmap {nullptr};
Expand Down
33 changes: 17 additions & 16 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1850,15 +1850,15 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
res = Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
break;
}
for (auto& delete_pred : base_tablet->delete_predicates()) {
if (delete_pred.version() > end_version) {
auto& all_del_preds = base_tablet->delete_predicates();
for (auto& delete_pred : all_del_preds) {
if (delete_pred->version().first > end_version) {
continue;
}
base_tablet_schema->merge_dropped_columns(base_tablet->tablet_schema(
Version(delete_pred.version(), delete_pred.version())));
base_tablet_schema->merge_dropped_columns(
base_tablet->tablet_schema(delete_pred->version()));
}
res = delete_handler.init(base_tablet, base_tablet_schema,
base_tablet->delete_predicates(), end_version);
res = delete_handler.init(base_tablet_schema, all_del_preds, end_version);
if (!res) {
LOG(WARNING) << "init delete handler failed. base_tablet="
<< base_tablet->full_name() << ", end_version=" << end_version;
Expand Down Expand Up @@ -2013,9 +2013,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
bool sc_directly = false;

// a.Parse the Alter request and convert it into an internal representation
Status res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer,
&sc_sorting, &sc_directly, sc_params.materialized_params_map,
*sc_params.desc_tbl, sc_params.base_tablet_schema);
Status res = _parse_request(sc_params, &rb_changer, &sc_sorting, &sc_directly);

auto process_alter_exit = [&]() -> Status {
{
Expand Down Expand Up @@ -2115,12 +2113,15 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams

// @static
// Analyze the mapping of the column and the mapping of the filter key
Status SchemaChangeHandler::_parse_request(
TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowBlockChanger* rb_changer,
bool* sc_sorting, bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>&
materialized_function_map,
DescriptorTbl desc_tbl, TabletSchemaSPtr base_tablet_schema) {
Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
RowBlockChanger* rb_changer, bool* sc_sorting,
bool* sc_directly) {
TabletSharedPtr base_tablet = sc_params.base_tablet;
TabletSharedPtr new_tablet = sc_params.new_tablet;
TabletSchemaSPtr base_tablet_schema = sc_params.base_tablet_schema;
const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map =
sc_params.materialized_params_map;
DescriptorTbl desc_tbl = *sc_params.desc_tbl;
// set column mapping
for (int i = 0, new_schema_size = new_tablet->tablet_schema()->num_columns();
i < new_schema_size; ++i) {
Expand Down Expand Up @@ -2232,7 +2233,7 @@ Status SchemaChangeHandler::_parse_request(
}
}

if (base_tablet->delete_predicates().size() != 0) {
if (!sc_params.delete_handler->empty()) {
// there exists delete condition in header, can't do linked schema change
*sc_directly = true;
}
Expand Down
7 changes: 2 additions & 5 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,8 @@ class SchemaChangeHandler {

static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);

static Status _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>&
materialized_function_map,
DescriptorTbl desc_tbl, TabletSchemaSPtr base_tablet_schema);
static Status _parse_request(const SchemaChangeParams& sc_params, RowBlockChanger* rb_changer,
bool* sc_sorting, bool* sc_directly);

// Initialization Settings for creating a default value
static Status _init_column_mapping(ColumnMapping* column_mapping,
Expand Down
16 changes: 5 additions & 11 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowset
// delete versions from new local tablet_meta
for (const Version& version : versions_to_delete) {
new_tablet_meta->delete_rs_meta_by_version(version, nullptr);
if (new_tablet_meta->version_for_delete_predicate(version)) {
new_tablet_meta->remove_delete_predicate_by_version(version);
}
LOG(INFO) << "delete version from new local tablet_meta when clone. [table="
<< full_name() << ", version=" << version << "]";
}
Expand Down Expand Up @@ -1137,17 +1134,18 @@ TabletInfo Tablet::get_tablet_info() const {
}

void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
std::vector<RowsetSharedPtr>* candidate_rowsets) {
std::vector<RowsetSharedPtr>* candidate_rowsets,
std::shared_lock<std::shared_mutex>& /* meta lock*/) {
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return;
}
std::shared_lock rdlock(_meta_lock);
_cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map, _cumulative_point,
candidate_rowsets);
}

void Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* candidate_rowsets) {
std::shared_lock rdlock(_meta_lock);
void Tablet::pick_candidate_rowsets_to_base_compaction(
vector<RowsetSharedPtr>* candidate_rowsets,
std::shared_lock<std::shared_mutex>& /* meta lock*/) {
for (auto& it : _rs_version_map) {
// Do compaction on local rowsets only.
if (it.first.first < _cumulative_point && it.second->is_local()) {
Expand Down Expand Up @@ -1753,10 +1751,6 @@ Status Tablet::cooldown() {
has_shutdown = tablet_state() == TABLET_SHUTDOWN;
if (!has_shutdown) {
modify_rowsets(to_add, to_delete);
if (new_rowset_meta->has_delete_predicate()) {
_tablet_meta->add_delete_predicate(new_rowset_meta->delete_predicate(),
new_rowset_meta->start_version());
}
_self_owned_remote_rowsets.insert(to_add.front());
save_meta();
}
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <functional>
#include <memory>
#include <set>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -156,7 +157,7 @@ class Tablet : public BaseTablet {
Status capture_rs_readers(const std::vector<Version>& version_path,
std::vector<RowsetReaderSharedPtr>* rs_readers) const;

const std::vector<DeletePredicatePB>& delete_predicates() {
const std::vector<RowsetMetaSharedPtr> delete_predicates() {
return _tablet_meta->delete_predicates();
}
bool version_for_delete_predicate(const Version& version);
Expand Down Expand Up @@ -228,8 +229,11 @@ class Tablet : public BaseTablet {
TabletInfo get_tablet_info() const;

void pick_candidate_rowsets_to_cumulative_compaction(
std::vector<RowsetSharedPtr>* candidate_rowsets);
void pick_candidate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
std::vector<RowsetSharedPtr>* candidate_rowsets,
std::shared_lock<std::shared_mutex>& /* meta lock*/);
void pick_candidate_rowsets_to_base_compaction(
std::vector<RowsetSharedPtr>* candidate_rowsets,
std::shared_lock<std::shared_mutex>& /* meta lock*/);

void calculate_cumulative_point();
// TODO(ygl):
Expand Down
50 changes: 8 additions & 42 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_schema(b._schema),
_rs_metas(b._rs_metas),
_stale_rs_metas(b._stale_rs_metas),
_del_predicates(b._del_predicates),
_in_restore_mode(b._in_restore_mode),
_preferred_rowset_type(b._preferred_rowset_type),
_storage_policy(b._storage_policy),
Expand Down Expand Up @@ -447,9 +446,6 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
for (auto& it : tablet_meta_pb.rs_metas()) {
RowsetMetaSharedPtr rs_meta(new RowsetMeta());
rs_meta->init_from_pb(it);
if (rs_meta->has_delete_predicate()) {
add_delete_predicate(rs_meta->delete_predicate(), rs_meta->version().first);
}
_rs_metas.push_back(std::move(rs_meta));
}

Expand Down Expand Up @@ -607,12 +603,7 @@ Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) {
}
}
}

_rs_metas.push_back(rs_meta);
if (rs_meta->has_delete_predicate()) {
add_delete_predicate(rs_meta->delete_predicate(), rs_meta->version().first);
}

return Status::OK();
}

Expand Down Expand Up @@ -640,9 +631,6 @@ void TabletMeta::modify_rs_metas(const std::vector<RowsetMetaSharedPtr>& to_add,
auto it = _rs_metas.begin();
while (it != _rs_metas.end()) {
if (rs_to_del->version() == (*it)->version()) {
if ((*it)->has_delete_predicate()) {
remove_delete_predicate_by_version((*it)->version());
}
_rs_metas.erase(it);
// there should be only one rowset match the version
break;
Expand Down Expand Up @@ -721,45 +709,23 @@ RowsetMetaSharedPtr TabletMeta::acquire_stale_rs_meta_by_version(const Version&
return nullptr;
}

void TabletMeta::add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version) {
for (auto& del_pred : _del_predicates) {
if (del_pred.version() == version) {
*del_pred.mutable_sub_predicates() = delete_predicate.sub_predicates();
return;
const std::vector<RowsetMetaSharedPtr> TabletMeta::delete_predicates() const {
std::vector<RowsetMetaSharedPtr> res;
for (auto& del_pred : _rs_metas) {
if (del_pred->has_delete_predicate()) {
res.push_back(del_pred);
}
}
DeletePredicatePB copied_pred = delete_predicate;
copied_pred.set_version(version);
_del_predicates.emplace_back(copied_pred);
}

void TabletMeta::remove_delete_predicate_by_version(const Version& version) {
DCHECK(version.first == version.second) << "version=" << version;
int pred_to_del = -1;
for (int i = 0; i < _del_predicates.size(); ++i) {
if (_del_predicates[i].version() == version.first) {
pred_to_del = i;
// one DeletePredicatePB stands for a nested predicate, such as user submit a delete predicate a=1 and b=2
// they could be saved as a one DeletePredicatePB
break;
}
}
if (pred_to_del > -1) {
_del_predicates.erase(_del_predicates.begin() + pred_to_del);
}
}

const std::vector<DeletePredicatePB>& TabletMeta::delete_predicates() const {
return _del_predicates;
return res;
}

bool TabletMeta::version_for_delete_predicate(const Version& version) {
if (version.first != version.second) {
return false;
}

for (auto& del_pred : _del_predicates) {
if (del_pred.version() == version.first) {
for (auto& del_pred : _rs_metas) {
if (del_pred->version().first == version.first && del_pred->has_delete_predicate()) {
return true;
}
}
Expand Down
Loading