Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ namespace config {
CONF_Int32(storage_flood_stage_usage_percent, "95"); // 95%
// The min bytes that should be left of a data dir
CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB
// number of thread for flushing memtable per store
CONF_Int32(flush_thread_num_per_store, "2");
} // namespace config

} // namespace doris
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ add_library(Olap STATIC
key_coder.cpp
lru_cache.cpp
memtable.cpp
memtable_flush_executor.cpp
merger.cpp
new_status.cpp
null_predicate.cpp
Expand Down
95 changes: 57 additions & 38 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,38 @@

#include "olap/delta_writer.h"

#include "olap/schema.h"
#include "olap/memtable.h"
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/schema.h"
#include "olap/storage_engine.h"

namespace doris {

OLAPStatus DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) {
*writer = new DeltaWriter(req);
return OLAP_SUCCESS;
*writer = new DeltaWriter(req, StorageEngine::instance());
return (*writer)->init();
}

DeltaWriter::DeltaWriter(WriteRequest* req)
DeltaWriter::DeltaWriter(
WriteRequest* req,
StorageEngine* storage_engine)
: _req(*req), _tablet(nullptr),
_cur_rowset(nullptr), _new_rowset(nullptr), _new_tablet(nullptr),
_rowset_writer(nullptr), _mem_table(nullptr),
_schema(nullptr), _tablet_schema(nullptr),
_delta_written_success(false) {}
_rowset_writer(nullptr), _schema(nullptr), _tablet_schema(nullptr),
_delta_written_success(false),
_storage_engine(storage_engine) {
}

DeltaWriter::~DeltaWriter() {
if (!_delta_written_success) {
_garbage_collection();
}

SAFE_DELETE(_mem_table);
_mem_table.reset();
SAFE_DELETE(_schema);
if (_rowset_writer != nullptr) {
_rowset_writer->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + _rowset_writer->rowset_id().to_string());
Expand All @@ -53,26 +58,26 @@ DeltaWriter::~DeltaWriter() {
void DeltaWriter::_garbage_collection() {
OLAPStatus rollback_status = OLAP_SUCCESS;
if (_tablet != nullptr) {
rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn(_req.partition_id,
rollback_status = _storage_engine->txn_manager()->rollback_txn(_req.partition_id,
_req.txn_id,_req.tablet_id, _req.schema_hash, _tablet->tablet_uid());
}
// has to check rollback status, because the rowset maybe committed in this thread and
// published in another thread, then rollback will failed
// when rollback failed should not delete rowset
if (rollback_status == OLAP_SUCCESS) {
StorageEngine::instance()->add_unused_rowset(_cur_rowset);
_storage_engine->add_unused_rowset(_cur_rowset);
}
if (_new_tablet != nullptr) {
rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn(_req.partition_id, _req.txn_id,
rollback_status = _storage_engine->txn_manager()->rollback_txn(_req.partition_id, _req.txn_id,
_new_tablet->tablet_id(), _new_tablet->schema_hash(), _new_tablet->tablet_uid());
if (rollback_status == OLAP_SUCCESS) {
StorageEngine::instance()->add_unused_rowset(_new_rowset);
_storage_engine->add_unused_rowset(_new_rowset);
}
}
}

OLAPStatus DeltaWriter::init() {
_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(_req.tablet_id, _req.schema_hash);
_tablet = _storage_engine->tablet_manager()->get_tablet(_req.tablet_id, _req.schema_hash);
if (_tablet == nullptr) {
LOG(WARNING) << "tablet_id: " << _req.tablet_id << ", "
<< "schema_hash: " << _req.schema_hash << " not found";
Expand All @@ -85,7 +90,7 @@ OLAPStatus DeltaWriter::init() {
return OLAP_ERR_RWLOCK_ERROR;
}
MutexLock push_lock(_tablet->get_push_lock());
RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn(
RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn(
_req.partition_id, _req.txn_id,
_req.tablet_id, _req.schema_hash, _tablet->tablet_uid(), _req.load_id));
if (_req.need_gen_rollup) {
Expand All @@ -98,7 +103,7 @@ OLAPStatus DeltaWriter::init() {
<< "new_tablet_id: " << new_tablet_id << ", "
<< "new_schema_hash: " << new_schema_hash << ", "
<< "transaction_id: " << _req.txn_id;
_new_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(new_tablet_id, new_schema_hash);
_new_tablet = _storage_engine->tablet_manager()->get_tablet(new_tablet_id, new_schema_hash);
if (_new_tablet == nullptr) {
LOG(WARNING) << "find alter task, but could not find new tablet tablet_id: " << new_tablet_id
<< ", schema_hash: " << new_schema_hash;
Expand All @@ -108,15 +113,15 @@ OLAPStatus DeltaWriter::init() {
if (!new_migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
StorageEngine::instance()->txn_manager()->prepare_txn(
_storage_engine->txn_manager()->prepare_txn(
_req.partition_id, _req.txn_id,
new_tablet_id, new_schema_hash, _new_tablet->tablet_uid(), _req.load_id);
}
}
}

RowsetWriterContext writer_context;
writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
writer_context.rowset_id = _storage_engine->next_rowset_id();
writer_context.tablet_uid = _tablet->tablet_uid();
writer_context.tablet_id = _req.tablet_id;
writer_context.partition_id = _req.partition_id;
Expand All @@ -132,48 +137,58 @@ OLAPStatus DeltaWriter::init() {

_tablet_schema = &(_tablet->tablet_schema());
_schema = new Schema(*_tablet_schema);
_mem_table = new MemTable(_schema, _tablet_schema, _req.slots,
_req.tuple_desc, _tablet->keys_type());
_mem_table = std::make_shared<MemTable>(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
_req.tuple_desc, _tablet->keys_type(), _rowset_writer.get());

// create flush handler
RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_handler(_tablet->data_dir()->path_hash(), &_flush_handler));

_is_init = true;
return OLAP_SUCCESS;
}

OLAPStatus DeltaWriter::write(Tuple* tuple) {
if (!_is_init) {
auto st = init();
if (st != OLAP_SUCCESS) {
return st;
}
RETURN_NOT_OK(init());
}

_mem_table->insert(tuple);
if (_mem_table->memory_usage() >= config::write_buffer_size) {
RETURN_NOT_OK(_mem_table->flush(_rowset_writer.get()));

SAFE_DELETE(_mem_table);
_mem_table = new MemTable(_schema, _tablet_schema, _req.slots,
_req.tuple_desc, _tablet->keys_type());
// if memtable is full, push it to the flush executor,
// and create a new memtable for incoming data
if (_mem_table->memory_usage() >= config::write_buffer_size) {
RETURN_NOT_OK(_flush_memtable_async());
// create a new memtable for new incoming data
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots,
_req.tuple_desc, _tablet->keys_type(), _rowset_writer.get()));
}
return OLAP_SUCCESS;
}

OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
OLAPStatus DeltaWriter::_flush_memtable_async() {
return _flush_handler->submit(_mem_table);
}

OLAPStatus DeltaWriter::close() {
if (!_is_init) {
auto st = init();
if (st != OLAP_SUCCESS) {
return st;
}
RETURN_NOT_OK(init());
}
RETURN_NOT_OK(_mem_table->close(_rowset_writer.get()));

OLAPStatus res = OLAP_SUCCESS;
RETURN_NOT_OK(_flush_memtable_async());
return OLAP_SUCCESS;
}

OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
// return error if previous flush failed
RETURN_NOT_OK(_flush_handler->wait());

// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
if (_cur_rowset == nullptr) {
LOG(WARNING) << "fail to build rowset";
return OLAP_ERR_MALLOC_ERROR;
}
res = StorageEngine::instance()->txn_manager()->commit_txn(_tablet->data_dir()->get_meta(),
OLAPStatus res = _storage_engine->txn_manager()->commit_txn(_tablet->data_dir()->get_meta(),
_req.partition_id, _req.txn_id,_req.tablet_id, _req.schema_hash, _tablet->tablet_uid(),
_req.load_id, _cur_rowset, false);
if (res != OLAP_SUCCESS && res != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
Expand All @@ -194,7 +209,7 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField<PTabletInfo>* t
return res;
}

res = StorageEngine::instance()->txn_manager()->commit_txn(_new_tablet->data_dir()->get_meta(),
res = _storage_engine->txn_manager()->commit_txn(_new_tablet->data_dir()->get_meta(),
_req.partition_id, _req.txn_id, _new_tablet->tablet_id(),
_new_tablet->schema_hash(), _new_tablet->tablet_uid(),
_req.load_id, _new_rowset, false);
Expand All @@ -218,6 +233,10 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField<PTabletInfo>* t
#endif

_delta_written_success = true;

const FlushStatistic& stat = _flush_handler->get_stats();
LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this INFO log? this would create many log in our info log files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is already online and not write too much log.
It's better to use INFO in this version for checking. I will remove it at next version if it a problem.

<< ", stats: " << stat;
return OLAP_SUCCESS;
}

Expand Down
25 changes: 20 additions & 5 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@
#ifndef DORIS_BE_SRC_DELTA_WRITER_H
#define DORIS_BE_SRC_DELTA_WRITER_H

#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/schema_change.h"
#include "runtime/descriptors.h"
#include "runtime/tuple.h"
#include "gen_cpp/internal_service.pb.h"
#include "olap/rowset/rowset_writer.h"
#include "util/blocking_queue.hpp"

namespace doris {

class SegmentGroup;
class FlushHandler;
class MemTable;
class Schema;
class SegmentGroup;
class StorageEngine;

enum WriteType {
LOAD = 1,
Expand All @@ -54,17 +56,27 @@ struct WriteRequest {
class DeltaWriter {
public:
static OLAPStatus open(WriteRequest* req, DeltaWriter** writer);

DeltaWriter(WriteRequest* req, StorageEngine* storage_engine);

OLAPStatus init();
DeltaWriter(WriteRequest* req);

~DeltaWriter();

OLAPStatus write(Tuple* tuple);
OLAPStatus close(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);
// flush the last memtable to flush queue, must call it before close_wait()
OLAPStatus close();
// wait for all memtables being flushed
OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);

OLAPStatus cancel();

int64_t partition_id() const { return _req.partition_id; }

private:
// push a full memtable to flush executor
OLAPStatus _flush_memtable_async();

void _garbage_collection();

private:
Expand All @@ -75,10 +87,13 @@ class DeltaWriter {
RowsetSharedPtr _new_rowset;
TabletSharedPtr _new_tablet;
std::unique_ptr<RowsetWriter> _rowset_writer;
MemTable* _mem_table;
std::shared_ptr<MemTable> _mem_table;
Schema* _schema;
const TabletSchema* _tablet_schema;
bool _delta_written_success;

StorageEngine* _storage_engine;
std::shared_ptr<FlushHandler> _flush_handler;
};

} // namespace doris
Expand Down
21 changes: 12 additions & 9 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@

namespace doris {

MemTable::MemTable(Schema* schema, const TabletSchema* tablet_schema,
MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
KeysType keys_type)
: _schema(schema),
KeysType keys_type, RowsetWriter* rowset_writer)
: _tablet_id(tablet_id),
_schema(schema),
_tablet_schema(tablet_schema),
_tuple_desc(tuple_desc),
_slot_descs(slot_descs),
_keys_type(keys_type),
_row_comparator(_schema) {
_row_comparator(_schema),
_rowset_writer(rowset_writer) {
_schema_size = _schema->schema_size();
_tuple_buf = _arena.Allocate(_schema_size);
_skip_list = new Table(_row_comparator, &_arena);
Expand All @@ -59,6 +61,7 @@ size_t MemTable::memory_usage() {

void MemTable::insert(Tuple* tuple) {
ContiguousRow row(_schema, _tuple_buf);

for (size_t i = 0; i < _slot_descs->size(); ++i) {
auto cell = row.cell(i);
const SlotDescriptor* slot = (*_slot_descs)[i];
Expand All @@ -75,7 +78,7 @@ void MemTable::insert(Tuple* tuple) {
}
}

OLAPStatus MemTable::flush(RowsetWriter* rowset_writer) {
OLAPStatus MemTable::flush() {
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
Expand All @@ -84,17 +87,17 @@ OLAPStatus MemTable::flush(RowsetWriter* rowset_writer) {
char* row = (char*)it.key();
ContiguousRow dst_row(_schema, row);
agg_finalize_row(&dst_row, _skip_list->arena());
RETURN_NOT_OK(rowset_writer->add_row(dst_row));
RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
}
RETURN_NOT_OK(rowset_writer->flush());
RETURN_NOT_OK(_rowset_writer->flush());
}
DorisMetrics::memtable_flush_total.increment(1);
DorisMetrics::memtable_flush_duration_us.increment(duration_ns / 1000);
return OLAP_SUCCESS;
}

OLAPStatus MemTable::close(RowsetWriter* rowset_writer) {
return flush(rowset_writer);
OLAPStatus MemTable::close() {
return flush();
}

} // namespace doris
Loading