diff --git a/.gitignore b/.gitignore index 74c2226b136f78..e2c576843adfe9 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,9 @@ fe_plugins/output fe/mocked fe/*/target dependency-reduced-pom.xml +fe_plugins/**/.classpath +fe_plugins/**/.factorypath +samples/**/.classpath #ignore eclipse project file & idea project file diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 53212e1da98c5c..e7ad582d4723e7 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -218,7 +218,6 @@ Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* << Tuple::to_string(row->get_tuple(0), *_tuple_desc); } } - return Status::OK(); } @@ -235,7 +234,7 @@ Status BrokerScanNode::close(RuntimeState* state) { _scanner_threads[i].join(); } - // Open partition + // Close partition if (_partition_expr_ctxs.size() > 0) { Expr::close(_partition_expr_ctxs, state); for (auto iter : _partition_infos) { diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index e6260810474813..1ef046912e1f2f 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -105,6 +105,15 @@ void NodeChannel::open() { request.set_load_mem_limit(_parent->_load_mem_limit); request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); + // set load info + std::map::const_iterator it = _type_map.find(_parent->_merge_type); + if (it == _type_map.end() ) { + LOG(WARNING) << "merge type [" << _parent->_merge_type << "] is invalid, set to default APPEND"; + request.set_merge_type(PTabletWriterOpenRequest::APPEND); + } else { + request.set_merge_type(it->second); + } + request.set_delete_slot_id(_parent->_delete_slot_id); _open_closure = new RefCountClosure(); _open_closure->ref(); @@ -544,6 +553,9 @@ Status OlapTableSink::prepare(RuntimeState* state) { _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); _load_mem_limit = state->get_load_mem_limit(); + _merge_type = state->get_merge_type(); + _delete_slot_id = state->get_delete_slot_id(); + // open all channels auto& partitions = _partition->get_partitions(); for (int i = 0; i < _schema->indexes().size(); ++i) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 4f61ed15cc0f1c..3db48a8065eba1 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -237,6 +237,11 @@ class NodeChannel { int64_t _mem_exceeded_block_ns = 0; int64_t _queue_push_lock_ns = 0; int64_t _actual_consume_ns = 0; + std::map _type_map = { + {TMergeType::APPEND, PTabletWriterOpenRequest::APPEND}, + {TMergeType::MERGE, PTabletWriterOpenRequest::MERGE}, + {TMergeType::DELETE, PTabletWriterOpenRequest::DELETE} + }; }; class IndexChannel { @@ -393,6 +398,9 @@ class OlapTableSink : public DataSink { // the timeout of load channels opened by this tablet sink. in second int64_t _load_channel_timeout_s = 0; + TMergeType::type _merge_type = TMergeType::APPEND; + int32_t _delete_slot_id = -1; + // }; } // namespace stream_load diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 52e16a5ebcfb9b..0a1419479c6a97 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -56,6 +56,7 @@ #include "util/doris_metrics.h" #include "util/time.h" #include "util/uid_util.h" +#include "util/string_util.h" namespace doris { @@ -390,6 +391,27 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_timeout(ctx->timeout_second); } request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + request.__set_merge_type(TMergeType::APPEND); + StringCaseMap merge_type_map = { + { "APPEND", TMergeType::APPEND }, + { "DELETE", TMergeType::DELETE }, + { "MERGE", TMergeType::MERGE } + }; + if (!http_req->header(HTTP_MERGE_TYPE).empty()) { + std::string merge_type = http_req->header(HTTP_MERGE_TYPE); + if (merge_type_map.find(merge_type) != merge_type_map.end() ) { + request.__set_merge_type(merge_type_map.find(merge_type)->second); + } else { + return Status::InvalidArgument("Invalid merge type " + merge_type); + } + } + if (!http_req->header(HTTP_DELETE_CONDITION).empty()) { + if (request.merge_type == TMergeType::MERGE) { + request.__set_delete_condition(http_req->header(HTTP_DELETE_CONDITION)); + } else { + return Status::InvalidArgument("not support delete when merge type is not merge."); + } + } // plan this load TNetworkAddress master_addr = _exec_env->master_info()->network_address; #ifndef BE_TEST diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 984aa102a337d8..911b54cf311741 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -39,6 +39,8 @@ static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit"; static const std::string HTTP_JSONPATHS = "jsonpaths"; static const std::string HTTP_JSONROOT = "json_root"; static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array"; +static const std::string HTTP_MERGE_TYPE = "merge_type"; +static const std::string HTTP_DELETE_CONDITION = "delete"; static const std::string HTTP_100_CONTINUE = "100-continue"; diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 884c0458bcb60d..3705f59f1055c2 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -69,6 +69,7 @@ add_library(Olap STATIC serialize.cpp storage_engine.cpp data_dir.cpp + row.cpp short_key_index.cpp snapshot_manager.cpp stream_index_common.cpp diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 0a89bc41a0ed5b..d2f3ee6e62de20 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -24,6 +24,7 @@ #include "olap/schema.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" +#include "runtime/tuple.h" namespace doris { @@ -133,7 +134,8 @@ OLAPStatus DeltaWriter::init() { writer_context.partition_id = _req.partition_id; writer_context.tablet_schema_hash = _req.schema_hash; writer_context.rowset_type = _storage_engine->default_rowset_type(); - if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { + if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET + || _req.merge_type != PTabletWriterOpenRequest::APPEND) { writer_context.rowset_type = BETA_ROWSET; } writer_context.rowset_path_prefix = _tablet->tablet_path(); @@ -154,13 +156,24 @@ OLAPStatus DeltaWriter::init() { _is_init = true; return OLAP_SUCCESS; } - OLAPStatus DeltaWriter::write(Tuple* tuple) { + return write(tuple, nullptr); +} +OLAPStatus DeltaWriter::write(Tuple* tuple, TupleDescriptor* tuple_desc) { if (!_is_init) { RETURN_NOT_OK(init()); } + bool is_delete = false; + if (tuple_desc != nullptr) { - _mem_table->insert(tuple); + } + if (_req.delete_slot_id >= 0) { + const SlotDescriptor* slot = tuple_desc->slots()[_req.delete_slot_id]; + if (slot->type() == TYPE_BOOLEAN) { + is_delete = *reinterpret_cast(tuple->get_slot(slot->tuple_offset())); + } + } + _mem_table->insert(tuple, is_delete); // if memtable is full, push it to the flush executor, // and create a new memtable for incoming data @@ -194,8 +207,8 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() { void DeltaWriter::_reset_mem_table() { _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots, - _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), - _mem_tracker.get())); + _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get(), + _tablet->keys_type() == UNIQUE_KEYS && _req.delete_slot_id > 0)); } OLAPStatus DeltaWriter::close() { diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 64828d59c975f3..1ed53ef0951813 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -50,6 +50,8 @@ struct WriteRequest { TupleDescriptor* tuple_desc; // slots are in order of tablet's schema const std::vector* slots; + PTabletWriterOpenRequest::PMergeType merge_type; + int delete_slot_id; }; // Writer for a particular (load, index, tablet). @@ -63,6 +65,8 @@ class DeltaWriter { OLAPStatus init(); OLAPStatus write(Tuple* tuple); + OLAPStatus write(Tuple* tuple, TupleDescriptor* tuple_desc); + // flush the last memtable to flush queue, must call it before close_wait() OLAPStatus close(); // wait for all memtables to be flushed. diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 2dd059bd9294fc..691eccb2a85d91 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -18,10 +18,10 @@ #include "olap/memtable.h" #include "common/logging.h" +#include "olap/row.h" +#include "olap/row_cursor.h" #include "olap/rowset/column_data_writer.h" #include "olap/rowset/rowset_writer.h" -#include "olap/row_cursor.h" -#include "olap/row.h" #include "olap/schema.h" #include "runtime/tuple.h" #include "util/debug_util.h" @@ -32,20 +32,31 @@ namespace doris { MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker) - : _tablet_id(tablet_id), - _schema(schema), - _tablet_schema(tablet_schema), - _tuple_desc(tuple_desc), - _slot_descs(slot_descs), - _keys_type(keys_type), - _row_comparator(_schema), - _rowset_writer(rowset_writer) { - - _schema_size = _schema->schema_size(); + : MemTable(tablet_id, schema, tablet_schema, slot_descs, tuple_desc, keys_type, + rowset_writer, mem_tracker, false) {} +MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, + const std::vector* slot_descs, TupleDescriptor* tuple_desc, + KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker, + bool with_delete_flag) + : _tablet_id(tablet_id), + _schema(schema), + _tablet_schema(tablet_schema), + _tuple_desc(tuple_desc), + _slot_descs(slot_descs), + _keys_type(keys_type), + _row_comparator(_schema), + _rowset_writer(rowset_writer), + _with_delete(with_delete_flag) { + if (_with_delete) { + _schema_size = _schema->schema_size() + 1; + } else { + _schema_size = _schema->schema_size(); + } _mem_tracker.reset(new MemTracker(-1, "memtable", mem_tracker)); _buffer_mem_pool.reset(new MemPool(_mem_tracker.get())); _table_mem_pool.reset(new MemPool(_mem_tracker.get())); - _skip_list = new Table(_row_comparator, _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS); + _skip_list = + new Table(_row_comparator, _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS); } MemTable::~MemTable() { @@ -61,6 +72,10 @@ int MemTable::RowCursorComparator::operator()(const char* left, const char* righ } void MemTable::insert(const Tuple* tuple) { + insert(tuple, false); +} + +void MemTable::insert(const Tuple* tuple, bool is_delete) { bool overwritten = false; uint8_t* _tuple_buf = nullptr; if (_keys_type == KeysType::DUP_KEYS) { @@ -69,6 +84,8 @@ void MemTable::insert(const Tuple* tuple) { ContiguousRow row(_schema, _tuple_buf); _tuple_to_row(tuple, &row, _table_mem_pool.get()); _skip_list->Insert((TableKey)_tuple_buf, &overwritten); + LOG(INFO) << "is_delete: " << row.is_delete(); + DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList"; return; } @@ -78,7 +95,8 @@ void MemTable::insert(const Tuple* tuple) { // _skiplist. If it exists, we aggregate the new row into the row in skiplist. // otherwise, we need to copy it into _table_mem_pool before we can insert it. _tuple_buf = _buffer_mem_pool->allocate(_schema_size); - ContiguousRow src_row(_schema, _tuple_buf); + ContiguousRow src_row(_schema, _tuple_buf, _with_delete); + src_row.set_delete(is_delete); _tuple_to_row(tuple, &src_row, _buffer_mem_pool.get()); bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint); @@ -86,7 +104,7 @@ void MemTable::insert(const Tuple* tuple) { _aggregate_two_row(src_row, _hint.curr->key); } else { _tuple_buf = _table_mem_pool->allocate(_schema_size); - ContiguousRow dst_row(_schema, _tuple_buf); + ContiguousRow dst_row(_schema, _tuple_buf, _with_delete); copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get()); _skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint); } @@ -102,13 +120,13 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me bool is_null = tuple->is_null(slot->null_indicator_offset()); const void* value = tuple->get_slot(slot->tuple_offset()); - _schema->column(i)->consume( - &cell, (const char*)value, is_null, mem_pool, &_agg_object_pool); + _schema->column(i)->consume(&cell, (const char*)value, is_null, mem_pool, + &_agg_object_pool); } } void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) { - ContiguousRow dst_row(_schema, row_in_skiplist); + ContiguousRow dst_row(_schema, row_in_skiplist, _with_delete); agg_update_row(&dst_row, src_row, _table_mem_pool.get()); } @@ -119,7 +137,7 @@ OLAPStatus MemTable::flush() { Table::Iterator it(_skip_list); for (it.SeekToFirst(); it.Valid(); it.Next()) { char* row = (char*)it.key(); - ContiguousRow dst_row(_schema, row); + ContiguousRow dst_row(_schema, row, _with_delete); agg_finalize_row(&dst_row, _table_mem_pool.get()); RETURN_NOT_OK(_rowset_writer->add_row(dst_row)); } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index a7142ac5127347..fca1ff3ddc60df 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -40,11 +40,16 @@ class MemTable { MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker); + MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, + const std::vector* slot_descs, TupleDescriptor* tuple_desc, + KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker, + bool with_delete_flag = false); ~MemTable(); int64_t tablet_id() const { return _tablet_id; } size_t memory_usage() const { return _mem_tracker->consumption(); } void insert(const Tuple* tuple); + void insert(const Tuple* tuple, bool is_delete); OLAPStatus flush(); OLAPStatus close(); @@ -88,6 +93,9 @@ class MemTable { RowsetWriter* _rowset_writer; + bool _with_delete; + + }; // class MemTable inline std::ostream& operator<<(std::ostream& os, const MemTable& table) { diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index b3c136b1ddf1d7..4d4b29a4d4cabb 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -362,7 +362,8 @@ enum OLAPStatus { OLAP_ERR_ROWSET_LOAD_FAILED = -3109, OLAP_ERR_ROWSET_READER_INIT = -3110, OLAP_ERR_ROWSET_READ_FAILED = -3111, - OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION = -3112 + OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION = -3112, + OLAP_ERR_ROWSET_VERSION_NOT_MATCH = -3113 }; enum ColumnFamilyIndex { diff --git a/be/src/olap/row.cpp b/be/src/olap/row.cpp new file mode 100644 index 00000000000000..5385e2226af85d --- /dev/null +++ b/be/src/olap/row.cpp @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/row.h" + +namespace doris { + +template <> +void copy_row_in_memtable(ContiguousRow* dst, const ContiguousRow& src, MemPool* pool) { + for (auto cid : dst->schema()->column_ids()) { + auto dst_cell = dst->cell(cid); + auto src_cell = src.cell(cid); + dst->schema()->column(cid)->copy_object(&dst_cell, src_cell, pool); + } + if (dst->with_delete_flag() && src.with_delete_flag()) { + dst->set_delete(src.is_delete()); + } +} + +template <> +void agg_update_row(ContiguousRow* dst, const ContiguousRow& src, MemPool* mem_pool) { + for (uint32_t cid = dst->schema()->num_key_columns(); cid < dst->schema()->num_columns(); + ++cid) { + auto dst_cell = dst->cell(cid); + auto src_cell = src.cell(cid); + dst->schema()->column(cid)->agg_update(&dst_cell, src_cell, mem_pool); + } + if (dst->with_delete_flag() && src.with_delete_flag()) { + dst->set_delete(src.is_delete()); + } +} + +template <> +void agg_update_row(const std::vector& cids, ContiguousRow* dst, const ContiguousRow& src) { + for (auto cid : cids) { + auto dst_cell = dst->cell(cid); + auto src_cell = src.cell(cid); + dst->schema()->column(cid)->agg_update(&dst_cell, src_cell); + } + if (dst->with_delete_flag() && src.with_delete_flag()) { + dst->set_delete(src.is_delete()); + } +} + +template <> +std::string print_row(const ContiguousRow& row) { + std::stringstream ss; + + size_t i = 0; + for (auto cid : row.schema()->column_ids()) { + if (i++ > 0) { + ss << "|"; + } + ss << row.schema()->column(cid)->debug_string(row.cell(cid)); + } + if (row.with_delete_flag()) { + ss << "|[ delete: " << std::boolalpha << row.is_delete() << "]"; + } + + return ss.str(); +} +} // namespace doris diff --git a/be/src/olap/row.h b/be/src/olap/row.h index f24540e13ebff1..6a8b1a6df6f93e 100644 --- a/be/src/olap/row.h +++ b/be/src/olap/row.h @@ -17,8 +17,8 @@ #pragma once -#include #include +#include #include "olap/row_cursor_cell.h" #include "olap/schema.h" @@ -30,24 +30,38 @@ class Arena; // The row has all columns layed out in memory based on the schema.column_offset() struct ContiguousRow { - ContiguousRow(const Schema* schema, const void* row) : _schema(schema), _row((void*)row) { } - ContiguousRow(const Schema* schema, void* row) : _schema(schema), _row(row) { } + ContiguousRow(const Schema* schema, const void* row, bool with_delete_flag = false) + : _schema(schema), _row((void*)row), _with_delete_flag(with_delete_flag) {} + ContiguousRow(const Schema* schema, void* row, bool with_delete_flag = false) + : _schema(schema), _row(row), _with_delete_flag(with_delete_flag) {} RowCursorCell cell(uint32_t cid) const { return RowCursorCell((char*)_row + _schema->column_offset(cid)); } - void set_is_null(uint32_t cid, bool is_null) const { - _schema->set_is_null(_row, cid, is_null); - } + void set_is_null(uint32_t cid, bool is_null) const { _schema->set_is_null(_row, cid, is_null); } const Schema* schema() const { return _schema; } void* row_ptr() const { return _row; } + bool is_delete() const { + if (_with_delete_flag) { + return *reinterpret_cast((char*)_row + _schema->schema_size()); + } else { + return false; + } + } + void set_delete(bool val) { + if (_with_delete_flag) { + *reinterpret_cast((char*)_row + _schema->schema_size()) = val; + } + } + bool with_delete_flag() const { return _with_delete_flag; } + private: const Schema* _schema; void* _row; + bool _with_delete_flag = false; }; -template -bool equal_row(const std::vector& ids, - const LhsRowType& lhs, const RhsRowType& rhs) { +template +bool equal_row(const std::vector& ids, const LhsRowType& lhs, const RhsRowType& rhs) { for (auto id : ids) { if (!lhs.schema()->column(id)->equal(lhs.cell(id), rhs.cell(id))) { return false; @@ -56,7 +70,7 @@ bool equal_row(const std::vector& ids, return true; } -template +template int compare_row(const LhsRowType& lhs, const RhsRowType& rhs) { for (uint32_t cid = 0; cid < lhs.schema()->num_key_columns(); ++cid) { auto res = lhs.schema()->column(cid)->compare_cell(lhs.cell(cid), rhs.cell(cid)); @@ -72,7 +86,7 @@ int compare_row(const LhsRowType& lhs, const RhsRowType& rhs) { // So we should compare the common prefix columns of lhs and rhs. // // NOTE: if you are not sure if you can use it, please don't use this function. -template +template int compare_row_key(const LhsRowType& lhs, const RhsRowType& rhs) { auto cmp_cids = std::min(lhs.schema()->num_column_ids(), rhs.schema()->num_column_ids()); for (uint32_t cid = 0; cid < cmp_cids; ++cid) { @@ -90,7 +104,7 @@ int compare_row_key(const LhsRowType& lhs, const RhsRowType& rhs) { // // NOTE: Client should assure that lhs and rhs only contain KEY column ids in its // Schema. Otherwise it may lead to an error. -template +template int index_compare_row(const LhsRowType& lhs, const RhsRowType& rhs) { auto cmp_cids = std::min(lhs.schema()->num_column_ids(), rhs.schema()->num_column_ids()); for (uint32_t cid = 0; cid < cmp_cids; ++cid) { @@ -106,8 +120,9 @@ int index_compare_row(const LhsRowType& lhs, const RhsRowType& rhs) { // will direct_copy source column to destination column, and for value columns, this // function will first initialize destination column and then update with source column // value. -template -void init_row_with_others(DstRowType* dst, const SrcRowType& src, MemPool* mem_pool, ObjectPool* agg_pool) { +template +void init_row_with_others(DstRowType* dst, const SrcRowType& src, MemPool* mem_pool, + ObjectPool* agg_pool) { for (auto cid : dst->schema()->column_ids()) { auto dst_cell = dst->cell(cid); dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid), mem_pool, agg_pool); @@ -116,7 +131,7 @@ void init_row_with_others(DstRowType* dst, const SrcRowType& src, MemPool* mem_p // Copy other row to destination directly. This function assume // that destination has enough space for source conetent. -template +template void direct_copy_row(DstRowType* dst, const SrcRowType& src) { for (auto cid : dst->schema()->column_ids()) { auto dst_cell = dst->cell(cid); @@ -125,7 +140,7 @@ void direct_copy_row(DstRowType* dst, const SrcRowType& src) { } // Deep copy other row's content into itself. -template +template void copy_row(DstRowType* dst, const SrcRowType& src, MemPool* pool) { for (auto cid : dst->schema()->column_ids()) { auto dst_cell = dst->cell(cid); @@ -138,7 +153,7 @@ void copy_row(DstRowType* dst, const SrcRowType& src, MemPool* pool) { // its content is in Slice->data, other than a normal Slice, so we just assign the Slice->data // pointer, instead of memcpy the data. // TODO(lingbin): remove this method -template +template void copy_row_in_memtable(DstRowType* dst, const SrcRowType& src, MemPool* pool) { for (auto cid : dst->schema()->column_ids()) { auto dst_cell = dst->cell(cid); @@ -147,19 +162,27 @@ void copy_row_in_memtable(DstRowType* dst, const SrcRowType& src, MemPool* pool) } } -template +template <> +void copy_row_in_memtable(ContiguousRow* dst, const ContiguousRow& src, + MemPool* pool); + +template void agg_update_row(DstRowType* dst, const SrcRowType& src, MemPool* mem_pool) { - for (uint32_t cid = dst->schema()->num_key_columns(); cid < dst->schema()->num_columns(); ++cid) { + for (uint32_t cid = dst->schema()->num_key_columns(); cid < dst->schema()->num_columns(); + ++cid) { auto dst_cell = dst->cell(cid); auto src_cell = src.cell(cid); dst->schema()->column(cid)->agg_update(&dst_cell, src_cell, mem_pool); } } +template <> +void agg_update_row(ContiguousRow* dst, const ContiguousRow& src, MemPool* mem_pool); + // Do aggregate update source row to destination row. // This funcion will operate on given cids. // TODO(zc): unify two versions of agg_update_row -template +template void agg_update_row(const std::vector& cids, DstRowType* dst, const SrcRowType& src) { for (auto cid : cids) { auto dst_cell = dst->cell(cid); @@ -168,15 +191,20 @@ void agg_update_row(const std::vector& cids, DstRowType* dst, const Sr } } -template +template <> +void agg_update_row(const std::vector& cids, ContiguousRow* dst, + const ContiguousRow& src); + +template void agg_finalize_row(RowType* row, MemPool* mem_pool) { - for (uint32_t cid = row->schema()->num_key_columns(); cid < row->schema()->num_columns(); ++cid) { + for (uint32_t cid = row->schema()->num_key_columns(); cid < row->schema()->num_columns(); + ++cid) { auto cell = row->cell(cid); row->schema()->column(cid)->agg_finalize(&cell, mem_pool); } } -template +template void agg_finalize_row(const std::vector& ids, RowType* row, MemPool* mem_pool) { for (uint32_t id : ids) { auto cell = row->cell(id); @@ -184,7 +212,7 @@ void agg_finalize_row(const std::vector& ids, RowType* row, MemPool* m } } -template +template uint32_t hash_row(const RowType& row, uint32_t seed) { for (uint32_t cid : row.schema()->column_ids()) { FieldType type = row.schema()->column(cid)->type(); @@ -198,7 +226,7 @@ uint32_t hash_row(const RowType& row, uint32_t seed) { return seed; } -template +template std::string print_row(const RowType& row) { std::stringstream ss; @@ -213,4 +241,6 @@ std::string print_row(const RowType& row) { return ss.str(); } -} +template <> +std::string print_row(const ContiguousRow& row); +} // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index c75f056c426e80..8e474a7dc94953 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -89,6 +89,7 @@ OLAPStatus BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte template OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) { + VLOG_ROW << "write row: " << print_row(row); if (PREDICT_FALSE(_segment_writer == nullptr)) { RETURN_NOT_OK(_create_segment_writer()); } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 372c80894f0bff..b0d0375dabd308 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -519,5 +519,21 @@ int64_t RuntimeState::get_load_mem_limit() { } } +TMergeType::type RuntimeState::get_merge_type() { + TMergeType::type merge_type = TMergeType::APPEND; + if (_query_options.__isset.merge_type) { + merge_type = _query_options.merge_type; + } + return merge_type; +} + +int32_t RuntimeState::get_delete_slot_id() { + int32_t slot_offset = -1; + if (_query_options.__isset.delete_slot_id) { + slot_offset = _query_options.delete_slot_id; + } + return slot_offset; +} + } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index dcd97e2b3eec3e..c05f8f81e608e1 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -495,6 +495,11 @@ class RuntimeState { // if load mem limit is not set, or is zero, using query mem limit instead. int64_t get_load_mem_limit(); + // get load type(APPEND|MERGE|DELETE) + TMergeType::type get_merge_type(); + + int32_t get_delete_slot_id(); + private: // Allow TestEnv to set block_mgr manually for testing. friend class TestEnv; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 2a125af94f7dab..d7a3be24f01da0 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -103,7 +103,9 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { return Status::InternalError(strings::Substitute( "unknown tablet to append data, tablet=$0", tablet_id)); } - auto st = it->second->write(row_batch.get_row(i)->get_tuple(0)); + Tuple* tuple = row_batch.get_row(i)->get_tuple(0); + TupleDescriptor* tuple_desc = row_batch.row_desc().tuple_descriptors()[0]; + auto st = it->second->write(tuple, tuple_desc); if (st != OLAP_SUCCESS) { const std::string& err_msg = strings::Substitute( "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2", @@ -233,6 +235,8 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& params) request.need_gen_rollup = params.need_gen_rollup(); request.tuple_desc = _tuple_desc; request.slots = index_slots; + request.merge_type = params.merge_type(); + request.delete_slot_id = params.delete_slot_id(); DeltaWriter* writer = nullptr; auto st = DeltaWriter::open(&request, _mem_tracker.get(), &writer); diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index a14c74d2f8b586..0bf9d7a7d7015c 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -344,7 +344,7 @@ TEST_F(TestDeltaWriter, write) { load_id.set_lo(0); WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002, 30002, load_id, false, tuple_desc, - &(tuple_desc->slots())}; + &(tuple_desc->slots()), PTabletWriterOpenRequest::APPEND, -1}; DeltaWriter* delta_writer = nullptr; DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); ASSERT_NE(delta_writer, nullptr); diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp index bda51c49cf1803..c31ebcc58f3e5d 100644 --- a/be/test/runtime/load_channel_mgr_test.cpp +++ b/be/test/runtime/load_channel_mgr_test.cpp @@ -65,6 +65,10 @@ OLAPStatus DeltaWriter::open(WriteRequest* req, MemTracker* mem_tracker, DeltaWr return open_status; } +OLAPStatus DeltaWriter::write(Tuple* tuple, TupleDescriptor* tuple_desc) { + return write(tuple); +} + OLAPStatus DeltaWriter::write(Tuple* tuple) { if (_k_tablet_recorder.find(_req.tablet_id) == std::end(_k_tablet_recorder)) { _k_tablet_recorder[_req.tablet_id] = 1; diff --git a/fe/fe-core/SchemaChangeV2Test b/fe/fe-core/SchemaChangeV2Test new file mode 100644 index 00000000000000..b233ff80a09960 Binary files /dev/null and b/fe/fe-core/SchemaChangeV2Test differ diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index d6b55bd89b8628..ccaf96df5ed4fd 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -37,6 +37,7 @@ import org.apache.doris.catalog.View; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Version; import org.apache.doris.mysql.MysqlPassword; +import org.apache.doris.load.loadv2.LoadTask; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -226,7 +227,7 @@ parser code {: :}; // Total keywords of doris -terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_AS, KW_ASC, KW_AUTHORS, +terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_APPEND, KW_AS, KW_ASC, KW_AUTHORS, KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BITMAP, KW_BITMAP_UNION, KW_BOOLEAN, KW_BOTH, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN, KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS, KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, @@ -287,7 +288,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, describe_stmt, alter_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, link_stmt, migrate_stmt, enter_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, - import_columns_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt; + import_columns_stmt, import_delete_on_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt; nonterminal ImportColumnDesc import_column_desc; nonterminal List import_column_descs; @@ -342,6 +343,7 @@ nonterminal ClusterName des_cluster_name; nonterminal TableName table_name; nonterminal FunctionName function_name; nonterminal Expr where_clause; +nonterminal Expr delete_on_clause; nonterminal Expr where_clause_without_null; nonterminal Predicate predicate, between_predicate, comparison_predicate, compound_predicate, in_predicate, like_predicate, exists_predicate; @@ -383,6 +385,8 @@ nonterminal Qualifier opt_set_qualifier; nonterminal Operation set_op; nonterminal ArrayList opt_common_hints; +nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type; + // Set type nonterminal SetType option_type, opt_var_type, var_ident_type; @@ -523,6 +527,10 @@ stmts ::= {: RESULT = Lists.newArrayList(stmt); :} + | import_delete_on_stmt:stmt + {: + RESULT = Lists.newArrayList(stmt); + :} ; import_columns_stmt ::= @@ -562,6 +570,13 @@ import_where_stmt ::= :} ; +import_delete_on_stmt ::= + KW_DELETE KW_ON expr:expr + {: + RESULT = new ImportDeleteOnStmt(expr); + :} + ; + stmt ::= alter_stmt:stmt {: RESULT = stmt; :} @@ -1246,7 +1261,56 @@ data_desc_list ::= :} ; +opt_merge_type ::= + {: + RESULT = LoadTask.MergeType.APPEND; + :} + | KW_APPEND + {: + RESULT = LoadTask.MergeType.APPEND; + :} + | KW_DELETE + {: + RESULT = LoadTask.MergeType.DELETE; + :} + | KW_MERGE + {: + RESULT = LoadTask.MergeType.MERGE; + :} + | KW_WITH KW_APPEND + {: + RESULT = LoadTask.MergeType.APPEND; + :} + | KW_WITH KW_DELETE + {: + RESULT = LoadTask.MergeType.DELETE; + :} + | KW_WITH KW_MERGE + {: + RESULT = LoadTask.MergeType.MERGE; + :} + ; + +opt_with_merge_type ::= + {: + RESULT = LoadTask.MergeType.APPEND; + :} + | KW_WITH KW_APPEND + {: + RESULT = LoadTask.MergeType.APPEND; + :} + | KW_WITH KW_DELETE + {: + RESULT = LoadTask.MergeType.DELETE; + :} + | KW_WITH KW_MERGE + {: + RESULT = LoadTask.MergeType.MERGE; + :} + ; + data_desc ::= + opt_merge_type:mergeType KW_DATA KW_INFILE LPAREN string_list:files RPAREN opt_negative:isNeg KW_INTO KW_TABLE ident:tableName @@ -1257,18 +1321,21 @@ data_desc ::= opt_columns_from_path:columnsFromPath opt_col_mapping_list:colMappingList where_clause:whereExpr + delete_on_clause:deleteExpr {: RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, - columnsFromPath, isNeg, colMappingList, whereExpr); + columnsFromPath, isNeg, colMappingList, whereExpr, mergeType, deleteExpr); :} - | KW_DATA KW_FROM KW_TABLE ident:srcTableName + | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName opt_negative:isNeg KW_INTO KW_TABLE ident:tableName opt_partition_names:partitionNames opt_col_mapping_list:colMappingList where_clause:whereExpr + delete_on_clause:deleteExpr {: - RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr); + RESULT = new DataDescription(tableName, partitionNames, srcTableName, isNeg, colMappingList, whereExpr, + mergeType, deleteExpr); :} ; @@ -1411,11 +1478,13 @@ resource_desc ::= // Routine load statement create_routine_load_stmt ::= KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel KW_ON ident:tableName + opt_with_merge_type:mergeType opt_load_property_list:loadPropertyList opt_properties:properties KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN {: - RESULT = new CreateRoutineLoadStmt(jobLabel, tableName, loadPropertyList, properties, type, customProperties); + RESULT = new CreateRoutineLoadStmt(jobLabel, tableName, loadPropertyList, + properties, type, customProperties, mergeType); :} ; @@ -1447,6 +1516,10 @@ load_property ::= {: RESULT = wherePredicate; :} + | import_delete_on_stmt:deletePredicate + {: + RESULT = deletePredicate; + :} | partition_names:partitionNames {: RESULT = partitionNames; @@ -3565,6 +3638,12 @@ where_clause ::= | KW_WHERE expr:e {: RESULT = e; :} ; +delete_on_clause ::= + /* empty */ + {: RESULT = null; :} + | KW_DELETE KW_ON expr:e + {: RESULT = e; :} + ; where_clause_without_null ::= KW_WHERE expr:e diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 6861cafdc16c36..00d2ee2ec54c3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -29,6 +29,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.KafkaProgress; import org.apache.doris.load.routineload.LoadDataSourceType; import org.apache.doris.load.routineload.RoutineLoadJob; @@ -88,6 +89,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY = "max_batch_interval"; public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows"; public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size"; + public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit"; public static final String FORMAT = "format";// the value is csv or json, default is csv public static final String STRIP_OUTER_ARRAY = "strip_outer_array"; @@ -117,6 +119,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(JSONROOT) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) + .add(EXEC_MEM_LIMIT_PROPERTY) .build(); private static final ImmutableSet KAFKA_PROPERTIES_SET = new ImmutableSet.Builder() @@ -144,6 +147,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private long maxBatchRows = -1; private long maxBatchSizeBytes = -1; private boolean strictMode = true; + private long execMemLimit = 2 * 1024 * 1024 * 1024L; private String timezone = TimeUtils.DEFAULT_TIME_ZONE; /** * RoutineLoad support json data. @@ -164,22 +168,26 @@ public class CreateRoutineLoadStmt extends DdlStmt { //custom kafka property map private Map customKafkaProperties = Maps.newHashMap(); + private LoadTask.MergeType mergeType; + private Expr deleteCondition; private static final Predicate DESIRED_CONCURRENT_NUMBER_PRED = (v) -> { return v > 0L; }; private static final Predicate MAX_ERROR_NUMBER_PRED = (v) -> { return v >= 0L; }; private static final Predicate MAX_BATCH_INTERVAL_PRED = (v) -> { return v >= 5 && v <= 60; }; private static final Predicate MAX_BATCH_ROWS_PRED = (v) -> { return v >= 200000; }; private static final Predicate MAX_BATCH_SIZE_PRED = (v) -> { return v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; }; + private static final Predicate EXEC_MEM_LIMIT_PRED = (v) -> { return v >= 0L; }; public CreateRoutineLoadStmt(LabelName labelName, String tableName, List loadPropertyList, - Map jobProperties, - String typeName, Map dataSourceProperties) { + Map jobProperties, String typeName, + Map dataSourceProperties, LoadTask.MergeType mergeType) { this.labelName = labelName; this.tableName = tableName; this.loadPropertyList = loadPropertyList; this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties; this.typeName = typeName.toUpperCase(); this.dataSourceProperties = dataSourceProperties; + this.mergeType = mergeType; } public String getName() { @@ -222,6 +230,10 @@ public long getMaxBatchSize() { return maxBatchSizeBytes; } + public long getExecMemLimit() { + return execMemLimit; + } + public boolean isStrictMode() { return strictMode; } @@ -294,6 +306,7 @@ public void checkLoadProperties() throws UserException { ImportColumnsStmt importColumnsStmt = null; ImportWhereStmt importWhereStmt = null; PartitionNames partitionNames = null; + ImportDeleteOnStmt importDeleteOnStmt = null; for (ParseNode parseNode : loadPropertyList) { if (parseNode instanceof ColumnSeparator) { // check column separator @@ -321,10 +334,16 @@ public void checkLoadProperties() throws UserException { } partitionNames = (PartitionNames) parseNode; partitionNames.analyze(null); + } else if (parseNode instanceof ImportDeleteOnStmt) { + // check delete expr + if (importDeleteOnStmt != null) { + throw new AnalysisException("repeat setting of delete predicate"); + } + importDeleteOnStmt = (ImportDeleteOnStmt) parseNode; } } routineLoadDesc = new RoutineLoadDesc(columnSeparator, importColumnsStmt, importWhereStmt, - partitionNames); + partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType); } private void checkJobProperties() throws UserException { @@ -357,7 +376,8 @@ private void checkJobProperties() throws UserException { strictMode = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE), RoutineLoadJob.DEFAULT_STRICT_MODE, LoadStmt.STRICT_MODE + " should be a boolean"); - + execMemLimit = Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY), + RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, EXEC_MEM_LIMIT_PROPERTY + "should > 0"); if (ConnectContext.get() != null) { timezone = ConnectContext.get().getSessionVariable().getTimeZone(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 0d621c05ce136e..6035f0c3380080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -27,6 +27,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Pair; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TNetworkAddress; @@ -47,6 +48,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; + // used to describe data info which is needed to import. // // data_desc: @@ -120,6 +122,9 @@ public class DataDescription { private boolean isHadoopLoad = false; + private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; + private Expr deleteCondition; + public DataDescription(String tableName, PartitionNames partitionNames, List filePaths, @@ -128,7 +133,8 @@ public DataDescription(String tableName, String fileFormat, boolean isNegative, List columnMappingList) { - this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, isNegative, columnMappingList, null); + this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, + isNegative, columnMappingList, null, LoadTask.MergeType.APPEND, null); } public DataDescription(String tableName, @@ -140,7 +146,9 @@ public DataDescription(String tableName, List columnsFromPath, boolean isNegative, List columnMappingList, - Expr whereExpr) { + Expr whereExpr, + LoadTask.MergeType mergeType, + Expr deleteCondition) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = filePaths; @@ -152,6 +160,8 @@ public DataDescription(String tableName, this.columnMappingList = columnMappingList; this.whereExpr = whereExpr; this.srcTableName = null; + this.mergeType = mergeType; + this.deleteCondition = deleteCondition; } // data from table external_hive_table @@ -160,7 +170,9 @@ public DataDescription(String tableName, String srcTableName, boolean isNegative, List columnMappingList, - Expr whereExpr) { + Expr whereExpr, + LoadTask.MergeType mergeType, + Expr deleteCondition) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = null; @@ -172,6 +184,9 @@ public DataDescription(String tableName, this.columnMappingList = columnMappingList; this.whereExpr = whereExpr; this.srcTableName = srcTableName; + this.mergeType = mergeType; + this.deleteCondition = deleteCondition + ; } public String getTableName() { @@ -186,6 +201,17 @@ public Expr getWhereExpr(){ return whereExpr; } + public LoadTask.MergeType getMergeType() { + if (mergeType == null) { + return LoadTask.MergeType.APPEND; + } + return mergeType; + } + + public Expr getDeleteCondition() { + return deleteCondition; + } + public List getFilePaths() { return filePaths; } @@ -582,8 +608,20 @@ private void checkLoadPriv(String fullDbName) throws AnalysisException { } public void analyze(String fullDbName) throws AnalysisException { + if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) { + throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE"); + } + if (mergeType == LoadTask.MergeType.MERGE && deleteCondition == null) { + throw new AnalysisException("Except DELETE ON clause where merge type is MERGE"); + } + if (mergeType != LoadTask.MergeType.APPEND && isNegative) { + throw new AnalysisException("not support MERGE or DELETE with NEGATIVE"); + } checkLoadPriv(fullDbName); analyzeWithoutCheckPriv(); + if (isNegative && mergeType != LoadTask.MergeType.APPEND) { + throw new AnalysisException("Negative is only used when merge type is append."); + } } public void analyzeWithoutCheckPriv() throws AnalysisException { @@ -653,9 +691,11 @@ public void fillColumnInfoIfNotSpecified(List baseSchema) throws DdlExce public String toSql() { StringBuilder sb = new StringBuilder(); if (isLoadFromTable()) { - sb.append("DATA FROM TABLE ").append(srcTableName); + sb.append(mergeType.toString()); + sb.append(" DATA FROM TABLE ").append(srcTableName); } else { - sb.append("DATA INFILE ("); + sb.append(mergeType.toString()); + sb.append(" DATA INFILE ("); Joiner.on(", ").appendTo(sb, Lists.transform(filePaths, new Function() { @Override public String apply(String s) { @@ -691,6 +731,12 @@ public Object apply(Expr expr) { } })).append(")"); } + if (whereExpr != null) { + sb.append(" WHERE ").append(whereExpr.toSql()); + } + if (deleteCondition != null && mergeType == LoadTask.MergeType.MERGE) { + sb.append(" DELETE ON ").append(deleteCondition.toSql()); + } return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportDeleteOnStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportDeleteOnStmt.java new file mode 100644 index 00000000000000..22db97455ab021 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportDeleteOnStmt.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +public class ImportDeleteOnStmt extends StatementBase { + private Expr expr; + + public ImportDeleteOnStmt(Expr expr) { + this.expr = expr; + } + + public Expr getExpr() { + return expr; + } + + @Override + public RedirectStatus getRedirectStatus() { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index b3636b66244835..110a928d18f27a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -18,6 +18,9 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -26,6 +29,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -272,6 +276,11 @@ public void analyze(Analyzer analyzer) throws UserException { if (dataDescription.isLoadFromTable()) { isLoadFromTable = true; } + Table table = Catalog.getCurrentCatalog().getDb(label.getDbName()).getTable(dataDescription.getTableName()); + if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND && + (!(table instanceof OlapTable) || ((OlapTable) table).getKeysType() != KeysType.UNIQUE_KEYS)) { + throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables."); + } } if (isLoadFromTable) { if (dataDescriptions.size() > 1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 7a450a641a4b93..2e064437dcbf7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -43,6 +43,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -78,6 +79,8 @@ public class BrokerFileGroup implements Writable { private Map>> columnToHadoopFunction; // filter the data which has been conformed private Expr whereExpr; + private Expr deleteCondition; + private LoadTask.MergeType mergeType; // load from table private long srcTableId = -1; @@ -103,6 +106,8 @@ public BrokerFileGroup(DataDescription dataDescription) { this.columnExprList = dataDescription.getParsedColumnExprList(); this.columnToHadoopFunction = dataDescription.getColumnToHadoopFunction(); this.whereExpr = dataDescription.getWhereExpr(); + this.deleteCondition = dataDescription.getDeleteCondition(); + this.mergeType = dataDescription.getMergeType(); } // NOTE: DBLock will be held @@ -255,6 +260,14 @@ public boolean isLoadFromTable() { return isLoadFromTable; } + public Expr getDeleteCondition() { + return deleteCondition; + } + + public LoadTask.MergeType getMergeType() { + return mergeType; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 99218b090441ef..223d05df6e0668 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -137,6 +137,7 @@ public class Load { private static final Logger LOG = LogManager.getLogger(Load.class); public static final String VERSION = "v1"; + public static final String BATCH_DELETE_VIRTUAL_COL = "__DORIS_BATCH_DELETE_COL__"; // valid state change map private static final Map> STATE_CHANGE_MAP = Maps.newHashMap(); @@ -961,7 +962,8 @@ public static void initColumns(Table tbl, List columnExprs, // check mapping column exist in schema // !! all column mappings are in columnExprs !! for (ImportColumnDesc importColumnDesc : columnExprs) { - if (importColumnDesc.isColumn()) { + if (importColumnDesc.isColumn() + || importColumnDesc.getColumnName().equalsIgnoreCase(BATCH_DELETE_VIRTUAL_COL)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java index 4098142f9a221d..86f6786d3a11f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java @@ -18,23 +18,30 @@ package org.apache.doris.load; import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnsStmt; import org.apache.doris.analysis.ImportWhereStmt; import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.load.loadv2.LoadTask; public class RoutineLoadDesc { private final ColumnSeparator columnSeparator; private final ImportColumnsStmt columnsInfo; private final ImportWhereStmt wherePredicate; + private final Expr deleteCondition; + private LoadTask.MergeType mergeType; // nullable private final PartitionNames partitionNames; public RoutineLoadDesc(ColumnSeparator columnSeparator, ImportColumnsStmt columnsInfo, - ImportWhereStmt wherePredicate, PartitionNames partitionNames) { + ImportWhereStmt wherePredicate, PartitionNames partitionNames, + Expr deleteCondition, LoadTask.MergeType mergeType) { this.columnSeparator = columnSeparator; this.columnsInfo = columnsInfo; this.wherePredicate = wherePredicate; this.partitionNames = partitionNames; + this.deleteCondition = deleteCondition; + this.mergeType = mergeType; } public ColumnSeparator getColumnSeparator() { @@ -49,8 +56,16 @@ public ImportWhereStmt getWherePredicate() { return wherePredicate; } + public LoadTask.MergeType getMergeType() { + return mergeType; + } + // nullable public PartitionNames getPartitionNames() { return partitionNames; } + + public Expr getDeleteCondition() { + return deleteCondition; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 4d658fec83b848..8754a7bbcb9b30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -138,7 +138,14 @@ private void checkAndSetDataSourceInfo(Database db, List dataDe // check data source info db.readLock(); try { + LoadTask.MergeType mergeType = null; for (DataDescription dataDescription : dataDescriptions) { + if (mergeType == null) { + mergeType = dataDescription.getMergeType(); + } + if (mergeType != dataDescription.getMergeType()) { + throw new DdlException("merge type in all statement must be the same."); + } BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription); fileGroup.parse(db, dataDescription); fileGroupAggInfo.addFileGroup(fileGroup); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index caa6081e7073e7..a4701eb375217f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -116,6 +116,8 @@ private void executeOnce() throws Exception { */ curCoordinator.setLoadMemLimit(execMemLimit); curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000)); + curCoordinator.setMergeType(this.fileGroups.get(0).getMergeType()); + curCoordinator.setDelSlotId(planner.getDelSlotId()); try { QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index 0a52369362882d..9a4568a1bc10d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -30,6 +30,12 @@ public abstract class LoadTask extends MasterTask { + public enum MergeType { + MERGE, + APPEND, + DELETE + } + private static final Logger LOG = LogManager.getLogger(LoadTask.class); protected LoadTaskCallback callback; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 803e4352b16315..042ec1316ac3a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -26,12 +26,14 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Type; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.Load; import org.apache.doris.planner.BrokerScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.OlapTableSink; @@ -53,6 +55,8 @@ import java.util.List; import java.util.Set; +import static org.apache.doris.catalog.AggregateType.REPLACE; + public class LoadingTaskPlanner { private static final Logger LOG = LogManager.getLogger(LoadingTaskPlanner.class); @@ -76,6 +80,11 @@ public class LoadingTaskPlanner { private List scanNodes = Lists.newArrayList(); private int nextNodeId = 0; + private int delSlotId = -1; + + public int getDelSlotId() { + return delSlotId; + } public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, @@ -114,7 +123,13 @@ public void plan(TUniqueId loadId, List> fileStatusesLis slotDesc.setIsNullable(false); } } - + // Add a virtual column to indicate whether the data in this row is deleted + SlotDescriptor delSlotDesc = descTable.addSlotDescriptor(tupleDesc); + delSlotDesc.setIsMaterialized(true); + delSlotDesc.setColumn(new Column(Load.BATCH_DELETE_VIRTUAL_COL, Type.BOOLEAN, false, REPLACE, false, + String.valueOf(fileGroups.get(0).getMergeType() == LoadTask.MergeType.DELETE), + "merge load virtual column")); + delSlotId = delSlotDesc.getId().asInt(); // Generate plan trees // 1. Broker scan node BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode", diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index feaf50f171fcdc..6c6e406b61b183 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -46,7 +46,9 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.Load; import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.planner.StreamLoadPlanner; @@ -54,8 +56,10 @@ import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.SqlModeHelper; -import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionException; @@ -91,7 +95,7 @@ * The desireTaskConcurrentNum means that user expect the number of concurrent stream load * The routine load job support different streaming medium such as KAFKA */ -public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback implements Writable { +public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback implements Writable, LoadTaskInfo { private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class); public static final long DEFAULT_MAX_ERROR_NUM = 0; @@ -99,6 +103,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public static final long DEFAULT_MAX_INTERVAL_SECOND = 10; public static final long DEFAULT_MAX_BATCH_ROWS = 200000; public static final long DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB + public static final long DEFAULT_EXEC_MEM_LIMIT = 2 * 1024 * 1024 * 1024L; public static final boolean DEFAULT_STRICT_MODE = false; // default is false protected static final String STAR_STRING = "*"; @@ -156,6 +161,7 @@ public boolean isFinalState() { // maxErrorNum / (maxBatchRows * 10) = max error rate of routine load job // if current error rate is more then max error rate, the job will be paused protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional + protected long execMemLimit = DEFAULT_EXEC_MEM_LIMIT; // include strict mode protected Map jobProperties = Maps.newHashMap(); @@ -229,6 +235,8 @@ public boolean isFinalState() { protected OriginStatement origStmt; protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete + protected Expr deleteCondition; // TODO(ml): error sample // save the latest 3 error log urls @@ -279,7 +287,11 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (stmt.getMaxBatchSize() != -1) { this.maxBatchSizeBytes = stmt.getMaxBatchSize(); } + if (stmt.getExecMemLimit() != -1) { + this.execMemLimit = stmt.getExecMemLimit(); + } jobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(stmt.isStrictMode())); + jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(stmt.getExecMemLimit())); if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) { jobProperties.put(PROPS_FORMAT, "csv"); jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false"); @@ -327,6 +339,13 @@ private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { if (routineLoadDesc.getPartitionNames() != null) { partitions = routineLoadDesc.getPartitionNames(); } + mergeType = routineLoadDesc.getMergeType(); + if (routineLoadDesc.getDeleteCondition() != null) { + deleteCondition = routineLoadDesc.getDeleteCondition(); + } + if (mergeType == LoadTask.MergeType.MERGE) { + columnDescs.add(new ImportColumnDesc(Load.BATCH_DELETE_VIRTUAL_COL, deleteCondition)); + } } } @@ -413,8 +432,28 @@ public PartitionNames getPartitions() { return partitions; } - public List getColumnDescs() { - return columnDescs; + @Override + public LoadTask.MergeType getMergeType() { + return mergeType; + } + + @Override + public Expr getDeleteCondition() { + return deleteCondition; + } + + @Override + public TFileType getFileType() { + return TFileType.FILE_STREAM; + } + + @Override + public TFileFormatType getFormatType() { + TFileFormatType fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + if (getFormat().equals("json")) { + fileFormatType = TFileFormatType.FORMAT_JSON; + } + return fileFormatType; } public Expr getWhereExpr() { @@ -433,6 +472,30 @@ public boolean isStrictMode() { return Boolean.valueOf(value); } + @Override + public boolean getNegative() { + return false; + } + + @Override + public long getTxnId() { + return -1L; + } + + @Override + public int getTimeout() { + return (int) getMaxBatchIntervalS(); + } + + @Override + public long getMemLimit() { + String value = jobProperties.get(LoadStmt.EXEC_MEM_LIMIT); + if (value == null) { + return DEFAULT_EXEC_MEM_LIMIT; + } + return Long.valueOf(value); + } + public String getTimezone() { String value = jobProperties.get(LoadStmt.TIMEZONE); if (value == null) { @@ -469,6 +532,19 @@ public boolean isStripOuterArray() { return Boolean.valueOf(jobProperties.get(PROPS_STRIP_OUTER_ARRAY)); } + @Override + public String getPath() { + return null; + } + + @Override + public List getColumnExprDescs() { + if (columnDescs == null) { + return new ArrayList<>(); + } + return columnDescs; + } + public String getJsonPaths() { String value = jobProperties.get(PROPS_JSONPATHS); if (value == null) { @@ -662,12 +738,11 @@ public void prepare() throws UserException { } private void initPlanner() throws UserException { - StreamLoadTask streamLoadTask = StreamLoadTask.fromRoutineLoadJob(this); Database db = Catalog.getCurrentCatalog().getDb(dbId); if (db == null) { throw new MetaNotFoundException("db " + dbId + " does not exist"); } - planner = new StreamLoadPlanner(db, (OlapTable) db.getTable(this.tableId), streamLoadTask); + planner = new StreamLoadPlanner(db, (OlapTable) db.getTable(this.tableId), this); } public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId) throws UserException { @@ -1242,6 +1317,9 @@ private String jobPropertiesToJsonString() { jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows)); jobProperties.put("maxBatchSizeBytes", String.valueOf(maxBatchSizeBytes)); jobProperties.put("currentTaskConcurrentNum", String.valueOf(currentTaskConcurrentNum)); + jobProperties.put("execMemLimit", String.valueOf(execMemLimit)); + jobProperties.put("mergeType", mergeType.toString()); + jobProperties.put("deleteCondition", deleteCondition == null ? STAR_STRING : deleteCondition.toSql()); Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(jobProperties); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index a4427eddfc4e3f..29128c276c8935 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -18,43 +18,31 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.ArithmeticExpr; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.ImportColumnDesc; -import org.apache.doris.analysis.IntLiteral; -import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FsBroker; -import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TBrokerRangeDesc; -import org.apache.doris.thrift.TBrokerScanNode; import org.apache.doris.thrift.TBrokerScanRange; import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TPlanNode; -import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; @@ -99,7 +87,6 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) { // used both for load statement and select statement private long totalBytes; - private int numInstances; private long bytesPerInstance; // Parameters need to process @@ -208,6 +195,8 @@ private void initParams(ParamCreateContext context) params.setProperties(brokerDesc.getProperties()); initColumns(context); initWhereExpr(fileGroup.getWhereExpr(), analyzer); + deleteCondition = fileGroup.getDeleteCondition(); + mergeType = fileGroup.getMergeType(); } /** @@ -216,6 +205,7 @@ private void initParams(ParamCreateContext context) * The smap of slot which belongs to expr will be analyzed by src desc. * slotDescByName: the single slot from columns in load stmt * exprMap: the expr from column mapping in load stmt. + * * @param context * @throws UserException */ @@ -229,6 +219,9 @@ private void initColumns(ParamCreateContext context) throws UserException { List columnExprs = Lists.newArrayList(); if (isLoad()) { columnExprs = context.fileGroup.getColumnExprList(); + if (mergeType == LoadTask.MergeType.MERGE) { + columnExprs.add(new ImportColumnDesc(Load.BATCH_DELETE_VIRTUAL_COL, deleteCondition)); + } } Load.initColumns(targetTable, columnExprs, @@ -236,76 +229,6 @@ private void initColumns(ParamCreateContext context) throws UserException { context.tupleDescriptor, context.slotDescByName, context.params); } - private void finalizeParams(ParamCreateContext context) throws UserException, AnalysisException { - Map slotDescByName = context.slotDescByName; - Map exprMap = context.exprMap; - Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); - - boolean isNegative = context.fileGroup.isNegative(); - for (SlotDescriptor destSlotDesc : desc.getSlots()) { - if (!destSlotDesc.isMaterialized()) { - continue; - } - Expr expr = null; - if (exprMap != null) { - expr = exprMap.get(destSlotDesc.getColumn().getName()); - } - if (expr == null) { - SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); - if (srcSlotDesc != null) { - destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); - // If dest is allow null, we set source to nullable - if (destSlotDesc.getColumn().isAllowNull()) { - srcSlotDesc.setIsNullable(true); - } - expr = new SlotRef(srcSlotDesc); - } else { - Column column = destSlotDesc.getColumn(); - if (column.getDefaultValue() != null) { - expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); - } else { - if (column.isAllowNull()) { - expr = NullLiteral.create(column.getType()); - } else { - throw new UserException("Unknown slot ref(" - + destSlotDesc.getColumn().getName() + ") in source file"); - } - } - } - } - - // check hll_hash - if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { - if (!(expr instanceof FunctionCallExpr)) { - throw new AnalysisException("HLL column must use hll_hash function, like " - + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); - } - FunctionCallExpr fn = (FunctionCallExpr) expr; - if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") && !fn.getFnName().getFunction().equalsIgnoreCase("hll_empty")) { - throw new AnalysisException("HLL column must use hll_hash function, like " - + destSlotDesc.getColumn().getName() + "=hll_hash(xxx) or " + destSlotDesc.getColumn().getName() + "=hll_empty()"); - } - expr.setType(Type.HLL); - } - - checkBitmapCompatibility(analyzer, destSlotDesc, expr); - - // analyze negative - if (isNegative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { - expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); - expr.analyze(analyzer); - } - expr = castToSlot(destSlotDesc, expr); - context.params.putToExpr_of_dest_slot(destSlotDesc.getId().asInt(), expr.treeToThrift()); - } - context.params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans); - context.params.setSrc_tuple_id(context.tupleDescriptor.getId().asInt()); - context.params.setDest_tuple_id(desc.getId().asInt()); - context.params.setStrict_mode(strictMode); - // Need re compute memory layout after set some slot descriptor to nullable - context.tupleDescriptor.computeMemLayout(); - } - private TScanRangeLocations newLocations(TBrokerScanRangeParams params, String brokerName) throws UserException { Backend selectedBackend = backends.get(nextBe++); @@ -514,7 +437,8 @@ public void finalize(Analyzer analyzer) throws UserException { } ParamCreateContext context = paramCreateContexts.get(i); try { - finalizeParams(context); + finalizeParams(context.slotDescByName, context.exprMap, context.params, + context.tupleDescriptor, strictMode, context.fileGroup.isNegative(), analyzer); } catch (AnalysisException e) { throw new UserException(e.getMessage()); } @@ -533,23 +457,11 @@ public void finalize(Analyzer analyzer) throws UserException { } } - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.BROKER_SCAN_NODE; - TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); - msg.setBroker_scan_node(brokerScanNode); - } - @Override public List getScanRangeLocations(long maxScanRangeLength) { return locationsList; } - @Override - public int getNumInstances() { - return numInstances; - } - @Override protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java index a8478b24d7f333..1f9bb3b1b6adc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java @@ -18,15 +18,28 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.ArithmeticExpr; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.FunctionSet; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.thrift.TBrokerScanNode; +import org.apache.doris.thrift.TBrokerScanRangeParams; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -36,6 +49,10 @@ public abstract class LoadScanNode extends ScanNode { + protected Expr deleteCondition; + protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; + protected int numInstances; + public LoadScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { super(id, desc, planNodeName); } @@ -44,7 +61,7 @@ protected void initWhereExpr(Expr whereExpr, Analyzer analyzer) throws UserExcep if (whereExpr == null) { return; } - + Map dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); for (SlotDescriptor slotDescriptor : desc.getSlots()) { dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor); @@ -85,4 +102,84 @@ protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDe } } + protected void finalizeParams(Map slotDescByName, + Map exprMap, + TBrokerScanRangeParams params, + TupleDescriptor srcTupleDesc, + boolean strictMode, + boolean negative, + Analyzer analyzer) throws UserException { + Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); + for (SlotDescriptor destSlotDesc : desc.getSlots()) { + if (!destSlotDesc.isMaterialized()) { + continue; + } + Expr expr = null; + if (exprMap != null) { + expr = exprMap.get(destSlotDesc.getColumn().getName()); + } + if (expr == null) { + SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName()); + if (srcSlotDesc != null) { + destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); + // If dest is allow null, we set source to nullable + if (destSlotDesc.getColumn().isAllowNull()) { + srcSlotDesc.setIsNullable(true); + } + expr = new SlotRef(srcSlotDesc); + } else { + Column column = destSlotDesc.getColumn(); + if (column.getDefaultValue() != null) { + expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue()); + } else { + if (column.isAllowNull()) { + expr = NullLiteral.create(column.getType()); + } else { + throw new AnalysisException("column has no source field, column=" + column.getName()); + } + } + } + } + + // check hll_hash + if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { + if (!(expr instanceof FunctionCallExpr)) { + throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx)"); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) + && !fn.getFnName().getFunction().equalsIgnoreCase("hll_empty")) { + throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + + "(xxx) or " + destSlotDesc.getColumn().getName() + "=hll_empty()"); + } + expr.setType(Type.HLL); + } + + checkBitmapCompatibility(analyzer, destSlotDesc, expr); + + if (negative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { + expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); + expr.analyze(analyzer); + } + expr = castToSlot(destSlotDesc, expr); + params.putToExpr_of_dest_slot(destSlotDesc.getId().asInt(), expr.treeToThrift()); + } + params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans); + params.setDest_tuple_id(desc.getId().asInt()); + params.setStrict_mode(strictMode); + params.setSrc_tuple_id(srcTupleDesc.getId().asInt()); + // LOG.info("brokerScanRange is {}", brokerScanRange); + + // Need re compute memory layout after set some slot descriptor to nullable + srcTupleDesc.computeMemLayout(); + } + + @Override + protected void toThrift(TPlanNode planNode) { + planNode.setNode_type(TPlanNodeType.BROKER_SCAN_NODE); + TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); + planNode.setBroker_scan_node(brokerScanNode); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index bc96844e9c8bd1..df3e41c5a53178 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -26,17 +26,23 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.load.Load; import org.apache.doris.load.LoadErrorHub; -import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TLoadErrorHubInfo; +import org.apache.doris.thrift.TMergeType; import org.apache.doris.thrift.TPlanFragmentExecParams; import org.apache.doris.thrift.TQueryGlobals; import org.apache.doris.thrift.TQueryOptions; @@ -57,6 +63,8 @@ import java.util.List; import java.util.Map; +import static org.apache.doris.catalog.AggregateType.REPLACE; + // Used to generate a plan fragment for a streaming load. // we only support OlapTable now. // TODO(zc): support other type table @@ -68,15 +76,15 @@ public class StreamLoadPlanner { // Data will load to this table private Database db; private OlapTable destTable; - private StreamLoadTask streamLoadTask; + private LoadTaskInfo taskInfo; private Analyzer analyzer; private DescriptorTable descTable; - public StreamLoadPlanner(Database db, OlapTable destTable, StreamLoadTask streamLoadTask) { + public StreamLoadPlanner(Database db, OlapTable destTable, LoadTaskInfo taskInfo) { this.db = db; this.destTable = destTable; - this.streamLoadTask = streamLoadTask; + this.taskInfo = taskInfo; } private void resetAnalyzer() { @@ -94,10 +102,14 @@ public OlapTable getDestTable() { // create the plan. the plan's query id and load id are same, using the parameter 'loadId' public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { + if (destTable.getKeysType() != KeysType.UNIQUE_KEYS + && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) { + throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables."); + } resetAnalyzer(); // construct tuple descriptor, used for scanNode and dataSink TupleDescriptor tupleDesc = descTable.createTupleDescriptor("DstTableTuple"); - boolean negative = streamLoadTask.getNegative(); + boolean negative = taskInfo.getNegative(); // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc); @@ -108,9 +120,15 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { throw new DdlException("Column is not SUM AggreateType. column:" + col.getName()); } } + // Add a virtual column to indicate whether the data in this row is deleted + SlotDescriptor delSlotDesc = descTable.addSlotDescriptor(tupleDesc); + delSlotDesc.setIsMaterialized(true); + delSlotDesc.setColumn(new Column(Load.BATCH_DELETE_VIRTUAL_COL, Type.BOOLEAN, false, REPLACE, false, + String.valueOf(taskInfo.getMergeType() == LoadTask.MergeType.DELETE), + "merge load virtual column")); // create scan node - StreamLoadScanNode scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), tupleDesc, destTable, streamLoadTask); + StreamLoadScanNode scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), tupleDesc, destTable, taskInfo); scanNode.init(analyzer); descTable.computeMemLayout(); scanNode.finalize(analyzer); @@ -118,7 +136,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { // create dest sink List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds); - olapTableSink.init(loadId, streamLoadTask.getTxnId(), db.getId(), streamLoadTask.getTimeout()); + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), taskInfo.getTimeout()); olapTableSink.complete(); // for stream load, we only need one fragment, ScanNode -> DataSink. @@ -153,15 +171,17 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { params.setParams(execParams); TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setQuery_type(TQueryType.LOAD); - queryOptions.setQuery_timeout(streamLoadTask.getTimeout()); - queryOptions.setMem_limit(streamLoadTask.getMemLimit()); + queryOptions.setQuery_timeout(taskInfo.getTimeout()); + queryOptions.setMem_limit(taskInfo.getMemLimit()); // for stream load, we use exec_mem_limit to limit the memory usage of load channel. - queryOptions.setLoad_mem_limit(streamLoadTask.getMemLimit()); + queryOptions.setLoad_mem_limit(taskInfo.getMemLimit()); + queryOptions.setMerge_type(TMergeType.valueOf(taskInfo.getMergeType().name())); + queryOptions.setDelete_slot_id(delSlotDesc.getId().asInt()); params.setQuery_options(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); queryGlobals.setNow_string(DATE_FORMAT.format(new Date())); queryGlobals.setTimestamp_ms(new Date().getTime()); - queryGlobals.setTime_zone(streamLoadTask.getTimezone()); + queryGlobals.setTime_zone(taskInfo.getTimezone()); params.setQuery_globals(queryGlobals); // set load error hub if exist @@ -182,7 +202,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { private List getAllPartitionIds() throws DdlException { List partitionIds = Lists.newArrayList(); - PartitionNames partitionNames = streamLoadTask.getPartitions(); + PartitionNames partitionNames = taskInfo.getPartitions(); if (partitionNames != null) { for (String partName : partitionNames.getPartitionNames()) { Partition part = destTable.getPartition(partName, partitionNames.isTemp()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 5725af2a7bcbcf..79fb30c4ec2be8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -18,33 +18,18 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.ArithmeticExpr; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.FunctionCallExpr; -import org.apache.doris.analysis.IntLiteral; -import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.AggregateType; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.FunctionSet; -import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Type; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.load.Load; -import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TBrokerRangeDesc; -import org.apache.doris.thrift.TBrokerScanNode; import org.apache.doris.thrift.TBrokerScanRange; import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TPlanNode; -import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TUniqueId; @@ -69,7 +54,7 @@ public class StreamLoadScanNode extends LoadScanNode { // TODO(zc): now we use scanRange // input parameter private Table dstTable; - private StreamLoadTask streamLoadTask; + private LoadTaskInfo taskInfo; // helper private Analyzer analyzer; @@ -81,11 +66,12 @@ public class StreamLoadScanNode extends LoadScanNode { // used to construct for streaming loading public StreamLoadScanNode( - TUniqueId loadId, PlanNodeId id, TupleDescriptor tupleDesc, Table dstTable, StreamLoadTask streamLoadTask) { + TUniqueId loadId, PlanNodeId id, TupleDescriptor tupleDesc, Table dstTable, LoadTaskInfo taskInfo) { super(id, tupleDesc, "StreamLoadScanNode"); this.loadId = loadId; this.dstTable = dstTable; - this.streamLoadTask = streamLoadTask; + this.taskInfo = taskInfo; + this.numInstances = 1; } @Override @@ -97,28 +83,28 @@ public void init(Analyzer analyzer) throws UserException { brokerScanRange = new TBrokerScanRange(); TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); - rangeDesc.file_type = streamLoadTask.getFileType(); - rangeDesc.format_type = streamLoadTask.getFormatType(); + rangeDesc.file_type = taskInfo.getFileType(); + rangeDesc.format_type = taskInfo.getFormatType(); if (rangeDesc.format_type == TFileFormatType.FORMAT_JSON) { - if (!streamLoadTask.getJsonPaths().isEmpty()) { - rangeDesc.setJsonpaths(streamLoadTask.getJsonPaths()); + if (!taskInfo.getJsonPaths().isEmpty()) { + rangeDesc.setJsonpaths(taskInfo.getJsonPaths()); } - if (!streamLoadTask.getJsonRoot().isEmpty()) { - rangeDesc.setJson_root(streamLoadTask.getJsonRoot()); + if (!taskInfo.getJsonRoot().isEmpty()) { + rangeDesc.setJson_root(taskInfo.getJsonRoot()); } - rangeDesc.setStrip_outer_array(streamLoadTask.isStripOuterArray()); + rangeDesc.setStrip_outer_array(taskInfo.isStripOuterArray()); } rangeDesc.splittable = false; - switch (streamLoadTask.getFileType()) { + switch (taskInfo.getFileType()) { case FILE_LOCAL: - rangeDesc.path = streamLoadTask.getPath(); + rangeDesc.path = taskInfo.getPath(); break; case FILE_STREAM: rangeDesc.path = "Invalid Path"; rangeDesc.load_id = loadId; break; default: - throw new UserException("unsupported file type, type=" + streamLoadTask.getFileType()); + throw new UserException("unsupported file type, type=" + taskInfo.getFileType()); } rangeDesc.start_offset = 0; rangeDesc.size = -1; @@ -127,25 +113,26 @@ public void init(Analyzer analyzer) throws UserException { srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor("StreamLoadScanNode"); TBrokerScanRangeParams params = new TBrokerScanRangeParams(); - params.setStrict_mode(streamLoadTask.isStrictMode()); - Load.initColumns(dstTable, streamLoadTask.getColumnExprDescs(), null /* no hadoop function */, + Load.initColumns(dstTable, taskInfo.getColumnExprDescs(), null /* no hadoop function */, exprsByName, analyzer, srcTupleDesc, slotDescByName, params); // analyze where statement - initWhereExpr(streamLoadTask.getWhereExpr(), analyzer); + initWhereExpr(taskInfo.getWhereExpr(), analyzer); + + deleteCondition = taskInfo.getDeleteCondition(); + mergeType = taskInfo.getMergeType(); computeStats(analyzer); createDefaultSmap(analyzer); - if (streamLoadTask.getColumnSeparator() != null) { - String sep = streamLoadTask.getColumnSeparator().getColumnSeparator(); + if (taskInfo.getColumnSeparator() != null) { + String sep = taskInfo.getColumnSeparator().getColumnSeparator(); params.setColumn_separator(sep.getBytes(Charset.forName("UTF-8"))[0]); } else { params.setColumn_separator((byte) '\t'); } params.setLine_delimiter((byte) '\n'); - params.setSrc_tuple_id(srcTupleDesc.getId().asInt()); params.setDest_tuple_id(desc.getId().asInt()); brokerScanRange.setParams(params); @@ -154,81 +141,8 @@ public void init(Analyzer analyzer) throws UserException { @Override public void finalize(Analyzer analyzer) throws UserException, UserException { - finalizeParams(); - } - - private void finalizeParams() throws UserException { - boolean negative = streamLoadTask.getNegative(); - Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); - for (SlotDescriptor dstSlotDesc : desc.getSlots()) { - if (!dstSlotDesc.isMaterialized()) { - continue; - } - Expr expr = null; - if (exprsByName != null) { - expr = exprsByName.get(dstSlotDesc.getColumn().getName()); - } - if (expr == null) { - SlotDescriptor srcSlotDesc = slotDescByName.get(dstSlotDesc.getColumn().getName()); - if (srcSlotDesc != null) { - destSidToSrcSidWithoutTrans.put(dstSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); - // If dest is allow null, we set source to nullable - if (dstSlotDesc.getColumn().isAllowNull()) { - srcSlotDesc.setIsNullable(true); - } - expr = new SlotRef(srcSlotDesc); - } else { - Column column = dstSlotDesc.getColumn(); - if (column.getDefaultValue() != null) { - expr = new StringLiteral(dstSlotDesc.getColumn().getDefaultValue()); - } else { - if (column.isAllowNull()) { - expr = NullLiteral.create(column.getType()); - } else { - throw new AnalysisException("column has no source field, column=" + column.getName()); - } - } - } - } - - // check hll_hash - if (dstSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { - if (!(expr instanceof FunctionCallExpr)) { - throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " - + dstSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx)"); - } - FunctionCallExpr fn = (FunctionCallExpr) expr; - if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) - && !fn.getFnName().getFunction().equalsIgnoreCase("hll_empty")) { - throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " - + dstSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH - + "(xxx) or " + dstSlotDesc.getColumn().getName() + "=hll_empty()"); - } - expr.setType(Type.HLL); - } - - checkBitmapCompatibility(analyzer, dstSlotDesc, expr); - - if (negative && dstSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { - expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); - expr.analyze(analyzer); - } - expr = castToSlot(dstSlotDesc, expr); - brokerScanRange.params.putToExpr_of_dest_slot(dstSlotDesc.getId().asInt(), expr.treeToThrift()); - } - brokerScanRange.params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans); - brokerScanRange.params.setDest_tuple_id(desc.getId().asInt()); - // LOG.info("brokerScanRange is {}", brokerScanRange); - - // Need re compute memory layout after set some slot descriptor to nullable - srcTupleDesc.computeMemLayout(); - } - - @Override - protected void toThrift(TPlanNode planNode) { - planNode.setNode_type(TPlanNodeType.BROKER_SCAN_NODE); - TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); - planNode.setBroker_scan_node(brokerScanNode); + finalizeParams(slotDescByName, exprsByName, brokerScanRange.params, srcTupleDesc, + taskInfo.isStrictMode(), taskInfo.getNegative(), analyzer); } @Override @@ -241,9 +155,6 @@ public List getScanRangeLocations(long maxScanRangeLength) return Lists.newArrayList(locations); } - @Override - public int getNumInstances() { return 1; } - @Override protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) { return "StreamLoadScanNode"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 01da5c6dd71449..f17aa448a03a10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -33,6 +33,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadJob; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.DataStreamSink; @@ -64,6 +65,7 @@ import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TLoadErrorHubInfo; +import org.apache.doris.thrift.TMergeType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPlanFragmentDestination; @@ -290,6 +292,15 @@ public void setTimeout(int timeout) { this.queryOptions.setQuery_timeout(timeout); } + public void setMergeType(LoadTask.MergeType mergeType) { + this.queryOptions.setMerge_type(TMergeType.valueOf(mergeType.name())); + + } + + public void setDelSlotId(int id) { + this.queryOptions.setDelete_slot_id(id); + } + public void clearExportStatus() { lock.lock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java new file mode 100644 index 00000000000000..cfc8185bf88326 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; + +import java.util.List; + +public interface LoadTaskInfo { + public boolean getNegative(); + public long getTxnId(); + public int getTimeout(); + public long getMemLimit(); + public String getTimezone(); + public PartitionNames getPartitions(); + public LoadTask.MergeType getMergeType(); + public Expr getDeleteCondition(); + public TFileType getFileType(); + public TFileFormatType getFormatType(); + public String getJsonPaths(); + public String getJsonRoot(); + public boolean isStripOuterArray(); + public String getPath(); + public List getColumnExprDescs(); + public boolean isStrictMode(); + public Expr getWhereExpr(); + public ColumnSeparator getColumnSeparator(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index d5ff24e1377874..607ed6fd6333e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -32,7 +32,8 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TStreamLoadPutRequest; @@ -43,7 +44,7 @@ import java.io.StringReader; import java.util.List; -public class StreamLoadTask { +public class StreamLoadTask implements LoadTaskInfo { private static final Logger LOG = LogManager.getLogger(StreamLoadTask.class); @@ -66,6 +67,8 @@ public class StreamLoadTask { private String timezone = TimeUtils.DEFAULT_TIME_ZONE; private int timeout = Config.stream_load_default_timeout_second; private long execMemLimit = 2 * 1024 * 1024 * 1024L; // default is 2GB + private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete + private Expr deleteCondition; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { this.id = id; @@ -152,6 +155,13 @@ public String getJsonRoot() { public void setJsonRoot(String jsonRoot) { this.jsonRoot = jsonRoot; } + public LoadTask.MergeType getMergeType() { + return mergeType; + } + + public Expr getDeleteCondition() { + return deleteCondition; + } public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request, Database db) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), @@ -165,7 +175,7 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request, Databas setColumnToColumnExpr(request.getColumns()); } if (request.isSetWhere()) { - setWhereExpr(request.getWhere()); + whereExpr = parseWhereExpr(request.getWhere()); } if (request.isSetColumnSeparator()) { setColumnSeparator(request.getColumnSeparator()); @@ -209,39 +219,22 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request, Databas } stripOuterArray = request.isStrip_outer_array(); } - } - - public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { - TUniqueId dummyId = new TUniqueId(); - TFileFormatType fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - if (routineLoadJob.getFormat().equals("json")) { - fileFormatType = TFileFormatType.FORMAT_JSON; + if (request.isSetMerge_type()) { + try { + mergeType = LoadTask.MergeType.valueOf(request.getMerge_type().toString()); + } catch (IllegalArgumentException e) { + throw new UserException("unknown merge type " + request.getMerge_type().toString()); + } } - StreamLoadTask streamLoadTask = new StreamLoadTask(dummyId, -1L /* dummy txn id*/, - TFileType.FILE_STREAM, fileFormatType); - streamLoadTask.setOptionalFromRoutineLoadJob(routineLoadJob); - return streamLoadTask; - } - - private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) { - // copy the columnExprDescs, cause it may be changed when planning. - // so we keep the columnExprDescs in routine load job as origin. - if (routineLoadJob.getColumnDescs() != null) { - columnExprDescs = Lists.newArrayList(routineLoadJob.getColumnDescs()); + if (request.isSetDelete_condition()) { + deleteCondition = parseWhereExpr(request.getDelete_condition()); } - whereExpr = routineLoadJob.getWhereExpr(); - columnSeparator = routineLoadJob.getColumnSeparator(); - partitions = routineLoadJob.getPartitions(); - strictMode = routineLoadJob.isStrictMode(); - timezone = routineLoadJob.getTimezone(); - timeout = (int) routineLoadJob.getMaxBatchIntervalS() * 2; - if (!routineLoadJob.getJsonPaths().isEmpty()) { - jsonPaths = routineLoadJob.getJsonPaths(); + if (negative && mergeType != LoadTask.MergeType.APPEND) { + throw new AnalysisException("Negative is only used when merge type is append."); } - if (!routineLoadJob.getJsonRoot().isEmpty()) { - jsonRoot = routineLoadJob.getJsonRoot(); + if (mergeType == LoadTask.MergeType.MERGE) { + columnExprDescs.add(new ImportColumnDesc(Load.BATCH_DELETE_VIRTUAL_COL, deleteCondition)); } - stripOuterArray = routineLoadJob.isStripOuterArray(); } // used for stream load @@ -273,7 +266,7 @@ private void setColumnToColumnExpr(String columns) throws UserException { } } - private void setWhereExpr(String whereString) throws UserException { + private Expr parseWhereExpr(String whereString) throws UserException { String whereSQL = new String("WHERE " + whereString); SqlParser parser = new SqlParser(new SqlScanner(new StringReader(whereSQL))); ImportWhereStmt whereStmt; @@ -295,7 +288,7 @@ private void setWhereExpr(String whereString) throws UserException { LOG.warn("failed to parse where header, sql={}", whereSQL, e); throw new UserException("parse columns header failed", e); } - whereExpr = whereStmt.getExpr(); + return whereStmt.getExpr(); } private void setColumnSeparator(String oriSeparator) throws AnalysisException { diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index f0c0a9d6064798..e2693127d9cadf 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -96,6 +96,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("alter", new Integer(SqlParserSymbols.KW_ALTER)); keywordMap.put("and", new Integer(SqlParserSymbols.KW_AND)); keywordMap.put("anti", new Integer(SqlParserSymbols.KW_ANTI)); + keywordMap.put("append", new Integer(SqlParserSymbols.KW_APPEND)); keywordMap.put("as", new Integer(SqlParserSymbols.KW_AS)); keywordMap.put("asc", new Integer(SqlParserSymbols.KW_ASC)); keywordMap.put("authors", new Integer(SqlParserSymbols.KW_AUTHORS)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java index 85774dcd1ac158..004266e2dc7917 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -19,6 +19,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.LoadDataSourceType; import com.google.common.collect.Lists; @@ -37,6 +38,8 @@ import mockit.Mock; import mockit.MockUp; +import static org.apache.doris.load.loadv2.LoadTask.MergeType.APPEND; + public class CreateRoutineLoadStmtTest { private static final Logger LOG = LogManager.getLogger(CreateRoutineLoadStmtTest.class); @@ -70,7 +73,8 @@ public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) thro CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, - typeName, customProperties); + typeName, customProperties, + APPEND); new MockUp() { @Mock @@ -119,7 +123,8 @@ public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, - typeName, customProperties); + typeName, customProperties, + APPEND); new MockUp() { @Mock public void analyze(Analyzer analyzer1) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index 413f17f57d6f6c..ac6c12c9dece78 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.BinaryPredicate.Operator; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.MockedAuth; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.qe.ConnectContext; @@ -54,33 +55,42 @@ public void testNormal() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), null, null, null, false, null); desc.analyze("testDb"); - Assert.assertEquals("DATA INFILE ('abc.txt') INTO TABLE testTable", desc.toString()); + Assert.assertEquals("APPEND DATA INFILE ('abc.txt') INTO TABLE testTable", desc.toString()); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), null, null, null, true, null); desc.analyze("testDb"); - Assert.assertEquals("DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable", desc.toString()); + Assert.assertEquals("APPEND DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable", desc.toString()); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), null, null, null, true, null); desc.analyze("testDb"); - Assert.assertEquals("DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable", desc.toString()); + Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable", desc.toString()); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), Lists.newArrayList("col1", "col2"), null, null, true, null); desc.analyze("testDb"); - Assert.assertEquals("DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable (col1, col2)", desc.toString()); + Assert.assertEquals("APPEND DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable (col1, col2)", desc.toString()); Assert.assertEquals("testTable", desc.getTableName()); Assert.assertEquals("[col1, col2]", desc.getFileFieldNames().toString()); Assert.assertEquals("[abc.txt]", desc.getFilePaths().toString()); Assert.assertTrue(desc.isNegative()); Assert.assertNull(desc.getColumnSeparator()); + Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); + + desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, false, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr); + desc.analyze("testDb"); + Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1", desc.toString()); + Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql()); + Assert.assertEquals("1 = 1", desc.getDeleteCondition().toSql()); + Assert.assertEquals(",", desc.getColumnSeparator()); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), Lists.newArrayList("col1", "col2"), new ColumnSeparator("\t"), null, true, null); desc.analyze("testDb"); - Assert.assertEquals("DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable" + Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable" + " COLUMNS TERMINATED BY '\t' (col1, col2)", desc.toString()); @@ -89,7 +99,7 @@ public void testNormal() throws AnalysisException { Lists.newArrayList("col1", "col2"), new ColumnSeparator("\\x01"), null, true, null); desc.analyze("testDb"); - Assert.assertEquals("DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable" + Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable" + " COLUMNS TERMINATED BY '\\x01' (col1, col2)", desc.toString()); @@ -98,7 +108,7 @@ public void testNormal() throws AnalysisException { Lists.newArrayList("abc.txt"), null, null, null, false, null); desc.analyze("testDb"); - Assert.assertEquals("DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2)", desc.toString()); + Assert.assertEquals("APPEND DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2)", desc.toString()); // alignment_timestamp func List params = Lists.newArrayList(); @@ -111,7 +121,7 @@ public void testNormal() throws AnalysisException { Lists.newArrayList("k2", "k3"), null, null, false, Lists .newArrayList((Expr) predicate)); desc.analyze("testDb"); - String sql = "DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2) (k2, k3)" + String sql = "APPEND DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2) (k2, k3)" + " SET (`k1` = alignment_timestamp('day', `k2`))"; Assert.assertEquals(sql, desc.toString()); @@ -126,7 +136,7 @@ public void testNormal() throws AnalysisException { Lists.newArrayList("k2", "k3"), null, null, false, Lists.newArrayList((Expr) predicate)); desc.analyze("testDb"); - sql = "DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2) (k2, k3)" + sql = "APPEND DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2) (k2, k3)" + " SET (`k1` = replace_value('-', '10'))"; Assert.assertEquals(sql, desc.toString()); @@ -141,7 +151,7 @@ public void testNormal() throws AnalysisException { Lists.newArrayList("k2", "k3"), null, null, false, Lists .newArrayList((Expr) predicate)); desc.analyze("testDb"); - sql = "DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2) (k2, k3)" + sql = "APPEND DATA INFILE ('abc.txt') INTO TABLE testTable PARTITIONS (p1, p2) (k2, k3)" + " SET (`k1` = replace_value('', NULL))"; Assert.assertEquals(sql, desc.toString()); @@ -151,9 +161,10 @@ public void testNormal() throws AnalysisException { predicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "k1"), new FunctionCallExpr("bitmap_dict", params)); desc = new DataDescription("testTable", new PartitionNames(false, Lists.newArrayList("p1", "p2")), - "testHiveTable", false, Lists.newArrayList(predicate), null); + "testHiveTable", false, Lists.newArrayList(predicate), + null, LoadTask.MergeType.APPEND, null); desc.analyze("testDb"); - sql = "DATA FROM TABLE testHiveTable INTO TABLE testTable PARTITIONS (p1, p2) SET (`k1` = bitmap_dict(`k2`))"; + sql = "APPEND DATA FROM TABLE testHiveTable INTO TABLE testTable PARTITIONS (p1, p2) SET (`k1` = bitmap_dict(`k2`))"; Assert.assertEquals(sql, desc.toSql()); } @@ -164,6 +175,15 @@ public void testNoTable() throws AnalysisException { desc.analyze("testDb"); } + @Test(expected = AnalysisException.class) + public void testNegMerge() throws AnalysisException { + Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1)); + + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, true, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr); + desc.analyze("testDb"); + } + @Test(expected = AnalysisException.class) public void testNoFile() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, null, null, null, null, false, null); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java index 250f022a2180a3..a8500ab697d72e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java @@ -24,6 +24,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.load.EtlJobType; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -83,6 +84,8 @@ public void testNormal(@Injectable DataDescription desc, @Mocked Catalog catalog new Expectations(){ { + desc.getMergeType(); + result = LoadTask.MergeType.APPEND; desc.toSql(); minTimes = 0; result = "XXX"; diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index ece07c24fa7fae..eafbf64a85a594 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -35,6 +35,7 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.common.util.KafkaUtil; import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; @@ -65,6 +66,8 @@ import mockit.Mocked; import mockit.Verifications; +import static org.apache.doris.load.loadv2.LoadTask.MergeType.APPEND; + public class KafkaRoutineLoadJobTest { private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJobTest.class); @@ -240,7 +243,8 @@ public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTrans public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog, @Injectable Database database) throws LoadException { CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); - RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames); + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, + partitionNames, null, APPEND); Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); new Expectations() { @@ -264,7 +268,7 @@ public void testFromCreateStmt(@Mocked Catalog catalog, @Injectable Database database, @Injectable OlapTable table) throws UserException { CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); - RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames); + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames, null, APPEND); Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); List> partitionIdToOffset = Lists.newArrayList(); List kafkaPartitionInfoList = Lists.newArrayList(); @@ -329,7 +333,8 @@ private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, - typeName, customProperties); + typeName, customProperties, + APPEND); Deencapsulation.setField(createRoutineLoadStmt, "name", jobName); return createRoutineLoadStmt; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 8469aca809eb2a..05eea2614dc411 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -35,6 +35,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.EditLog; @@ -93,7 +94,8 @@ public void testAddJobByStmt(@Injectable PaloAuth paloAuth, customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, - typeName, customProperties); + typeName, customProperties, + LoadTask.MergeType.APPEND); createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, @@ -162,7 +164,8 @@ public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, - typeName, customProperties); + typeName, customProperties, + LoadTask.MergeType.APPEND); createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 80f3be9d322a9e..8cc22f23005ea0 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -70,6 +70,13 @@ message PTabletWriterOpenRequest { required bool need_gen_rollup = 7; optional int64 load_mem_limit = 8; optional int64 load_channel_timeout_s = 9; + enum PMergeType { + APPEND = 0; + MERGE = 1; + DELETE = 2; + } + optional PMergeType merge_type = 10 [default = APPEND]; + optional int32 delete_slot_id = 11 [default = -1]; }; message PTabletWriterOpenResult { diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index 584d409717a154..93c3f196ee5549 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -163,9 +163,10 @@ message SegmentFooterPB { optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns repeated MetadataPairPB file_meta_datas = 8; // meta data of file - // Short key index's page optional PagePointerPB short_key_index_page = 9; + // Use bitmap index to indicate which row is marked for deleting + optional PagePointerPB delete_index_page = 10; } message BTreeMetaPB { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index dd5b6ec8921695..6de605b2a74aab 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -552,6 +552,8 @@ struct TStreamLoadPutRequest { 24: optional string jsonpaths 25: optional i64 thrift_rpc_timeout_ms 26: optional string json_root + 27: optional Types.TMergeType merge_type + 28: optional string delete_condition } struct TStreamLoadPutResult { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 0f5c231ad2d55e..45b85d48457f36 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -137,7 +137,10 @@ struct TQueryOptions { // if set, this will overwrite the BE config. 30: optional i32 max_pushdown_conditions_per_column // whether enable spilling to disk - 31: optional bool enable_spilling = false; + 31: optional bool enable_spilling = false + // for batch load + 32: optional Types.TMergeType merge_type + 33: optional i32 delete_slot_id = -1 } diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index abdd52d7f448f0..3ea2e36fbc97d0 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -377,6 +377,12 @@ enum TLoadSourceType { KAFKA, } +enum TMergeType { + APPEND, + MERGE, + DELETE +} + // represent a user identity struct TUserIdentity { 1: optional string username