Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ OLAPStatus DeleteConditionHandler::check_condition_valid(
const TabletSchema& schema,
const TCondition& cond) {
// 检查指定列名的列是否存在
int field_index = _get_field_index(schema, cond.column_name);

int32_t field_index = schema.field_index(cond.column_name);
if (field_index < 0) {
OLAP_LOG_WARNING("field is not existent. [field_index=%d]", field_index);
return OLAP_ERR_DELETE_INVALID_CONDITION;
Expand Down Expand Up @@ -229,19 +228,10 @@ bool DeleteHandler::_parse_condition(const std::string& condition_str, TConditio

OLAPStatus DeleteHandler::init(const TabletSchema& schema,
const DelPredicateArray& delete_conditions, int32_t version) {
if (_is_inited) {
OLAP_LOG_WARNING("reinitialize delete handler.");
return OLAP_ERR_INIT_FAILED;

}

if (version < 0) {
OLAP_LOG_WARNING("invalid parameters. [version=%d]", version);
return OLAP_ERR_DELETE_INVALID_PARAMETERS;
}
DCHECK(!_is_inited) << "reinitialize delete handler.";
DCHECK(version >= 0) << "invalid parameters. version=" << version;

DelPredicateArray::const_iterator it = delete_conditions.begin();

for (; it != delete_conditions.end(); ++it) {
// 跳过版本号大于version的过滤条件
if (it->version() > version) {
Expand All @@ -250,7 +240,6 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema,

DeleteConditions temp;
temp.filter_version = it->version();

temp.del_cond = new(std::nothrow) Conditions();

if (temp.del_cond == nullptr) {
Expand Down
18 changes: 2 additions & 16 deletions be/src/olap/delete_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,13 @@ class DeleteConditionHandler {
std::string construct_sub_predicates(const TCondition& condition);

private:

int32_t _get_field_index(const TabletSchema& schema, const std::string& field_name) const {
for (int i = 0; i < schema.num_columns(); i++) {
if (schema.column(i).name() == field_name) {
return i;
}
}
LOG(WARNING) << "invalid field name. name='" << field_name;
return -1;
}

bool is_condition_value_valid(const TabletColumn& column, const TCondition& cond, const string& value);
};

// 表示一个删除条件
struct DeleteConditions {
DeleteConditions() : filter_version(0), del_cond(NULL) {}
~DeleteConditions() {}

int32_t filter_version; // 删除条件版本号
Conditions* del_cond; // 删除条件
int32_t filter_version = 0; // 删除条件版本号
Conditions* del_cond = nullptr; // 删除条件
};

// 这个类主要用于判定一条数据(RowCursor)是否符合删除条件。这个类的使用流程如下:
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/olap_cond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,10 @@ bool CondColumn::eval(const segment_v2::BloomFilter* bf) const {
}

OLAPStatus Conditions::append_condition(const TCondition& tcond) {
int32_t index = _get_field_index(tcond.column_name);
DCHECK(_schema != nullptr);
int32_t index = _schema->field_index(tcond.column_name);
if (index < 0) {
LOG(WARNING) << "fail to get field index, name is invalid. index=" << index
<< ", field_name=" << tcond.column_name;
LOG(WARNING) << "fail to get field index, field name=" << tcond.column_name;
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}

Expand Down
13 changes: 2 additions & 11 deletions be/src/olap/olap_cond.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class Conditions {
_columns.clear();
}

// TODO(yingchun): should do it in constructor
void set_tablet_schema(const TabletSchema* schema) {
_schema = schema;
}
Expand All @@ -187,22 +188,12 @@ class Conditions {
CondColumn* get_column(int32_t cid) const;

private:
int32_t _get_field_index(const std::string& field_name) const {
for (int i = 0; i < _schema->num_columns(); i++) {
if (_schema->column(i).name() == field_name) {
return i;
}
}
LOG(WARNING) << "invalid field name. [name='" << field_name << "']";
return -1;
}

bool _cond_column_is_key_or_duplicate(const CondColumn* cc) const {
return cc->is_key() || _schema->keys_type() == KeysType::DUP_KEYS;
}

private:
const TabletSchema* _schema;
const TabletSchema* _schema = nullptr;
CondColumns _columns; // list of condition column
};

Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
void Reader::_init_conditions_param(const ReaderParams& read_params) {
_conditions.set_tablet_schema(&_tablet->tablet_schema());
for (const auto& condition : read_params.conditions) {
_conditions.append_condition(condition);
DCHECK_EQ(OLAP_SUCCESS, _conditions.append_condition(condition));
ColumnPredicate* predicate = _parse_to_predicate(condition);
if (predicate != nullptr) {
_col_predicates.push_back(predicate);
Expand Down Expand Up @@ -854,7 +854,10 @@ COMPARISON_PREDICATE_CONDITION_VALUE(ge, GreaterEqualPredicate)

ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) {
// TODO: not equal and not in predicate is not pushed down
int index = _tablet->field_index(condition.column_name);
int32_t index = _tablet->field_index(condition.column_name);
if (index < 0) {
return nullptr;
}
const TabletColumn& column = _tablet->tablet_schema().column(index);
if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
return nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Tablet : public BaseTablet {
inline double bloom_filter_fpp() const;
inline size_t next_unique_id() const;
inline size_t row_size() const;
inline size_t field_index(const string& field_name) const;
inline int32_t field_index(const string& field_name) const;

// operation in rowsets
OLAPStatus add_rowset(RowsetSharedPtr rowset, bool need_persist = true);
Expand Down Expand Up @@ -396,7 +396,7 @@ inline size_t Tablet::next_unique_id() const {
return _schema.next_column_unique_id();
}

inline size_t Tablet::field_index(const string& field_name) const {
inline int32_t Tablet::field_index(const string& field_name) const {
return _schema.field_index(field_name);
}

Expand Down
19 changes: 6 additions & 13 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,6 @@ OLAPStatus TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& r
next_unique_id = request.tablet_schema.columns.size();
} else {
next_unique_id = base_tablet->next_unique_id();
size_t old_num_columns = base_tablet->num_columns();
auto& new_columns = request.tablet_schema.columns;
for (uint32_t new_col_idx = 0; new_col_idx < new_columns.size(); ++new_col_idx) {
const TColumn& column = new_columns[new_col_idx];
Expand All @@ -1342,18 +1341,12 @@ OLAPStatus TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& r
// unique_id in old_tablet to be the column's ordinal number in new_tablet
// 2. if column exists only in new_tablet, assign next_unique_id of old_tablet
// to the new column
size_t old_col_idx = 0;
for (old_col_idx = 0 ; old_col_idx < old_num_columns; ++old_col_idx) {
const string& old_name = base_tablet->tablet_schema().column(old_col_idx).name();
if (old_name == column.column_name) {
uint32_t old_unique_id
= base_tablet->tablet_schema().column(old_col_idx).unique_id();
col_idx_to_unique_id[new_col_idx] = old_unique_id;
break;
}
}
// Not exist in old tablet, it is a new added column
if (old_col_idx == old_num_columns) {
int32_t old_col_idx = base_tablet->field_index(column.column_name);
if (old_col_idx != -1) {
uint32_t old_unique_id = base_tablet->tablet_schema().column(old_col_idx).unique_id();
col_idx_to_unique_id[new_col_idx] = old_unique_id;
} else {
// Not exist in old tablet, it is a new added column
col_idx_to_unique_id[new_col_idx] = next_unique_id++;
}
}
Expand Down
27 changes: 14 additions & 13 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,17 +348,19 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
_num_key_columns = 0;
_num_null_columns = 0;
_cols.clear();
_field_name_to_index.clear();
for (auto& column_pb : schema.column()) {
TabletColumn column;
column.init_from_pb(column_pb);
_cols.push_back(column);
_num_columns++;
if (column.is_key()) {
_num_key_columns++;
}
if (column.is_nullable()) {
_num_null_columns++;
}
_field_name_to_index[column.name()] = _num_columns;
_cols.emplace_back(std::move(column));
_num_columns++;
}
_num_short_key_columns = schema.num_short_key_columns();
_num_rows_per_row_block = schema.num_rows_per_row_block();
Expand Down Expand Up @@ -404,17 +406,9 @@ size_t TabletSchema::row_size() const {
return size;
}

size_t TabletSchema::field_index(const std::string& field_name) const {
bool field_exist = false;
int ordinal = -1;
for (auto& column : _cols) {
ordinal++;
if (column.name() == field_name) {
field_exist = true;
break;
}
}
return field_exist ? ordinal : -1;
int32_t TabletSchema::field_index(const std::string& field_name) const {
const auto& found = _field_name_to_index.find(field_name);
return (found == _field_name_to_index.end()) ? -1 : found->second;
}

const std::vector<TabletColumn>& TabletSchema::columns() const {
Expand All @@ -427,6 +421,13 @@ const TabletColumn& TabletSchema::column(size_t ordinal) const {
return _cols[ordinal];
}

void TabletSchema::init_field_index_for_test() {
_field_name_to_index.clear();
for (int i = 0; i < _cols.size(); ++i) {
_field_name_to_index[_cols[i].name()] = i;
}
}

bool operator==(const TabletColumn& a, const TabletColumn& b) {
if (a._unique_id != b._unique_id) return false;
if (a._col_name != b._col_name) return false;
Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,14 @@ bool operator!=(const TabletColumn& a, const TabletColumn& b);

class TabletSchema {
public:
// TODO(yingchun): better to make constructor as private to avoid
// manually init members incorrectly, and define a new function like
// void create_from_pb(const TabletSchemaPB& schema, TabletSchema* tablet_schema)
TabletSchema() = default;
void init_from_pb(const TabletSchemaPB& schema);
void to_schema_pb(TabletSchemaPB* tablet_meta_pb);
size_t row_size() const;
size_t field_index(const std::string& field_name) const;
int32_t field_index(const std::string& field_name) const;
const TabletColumn& column(size_t ordinal) const;
const std::vector<TabletColumn>& columns() const;
inline size_t num_columns() const { return _num_columns; }
Expand All @@ -119,12 +122,16 @@ class TabletSchema {
inline int32_t sequence_col_idx() const { return _sequence_col_idx; }

private:
// Only for unit test
void init_field_index_for_test();

friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);

private:
KeysType _keys_type = DUP_KEYS;
std::vector<TabletColumn> _cols;
std::unordered_map<std::string, int32_t> _field_name_to_index;
size_t _num_columns = 0;
size_t _num_key_columns = 0;
size_t _num_null_columns = 0;
Expand Down
6 changes: 3 additions & 3 deletions be/test/olap/cumulative_compaction_policy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1052,20 +1052,20 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _pick_missing_version_cumulative
rowsets.push_back(_tablet->get_rowset_by_version({4, 4}));
std::shared_ptr<MemTracker> mem_tracker(new MemTracker());
CumulativeCompaction compaction(_tablet, "label", mem_tracker);
compaction.find_longest_consecutive_version(&rowsets);
compaction.find_longest_consecutive_version(&rowsets, nullptr);
ASSERT_EQ(3, rowsets.size());
ASSERT_EQ(2, rowsets[2]->end_version());

// no miss version
std::vector<RowsetSharedPtr> rowsets2;
rowsets2.push_back(_tablet->get_rowset_by_version({0, 0}));
compaction.find_longest_consecutive_version(&rowsets2);
compaction.find_longest_consecutive_version(&rowsets2, nullptr);
ASSERT_EQ(1, rowsets2.size());
ASSERT_EQ(0, rowsets[0]->end_version());

// no version
std::vector<RowsetSharedPtr> rowsets3;
compaction.find_longest_consecutive_version(&rowsets3);
compaction.find_longest_consecutive_version(&rowsets3, nullptr);
ASSERT_EQ(0, rowsets3.size());
}
}
Expand Down
16 changes: 9 additions & 7 deletions be/test/olap/rowset/segment_v2/segment_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class SegmentReaderWriterTest : public ::testing::Test {
res._num_columns = columns.size();
res._num_key_columns = num_key_columns;
res._num_short_key_columns = num_short_key_columns != -1 ? num_short_key_columns : num_key_columns;
res.init_field_index_for_test();
return res;
}

Expand Down Expand Up @@ -442,7 +443,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
condition.__set_condition_values(vals);
std::shared_ptr<Conditions> conditions(new Conditions());
conditions->set_tablet_schema(&tablet_schema);
conditions->append_condition(condition);
ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

StorageReadOptions read_opts;
read_opts.stats = &stats;
Expand All @@ -465,7 +466,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
condition.__set_condition_values(vals);
std::shared_ptr<Conditions> conditions(new Conditions());
conditions->set_tablet_schema(&tablet_schema);
conditions->append_condition(condition);
ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

StorageReadOptions read_opts;
read_opts.stats = &stats;
Expand Down Expand Up @@ -513,7 +514,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
condition.__set_condition_values(vals);
std::shared_ptr<Conditions> conditions(new Conditions());
conditions->set_tablet_schema(&tablet_schema);
conditions->append_condition(condition);
ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

// the second page read will be pruned by the following delete predicate
TCondition delete_condition;
Expand All @@ -523,7 +524,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
delete_condition.__set_condition_values(vals2);
std::shared_ptr<Conditions> delete_conditions(new Conditions());
delete_conditions->set_tablet_schema(&tablet_schema);
delete_conditions->append_condition(delete_condition);
ASSERT_EQ(OLAP_SUCCESS, delete_conditions->append_condition(delete_condition));

StorageReadOptions read_opts;
read_opts.stats = &stats;
Expand Down Expand Up @@ -574,7 +575,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
condition.__set_condition_values(vals);
std::shared_ptr<Conditions> conditions(new Conditions());
conditions->set_tablet_schema(&tablet_schema);
conditions->append_condition(condition);
ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));
read_opts.conditions = conditions.get();
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
Expand Down Expand Up @@ -764,6 +765,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
tablet_schema->_cols.push_back(create_char_key(2));
tablet_schema->_cols.push_back(create_varchar_key(3));
tablet_schema->_cols.push_back(create_varchar_key(4));
tablet_schema->init_field_index_for_test();

// segment write
std::string dname = "./ut_dir/segment_test";
Expand Down Expand Up @@ -912,7 +914,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
condition.__set_condition_values(vals);
std::shared_ptr<Conditions> conditions(new Conditions());
conditions->set_tablet_schema(tablet_schema.get());
conditions->append_condition(condition);
ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

StorageReadOptions read_opts;
read_opts.stats = &stats;
Expand Down Expand Up @@ -964,7 +966,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
condition.__set_condition_values(vals);
std::shared_ptr<Conditions> conditions(new Conditions());
conditions->set_tablet_schema(tablet_schema.get());
conditions->append_condition(condition);
ASSERT_EQ(OLAP_SUCCESS, conditions->append_condition(condition));

StorageReadOptions read_opts;
read_opts.stats = &stats;
Expand Down