Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 3 additions & 29 deletions be/src/olap/column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,13 @@ OLAPStatus ColumnData::prepare_block_read(
// ColumnData向上返回的列至少由几部分组成:
// 1. return_columns中要求返回的列,即Fetch命令中指定要查询的列.
// 2. condition中涉及的列, 绝大多数情况下这些列都已经在return_columns中.
// 3. start_keys和end_keys需要读取的列.
// 在这个函数里,合并上述几种情况
void ColumnData::set_read_params(
const std::vector<uint32_t>& return_columns,
const std::vector<uint32_t>& seek_solumns,
const std::set<uint32_t>& load_bf_columns,
const Conditions& conditions,
const std::vector<ColumnPredicate*>& col_predicates,
const std::vector<RowCursor*>& start_keys,
const std::vector<RowCursor*>& end_keys,
bool is_using_cache,
RuntimeState* runtime_state) {
_conditions = &conditions;
Expand All @@ -434,34 +432,10 @@ void ColumnData::set_read_params(
_is_using_cache = is_using_cache;
_runtime_state = runtime_state;
_return_columns = return_columns;
_seek_columns = seek_solumns;
_load_bf_columns = load_bf_columns;

std::unordered_set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());

for (auto& it : conditions.columns()) {
column_set.insert(it.first);
}

uint32_t max_key_column_count = 0;
for (auto key : start_keys) {
if (key->field_count() > max_key_column_count) {
max_key_column_count = key->field_count();
}
}

for (auto key : end_keys) {
if (key->field_count() > max_key_column_count) {
max_key_column_count = key->field_count();
}
}

for (uint32_t i = 0; i < _table->tablet_schema().size(); i++) {
if (i < max_key_column_count || column_set.find(i) != column_set.end()) {
_seek_columns.push_back(i);
}
}

auto res = _cursor.init(_table->tablet_schema());
auto res = _cursor.init(_table->tablet_schema(), _seek_columns);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init row_cursor");
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/column_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ class ColumnData {

void set_read_params(
const std::vector<uint32_t>& return_columns,
const std::vector<uint32_t>& seek_columns,
const std::set<uint32_t>& load_bf_columns,
const Conditions& conditions,
const std::vector<ColumnPredicate*>& col_predicates,
const std::vector<RowCursor*>& start_keys,
const std::vector<RowCursor*>& end_keys,
bool is_using_cache,
RuntimeState* runtime_state);

Expand Down Expand Up @@ -119,6 +118,7 @@ class ColumnData {
void set_segment_group(SegmentGroup* segment_group) { _segment_group = segment_group; }
int64_t num_rows() const { return _segment_group->num_rows(); }

const std::vector<uint32_t>& seek_columns() const { return _seek_columns; }
private:
DISALLOW_COPY_AND_ASSIGN(ColumnData);

Expand Down
35 changes: 32 additions & 3 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class CollectIterator {
}

OLAPStatus init() {
auto res = _row_cursor.init(_data->segment_group()->table()->tablet_schema());
auto res = _row_cursor.init(_data->segment_group()->table()->tablet_schema(),
_data->seek_columns());
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to init row cursor, res=" << res;
return res;
Expand Down Expand Up @@ -510,11 +511,10 @@ OLAPStatus Reader::_acquire_data_sources(const ReaderParams& read_params) {
}
i_data->set_delete_handler(_delete_handler);
i_data->set_read_params(_return_columns,
_seek_columns,
_load_bf_columns,
_conditions,
_col_predicates,
_keys_param.start_keys,
_keys_param.end_keys,
is_using_cache,
read_params.runtime_state);
if (i_data->delta_pruning_filter()) {
Expand Down Expand Up @@ -574,6 +574,11 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) {
OLAP_LOG_WARNING("fail to init return columns. [res=%d]", res);
return res;
}
res = _init_seek_columns();
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init seek columns. [res=%d]", res);
return res;
}

res = _init_keys_param(read_params);
if (res != OLAP_SUCCESS) {
Expand Down Expand Up @@ -639,6 +644,30 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) {
return OLAP_SUCCESS;
}

OLAPStatus Reader::_init_seek_columns() {
std::unordered_set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());
for (auto& it : _conditions.columns()) {
column_set.insert(it.first);
}
uint32_t max_key_column_count = 0;
for (auto key : _keys_param.start_keys) {
if (key->field_count() > max_key_column_count) {
max_key_column_count = key->field_count();
}
}
for (auto key : _keys_param.end_keys) {
if (key->field_count() > max_key_column_count) {
max_key_column_count = key->field_count();
}
}
for (uint32_t i = 0; i < _olap_table->tablet_schema().size(); i++) {
if (i < max_key_column_count || column_set.find(i) != column_set.end()) {
_seek_columns.push_back(i);
}
}
return OLAP_SUCCESS;
}

OLAPStatus Reader::_attach_data_to_merge_set(bool first, bool *eof) {
OLAPStatus res = OLAP_SUCCESS;
*eof = false;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class Reader {
OLAPStatus _init_delete_condition(const ReaderParams& read_params);

OLAPStatus _init_return_columns(const ReaderParams& read_params);
OLAPStatus _init_seek_columns();

OLAPStatus _init_load_bf_columns(const ReaderParams& read_params);

Expand All @@ -196,6 +197,7 @@ class Reader {
std::unique_ptr<MemPool> _predicate_mem_pool;
std::set<uint32_t> _load_bf_columns;
std::vector<uint32_t> _return_columns;
std::vector<uint32_t> _seek_columns;

Version _version;

Expand Down
13 changes: 9 additions & 4 deletions be/src/olap/row_cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/row_cursor.h"

#include <algorithm>
#include <unordered_set>

using std::min;
using std::nothrow;
Expand Down Expand Up @@ -102,12 +103,16 @@ OLAPStatus RowCursor::_init(const std::vector<FieldInfo>& tablet_schema,
_owned_fixed_buf = _fixed_buf;
memset(_fixed_buf, 0, _fixed_len);

// we must make sure that the offset is the same with RowBlock's
std::unordered_set<uint32_t> column_set(_columns.begin(), _columns.end());
_field_offsets.resize(tablet_schema.size(), -1);
size_t offset = 0;
for (auto cid : _columns) {
_field_offsets[cid] = offset;
_field_array[cid]->set_offset(offset);
offset += field_buf_lens[cid] + 1;
for (int cid = 0; cid < tablet_schema.size(); ++cid) {
if (column_set.find(cid) != std::end(column_set)) {
_field_offsets[cid] = offset;
_field_array[cid]->set_offset(offset);
offset += field_buf_lens[cid] + 1;
}
}

return OLAP_SUCCESS;
Expand Down