Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ fe_plugins/output
fe/mocked
fe/*/target
dependency-reduced-pom.xml
fe_plugins/**/.classpath
fe_plugins/**/.factorypath
samples/**/.classpath


#ignore eclipse project file & idea project file
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
<< Tuple::to_string(row->get_tuple(0), *_tuple_desc);
}
}

return Status::OK();
}

Expand All @@ -235,7 +234,7 @@ Status BrokerScanNode::close(RuntimeState* state) {
_scanner_threads[i].join();
}

// Open partition
// Close partition
if (_partition_expr_ctxs.size() > 0) {
Expr::close(_partition_expr_ctxs, state);
for (auto iter : _partition_infos) {
Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ void NodeChannel::open() {
request.set_load_mem_limit(_parent->_load_mem_limit);
request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s);

// set load info
std::map<TMergeType::type, PTabletWriterOpenRequest::PMergeType>::const_iterator it = _type_map.find(_parent->_merge_type);
if (it == _type_map.end() ) {
LOG(WARNING) << "merge type [" << _parent->_merge_type << "] is invalid, set to default APPEND";
request.set_merge_type(PTabletWriterOpenRequest::APPEND);
} else {
request.set_merge_type(it->second);
}
request.set_delete_slot_id(_parent->_delete_slot_id);
_open_closure = new RefCountClosure<PTabletWriterOpenResult>();
_open_closure->ref();

Expand Down Expand Up @@ -544,6 +553,9 @@ Status OlapTableSink::prepare(RuntimeState* state) {
_serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");
_load_mem_limit = state->get_load_mem_limit();

_merge_type = state->get_merge_type();
_delete_slot_id = state->get_delete_slot_id();

// open all channels
auto& partitions = _partition->get_partitions();
for (int i = 0; i < _schema->indexes().size(); ++i) {
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ class NodeChannel {
int64_t _mem_exceeded_block_ns = 0;
int64_t _queue_push_lock_ns = 0;
int64_t _actual_consume_ns = 0;
std::map<TMergeType::type, PTabletWriterOpenRequest::PMergeType> _type_map = {
{TMergeType::APPEND, PTabletWriterOpenRequest::APPEND},
{TMergeType::MERGE, PTabletWriterOpenRequest::MERGE},
{TMergeType::DELETE, PTabletWriterOpenRequest::DELETE}
};
};

class IndexChannel {
Expand Down Expand Up @@ -393,6 +398,9 @@ class OlapTableSink : public DataSink {

// the timeout of load channels opened by this tablet sink. in second
int64_t _load_channel_timeout_s = 0;
TMergeType::type _merge_type = TMergeType::APPEND;
int32_t _delete_slot_id = -1;
//
};

} // namespace stream_load
Expand Down
22 changes: 22 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "util/doris_metrics.h"
#include "util/time.h"
#include "util/uid_util.h"
#include "util/string_util.h"

namespace doris {

Expand Down Expand Up @@ -390,6 +391,27 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_timeout(ctx->timeout_second);
}
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
request.__set_merge_type(TMergeType::APPEND);
StringCaseMap<TMergeType::type> merge_type_map = {
{ "APPEND", TMergeType::APPEND },
{ "DELETE", TMergeType::DELETE },
{ "MERGE", TMergeType::MERGE }
};
if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
std::string merge_type = http_req->header(HTTP_MERGE_TYPE);
if (merge_type_map.find(merge_type) != merge_type_map.end() ) {
request.__set_merge_type(merge_type_map.find(merge_type)->second);
} else {
return Status::InvalidArgument("Invalid merge type " + merge_type);
}
}
if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
if (request.merge_type == TMergeType::MERGE) {
request.__set_delete_condition(http_req->header(HTTP_DELETE_CONDITION));
} else {
return Status::InvalidArgument("not support delete when merge type is not merge.");
}
}
// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
#ifndef BE_TEST
Expand Down
2 changes: 2 additions & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
static const std::string HTTP_JSONPATHS = "jsonpaths";
static const std::string HTTP_JSONROOT = "json_root";
static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array";
static const std::string HTTP_MERGE_TYPE = "merge_type";
static const std::string HTTP_DELETE_CONDITION = "delete";

static const std::string HTTP_100_CONTINUE = "100-continue";

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ add_library(Olap STATIC
serialize.cpp
storage_engine.cpp
data_dir.cpp
row.cpp
short_key_index.cpp
snapshot_manager.cpp
stream_index_common.cpp
Expand Down
23 changes: 18 additions & 5 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "runtime/tuple.h"

namespace doris {

Expand Down Expand Up @@ -133,7 +134,8 @@ OLAPStatus DeltaWriter::init() {
writer_context.partition_id = _req.partition_id;
writer_context.tablet_schema_hash = _req.schema_hash;
writer_context.rowset_type = _storage_engine->default_rowset_type();
if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET
|| _req.merge_type != PTabletWriterOpenRequest::APPEND) {
writer_context.rowset_type = BETA_ROWSET;
}
writer_context.rowset_path_prefix = _tablet->tablet_path();
Expand All @@ -154,13 +156,24 @@ OLAPStatus DeltaWriter::init() {
_is_init = true;
return OLAP_SUCCESS;
}

OLAPStatus DeltaWriter::write(Tuple* tuple) {
return write(tuple, nullptr);
}
OLAPStatus DeltaWriter::write(Tuple* tuple, TupleDescriptor* tuple_desc) {
if (!_is_init) {
RETURN_NOT_OK(init());
}
bool is_delete = false;
if (tuple_desc != nullptr) {

_mem_table->insert(tuple);
}
if (_req.delete_slot_id >= 0) {
const SlotDescriptor* slot = tuple_desc->slots()[_req.delete_slot_id];
if (slot->type() == TYPE_BOOLEAN) {
is_delete = *reinterpret_cast<const bool*>(tuple->get_slot(slot->tuple_offset()));
}
}
_mem_table->insert(tuple, is_delete);

// if memtable is full, push it to the flush executor,
// and create a new memtable for incoming data
Expand Down Expand Up @@ -194,8 +207,8 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {

void DeltaWriter::_reset_mem_table() {
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots,
_req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
_mem_tracker.get()));
_req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), _mem_tracker.get(),
_tablet->keys_type() == UNIQUE_KEYS && _req.delete_slot_id > 0));
}

OLAPStatus DeltaWriter::close() {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct WriteRequest {
TupleDescriptor* tuple_desc;
// slots are in order of tablet's schema
const std::vector<SlotDescriptor*>* slots;
PTabletWriterOpenRequest::PMergeType merge_type;
int delete_slot_id;
};

// Writer for a particular (load, index, tablet).
Expand All @@ -63,6 +65,8 @@ class DeltaWriter {
OLAPStatus init();

OLAPStatus write(Tuple* tuple);
OLAPStatus write(Tuple* tuple, TupleDescriptor* tuple_desc);

// flush the last memtable to flush queue, must call it before close_wait()
OLAPStatus close();
// wait for all memtables to be flushed.
Expand Down
56 changes: 37 additions & 19 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include "olap/memtable.h"

#include "common/logging.h"
#include "olap/row.h"
#include "olap/row_cursor.h"
#include "olap/rowset/column_data_writer.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/row_cursor.h"
#include "olap/row.h"
#include "olap/schema.h"
#include "runtime/tuple.h"
#include "util/debug_util.h"
Expand All @@ -32,20 +32,31 @@ namespace doris {
MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker)
: _tablet_id(tablet_id),
_schema(schema),
_tablet_schema(tablet_schema),
_tuple_desc(tuple_desc),
_slot_descs(slot_descs),
_keys_type(keys_type),
_row_comparator(_schema),
_rowset_writer(rowset_writer) {

_schema_size = _schema->schema_size();
: MemTable(tablet_id, schema, tablet_schema, slot_descs, tuple_desc, keys_type,
rowset_writer, mem_tracker, false) {}
MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker,
bool with_delete_flag)
: _tablet_id(tablet_id),
_schema(schema),
_tablet_schema(tablet_schema),
_tuple_desc(tuple_desc),
_slot_descs(slot_descs),
_keys_type(keys_type),
_row_comparator(_schema),
_rowset_writer(rowset_writer),
_with_delete(with_delete_flag) {
if (_with_delete) {
_schema_size = _schema->schema_size() + 1;
} else {
_schema_size = _schema->schema_size();
}
_mem_tracker.reset(new MemTracker(-1, "memtable", mem_tracker));
_buffer_mem_pool.reset(new MemPool(_mem_tracker.get()));
_table_mem_pool.reset(new MemPool(_mem_tracker.get()));
_skip_list = new Table(_row_comparator, _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
_skip_list =
new Table(_row_comparator, _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
}

MemTable::~MemTable() {
Expand All @@ -61,6 +72,10 @@ int MemTable::RowCursorComparator::operator()(const char* left, const char* righ
}

void MemTable::insert(const Tuple* tuple) {
insert(tuple, false);
}

void MemTable::insert(const Tuple* tuple, bool is_delete) {
bool overwritten = false;
uint8_t* _tuple_buf = nullptr;
if (_keys_type == KeysType::DUP_KEYS) {
Expand All @@ -69,6 +84,8 @@ void MemTable::insert(const Tuple* tuple) {
ContiguousRow row(_schema, _tuple_buf);
_tuple_to_row(tuple, &row, _table_mem_pool.get());
_skip_list->Insert((TableKey)_tuple_buf, &overwritten);
LOG(INFO) << "is_delete: " << row.is_delete();

DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
return;
}
Expand All @@ -78,15 +95,16 @@ void MemTable::insert(const Tuple* tuple) {
// _skiplist. If it exists, we aggregate the new row into the row in skiplist.
// otherwise, we need to copy it into _table_mem_pool before we can insert it.
_tuple_buf = _buffer_mem_pool->allocate(_schema_size);
ContiguousRow src_row(_schema, _tuple_buf);
ContiguousRow src_row(_schema, _tuple_buf, _with_delete);
src_row.set_delete(is_delete);
_tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());

bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint);
if (is_exist) {
_aggregate_two_row(src_row, _hint.curr->key);
} else {
_tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow dst_row(_schema, _tuple_buf);
ContiguousRow dst_row(_schema, _tuple_buf, _with_delete);
copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get());
_skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint);
}
Expand All @@ -102,13 +120,13 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me

bool is_null = tuple->is_null(slot->null_indicator_offset());
const void* value = tuple->get_slot(slot->tuple_offset());
_schema->column(i)->consume(
&cell, (const char*)value, is_null, mem_pool, &_agg_object_pool);
_schema->column(i)->consume(&cell, (const char*)value, is_null, mem_pool,
&_agg_object_pool);
}
}

void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
ContiguousRow dst_row(_schema, row_in_skiplist, _with_delete);
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
}

Expand All @@ -119,7 +137,7 @@ OLAPStatus MemTable::flush() {
Table::Iterator it(_skip_list);
for (it.SeekToFirst(); it.Valid(); it.Next()) {
char* row = (char*)it.key();
ContiguousRow dst_row(_schema, row);
ContiguousRow dst_row(_schema, row, _with_delete);
agg_finalize_row(&dst_row, _table_mem_pool.get());
RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,16 @@ class MemTable {
MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker);
MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker,
bool with_delete_flag = false);
~MemTable();

int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const { return _mem_tracker->consumption(); }
void insert(const Tuple* tuple);
void insert(const Tuple* tuple, bool is_delete);
OLAPStatus flush();
OLAPStatus close();

Expand Down Expand Up @@ -88,6 +93,9 @@ class MemTable {

RowsetWriter* _rowset_writer;

bool _with_delete;


}; // class MemTable

inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ enum OLAPStatus {
OLAP_ERR_ROWSET_LOAD_FAILED = -3109,
OLAP_ERR_ROWSET_READER_INIT = -3110,
OLAP_ERR_ROWSET_READ_FAILED = -3111,
OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION = -3112
OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION = -3112,
OLAP_ERR_ROWSET_VERSION_NOT_MATCH = -3113
};

enum ColumnFamilyIndex {
Expand Down
Loading