Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ Status OlapScanner::_init_return_columns() {
_return_columns.push_back(index);
_query_slots.push_back(slot);
}
// expand the sequence column
if (_tablet->tablet_schema().has_sequence_col()) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet->tablet_schema().column(col).aggregation() == FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
has_replace_col = true;
break;
}
}
if (has_replace_col) {
_return_columns.push_back(_tablet->tablet_schema().sequence_col_idx());
}
}

if (_return_columns.empty()) {
return Status::InternalError("failed to build storage scanner, no materialized slot!");
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
} else {
request.__set_strip_outer_array(false);
}
if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
request.__set_sequence_col(http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL));
}

if (ctx->timeout_second != -1) {
request.__set_timeout(ctx->timeout_second);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ static const std::string HTTP_JSONROOT = "json_root";
static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array";
static const std::string HTTP_MERGE_TYPE = "merge_type";
static const std::string HTTP_DELETE_CONDITION = "delete";
static const std::string HTTP_FUNCTION_COLUMN = "function_column";
static const std::string HTTP_SEQUENCE_COL = "sequence_col";

static const std::string HTTP_100_CONTINUE = "100-continue";

Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me

void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
if (_tablet_schema->has_sequence_col()) {
agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(), _table_mem_pool.get());
} else {
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
}
}

OLAPStatus MemTable::flush() {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ enum OLAPStatus {
OLAP_ERR_READER_GET_ITERATOR_ERROR = -701,
OLAP_ERR_CAPTURE_ROWSET_READER_ERROR = -702,
OLAP_ERR_READER_READING_ERROR = -703,
OLAP_ERR_READER_INITIALIZE_ERROR = -704,

// BaseCompaction
// [-800, -900)
Expand Down
20 changes: 19 additions & 1 deletion be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool
// in UNIQUE_KEY highest version is the final result, there is no need to
// merge the lower versions
direct_copy_row(row_cursor, *_next_key);
agg_finalize_row(_value_cids, row_cursor, mem_pool);
// skip the lower version rows;
while (nullptr != _next_key) {
auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
Expand All @@ -431,9 +430,15 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool
}
// break while can NOT doing aggregation
if (!equal_row(_key_cids, *row_cursor, *_next_key)) {
agg_finalize_row(_value_cids, row_cursor, mem_pool);
break;
}
++merged_count;
cur_delete_flag = _next_delete_flag;
// if has sequence column, the higher version need to merge the lower versions
if (_has_sequence_col) {
agg_update_row_with_sequence(_value_cids, row_cursor, *_next_key, _sequence_col_idx);
}
}

// if reader needs to filter delete row and current delete_flag is ture,
Expand Down Expand Up @@ -592,6 +597,19 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) {
_collect_iter = new CollectIterator();
_collect_iter->init(this);

if (_tablet->tablet_schema().has_sequence_col()) {
_sequence_col_idx = _tablet->tablet_schema().sequence_col_idx();
if (_sequence_col_idx != -1) {
for (auto col : _return_columns) {
// query has sequence col
if (col == _sequence_col_idx) {
_has_sequence_col = true;
break;
}
}
}
}

return res;
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ class Reader {
ReaderType _reader_type = READER_QUERY;
bool _next_delete_flag = false;
bool _filter_delete = false;
bool _has_sequence_col = false;
int32_t _sequence_col_idx = -1;
const RowCursor* _next_key = nullptr;
CollectIterator* _collect_iter = nullptr;
std::vector<uint32_t> _key_cids;
Expand Down
33 changes: 33 additions & 0 deletions be/src/olap/row.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,22 @@ void agg_update_row(DstRowType* dst, const SrcRowType& src, MemPool* mem_pool) {
}
}

template<typename DstRowType, typename SrcRowType>
void agg_update_row_with_sequence(DstRowType* dst, const SrcRowType& src, uint32_t sequence_idx, MemPool* mem_pool) {
auto seq_dst_cell = dst->cell(sequence_idx);
auto seq_src_cell = src.cell(sequence_idx);
auto res = src.schema()->column(sequence_idx)->compare_cell(seq_dst_cell, seq_src_cell);
// dst sequence column larger than src, don't need to update
if (res > 0) {
return;
}
for (uint32_t cid = dst->schema()->num_key_columns(); cid < dst->schema()->num_columns(); ++cid) {
auto dst_cell = dst->cell(cid);
auto src_cell = src.cell(cid);
dst->schema()->column(cid)->agg_update(&dst_cell, src_cell, mem_pool);
}
}

// Do aggregate update source row to destination row.
// This funcion will operate on given cids.
// TODO(zc): unify two versions of agg_update_row
Expand All @@ -168,6 +184,23 @@ void agg_update_row(const std::vector<uint32_t>& cids, DstRowType* dst, const Sr
}
}

template<typename DstRowType, typename SrcRowType>
void agg_update_row_with_sequence(const std::vector<uint32_t>& cids, DstRowType* dst, const SrcRowType& src,
const uint32_t sequence_idx) {
auto seq_dst_cell = dst->cell(sequence_idx);
auto seq_src_cell = src.cell(sequence_idx);
auto res = src.schema()->column(sequence_idx)->compare_cell(seq_dst_cell, seq_src_cell);
// dst sequence column larger than src, don't need to update
if (res > 0) {
return;
}
for (auto cid : cids) {
auto dst_cell = dst->cell(cid);
auto src_cell = src.cell(cid);
dst->schema()->column(cid)->agg_update(&dst_cell, src_cell);
}
}

template<typename RowType>
void agg_finalize_row(RowType* row, MemPool* mem_pool) {
for (uint32_t cid = row->schema()->num_key_columns(); cid < row->schema()->num_columns(); ++cid) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class Schema {
columns.push_back(column);
}
_delete_sign_idx = tablet_schema.delete_sign_idx();
if (tablet_schema.has_sequence_col()) {
_has_sequence_col = true;
}
_init(columns, col_ids, num_key_columns);
}

Expand Down Expand Up @@ -132,6 +135,7 @@ class Schema {
size_t num_column_ids() const { return _col_ids.size(); }
const std::vector<ColumnId>& column_ids() const { return _col_ids; }
int32_t delete_sign_idx() const { return _delete_sign_idx; }
bool has_sequence_col() const { return _has_sequence_col; }

private:
void _init(const std::vector<TabletColumn>& cols,
Expand All @@ -156,6 +160,7 @@ class Schema {
size_t _num_key_columns;
size_t _schema_size;
int32_t _delete_sign_idx = -1;
bool _has_sequence_col = false;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id,
TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
schema->set_sequence_col_idx(tablet_schema.sequence_col_idx);
switch(tablet_schema.keys_type) {
case TKeysType::DUP_KEYS:
schema->set_keys_type(KeysType::DUP_KEYS);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ class TabletMeta {
RWMutex _meta_lock;
};

static const std::string SEQUENCE_COL = "__DORIS_SEQUENCE_COL__";

inline TabletUid TabletMeta::tablet_uid() const {
return _tablet_uid;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <vector>

#include "olap/tablet_schema.h"
#include "tablet_meta.h"

namespace doris {

Expand Down Expand Up @@ -372,6 +373,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
}
_is_in_memory = schema.is_in_memory();
_delete_sign_idx = schema.delete_sign_idx();
_sequence_col_idx = schema.sequence_col_idx();
}

void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
Expand All @@ -389,6 +391,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
tablet_meta_pb->set_next_column_unique_id(_next_column_unique_id);
tablet_meta_pb->set_is_in_memory(_is_in_memory);
tablet_meta_pb->set_delete_sign_idx(_delete_sign_idx);
tablet_meta_pb->set_sequence_col_idx(_sequence_col_idx);
}

size_t TabletSchema::row_size() const {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class TabletSchema {
inline void set_is_in_memory(bool is_in_memory) { _is_in_memory = is_in_memory; }
inline int32_t delete_sign_idx() const { return _delete_sign_idx; }
inline void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; }
inline bool has_sequence_col() const { return _sequence_col_idx != -1; }
inline int32_t sequence_col_idx() const { return _sequence_col_idx; }

private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand All @@ -135,6 +137,7 @@ class TabletSchema {
double _bf_fpp = 0;
bool _is_in_memory = false;
int32_t _delete_sign_idx = -1;
int32_t _sequence_col_idx = -1;
};

bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand Down
131 changes: 131 additions & 0 deletions be/test/olap/delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,43 @@ void create_tablet_request(int64_t tablet_id, int32_t schema_hash, TCreateTablet
request->tablet_schema.columns.push_back(v10);
}

void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t schema_hash, TCreateTabletReq* request) {
request->tablet_id = tablet_id;
request->__set_version(1);
request->__set_version_hash(0);
request->tablet_schema.schema_hash = schema_hash;
request->tablet_schema.short_key_column_count = 2;
request->tablet_schema.keys_type = TKeysType::UNIQUE_KEYS;
request->tablet_schema.storage_type = TStorageType::COLUMN;
request->tablet_schema.__set_sequence_col_idx(2);

TColumn k1;
k1.column_name = "k1";
k1.__set_is_key(true);
k1.column_type.type = TPrimitiveType::TINYINT;
request->tablet_schema.columns.push_back(k1);

TColumn k2;
k2.column_name = "k2";
k2.__set_is_key(true);
k2.column_type.type = TPrimitiveType::SMALLINT;
request->tablet_schema.columns.push_back(k2);

TColumn sequence_col;
sequence_col.column_name = SEQUENCE_COL;
sequence_col.__set_is_key(false);
sequence_col.column_type.type = TPrimitiveType::INT;
sequence_col.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(sequence_col);

TColumn v1;
v1.column_name = "v1";
v1.__set_is_key(false);
v1.column_type.type = TPrimitiveType::DATETIME;
v1.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(v1);
}

TDescriptorTable create_descriptor_tablet() {
TDescriptorTableBuilder dtb;
TTupleDescriptorBuilder tuple_builder;
Expand Down Expand Up @@ -274,6 +311,23 @@ TDescriptorTable create_descriptor_tablet() {
return dtb.desc_tbl();
}

TDescriptorTable create_descriptor_tablet_with_sequence_col() {
TDescriptorTableBuilder dtb;
TTupleDescriptorBuilder tuple_builder;

tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_INT).column_name(SEQUENCE_COL).column_pos(2).build());
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("v1").column_pos(3).build());
tuple_builder.build(&dtb);

return dtb.desc_tbl();
}

class TestDeltaWriter : public ::testing::Test {
public:
TestDeltaWriter() { }
Expand Down Expand Up @@ -444,6 +498,83 @@ TEST_F(TestDeltaWriter, write) {
delete delta_writer;
}

TEST_F(TestDeltaWriter, sequence_col) {
TCreateTabletReq request;
create_tablet_request_with_sequence_col(10005, 270068377, &request);
OLAPStatus res = k_engine->create_tablet(request);
ASSERT_EQ(OLAP_SUCCESS, res);

TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
ObjectPool obj_pool;
DescriptorTbl* desc_tbl = nullptr;
DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();

PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
WriteRequest write_req = {10005, 270068377, WriteType::LOAD,
20003, 30003, load_id, false, tuple_desc,
&(tuple_desc->slots())};
DeltaWriter* delta_writer = nullptr;
DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
ASSERT_NE(delta_writer, nullptr);

MemTracker tracker;
MemPool pool(&tracker);
// Tuple 1
{
Tuple* tuple = reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
memset(tuple, 0, tuple_desc->byte_size());
*(int8_t*)(tuple->get_slot(slots[0]->tuple_offset())) = 123;
*(int16_t*)(tuple->get_slot(slots[1]->tuple_offset())) = 456;
*(int32_t*)(tuple->get_slot(slots[2]->tuple_offset())) = 1;
((DateTimeValue*)(tuple->get_slot(slots[3]->tuple_offset())))->from_date_str("2020-07-16 19:39:43", 19);

res = delta_writer->write(tuple);
ASSERT_EQ(OLAP_SUCCESS, res);
}

res = delta_writer->close();
ASSERT_EQ(OLAP_SUCCESS, res);
res = delta_writer->close_wait(nullptr);
ASSERT_EQ(OLAP_SUCCESS, res);

// publish version success
TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl;
OlapMeta* meta = tablet->data_dir()->get_meta();
Version version;
version.first = tablet->rowset_with_max_version()->end_version() + 1;
version.second = tablet->rowset_with_max_version()->end_version() + 1;
std::cout << "start to add rowset version:" << version.first << "-" << version.second << std::endl;
VersionHash version_hash = 2;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(write_req.txn_id, write_req.partition_id, &tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_rs.second;
res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id, write_req.schema_hash, tablet_rs.first.tablet_uid,
version, version_hash);
ASSERT_EQ(OLAP_SUCCESS, res);
std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows()
<< ", version:" << rowset->version().first << "-" << rowset->version().second
<< ", version_hash:" << rowset->version_hash()
<< std::endl;
res = tablet->add_inc_rowset(rowset);
ASSERT_EQ(OLAP_SUCCESS, res);
}
ASSERT_EQ(1, tablet->num_rows());

auto tablet_id = 10005;
auto schema_hash = 270068377;
res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
ASSERT_EQ(OLAP_SUCCESS, res);
delete delta_writer;
}

} // namespace doris

int main(int argc, char** argv) {
Expand Down
Loading