From ac52dd946f2ddde0309a7583e3f127d86817b4b5 Mon Sep 17 00:00:00 2001 From: Youngwb Date: Thu, 2 Jul 2020 15:24:39 +0800 Subject: [PATCH 1/6] add sequence col --- be/src/exec/olap_scanner.cpp | 14 ++ be/src/http/action/stream_load.cpp | 4 + be/src/http/http_common.h | 2 + be/src/olap/memtable.cpp | 6 +- be/src/olap/olap_define.h | 1 + be/src/olap/reader.cpp | 20 +- be/src/olap/reader.h | 2 + be/src/olap/row.h | 33 +++ be/src/olap/schema.h | 5 + be/src/olap/tablet_meta.cpp | 1 + be/src/olap/tablet_meta.h | 2 + be/src/olap/tablet_schema.cpp | 3 + be/src/olap/tablet_schema.h | 3 + be/test/olap/delta_writer_test.cpp | 131 ++++++++++++ be/test/olap/tablet_mgr_test.cpp | 50 +++++ be/test/olap/test_data/header.txt | 3 +- .../Data Definition/CREATE TABLE.md | 11 +- .../Data Manipulation/BROKER LOAD.md | 19 +- .../Data Manipulation/ROUTINE LOAD.md | 22 ++ .../Data Manipulation/STREAM LOAD.md | 11 +- fe/fe-core/src/main/cup/sql_parser.cup | 28 ++- .../doris/alter/MaterializedViewHandler.java | 14 ++ .../doris/analysis/CreateRoutineLoadStmt.java | 10 +- .../doris/analysis/DataDescription.java | 73 ++++++- .../doris/analysis/ImportSequenceStmt.java | 35 ++++ .../org/apache/doris/catalog/Catalog.java | 11 + .../java/org/apache/doris/catalog/Column.java | 9 + .../org/apache/doris/catalog/OlapTable.java | 41 ++++ .../doris/common/proc/IndexInfoProcDir.java | 9 + .../doris/common/util/PropertyAnalyzer.java | 25 +++ .../apache/doris/load/BrokerFileGroup.java | 12 ++ .../main/java/org/apache/doris/load/Load.java | 23 ++- .../apache/doris/load/RoutineLoadDesc.java | 14 +- .../apache/doris/load/loadv2/BulkLoadJob.java | 8 +- .../load/routineload/RoutineLoadJob.java | 19 +- .../apache/doris/planner/BrokerScanNode.java | 7 + .../doris/planner/StreamLoadPlanner.java | 7 + .../apache/doris/task/CreateReplicaTask.java | 5 + .../org/apache/doris/task/LoadTaskInfo.java | 1 + .../org/apache/doris/task/StreamLoadTask.java | 13 ++ .../doris/analysis/DataDescriptionTest.java | 84 +++++++- .../apache/doris/catalog/CreateTableTest.java | 29 +++ .../doris/load/loadv2/BrokerLoadJobTest.java | 4 +- .../routineload/KafkaRoutineLoadJobTest.java | 8 +- .../doris/planner/StreamLoadPlannerTest.java | 2 +- .../doris/planner/StreamLoadScanNodeTest.java | 191 +++++++++++++++++- gensrc/proto/olap_file.proto | 2 +- gensrc/thrift/AgentService.thrift | 1 + gensrc/thrift/FrontendService.thrift | 1 + 49 files changed, 989 insertions(+), 40 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/ImportSequenceStmt.java diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 38bbf7c6b52a44..11a4f51cb5f7db 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -224,6 +224,20 @@ Status OlapScanner::_init_return_columns() { _return_columns.push_back(index); _query_slots.push_back(slot); } + // expand the sequence column + if (_tablet->tablet_schema().has_sequence_col()) { + bool has_replace_col = false; + for (auto col : _return_columns) { + if (_tablet->tablet_schema().column(col).aggregation() == FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) { + has_replace_col = true; + break; + } + } + if (has_replace_col) { + _return_columns.push_back(_tablet->tablet_schema().sequence_col_idx()); + } + } + if (_return_columns.empty()) { return Status::InternalError("failed to build storage scanner, no materialized slot!"); } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 4dc6253d5449e7..262cc4ea75b36b 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -397,6 +397,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* } else { request.__set_strip_outer_array(false); } + if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { + request.__set_sequence_col(http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL)); + } + if (ctx->timeout_second != -1) { request.__set_timeout(ctx->timeout_second); } diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 911b54cf311741..1fb5426ca794de 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -41,6 +41,8 @@ static const std::string HTTP_JSONROOT = "json_root"; static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array"; static const std::string HTTP_MERGE_TYPE = "merge_type"; static const std::string HTTP_DELETE_CONDITION = "delete"; +static const std::string HTTP_FUNCTION_COLUMN = "function_column"; +static const std::string HTTP_SEQUENCE_COL = "sequence_col"; static const std::string HTTP_100_CONTINUE = "100-continue"; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 25a6a81912f811..b326c0a9fe476e 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -108,7 +108,11 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) { ContiguousRow dst_row(_schema, row_in_skiplist); - agg_update_row(&dst_row, src_row, _table_mem_pool.get()); + if (_tablet_schema->has_sequence_col()) { + agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(), _table_mem_pool.get()); + } else { + agg_update_row(&dst_row, src_row, _table_mem_pool.get()); + } } OLAPStatus MemTable::flush() { diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 1797fe1fa4f87e..1820117fd45a4a 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -212,6 +212,7 @@ enum OLAPStatus { OLAP_ERR_READER_GET_ITERATOR_ERROR = -701, OLAP_ERR_CAPTURE_ROWSET_READER_ERROR = -702, OLAP_ERR_READER_READING_ERROR = -703, + OLAP_ERR_READER_INITIALIZE_ERROR = -704, // BaseCompaction // [-800, -900) diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index c66c80e325df18..f925b1073a01d4 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -419,7 +419,6 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool // in UNIQUE_KEY highest version is the final result, there is no need to // merge the lower versions direct_copy_row(row_cursor, *_next_key); - agg_finalize_row(_value_cids, row_cursor, mem_pool); // skip the lower version rows; while (nullptr != _next_key) { auto res = _collect_iter->next(&_next_key, &_next_delete_flag); @@ -431,9 +430,15 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool } // break while can NOT doing aggregation if (!equal_row(_key_cids, *row_cursor, *_next_key)) { + agg_finalize_row(_value_cids, row_cursor, mem_pool); break; } ++merged_count; + cur_delete_flag = _next_delete_flag; + // if has sequence column, the higher version need to merge the lower versions + if (_has_sequence_col) { + agg_update_row_with_sequence(_value_cids, row_cursor, *_next_key, _sequence_col_idx); + } } // if reader needs to filter delete row and current delete_flag is ture, @@ -592,6 +597,19 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) { _collect_iter = new CollectIterator(); _collect_iter->init(this); + if (_tablet->tablet_schema().has_sequence_col()) { + _sequence_col_idx = _tablet->tablet_schema().sequence_col_idx(); + if (_sequence_col_idx != -1) { + for (auto col : _return_columns) { + // query has sequence col + if (col == _sequence_col_idx) { + _has_sequence_col = true; + break; + } + } + } + } + return res; } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 69a3faaf6fd0ce..d5d77ea6a43be1 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -225,6 +225,8 @@ class Reader { ReaderType _reader_type = READER_QUERY; bool _next_delete_flag = false; bool _filter_delete = false; + bool _has_sequence_col = false; + int32_t _sequence_col_idx = -1; const RowCursor* _next_key = nullptr; CollectIterator* _collect_iter = nullptr; std::vector _key_cids; diff --git a/be/src/olap/row.h b/be/src/olap/row.h index f24540e13ebff1..bfc11ef5c91fd2 100644 --- a/be/src/olap/row.h +++ b/be/src/olap/row.h @@ -156,6 +156,22 @@ void agg_update_row(DstRowType* dst, const SrcRowType& src, MemPool* mem_pool) { } } +template +void agg_update_row_with_sequence(DstRowType* dst, const SrcRowType& src, uint32_t sequence_idx, MemPool* mem_pool) { + auto seq_dst_cell = dst->cell(sequence_idx); + auto seq_src_cell = src.cell(sequence_idx); + auto res = src.schema()->column(sequence_idx)->compare_cell(seq_dst_cell, seq_src_cell); + // dst sequence column larger than src, don't need to update + if (res > 0) { + return; + } + for (uint32_t cid = dst->schema()->num_key_columns(); cid < dst->schema()->num_columns(); ++cid) { + auto dst_cell = dst->cell(cid); + auto src_cell = src.cell(cid); + dst->schema()->column(cid)->agg_update(&dst_cell, src_cell, mem_pool); + } +} + // Do aggregate update source row to destination row. // This funcion will operate on given cids. // TODO(zc): unify two versions of agg_update_row @@ -168,6 +184,23 @@ void agg_update_row(const std::vector& cids, DstRowType* dst, const Sr } } +template +void agg_update_row_with_sequence(const std::vector& cids, DstRowType* dst, const SrcRowType& src, + const uint32_t sequence_idx) { + auto seq_dst_cell = dst->cell(sequence_idx); + auto seq_src_cell = src.cell(sequence_idx); + auto res = src.schema()->column(sequence_idx)->compare_cell(seq_dst_cell, seq_src_cell); + // dst sequence column larger than src, don't need to update + if (res > 0) { + return; + } + for (auto cid : cids) { + auto dst_cell = dst->cell(cid); + auto src_cell = src.cell(cid); + dst->schema()->column(cid)->agg_update(&dst_cell, src_cell); + } +} + template void agg_finalize_row(RowType* row, MemPool* mem_pool) { for (uint32_t cid = row->schema()->num_key_columns(); cid < row->schema()->num_columns(); ++cid) { diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h index bc7f98b08123c6..63b858c35cbab6 100644 --- a/be/src/olap/schema.h +++ b/be/src/olap/schema.h @@ -54,6 +54,9 @@ class Schema { columns.push_back(column); } _delete_sign_idx = tablet_schema.delete_sign_idx(); + if (tablet_schema.has_sequence_col()) { + _has_sequence_col = true; + } _init(columns, col_ids, num_key_columns); } @@ -132,6 +135,7 @@ class Schema { size_t num_column_ids() const { return _col_ids.size(); } const std::vector& column_ids() const { return _col_ids; } int32_t delete_sign_idx() const { return _delete_sign_idx; } + bool has_sequence_col() const { return _has_sequence_col; } private: void _init(const std::vector& cols, @@ -156,6 +160,7 @@ class Schema { size_t _num_key_columns; size_t _schema_size; int32_t _delete_sign_idx = -1; + bool _has_sequence_col = false; }; } // namespace doris diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 5fb25ce27dfcf1..7bc5c14b44c7c8 100755 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -95,6 +95,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, TabletSchemaPB* schema = tablet_meta_pb.mutable_schema(); schema->set_num_short_key_columns(tablet_schema.short_key_column_count); schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block); + schema->set_sequence_col_idx(tablet_schema.sequence_col_idx); switch(tablet_schema.keys_type) { case TKeysType::DUP_KEYS: schema->set_keys_type(KeysType::DUP_KEYS); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 8ae8ccdd144b91..410c12d3df6fe9 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -238,6 +238,8 @@ class TabletMeta { RWMutex _meta_lock; }; +static const std::string SEQUENCE_COL = "__DORIS_SEQUENCE_COL__"; + inline TabletUid TabletMeta::tablet_uid() const { return _tablet_uid; } diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index e225365198dd84..02e9c5e89cfe3b 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -18,6 +18,7 @@ #include #include "olap/tablet_schema.h" +#include "tablet_meta.h" namespace doris { @@ -372,6 +373,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { } _is_in_memory = schema.is_in_memory(); _delete_sign_idx = schema.delete_sign_idx(); + _sequence_col_idx = schema.sequence_col_idx(); } void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) { @@ -389,6 +391,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) { tablet_meta_pb->set_next_column_unique_id(_next_column_unique_id); tablet_meta_pb->set_is_in_memory(_is_in_memory); tablet_meta_pb->set_delete_sign_idx(_delete_sign_idx); + tablet_meta_pb->set_sequence_col_idx(_sequence_col_idx); } size_t TabletSchema::row_size() const { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 2861164812bbf7..da7aedfaf1ad9d 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -115,6 +115,8 @@ class TabletSchema { inline void set_is_in_memory(bool is_in_memory) { _is_in_memory = is_in_memory; } inline int32_t delete_sign_idx() const { return _delete_sign_idx; } inline void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; } + inline bool has_sequence_col() const { return _sequence_col_idx != -1; } + inline int32_t sequence_col_idx() const { return _sequence_col_idx; } private: friend bool operator==(const TabletSchema& a, const TabletSchema& b); @@ -135,6 +137,7 @@ class TabletSchema { double _bf_fpp = 0; bool _is_in_memory = false; int32_t _delete_sign_idx = -1; + int32_t _sequence_col_idx = -1; }; bool operator==(const TabletSchema& a, const TabletSchema& b); diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index b6aa07e9e7208f..1a6418e0cb7354 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -224,6 +224,43 @@ void create_tablet_request(int64_t tablet_id, int32_t schema_hash, TCreateTablet request->tablet_schema.columns.push_back(v10); } +void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t schema_hash, TCreateTabletReq* request) { + request->tablet_id = tablet_id; + request->__set_version(1); + request->__set_version_hash(0); + request->tablet_schema.schema_hash = schema_hash; + request->tablet_schema.short_key_column_count = 2; + request->tablet_schema.keys_type = TKeysType::UNIQUE_KEYS; + request->tablet_schema.storage_type = TStorageType::COLUMN; + request->tablet_schema.__set_sequence_col_idx(2); + + TColumn k1; + k1.column_name = "k1"; + k1.__set_is_key(true); + k1.column_type.type = TPrimitiveType::TINYINT; + request->tablet_schema.columns.push_back(k1); + + TColumn k2; + k2.column_name = "k2"; + k2.__set_is_key(true); + k2.column_type.type = TPrimitiveType::SMALLINT; + request->tablet_schema.columns.push_back(k2); + + TColumn sequence_col; + sequence_col.column_name = SEQUENCE_COL; + sequence_col.__set_is_key(false); + sequence_col.column_type.type = TPrimitiveType::INT; + sequence_col.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(sequence_col); + + TColumn v1; + v1.column_name = "v1"; + v1.__set_is_key(false); + v1.column_type.type = TPrimitiveType::DATETIME; + v1.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v1); +} + TDescriptorTable create_descriptor_tablet() { TDescriptorTableBuilder dtb; TTupleDescriptorBuilder tuple_builder; @@ -274,6 +311,23 @@ TDescriptorTable create_descriptor_tablet() { return dtb.desc_tbl(); } +TDescriptorTable create_descriptor_tablet_with_sequence_col() { + TDescriptorTableBuilder dtb; + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_INT).column_name(SEQUENCE_COL).column_pos(2).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("v1").column_pos(3).build()); + tuple_builder.build(&dtb); + + return dtb.desc_tbl(); +} + class TestDeltaWriter : public ::testing::Test { public: TestDeltaWriter() { } @@ -444,6 +498,83 @@ TEST_F(TestDeltaWriter, write) { delete delta_writer; } +TEST_F(TestDeltaWriter, sequence_col) { + TCreateTabletReq request; + create_tablet_request_with_sequence_col(10005, 270068377, &request); + OLAPStatus res = k_engine->create_tablet(request); + ASSERT_EQ(OLAP_SUCCESS, res); + + TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + const std::vector& slots = tuple_desc->slots(); + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + WriteRequest write_req = {10005, 270068377, WriteType::LOAD, + 20003, 30003, load_id, false, tuple_desc, + &(tuple_desc->slots())}; + DeltaWriter* delta_writer = nullptr; + DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); + ASSERT_NE(delta_writer, nullptr); + + MemTracker tracker; + MemPool pool(&tracker); + // Tuple 1 + { + Tuple* tuple = reinterpret_cast(pool.allocate(tuple_desc->byte_size())); + memset(tuple, 0, tuple_desc->byte_size()); + *(int8_t*)(tuple->get_slot(slots[0]->tuple_offset())) = 123; + *(int16_t*)(tuple->get_slot(slots[1]->tuple_offset())) = 456; + *(int32_t*)(tuple->get_slot(slots[2]->tuple_offset())) = 1; + ((DateTimeValue*)(tuple->get_slot(slots[3]->tuple_offset())))->from_date_str("2020-07-16 19:39:43", 19); + + res = delta_writer->write(tuple); + ASSERT_EQ(OLAP_SUCCESS, res); + } + + res = delta_writer->close(); + ASSERT_EQ(OLAP_SUCCESS, res); + res = delta_writer->close_wait(nullptr); + ASSERT_EQ(OLAP_SUCCESS, res); + + // publish version success + TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; + OlapMeta* meta = tablet->data_dir()->get_meta(); + Version version; + version.first = tablet->rowset_with_max_version()->end_version() + 1; + version.second = tablet->rowset_with_max_version()->end_version() + 1; + std::cout << "start to add rowset version:" << version.first << "-" << version.second << std::endl; + VersionHash version_hash = 2; + std::map tablet_related_rs; + StorageEngine::instance()->txn_manager()->get_txn_related_tablets(write_req.txn_id, write_req.partition_id, &tablet_related_rs); + for (auto& tablet_rs : tablet_related_rs) { + std::cout << "start to publish txn" << std::endl; + RowsetSharedPtr rowset = tablet_rs.second; + res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, write_req.schema_hash, tablet_rs.first.tablet_uid, + version, version_hash); + ASSERT_EQ(OLAP_SUCCESS, res); + std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() + << ", version:" << rowset->version().first << "-" << rowset->version().second + << ", version_hash:" << rowset->version_hash() + << std::endl; + res = tablet->add_inc_rowset(rowset); + ASSERT_EQ(OLAP_SUCCESS, res); + } + ASSERT_EQ(1, tablet->num_rows()); + + auto tablet_id = 10005; + auto schema_hash = 270068377; + res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + ASSERT_EQ(OLAP_SUCCESS, res); + delete delta_writer; +} + } // namespace doris int main(int argc, char** argv) { diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index 297de4c3a42414..aaf18c5e06c211 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -144,6 +144,56 @@ TEST_F(TabletMgrTest, CreateTablet) { ASSERT_TRUE(create_st == OLAP_ERR_CE_TABLET_ID_EXIST); } +TEST_F(TabletMgrTest, CreateTabletWithSequence) { + std::vector cols; + TColumn col1; + col1.column_type.type = TPrimitiveType::SMALLINT; + col1.__set_column_name("col1"); + col1.__set_is_key(true); + cols.push_back(col1); + + TColumn col2; + col2.column_type.type = TPrimitiveType::INT; + col2.__set_column_name(SEQUENCE_COL); + col2.__set_is_key(false); + col2.__set_aggregation_type(TAggregationType::REPLACE); + cols.push_back(col2); + + TColumn col3; + col3.column_type.type = TPrimitiveType::INT; + col3.__set_column_name("v1"); + col3.__set_is_key(false); + col3.__set_aggregation_type(TAggregationType::REPLACE); + cols.push_back(col3); + + + TTabletSchema tablet_schema; + tablet_schema.__set_short_key_column_count(1); + tablet_schema.__set_schema_hash(3333); + tablet_schema.__set_keys_type(TKeysType::UNIQUE_KEYS); + tablet_schema.__set_storage_type(TStorageType::COLUMN); + tablet_schema.__set_columns(cols); + tablet_schema.__set_sequence_col_idx(1); + + TCreateTabletReq create_tablet_req; + create_tablet_req.__set_tablet_schema(tablet_schema); + create_tablet_req.__set_tablet_id(111); + create_tablet_req.__set_version(2); + create_tablet_req.__set_version_hash(3333); + vector data_dirs; + data_dirs.push_back(_data_dir); + OLAPStatus create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs); + ASSERT_TRUE(create_st == OLAP_SUCCESS); + TabletSharedPtr tablet = _tablet_mgr->get_tablet(111, 3333); + ASSERT_TRUE(tablet != nullptr); + // check dir exist + bool dir_exist = FileUtils::check_exist(tablet->tablet_path()); + ASSERT_TRUE(dir_exist); + // check meta has this tablet + TabletMetaSharedPtr new_tablet_meta(new TabletMeta()); + OLAPStatus check_meta_st = TabletMetaManager::get_meta(_data_dir, 111, 3333, new_tablet_meta); + ASSERT_TRUE(check_meta_st == OLAP_SUCCESS); +} TEST_F(TabletMgrTest, DropTablet) { TColumnType col_type; diff --git a/be/test/olap/test_data/header.txt b/be/test/olap/test_data/header.txt index 5131620b8f5c8f..42edd2a168c7f1 100644 --- a/be/test/olap/test_data/header.txt +++ b/be/test/olap/test_data/header.txt @@ -50,7 +50,8 @@ "compress_kind": "COMPRESS_LZ4", "next_column_unique_id": 3, "is_in_memory": false, - "delete_sign_idx": -1 + "delete_sign_idx": -1, + "has_sequence_col": false }, "rs_metas": [ { diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 035a5ee298a413..ca223b0de4f731 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -253,7 +253,7 @@ under the License. 当表为两级分区时,这些属性为附属于每一个分区。 如果希望不同分区有不同属性。可以通过 ADD PARTITION 或 MODIFY PARTITION 进行操作 - 2 如果 Engine 类型为 olap, 可以指定某列使用 bloom filter 索引 + 2) 如果 Engine 类型为 olap, 可以指定某列使用 bloom filter 索引 bloom filter 索引仅适用于查询条件为 in 和 equal 的情况,该列的值越分散效果越好 目前只支持以下情况的列:除了 TINYINT FLOAT DOUBLE 类型以外的 key 列及聚合方法为 REPLACE 的 value 列 @@ -305,6 +305,15 @@ under the License. ) ``` 当 in_memory 属性为 true 时,Doris会尽可能将该表的数据和索引Cache到BE 内存中 + + 7) 创建UNIQUE_KEYS表时,可以指定一个sequence列,当KEY列相同时,将按照sequence列进行REPLACE(较大值替换较小值,否则无法替换) + +``` + PROPERTIES ( + "function_column.sequence_type" = 'Date', + ); +``` + sequence_type用来指定sequence列的类型,可以为整型和时间类型 ## example 1. 创建一个 olap 表,使用 HASH 分桶,使用列存,相同key的记录进行聚合 diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 2d7cf2fe879a47..bedf1a35a0e5cb 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -71,6 +71,7 @@ under the License. [SET (k1 = func(k2))] [WHERE predicate] [DELETE ON label=true] + [ORDER BY source_sequence] 说明: file_path: @@ -121,6 +122,11 @@ under the License. delete_on_predicates: 表示删除条件,仅在 merge type 为MERGE 时有意义,语法与where 相同 + + ORDER BY: + + 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。 + 3. broker_name 所使用的 broker 名称,可以通过 show broker 命令查看。 @@ -460,7 +466,18 @@ under the License. "timeout" = "3600", "max_filter_ratio" = "0.1" ); - + + 14. 导入时指定source_sequence列,保证UNIQUE_KEYS表中的替换顺序: + LOAD LABEL example_db.label_sequence + ( + DATA INFILE("hdfs://host:port/user/data/*/test.txt") + INTO TABLE `tbl1` + COLUMNS TERMINATED BY "," + (k1,k2,source_sequence,v1,v2) + ORDER BY source_sequence + ) + with BROKER "hdfs" ("username"="user", "password"="pass"); + ## keyword BROKER,LOAD diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 36c74011417339..920550a4ef092c 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -54,6 +54,7 @@ under the License. [columns_mapping], [where_predicates], [delete_on_predicates], + [source_sequence], [partitions] 1. column_separator: @@ -98,11 +99,17 @@ under the License. 示例: PARTITION(p1, p2, p3) + 5. merge_type 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete on条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 语法为[WITH MERGE|APPEND|DELETE] + 6. delete_on_predicates 表示删除条件,仅在 merge type 为MERGE 时有意义,语法与where 相同 + 7. source_sequence: + + 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。 + 4. job_properties 用于指定例行导入作业的通用参数。 @@ -437,6 +444,7 @@ under the License. {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387} ] } + 7. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。并且删除与v3 >100 行相匹配的key列的行 CREATE ROUTINE LOAD example_db.test1 ON example_tbl @@ -453,12 +461,26 @@ under the License. "strict_mode" = "false" ) FROM KAFKA + + 8. 导入数据到含有sequence列的UNIQUE_KEYS表中 + CREATE ROUTINE LOAD example_db.test_job ON example_tbl + COLUMNS TERMINATED BY ",", + COLUMNS(k1,k2,source_sequence,v1,v2), + ORDER BY source_sequence + PROPERTIES + ( + "desired_concurrent_number"="3", + "max_batch_interval" = "30", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" ); + ## keyword CREATE,ROUTINE,LOAD diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 7893d524d5ed69..82ebd447f92294 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -90,9 +90,12 @@ under the License. 当strip_outer_array为true,最后导入到doris中会生成两行数据。 json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。 + merge_type: 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 示例:`-H "merge_type: MERGE" -H "delete: flag=1"` delete: 仅在 MERGE下有意义, 表示数据的删除条件 - + + function_column.sequence_col: 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, + source_sequence可以是数据源中的列,也可以是表结构中的一列。 RETURN VALUES 导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段 @@ -149,6 +152,7 @@ under the License. 10. 简单模式,导入json数据 表结构: + `category` varchar(512) NULL COMMENT "", `author` varchar(512) NULL COMMENT "", `title` varchar(512) NULL COMMENT "", @@ -188,11 +192,14 @@ under the License. } 通过指定jsonpath进行精准导入,例如只导入category、author、price三个属性 curl --location-trusted -u root -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/testDb/testTbl/_stream_load + 13. 删除与这批导入key 相同的数据 curl --location-trusted -u root -H "merge_type: DELETE" -T testData http://host:port/api/testDb/testTbl/_stream_load 14. 将这批数据中与flag 列为ture 的数据相匹配的列删除,其他行正常追加 curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T testData http://host:port/api/testDb/testTbl/_stream_load - + + 15. 导入数据到含有sequence列的UNIQUE_KEYS表中 + curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load ## keyword STREAM,LOAD diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index fcd265a1b929f9..c7d073bf8a74bb 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -287,7 +287,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, describe_stmt, alter_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, - import_columns_stmt, import_delete_on_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt; + import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt; nonterminal ImportColumnDesc import_column_desc; nonterminal List import_column_descs; @@ -344,6 +344,7 @@ nonterminal FunctionName function_name; nonterminal Expr where_clause; nonterminal Expr delete_on_clause; nonterminal Expr where_clause_without_null; +nonterminal String sequence_col_clause; nonterminal Predicate predicate, between_predicate, comparison_predicate, compound_predicate, in_predicate, like_predicate, exists_predicate; nonterminal ArrayList opt_partition_by_clause; @@ -535,6 +536,10 @@ stmts ::= {: RESULT = Lists.newArrayList(stmt); :} + | import_sequence_stmt:stmt + {: + RESULT = Lists.newArrayList(stmt); + :} ; import_columns_stmt ::= @@ -581,6 +586,13 @@ import_delete_on_stmt ::= :} ; +import_sequence_stmt ::= + KW_ORDER KW_BY ident:s + {: + RESULT = new ImportSequenceStmt(s); + :} + ; + stmt ::= alter_stmt:stmt {: RESULT = stmt; :} @@ -1345,9 +1357,10 @@ data_desc ::= opt_col_mapping_list:colMappingList where_clause:whereExpr delete_on_clause:deleteExpr + sequence_col_clause:sequenceColName {: RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, - columnsFromPath, isNeg, colMappingList, whereExpr, mergeType, deleteExpr); + columnsFromPath, isNeg, colMappingList, whereExpr, mergeType, deleteExpr, sequenceColName); :} | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName opt_negative:isNeg @@ -1543,6 +1556,10 @@ load_property ::= {: RESULT = deletePredicate; :} + | import_sequence_stmt:sequenceColumn + {: + RESULT = sequenceColumn; + :} | partition_names:partitionNames {: RESULT = partitionNames; @@ -3668,6 +3685,13 @@ delete_on_clause ::= {: RESULT = e; :} ; +sequence_col_clause ::= +/* empty */ +{: RESULT = null; :} +| KW_ORDER KW_BY ident:s +{: RESULT = s; :} +; + where_clause_without_null ::= KW_WHERE expr:e {: RESULT = e; :} diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index f839076d56abb1..aae7558b26a217 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -506,6 +506,7 @@ public List checkAndPrepareMaterializedView(AddRollupClause addRollupCla // a. all columns should exist in base rollup schema // b. value after key // c. if rollup contains REPLACE column, all keys on base index should be included. + // d. if base index has sequence column for unique_keys, rollup should add the sequence column List rollupSchema = new ArrayList(); // check (a)(b) boolean meetValue = false; @@ -552,6 +553,19 @@ public List checkAndPrepareMaterializedView(AddRollupClause addRollupCla } } } + if (KeysType.UNIQUE_KEYS == keysType && olapTable.hasSequenceCol()) { + if (meetValue) { + // check sequence column already exist in the rollup schema + for (Column col : rollupSchema) { + if (col.isSequenceColumn()) { + throw new DdlException("sequence column already exist in the Rollup schema"); + } + } + // add the sequence column + rollupSchema.add(keysNumOfRollup, new Column(Column.SEQUENCE_COL, olapTable.getSequenceType(), + false, AggregateType.REPLACE, null, "")); + } + } } else if (KeysType.DUP_KEYS == keysType) { // supplement the duplicate key if (addRollupClause.getDupKeys() == null || addRollupClause.getDupKeys().isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 7e98df4b5b0224..f617b4b97109ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -306,6 +306,7 @@ public void checkLoadProperties() throws UserException { ColumnSeparator columnSeparator = null; ImportColumnsStmt importColumnsStmt = null; ImportWhereStmt importWhereStmt = null; + ImportSequenceStmt importSequenceStmt = null; PartitionNames partitionNames = null; ImportDeleteOnStmt importDeleteOnStmt = null; for (ParseNode parseNode : loadPropertyList) { @@ -341,10 +342,17 @@ public void checkLoadProperties() throws UserException { throw new AnalysisException("repeat setting of delete predicate"); } importDeleteOnStmt = (ImportDeleteOnStmt) parseNode; + } else if (parseNode instanceof ImportSequenceStmt) { + // check sequence column + if (importSequenceStmt != null) { + throw new AnalysisException("repeat setting of sequence column"); + } + importSequenceStmt = (ImportSequenceStmt) parseNode; } } routineLoadDesc = new RoutineLoadDesc(columnSeparator, importColumnsStmt, importWhereStmt, - partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType); + partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType, + importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName()); } private void checkJobProperties() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 228481876e053e..94a490b4e6e713 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -20,7 +20,10 @@ import org.apache.doris.analysis.BinaryPredicate.Operator; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.FunctionSet; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; @@ -109,6 +112,8 @@ public class DataDescription { private TNetworkAddress beAddr; private String lineDelimiter; + private String sequenceCol; + // Merged from fileFieldNames, columnsFromPath and columnMappingList // ImportColumnDesc: column name to (expr or null) private List parsedColumnExprList = Lists.newArrayList(); @@ -133,7 +138,7 @@ public DataDescription(String tableName, boolean isNegative, List columnMappingList) { this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, - isNegative, columnMappingList, null, LoadTask.MergeType.APPEND, null); + isNegative, columnMappingList, null, LoadTask.MergeType.APPEND, null, null); } public DataDescription(String tableName, @@ -147,7 +152,8 @@ public DataDescription(String tableName, List columnMappingList, Expr whereExpr, LoadTask.MergeType mergeType, - Expr deleteCondition) { + Expr deleteCondition, + String sequenceColName) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = filePaths; @@ -161,6 +167,7 @@ public DataDescription(String tableName, this.srcTableName = null; this.mergeType = mergeType; this.deleteCondition = deleteCondition; + this.sequenceCol = sequenceColName; } // data from table external_hive_table @@ -256,6 +263,14 @@ public void setLineDelimiter(String lineDelimiter) { this.lineDelimiter = lineDelimiter; } + public String getSequenceCol() { + return sequenceCol; + } + + public boolean hasSequenceCol() { + return !Strings.isNullOrEmpty(sequenceCol); + } + @Deprecated public void addColumnMapping(String functionName, Pair> pair) { if (Strings.isNullOrEmpty(functionName) || pair == null) { @@ -408,6 +423,55 @@ private void analyzeColumnToHadoopFunction(String columnName, Expr child1) throw columnToHadoopFunction.put(columnName, functionPair); } + private void analyzeSequenceCol(String fullDbName) throws AnalysisException { + Database db = Catalog.getCurrentCatalog().getDb(fullDbName); + if (db == null) { + throw new AnalysisException("Database[" + fullDbName + "] does not exist"); + } + Table table = db.getTable(tableName); + if (table == null) { + throw new AnalysisException("Unknown table " + tableName + + " in database " + db.getFullName()); + } + if (!(table instanceof OlapTable)) { + throw new AnalysisException("Table " + table.getName() + " is not OlapTable"); + } + OlapTable olapTable = (OlapTable) table; + // no sequence column in load and table schema + if (!hasSequenceCol() && !olapTable.hasSequenceCol()) { + return; + } + // check olapTable schema and sequenceCol + if (olapTable.hasSequenceCol() && !hasSequenceCol()) { + throw new AnalysisException("Table " + table.getName() + " has sequence column, need to specify the sequence column"); + } + if (hasSequenceCol() && !olapTable.hasSequenceCol()) { + throw new AnalysisException("There is no sequence column in the table " + table.getName()); + } + // check source sequence column is in parsedColumnExprList or Table base schema + boolean hasSourceSequenceCol = false; + if (!parsedColumnExprList.isEmpty()) { + for (ImportColumnDesc importColumnDesc : parsedColumnExprList) { + if (importColumnDesc.getColumnName().equals(sequenceCol)) { + hasSourceSequenceCol = true; + break; + } + } + } else { + List columns = olapTable.getBaseSchema(); + for (Column column : columns) { + if (column.getName().equals(sequenceCol)) { + hasSourceSequenceCol = true; + break; + } + } + } + if (!hasSourceSequenceCol) { + throw new AnalysisException("There is no sequence column " + sequenceCol + " in the " + table.getName() + + " or the COLUMNS and SET clause"); + } + } + public static void validateMappingFunction(String functionName, List args, Map columnNameMap, Column mappingColumn, boolean isHadoopLoad) throws AnalysisException { @@ -616,13 +680,13 @@ public void analyze(String fullDbName) throws AnalysisException { throw new AnalysisException("not support MERGE or DELETE with NEGATIVE"); } checkLoadPriv(fullDbName); - analyzeWithoutCheckPriv(); + analyzeWithoutCheckPriv(fullDbName); if (isNegative && mergeType != LoadTask.MergeType.APPEND) { throw new AnalysisException("Negative is only used when merge type is append."); } } - public void analyzeWithoutCheckPriv() throws AnalysisException { + public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException { if (!isLoadFromTable()) { if (filePaths == null || filePaths.isEmpty()) { throw new AnalysisException("No file path in load statement."); @@ -641,6 +705,7 @@ public void analyzeWithoutCheckPriv() throws AnalysisException { } analyzeColumns(); + analyzeSequenceCol(fullDbName); } /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportSequenceStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportSequenceStmt.java new file mode 100644 index 00000000000000..01addb9af57052 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportSequenceStmt.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +public class ImportSequenceStmt extends StatementBase { + private String sequenceColName; + + public ImportSequenceStmt(String sequenceColName) { + this.sequenceColName = sequenceColName; + } + + public String getSequenceColName() { + return sequenceColName; + } + + @Override + public RedirectStatus getRedirectStatus() { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index b9224ed930e38e..1334435c6e12dd 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3709,6 +3709,17 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws DdlExcept rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType); } + // analyse sequence column + Type sequenceColType = null; + try { + sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType()); + if (sequenceColType != null) { + olapTable.setSequenceInfo(sequenceColType); + } + } catch (Exception e) { + throw new DdlException(e.getMessage()); + } + // analyze version info Pair versionInfo = null; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index befc8ca3ac8cb7..ea8449c5c747b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -48,6 +48,7 @@ public class Column implements Writable { private static final Logger LOG = LogManager.getLogger(Column.class); public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__"; + public static final String SEQUENCE_COL = "__DORIS_SEQUENCE_COL__"; @SerializedName(value = "name") private String name; @SerializedName(value = "type") @@ -185,10 +186,18 @@ public boolean isVisible() { return visible; } + public void setIsVisible(boolean isVisible) { + this.visible = isVisible; + } + public boolean isDeleteSignColumn() { return !visible && aggregationType == AggregateType.REPLACE && nameEquals(DELETE_SIGN, true); } + public boolean isSequenceColumn() { + return !visible && aggregationType == AggregateType.REPLACE && nameEquals(SEQUENCE_COL, true); + } + public PrimitiveType getDataType() { return type.getPrimitiveType(); } public Type getType() { return ScalarType.createType(type.getPrimitiveType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 68de384983d29d..aaad51e7320d7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -119,6 +119,9 @@ public enum OlapTableState { private String colocateGroup; + private boolean hasSequenceCol; + private Type sequenceType; + private TableIndexes indexes; // In former implementation, base index id is same as table id. @@ -143,6 +146,8 @@ public OlapTable() { this.indexes = null; this.tableProperty = null; + + this.hasSequenceCol = false; } public OlapTable(long id, String tableName, List baseSchema, KeysType keysType, @@ -796,6 +801,42 @@ public void setBloomFilterInfo(Set bfColumns, double bfFpp) { this.bfFpp = bfFpp; } + public void setSequenceInfo(Type type) { + this.hasSequenceCol = true; + this.sequenceType = type; + + // sequence column is value column with REPLACE aggregate type + Column sequenceCol = new Column(Column.SEQUENCE_COL, type, false, AggregateType.REPLACE, false, null, "", false); + // add sequence column at last + fullSchema.add(sequenceCol); + nameToColumn.put(Column.SEQUENCE_COL, sequenceCol); + for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) { + List schema = indexMeta.getSchema(); + schema.add(sequenceCol); + } + } + + public Column getSequenceCol() { + for (Column column : getBaseSchema()) { + if (column.isSequenceColumn()) { + return column; + } + } + return null; + } + + public Boolean hasSequenceCol() { + return getSequenceCol() != null; + } + + public Type getSequenceType() { + if (getSequenceCol() == null) { + return null; + } else { + return getSequenceCol().getType(); + } + } + public void setIndexes(List indexes) { if (this.indexes == null) { this.indexes = new TableIndexes(null); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java index ca5c4cdf98b070..4d29a8816bb4c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java @@ -129,6 +129,15 @@ public ProcNodeInterface lookup(String idxIdStr) throws AnalysisException { throw new AnalysisException("Index " + idxId + " does not exist"); } bfColumns = olapTable.getCopiedBfColumns(); + // sequence col is the hidden column + if (olapTable.hasSequenceCol()) { + for (Column column : schema) { + if (column.isSequenceColumn()) { + schema.remove(column); + break; + } + } + } } else { schema = table.getBaseSchema(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 178d80cf9f6a8e..184c336616bc64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -21,8 +21,10 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DataProperty; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -86,6 +88,9 @@ public class PropertyAnalyzer { public static final String PROPERTIES_USE_TEMP_PARTITION_NAME = "use_temp_partition_name"; public static final String PROPERTIES_TYPE = "type"; + // This is common prefix for function column + public static final String PROPERTIES_FUNCTION_COLUMN = "function_column"; + public static final String PROPERTIES_SEQUENCE_TYPE = "sequence_type"; public static DataProperty analyzeDataProperty(Map properties, DataProperty oldDataProperty) throws AnalysisException { @@ -433,4 +438,24 @@ public static String analyzeType(Map properties) throws Analysis } return type; } + + public static Type analyzeSequenceType(Map properties, KeysType keysType) throws AnalysisException{ + String typeStr = null; + String propertyName = PROPERTIES_FUNCTION_COLUMN + "." + PROPERTIES_SEQUENCE_TYPE; + if (properties != null && properties.containsKey(propertyName)) { + typeStr = properties.get(propertyName); + properties.remove(propertyName); + } + if (typeStr == null) { + return null; + } + if (typeStr != null && keysType != KeysType.UNIQUE_KEYS) { + throw new AnalysisException("sequence column only support UNIQUE_KEYS"); + } + PrimitiveType type = PrimitiveType.valueOf(typeStr.toUpperCase()); + if (!type.isFixedPointType() && !type.isDateType()) { + throw new AnalysisException("sequence type only support integer types and date types"); + } + return ScalarType.createType(type); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 2e064437dcbf7f..efd8022d40ffef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -17,6 +17,7 @@ package org.apache.doris.load; +import com.google.common.base.Strings; import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; @@ -81,6 +82,8 @@ public class BrokerFileGroup implements Writable { private Expr whereExpr; private Expr deleteCondition; private LoadTask.MergeType mergeType; + // sequence column name + private String sequenceCol; // load from table private long srcTableId = -1; @@ -108,6 +111,7 @@ public BrokerFileGroup(DataDescription dataDescription) { this.whereExpr = dataDescription.getWhereExpr(); this.deleteCondition = dataDescription.getDeleteCondition(); this.mergeType = dataDescription.getMergeType(); + this.sequenceCol = dataDescription.getSequenceCol(); } // NOTE: DBLock will be held @@ -268,6 +272,14 @@ public LoadTask.MergeType getMergeType() { return mergeType; } + public String getSequenceCol() { + return sequenceCol; + } + + public boolean hasSequenceCol() { + return !Strings.isNullOrEmpty(sequenceCol); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index bbc549adc7eca5..40361fd0744797 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -735,7 +735,7 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip // check mapping column exist in table // check function // convert mapping column and func arg columns to schema format - + // When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in // their names. These columns are invisible to user, but we need to generate data for these columns. // So we add column mappings for these column. @@ -750,7 +750,7 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip if (mappingExpr != null) { /* * eg: - * (A, C) SET (B = func(xx)) + * (A, C) SET (B = func(xx)) * -> * (A, C) SET (B = func(xx), __doris_shadow_B = func(xxx)) */ @@ -779,10 +779,10 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip */ // do nothing } - + } } - + LOG.debug("after add shadow column. parsedColumnExprList: {}, columnToHadoopFunction: {}", parsedColumnExprList, columnToHadoopFunction); @@ -974,6 +974,11 @@ public static void initColumns(Table tbl, List columnExprs, // We make a copy of the columnExprs so that our subsequent changes // to the columnExprs will not affect the original columnExprs. List copiedColumnExprs = Lists.newArrayList(columnExprs); + // check whether the OlapTable has sequenceCol + boolean hasSequenceCol = false; + if (tbl instanceof OlapTable && ((OlapTable)tbl).hasSequenceCol()) { + hasSequenceCol = true; + } // If user does not specify the file field names, generate it by using base schema of table. // So that the following process can be unified @@ -981,6 +986,10 @@ public static void initColumns(Table tbl, List columnExprs, if (!specifyFileFieldNames) { List columns = tbl.getBaseSchema(false); for (Column column : columns) { + // columnExprs has sequence column, don't need to generate the sequence column + if (hasSequenceCol && column.isSequenceColumn()) { + continue; + } ImportColumnDesc columnDesc = new ImportColumnDesc(column.getName()); LOG.debug("add base column {} to stream load task", column.getName()); copiedColumnExprs.add(columnDesc); @@ -1129,7 +1138,7 @@ public static void initColumns(Table tbl, List columnExprs, * The hadoop function includes: replace_value, strftime, time_format, alignment_timestamp, default_value, now. * It rewrites those function with real function name and param. * For the other function, the expr only go through this function and the origin expr is returned. - * + * * @param columnName * @param originExpr * @return @@ -1158,7 +1167,7 @@ private static Expr transformHadoopFunctionExpr(Table tbl, String columnName, Ex * We will convert this based on different cases: * case 1: k1 = replace_value(null, anyval); * to: k1 = if (k1 is not null, k1, anyval); - * + * * case 2: k1 = replace_value(anyval1, anyval2); * to: k1 = if (k1 is not null, if(k1 != anyval1, k1, anyval2), null); */ @@ -1231,7 +1240,7 @@ private static Expr transformHadoopFunctionExpr(Table tbl, String columnName, Ex /* * change to: * UNIX_TIMESTAMP(DATE_FORMAT(FROM_UNIXTIME(ts), "%Y-01-01 00:00:00")); - * + * */ // FROM_UNIXTIME diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java index 86f6786d3a11f8..fe81be0e45a768 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java @@ -17,6 +17,7 @@ package org.apache.doris.load; +import com.google.common.base.Strings; import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnsStmt; @@ -32,16 +33,19 @@ public class RoutineLoadDesc { private LoadTask.MergeType mergeType; // nullable private final PartitionNames partitionNames; + private final String sequenceColName; public RoutineLoadDesc(ColumnSeparator columnSeparator, ImportColumnsStmt columnsInfo, ImportWhereStmt wherePredicate, PartitionNames partitionNames, - Expr deleteCondition, LoadTask.MergeType mergeType) { + Expr deleteCondition, LoadTask.MergeType mergeType, + String sequenceColName) { this.columnSeparator = columnSeparator; this.columnsInfo = columnsInfo; this.wherePredicate = wherePredicate; this.partitionNames = partitionNames; this.deleteCondition = deleteCondition; this.mergeType = mergeType; + this.sequenceColName = sequenceColName; } public ColumnSeparator getColumnSeparator() { @@ -68,4 +72,12 @@ public PartitionNames getPartitionNames() { public Expr getDeleteCondition() { return deleteCondition; } + + public String getSequenceColName() { + return sequenceColName; + } + + public boolean hasSequenceCol() { + return !Strings.isNullOrEmpty(sequenceColName); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 8754a7bbcb9b30..04f3d938fc0d30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -255,14 +255,14 @@ public void analyze() { Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE)))); LoadStmt stmt = null; try { - stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx); - for (DataDescription dataDescription : stmt.getDataDescriptions()) { - dataDescription.analyzeWithoutCheckPriv(); - } Database db = Catalog.getCurrentCatalog().getDb(dbId); if (db == null) { throw new DdlException("Database[" + dbId + "] does not exist"); } + stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx); + for (DataDescription dataDescription : stmt.getDataDescriptions()) { + dataDescription.analyzeWithoutCheckPriv(db.getFullName()); + } checkAndSetDataSourceInfo(db, stmt.getDataDescriptions()); } catch (Exception e) { LOG.info(new LogBuilder(LogKey.LOAD_JOB, id) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 41febf662f2474..823de3c3ff74aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -26,9 +26,11 @@ import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; @@ -181,6 +183,8 @@ public boolean isFinalState() { protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS; protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; + protected String sequenceCol; + /** * RoutineLoad support json data. * Require Params: @@ -324,10 +328,10 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { if (routineLoadDesc != null) { + columnDescs = Lists.newArrayList(); if (routineLoadDesc.getColumnsInfo() != null) { ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo(); if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { - columnDescs = Lists.newArrayList(); for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { columnDescs.add(columnDesc); } @@ -351,6 +355,11 @@ private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { } else if (mergeType == LoadTask.MergeType.DELETE) { columnDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); } + if (routineLoadDesc.hasSequenceCol()) { + sequenceCol = routineLoadDesc.getSequenceColName(); + // add expr for sequence column + columnDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol))); + } } } @@ -566,6 +575,14 @@ public String getJsonRoot() { return value; } + public String getSequenceCol() { + return sequenceCol; + } + + public boolean hasSequenceCol() { + return !Strings.isNullOrEmpty(sequenceCol); + } + public int getSizeOfRoutineLoadTaskInfoList() { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index e408f960452a25..25df304690b1b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -23,9 +23,11 @@ import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; @@ -225,6 +227,11 @@ private void initColumns(ParamCreateContext context) throws UserException { } else if (mergeType == LoadTask.MergeType.DELETE) { columnExprs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); } + // add columnExpr for sequence column + if (context.fileGroup.hasSequenceCol()) { + columnExprs.add(new ImportColumnDesc(Column.SEQUENCE_COL, + new SlotRef(null, context.fileGroup.getSequenceCol()))); + } } Load.initColumns(targetTable, columnExprs, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index cc1668381af7e5..9e779b6d674640 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -105,6 +105,13 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { && !destTable.hasDeleteSign() ) { throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete."); } + + if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol()) { + throw new UserException("Table " + destTable.getName() + " has sequence column, need to specify the sequence column"); + } + if (!destTable.hasSequenceCol() && taskInfo.hasSequenceCol()) { + throw new UserException("There is no sequence column in the table " + destTable.getName()); + } resetAnalyzer(); // construct tuple descriptor, used for scanNode and dataSink TupleDescriptor tupleDesc = descTable.createTupleDescriptor("DstTableTuple"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 69a6f956199609..d3c885d5a709f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -166,6 +166,7 @@ public TCreateTabletReq toThrift() { tSchema.setKeysType(keysType.toThrift()); tSchema.setStorageType(storageType); int deleteSign = -1; + int sequenceCol = -1; List tColumns = new ArrayList(); for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); @@ -184,9 +185,13 @@ public TCreateTabletReq toThrift() { if (column.isDeleteSignColumn()) { deleteSign = i; } + if (column.isSequenceColumn()) { + sequenceCol = i; + } } tSchema.setColumns(tColumns); tSchema.setDeleteSignIdx(deleteSign); + tSchema.setSequence_col_idx(sequenceCol); if (CollectionUtils.isNotEmpty(indexes)) { List tIndexes = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index cfc8185bf88326..1f86825e9cbbe8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -36,6 +36,7 @@ public interface LoadTaskInfo { public PartitionNames getPartitions(); public LoadTask.MergeType getMergeType(); public Expr getDeleteCondition(); + public boolean hasSequenceCol(); public TFileType getFileType(); public TFileFormatType getFormatType(); public String getJsonPaths(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 6720cfc6f8136a..113b69226291b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -17,6 +17,7 @@ package org.apache.doris.task; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.doris.analysis.ColumnSeparator; @@ -26,8 +27,10 @@ import org.apache.doris.analysis.ImportWhereStmt; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -70,6 +73,7 @@ public class StreamLoadTask implements LoadTaskInfo { private long execMemLimit = 2 * 1024 * 1024 * 1024L; // default is 2GB private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete private Expr deleteCondition; + private String sequenceCol; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { this.id = id; @@ -164,6 +168,10 @@ public Expr getDeleteCondition() { return deleteCondition; } + public boolean hasSequenceCol() { + return !Strings.isNullOrEmpty(sequenceCol); + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request, Database db) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType()); @@ -238,6 +246,11 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request, Databas } else if (mergeType == LoadTask.MergeType.DELETE) { columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); } + if (request.isSetSequence_col()) { + sequenceCol = request.getSequence_col(); + // add expr for sequence column + columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol))); + } } // used for stream load diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index ac6c12c9dece78..f25464d3ee9782 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -18,6 +18,9 @@ package org.apache.doris.analysis; import org.apache.doris.analysis.BinaryPredicate.Operator; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.load.loadv2.LoadTask; @@ -27,6 +30,7 @@ import com.google.common.collect.Lists; +import org.apache.doris.system.SystemInfoService; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,11 +47,47 @@ public class DataDescriptionTest { private PaloAuth auth; @Mocked private ConnectContext ctx; + @Mocked + private Database db; + @Mocked + private OlapTable tbl; + @Mocked + private Analyzer analyzer; + @Mocked + private Catalog catalog; @Before - public void setUp() { + public void setUp() throws AnalysisException { MockedAuth.mockedAuth(auth); MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + new Expectations() { + { + analyzer.getClusterName(); + minTimes = 0; + result = SystemInfoService.DEFAULT_CLUSTER; + + analyzer.getDefaultDb(); + minTimes = 0; + result = "testCluster:testDb"; + + Catalog.getCurrentCatalog(); + minTimes = 0; + result = catalog; + + Catalog.getCurrentCatalog(); + minTimes = 0; + result = catalog; + + catalog.getDb(anyString); + minTimes = 0; + result = db; + + db.getTable(anyString); + minTimes = 0; + result = tbl; + + } + }; } @Test @@ -79,7 +119,7 @@ public void testNormal() throws AnalysisException { Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, false, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr); + Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, false, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); desc.analyze("testDb"); Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1", desc.toString()); Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql()); @@ -180,7 +220,7 @@ public void testNegMerge() throws AnalysisException { Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, true, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr); + Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, true, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null); desc.analyze("testDb"); } @@ -267,4 +307,42 @@ public void testAnalyzeColumnsWithDuplicatedColumnMapping(@Injectable BinaryPred } } } + + @Test + public void testAnalyzeSequenceColumnNormal() throws AnalysisException { + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + Lists.newArrayList("k1", "k2", "source_sequence","v1"), new ColumnSeparator("\t"), + null, null,false, null, null, LoadTask.MergeType.APPEND, null, "source_sequence"); + new Expectations() { + { + tbl.getName(); + minTimes = 0; + result = "testTable"; + + tbl.hasSequenceCol(); + minTimes = 0; + result =true; + } + }; + desc.analyze("testDb"); + } + + @Test(expected = AnalysisException.class) + public void testAnalyzeSequenceColumnWithoutSourceSequence() throws AnalysisException { + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + Lists.newArrayList("k1", "k2","v1"), new ColumnSeparator("\t"), + null, null,false, null, null, LoadTask.MergeType.APPEND, null, "source_sequence"); + new Expectations() { + { + tbl.getName(); + minTimes = 0; + result = "testTable"; + + tbl.hasSequenceCol(); + minTimes = 0; + result =true; + } + }; + desc.analyze("testDb"); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index df61e6af60cb92..aaf6844ba3a54c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -102,6 +102,13 @@ public void testNormal() throws DdlException { .expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n" + "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');")); + ExceptionChecker + .expectThrowsNoException(() -> createTable("create table test.tbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n" + + "unique key(k1, k2)\n" + + "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n" + + "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n" + + "'function_column.sequence_type' = 'int');")); + Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test"); OlapTable tbl6 = (OlapTable) db.getTable("tbl6"); Assert.assertTrue(tbl6.getColumn("k1").isKey()); @@ -112,6 +119,12 @@ public void testNormal() throws DdlException { Assert.assertTrue(tbl7.getColumn("k1").isKey()); Assert.assertFalse(tbl7.getColumn("k2").isKey()); Assert.assertTrue(tbl7.getColumn("k2").getAggregationType() == AggregateType.NONE); + + OlapTable tbl8 = (OlapTable) db.getTable("tbl8"); + Assert.assertTrue(tbl8.getColumn("k1").isKey()); + Assert.assertTrue(tbl8.getColumn("k2").isKey()); + Assert.assertFalse(tbl8.getColumn("v1").isKey()); + Assert.assertTrue(tbl8.getColumn(Column.SEQUENCE_COL).getAggregationType() == AggregateType.REPLACE); } @Test @@ -159,5 +172,21 @@ public void testAbormal() throws DdlException { .expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium is SSD in all backends. need: 1", () -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n" + "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');")); + + ExceptionChecker + .expectThrowsWithMsg(DdlException.class, "sequence column only support UNIQUE_KEYS", + () -> createTable("create table test.atbl8\n" + "(k1 varchar(40), k2 int, v1 int sum)\n" + + "aggregate key(k1, k2)\n" + + "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n" + + "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n" + + "'function_column.sequence_type' = 'int');")); + + ExceptionChecker + .expectThrowsWithMsg(DdlException.class, "sequence type only support integer types and date types", + () -> createTable("create table test.atbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n" + + "unique key(k1, k2)\n" + + "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n" + + "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1',\n" + + "'function_column.sequence_type' = 'double');")); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 4d6b700509235d..4b0286c0f68316 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -19,8 +19,10 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; @@ -36,7 +38,6 @@ import org.apache.doris.load.Load; import org.apache.doris.load.Source; import org.apache.doris.metric.MetricRepo; -import org.apache.doris.qe.OriginStatement; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.transaction.TransactionState; @@ -45,7 +46,6 @@ import com.google.common.collect.Sets; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 5f6750c9365833..70b068632fd9e3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.ImportSequenceStmt; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.PartitionNames; @@ -81,6 +82,8 @@ public class KafkaRoutineLoadJobTest { private ColumnSeparator columnSeparator = new ColumnSeparator(","); + private ImportSequenceStmt sequenceStmt = new ImportSequenceStmt("source_sequence"); + @Mocked ConnectContext connectContext; @Mocked @@ -242,7 +245,7 @@ public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog, @Injectable Database database) throws LoadException { CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, - partitionNames, null, LoadTask.MergeType.APPEND); + partitionNames, null, LoadTask.MergeType.APPEND, null); Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); new Expectations() { @@ -267,7 +270,7 @@ public void testFromCreateStmt(@Mocked Catalog catalog, @Injectable OlapTable table) throws UserException { CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames, null, - LoadTask.MergeType.APPEND); + LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName()); Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); List> partitionIdToOffset = Lists.newArrayList(); List kafkaPartitionInfoList = Lists.newArrayList(); @@ -315,6 +318,7 @@ public List getAllKafkaPartitions(String brokerList, String topic, Assert.assertEquals(topicName, Deencapsulation.getField(kafkaRoutineLoadJob, "topic")); List kafkaPartitionResult = Deencapsulation.getField(kafkaRoutineLoadJob, "customKafkaPartitions"); Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(kafkaPartitionResult)); + Assert.assertEquals(sequenceStmt.getSequenceColName(), kafkaRoutineLoadJob.getSequenceCol()); } private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java index 847a9749fdb304..35b545c5af8196 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java @@ -87,7 +87,7 @@ public void testNormalPlan() throws UserException { request.setLoadId(new TUniqueId(2, 3)); request.setFileType(TFileType.FILE_STREAM); request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN); - StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db); StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, streamLoadTask); planner.plan(streamLoadTask.getId()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java index c5e9e42e16c559..22b29190954a56 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.OlapTable; @@ -71,6 +72,9 @@ public class StreamLoadScanNodeTest { @Injectable ConnectContext connectContext; + @Injectable + Database db; + @Injectable OlapTable dstTable; @@ -130,10 +134,47 @@ List getHllSchema() { return columns; } + + List getSequenceColSchema() { + List columns = Lists.newArrayList(); + + Column k1 = new Column("k1", PrimitiveType.BIGINT); + k1.setIsKey(true); + k1.setIsAllowNull(false); + columns.add(k1); + + Column k2 = new Column("k2", ScalarType.createVarchar(25)); + k2.setIsKey(true); + k2.setIsAllowNull(true); + columns.add(k2); + + // sequence column, it's hidden column + Column sequenceCol = new Column(Column.SEQUENCE_COL, PrimitiveType.BIGINT); + sequenceCol.setIsKey(false); + sequenceCol.setAggregationType(AggregateType.REPLACE, false); + sequenceCol.setIsAllowNull(false); + sequenceCol.setIsVisible(false); + columns.add(sequenceCol); + + // sequence column, it's visible column for user, it's equals to the hidden column + Column visibleSequenceCol = new Column("visible_sequence_col", PrimitiveType.BIGINT); + visibleSequenceCol.setIsKey(false); + visibleSequenceCol.setAggregationType(AggregateType.REPLACE, false); + visibleSequenceCol.setIsAllowNull(true); + columns.add(visibleSequenceCol); + + Column v1 = new Column("v1", ScalarType.createVarchar(25)); + v1.setIsKey(false); + v1.setAggregationType(AggregateType.REPLACE, false); + v1.setIsAllowNull(false); + columns.add(v1); + + return columns; + } private StreamLoadScanNode getStreamLoadScanNode(TupleDescriptor dstDesc, TStreamLoadPutRequest request) throws UserException { - StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db); StreamLoadScanNode scanNode = new StreamLoadScanNode(streamLoadTask.getId(), new PlanNodeId(1), dstDesc, dstTable, streamLoadTask); return scanNode; } @@ -197,7 +238,7 @@ public void testLostV2() throws UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1, k2, v1"); - StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db); StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request); scanNode.init(analyzer); @@ -273,7 +314,7 @@ public void testColumnsNormal() throws UserException, UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,v1, v2=k2"); - StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db); StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request); scanNode.init(analyzer); scanNode.finalize(analyzer); @@ -321,7 +362,7 @@ public void testHllColumnsNormal() throws UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setFileType(TFileType.FILE_STREAM); request.setColumns("k1,k2, v1=" + FunctionSet.HLL_HASH + "(k2)"); - StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, null); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request, db); StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request); scanNode.init(analyzer); @@ -643,4 +684,146 @@ public void testLoadInitColumnsMappingColumnNotExist() throws UserException { columnExprs.add(new ImportColumnDesc("c3", new FunctionCallExpr("func", Lists.newArrayList()))); Load.initColumns(table, columnExprs, null, null, null, null, null, null); } + + @Test + public void testSequenceColumnWithSetColumns() throws UserException { + Analyzer analyzer = new Analyzer(catalog, connectContext); + DescriptorTable descTbl = analyzer.getDescTbl(); + + List columns = getSequenceColSchema(); + TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc"); + for (Column column : columns) { + SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); + System.out.println(column); + slot.setColumn(column); + slot.setIsMaterialized(true); + if (column.isAllowNull()) { + slot.setIsNullable(true); + } else { + slot.setIsNullable(false); + } + } + + new Expectations() { + { + db.getTable(anyInt); + result = dstTable; + minTimes = 0; + dstTable.hasSequenceCol(); + result = true; + } + }; + + new Expectations() { + { + dstTable.getColumn("k1"); + result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get(); + minTimes = 0; + + dstTable.getColumn("k2"); + result = columns.stream().filter(c -> c.getName().equals("k2")).findFirst().get(); + minTimes = 0; + + dstTable.getColumn(Column.SEQUENCE_COL); + result = columns.stream().filter(c -> c.getName().equals(Column.SEQUENCE_COL)).findFirst().get(); + minTimes = 0; + + dstTable.getColumn("visible_sequence_col"); + result = columns.stream().filter(c -> c.getName().equals("visible_sequence_col")).findFirst().get(); + minTimes = 0; + + dstTable.getColumn("v1"); + result = columns.stream().filter(c -> c.getName().equals("v1")).findFirst().get(); + minTimes = 0; + // there is no "source_sequence" column in the Table + dstTable.getColumn("source_sequence"); + result = null; + minTimes = 0; + } + }; + + TStreamLoadPutRequest request = getBaseRequest(); + request.setColumns("k1,k2,source_sequence,v1"); + request.setFileType(TFileType.FILE_STREAM); + request.setSequence_col("source_sequence"); + StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request); + + scanNode.init(analyzer); + scanNode.finalize(analyzer); + scanNode.getNodeExplainString("", TExplainLevel.NORMAL); + TPlanNode planNode = new TPlanNode(); + scanNode.toThrift(planNode); + } + + @Test + public void testSequenceColumnWithoutSetColumns() throws UserException { + Analyzer analyzer = new Analyzer(catalog, connectContext); + DescriptorTable descTbl = analyzer.getDescTbl(); + + List columns = getSequenceColSchema(); + TupleDescriptor dstDesc = descTbl.createTupleDescriptor("DstTableDesc"); + for (Column column : columns) { + SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); + slot.setColumn(column); + + slot.setIsMaterialized(true); + if (column.isAllowNull()) { + slot.setIsNullable(true); + } else { + slot.setIsNullable(false); + } + } + + new Expectations() { + { + db.getTable(anyInt); + result = dstTable; + minTimes = 0; + dstTable.hasSequenceCol(); + result = true; + } + }; + + new Expectations() { + { + dstTable.getBaseSchema(anyBoolean); result = columns; + dstTable.getFullSchema(); result = columns; + + dstTable.getColumn("k1"); + result = columns.stream().filter(c -> c.getName().equals("k1")).findFirst().get(); + minTimes = 0; + + dstTable.getColumn("k2"); + result = columns.stream().filter(c -> c.getName().equals("k2")).findFirst().get(); + minTimes = 0; + + dstTable.getColumn(Column.SEQUENCE_COL); + result = columns.stream().filter(c -> c.getName().equals(Column.SEQUENCE_COL)).findFirst().get(); + minTimes = 0; + + dstTable.getColumn("visible_sequence_col"); + result = columns.stream().filter(c -> c.getName().equals("visible_sequence_col")).findFirst().get(); + minTimes = 0; + + dstTable.getColumn("v1"); + result = columns.stream().filter(c -> c.getName().equals("v1")).findFirst().get(); + minTimes = 0; + + dstTable.hasSequenceCol(); + result = true; + minTimes = 0; + } + }; + + TStreamLoadPutRequest request = getBaseRequest(); + request.setFileType(TFileType.FILE_STREAM); + request.setSequence_col("visible_sequence_col"); + StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request); + + scanNode.init(analyzer); + scanNode.finalize(analyzer); + scanNode.getNodeExplainString("", TExplainLevel.NORMAL); + TPlanNode planNode = new TPlanNode(); + scanNode.toThrift(planNode); + } } diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index d4f9cf758523de..0884aadee0d5bb 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -270,7 +270,7 @@ message TabletSchemaPB { optional uint32 next_column_unique_id = 7; // OLAPHeaderMessage.next_column_unique_id optional bool is_in_memory = 8 [default=false]; optional int32 delete_sign_idx = 9 [default = -1]; - + optional int32 sequence_col_idx = 10 [default= -1]; } enum TabletStatePB { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index f4184ab74dca85..bcbc57698ef89f 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -47,6 +47,7 @@ struct TTabletSchema { 7: optional list indexes 8: optional bool is_in_memory 9: optional i32 delete_sign_idx = -1 + 10: optional i32 sequence_col_idx = -1 } // this enum stands for different storage format in src_backends diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 6de605b2a74aab..3aec2f70021609 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -554,6 +554,7 @@ struct TStreamLoadPutRequest { 26: optional string json_root 27: optional Types.TMergeType merge_type 28: optional string delete_condition + 29: optional string sequence_col } struct TStreamLoadPutResult { From 08cf336a589efeb55a78add23cc6f5bbed0965d7 Mon Sep 17 00:00:00 2001 From: yangwenbo6 Date: Fri, 28 Aug 2020 16:03:16 +0800 Subject: [PATCH 2/6] fix rollup --- .../java/org/apache/doris/alter/MaterializedViewHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index aae7558b26a217..2a3a8386be3311 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -562,7 +562,7 @@ public List checkAndPrepareMaterializedView(AddRollupClause addRollupCla } } // add the sequence column - rollupSchema.add(keysNumOfRollup, new Column(Column.SEQUENCE_COL, olapTable.getSequenceType(), + rollupSchema.add(new Column(Column.SEQUENCE_COL, olapTable.getSequenceType(), false, AggregateType.REPLACE, null, "")); } } From 99edfdeb34c07b1af2a5a2942b14dbb263f52981 Mon Sep 17 00:00:00 2001 From: yangwenbo6 Date: Fri, 28 Aug 2020 16:37:00 +0800 Subject: [PATCH 3/6] fix for camelCase --- .../main/java/org/apache/doris/task/CreateReplicaTask.java | 2 +- .../src/main/java/org/apache/doris/task/StreamLoadTask.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index d3c885d5a709f9..04055d7d845b90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -191,7 +191,7 @@ public TCreateTabletReq toThrift() { } tSchema.setColumns(tColumns); tSchema.setDeleteSignIdx(deleteSign); - tSchema.setSequence_col_idx(sequenceCol); + tSchema.setSequenceColIdx(sequenceCol); if (CollectionUtils.isNotEmpty(indexes)) { List tIndexes = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 113b69226291b7..90ba6e1b2cf188 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -246,8 +246,8 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request, Databas } else if (mergeType == LoadTask.MergeType.DELETE) { columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); } - if (request.isSetSequence_col()) { - sequenceCol = request.getSequence_col(); + if (request.isSetSequenceCol()) { + sequenceCol = request.getSequenceCol(); // add expr for sequence column columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol))); } From 25153c4254542260c0f6c256ba8761fec7aeeb0b Mon Sep 17 00:00:00 2001 From: yangwenbo6 Date: Fri, 28 Aug 2020 16:41:23 +0800 Subject: [PATCH 4/6] fix for camelCase --- .../java/org/apache/doris/planner/StreamLoadScanNodeTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java index 22b29190954a56..18d5523f01a801 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java @@ -745,7 +745,7 @@ public void testSequenceColumnWithSetColumns() throws UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setColumns("k1,k2,source_sequence,v1"); request.setFileType(TFileType.FILE_STREAM); - request.setSequence_col("source_sequence"); + request.setSequenceCol("source_sequence"); StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request); scanNode.init(analyzer); @@ -817,7 +817,7 @@ public void testSequenceColumnWithoutSetColumns() throws UserException { TStreamLoadPutRequest request = getBaseRequest(); request.setFileType(TFileType.FILE_STREAM); - request.setSequence_col("visible_sequence_col"); + request.setSequenceCol("visible_sequence_col"); StreamLoadScanNode scanNode = getStreamLoadScanNode(dstDesc, request); scanNode.init(analyzer); From 1fb769491a678d4573f0e5666676e80a82e9307c Mon Sep 17 00:00:00 2001 From: yangwenbo6 Date: Fri, 28 Aug 2020 20:46:18 +0800 Subject: [PATCH 5/6] fix --- be/test/olap/test_data/header.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/test/olap/test_data/header.txt b/be/test/olap/test_data/header.txt index 42edd2a168c7f1..851cae207dea1a 100644 --- a/be/test/olap/test_data/header.txt +++ b/be/test/olap/test_data/header.txt @@ -51,7 +51,7 @@ "next_column_unique_id": 3, "is_in_memory": false, "delete_sign_idx": -1, - "has_sequence_col": false + "sequence_col_idx": -1 }, "rs_metas": [ { From ad7eab7a65578b682c25e5923c7dd3f3b5a50073 Mon Sep 17 00:00:00 2001 From: yangwenbo6 Date: Sat, 29 Aug 2020 01:47:00 +0800 Subject: [PATCH 6/6] fix for null sequence_col --- .../java/org/apache/doris/alter/MaterializedViewHandler.java | 2 +- .../src/main/java/org/apache/doris/catalog/OlapTable.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 2a3a8386be3311..d6c7c037be95b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -563,7 +563,7 @@ public List checkAndPrepareMaterializedView(AddRollupClause addRollupCla } // add the sequence column rollupSchema.add(new Column(Column.SEQUENCE_COL, olapTable.getSequenceType(), - false, AggregateType.REPLACE, null, "")); + false, AggregateType.REPLACE, true, null, "", false)); } } } else if (KeysType.DUP_KEYS == keysType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index aaad51e7320d7e..4750ff747b04d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -806,7 +806,7 @@ public void setSequenceInfo(Type type) { this.sequenceType = type; // sequence column is value column with REPLACE aggregate type - Column sequenceCol = new Column(Column.SEQUENCE_COL, type, false, AggregateType.REPLACE, false, null, "", false); + Column sequenceCol = new Column(Column.SEQUENCE_COL, type, false, AggregateType.REPLACE, true, null, "", false); // add sequence column at last fullSchema.add(sequenceCol); nameToColumn.put(Column.SEQUENCE_COL, sequenceCol);