Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,18 @@ Status OlapTableSink::prepare(RuntimeState* state) {
_output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
_filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT);
_send_data_timer = ADD_TIMER(_profile, "SendDataTime");
_wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime");
_convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime");
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseWaitTime");
_non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime");
_serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");
_non_blocking_send_work_timer = ADD_CHILD_TIMER(_profile, "NonBlockingSendWorkTime", "NonBlockingSendTime");
_serialize_batch_timer = ADD_CHILD_TIMER(_profile, "SerializeBatchTime", "NonBlockingSendWorkTime");
_total_add_batch_exec_timer = ADD_TIMER(_profile, "TotalAddBatchExecTime");
_max_add_batch_exec_timer = ADD_TIMER(_profile, "MaxAddBatchExecTime");
_add_batch_number = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT);
_num_node_channels = ADD_COUNTER(_profile, "NumberNodeChannels", TUnit::UNIT);
_load_mem_limit = state->get_load_mem_limit();

// open all channels
Expand Down Expand Up @@ -697,18 +703,23 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0,
actual_consume_ns = 0;
actual_consume_ns = 0, total_add_batch_exec_time_ns = 0,
max_add_batch_exec_time_ns = 0,
total_add_batch_num = 0, num_node_channels = 0;
{
SCOPED_TIMER(_close_timer);
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); });
num_node_channels += index_channel->num_node_channels();
}

for (auto index_channel : _channels) {
int64_t add_batch_exec_time = 0;
index_channel->for_each_node_channel([&status, &state, &node_add_batch_counter_map,
&serialize_batch_ns, &mem_exceeded_block_ns,
&queue_push_lock_ns,
&actual_consume_ns](NodeChannel* ch) {
&queue_push_lock_ns, &actual_consume_ns,
&total_add_batch_exec_time_ns, &add_batch_exec_time,
&total_add_batch_num](NodeChannel* ch) {
auto s = ch->close_wait(state);
if (!s.ok()) {
// 'status' will store the last non-ok status of all channels
Expand All @@ -719,8 +730,13 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
}
ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns,
&mem_exceeded_block_ns, &queue_push_lock_ns,
&actual_consume_ns);
&actual_consume_ns, &total_add_batch_exec_time_ns,
&add_batch_exec_time, &total_add_batch_num);
});

if (add_batch_exec_time > max_add_batch_exec_time_ns) {
max_add_batch_exec_time_ns = add_batch_exec_time;
}
}
}
// TODO need to be improved
Expand All @@ -732,9 +748,15 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
COUNTER_SET(_output_rows_counter, _number_output_rows);
COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_wait_mem_limit_timer, mem_exceeded_block_ns);
COUNTER_SET(_convert_batch_timer, _convert_batch_ns);
COUNTER_SET(_validate_data_timer, _validate_data_ns);
COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns);
COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
COUNTER_SET(_add_batch_number, total_add_batch_num);
COUNTER_SET(_num_node_channels, num_node_channels);
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() +
state->num_rows_load_unselected();
Expand All @@ -744,11 +766,10 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
// print log of add batch time of all node, for tracing load performance easily
std::stringstream ss;
ss << "finished to close olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: ";
<< ", txn_id=" << _txn_id << ", node add batch time(ms)/num: ";
for (auto const& pair : node_add_batch_counter_map) {
ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000)
<< ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) << ")("
<< pair.second.add_batch_num << ")} ";
<< ")(" << pair.second.add_batch_num << ")} ";
}
LOG(INFO) << ss.str();
} else {
Expand Down
26 changes: 20 additions & 6 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ struct AddBatchCounter {
template <typename T>
class ReusableClosure : public google::protobuf::Closure {
public:
ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
ReusableClosure() : cid(INVALID_BTHREAD_ID) {
}
~ReusableClosure() {
// shouldn't delete when Run() is calling or going to be called, wait for current Run() done.
join();
Expand Down Expand Up @@ -173,12 +174,17 @@ class NodeChannel {

void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,
int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns,
int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) {
int64_t* queue_push_lock_ns, int64_t* actual_consume_ns,
int64_t* total_add_batch_exec_time_ns, int64_t* add_batch_exec_time_ns,
int64_t* total_add_batch_num) {
(*add_batch_counter_map)[_node_id] += _add_batch_counter;
*serialize_batch_ns += _serialize_batch_ns;
*mem_exceeded_block_ns += _mem_exceeded_block_ns;
*queue_push_lock_ns += _queue_push_lock_ns;
*actual_consume_ns += _actual_consume_ns;
*add_batch_exec_time_ns = (_add_batch_counter.add_batch_execution_time_us * 1000);
*total_add_batch_exec_time_ns += *add_batch_exec_time_ns;
*total_add_batch_num += _add_batch_counter.add_batch_num;
}

int64_t node_id() const { return _node_id; }
Expand Down Expand Up @@ -237,10 +243,10 @@ class NodeChannel {
std::vector<TTabletCommitInfo> _tablet_commit_infos;

AddBatchCounter _add_batch_counter;
std::atomic<int64_t> _serialize_batch_ns;
std::atomic<int64_t> _mem_exceeded_block_ns;
std::atomic<int64_t> _queue_push_lock_ns;
std::atomic<int64_t> _actual_consume_ns;
std::atomic<int64_t> _serialize_batch_ns{0};
std::atomic<int64_t> _mem_exceeded_block_ns{0};
std::atomic<int64_t> _queue_push_lock_ns{0};
std::atomic<int64_t> _actual_consume_ns{0};
};

class IndexChannel {
Expand All @@ -262,6 +268,8 @@ class IndexChannel {
void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); }
bool has_intolerable_failure();

size_t num_node_channels() const { return _node_channels.size(); }

private:
OlapTableSink* _parent;
int64_t _index_id;
Expand Down Expand Up @@ -382,12 +390,18 @@ class OlapTableSink : public DataSink {
RuntimeProfile::Counter* _output_rows_counter = nullptr;
RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
RuntimeProfile::Counter* _send_data_timer = nullptr;
RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
RuntimeProfile::Counter* _convert_batch_timer = nullptr;
RuntimeProfile::Counter* _validate_data_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _non_blocking_send_timer = nullptr;
RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr;
RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr;
RuntimeProfile::Counter* _add_batch_number = nullptr;
RuntimeProfile::Counter* _num_node_channels = nullptr;

// load mem limit is for remote load channel
int64_t _load_mem_limit = -1;
Expand Down
60 changes: 55 additions & 5 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,17 @@ OLAPStatus DeltaWriter::init() {
}

OLAPStatus DeltaWriter::write(Tuple* tuple) {
if (!_is_init) {
std::lock_guard<SpinLock> l(_lock);
if (!_is_init && !_is_cancelled) {
RETURN_NOT_OK(init());
}

if (_is_cancelled) {
// The writer may be cancelled at any time by other thread.
// just return ERROR if writer is cancelled.
return OLAP_ERR_ALREADY_CANCELLED;
}

_mem_table->insert(tuple);

// if memtable is full, push it to the flush executor,
Expand All @@ -196,7 +203,20 @@ OLAPStatus DeltaWriter::_flush_memtable_async() {
return _flush_token->submit(_mem_table);
}

OLAPStatus DeltaWriter::flush_memtable_and_wait() {
OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
std::lock_guard<SpinLock> l(_lock);
if (!_is_init) {
// This writer is not initialized before flushing. Do nothing
// But we return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED,
// Because this method maybe called when trying to reduce mem consumption,
// and at that time, the writer may not be initialized yet and that is a normal case.
return OLAP_SUCCESS;
}

if (_is_cancelled) {
return OLAP_ERR_ALREADY_CANCELLED;
}

if (mem_consumption() == _mem_table->memory_usage()) {
// equal means there is no memtable in flush queue, just flush this memtable
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
Expand All @@ -208,7 +228,24 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
DCHECK(mem_consumption() > _mem_table->memory_usage());
// this means there should be at least one memtable in flush queue.
}
// wait all memtables in flush queue to be flushed.

if (need_wait) {
// wait all memtables in flush queue to be flushed.
RETURN_NOT_OK(_flush_token->wait());
}
return OLAP_SUCCESS;
}

OLAPStatus DeltaWriter::wait_flush() {
std::lock_guard<SpinLock> l(_lock);
if (!_is_init) {
// return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED for same reason
// as described in flush_memtable_and_wait()
return OLAP_SUCCESS;
}
if (_is_cancelled) {
return OLAP_ERR_ALREADY_CANCELLED;
}
RETURN_NOT_OK(_flush_token->wait());
return OLAP_SUCCESS;
}
Expand All @@ -220,7 +257,8 @@ void DeltaWriter::_reset_mem_table() {
}

OLAPStatus DeltaWriter::close() {
if (!_is_init) {
std::lock_guard<SpinLock> l(_lock);
if (!_is_init && !_is_cancelled) {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
// in same partition has data loaded.
Expand All @@ -229,14 +267,24 @@ OLAPStatus DeltaWriter::close() {
RETURN_NOT_OK(init());
}

if (_is_cancelled) {
return OLAP_ERR_ALREADY_CANCELLED;
}

RETURN_NOT_OK(_flush_memtable_async());
_mem_table.reset();
return OLAP_SUCCESS;
}

OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
std::lock_guard<SpinLock> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait() being called";

if (_is_cancelled) {
return OLAP_ERR_ALREADY_CANCELLED;
}

// return error if previous flush failed
RETURN_NOT_OK(_flush_token->wait());
DCHECK_EQ(_mem_tracker->consumption(), 0);
Expand Down Expand Up @@ -295,7 +343,8 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
}

OLAPStatus DeltaWriter::cancel() {
if (!_is_init) {
std::lock_guard<SpinLock> l(_lock);
if (!_is_init || _is_cancelled) {
return OLAP_SUCCESS;
}
_mem_table.reset();
Expand All @@ -304,6 +353,7 @@ OLAPStatus DeltaWriter::cancel() {
_flush_token->cancel();
}
DCHECK_EQ(_mem_tracker->consumption(), 0);
_is_cancelled = true;
return OLAP_SUCCESS;
}

Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/tablet.h"
#include "util/spinlock.h"

namespace doris {

Expand Down Expand Up @@ -73,12 +74,17 @@ class DeltaWriter {
// submit current memtable to flush queue, and wait all memtables in flush queue
// to be flushed.
// This is currently for reducing mem consumption of this delta writer.
OLAPStatus flush_memtable_and_wait();
// If need_wait is true, it will wait for all memtable in flush queue to be flushed.
// Otherwise, it will just put memtables to the flush queue and return.
OLAPStatus flush_memtable_and_wait(bool need_wait);

int64_t partition_id() const;

int64_t mem_consumption() const;

// Wait all memtable in flush queue to be flushed
OLAPStatus wait_flush();

private:
DeltaWriter(WriteRequest* req, const std::shared_ptr<MemTracker>& parent,
StorageEngine* storage_engine);
Expand All @@ -92,6 +98,7 @@ class DeltaWriter {

private:
bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
TabletSharedPtr _tablet;
RowsetSharedPtr _cur_rowset;
Expand All @@ -106,6 +113,8 @@ class DeltaWriter {
StorageEngine* _storage_engine;
std::unique_ptr<FlushToken> _flush_token;
std::shared_ptr<MemTracker> _mem_tracker;

SpinLock _lock;
};

} // namespace doris
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ enum OLAPStatus {
OLAP_ERR_TOO_MANY_TRANSACTIONS = -233,
OLAP_ERR_INVALID_SNAPSHOT_VERSION = -234,
OLAP_ERR_TOO_MANY_VERSION = -235,
OLAP_ERR_NOT_INITIALIZED = -236,
OLAP_ERR_ALREADY_CANCELLED = -237,

// CommandExecutor
// [-300, -400)
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/unique_rowset_id_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "util/doris_metrics.h"
#include "util/spinlock.h"
#include "util/stack_util.h"
#include "util/uid_util.h"

namespace doris {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema
bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path,
TTabletId* tablet_id,
TSchemaHash* schema_hash) {
// the path like: /data/14/10080/964828783/
static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)");
// match tablet schema hash data path, for example, the path is /data/1/16791/29998
// 1 is shard id , 16791 is tablet id, 29998 is schema hash
Expand All @@ -651,6 +652,7 @@ bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path,
}

bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) {
// the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat
static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*");
string id_str;
bool ret = RE2::PartialMatch(path, re, &id_str);
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
static void dummy_deleter(const CacheKey& key, void* value) {}

Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
int64_t* wait_lock_time_ns) {
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
UniqueId load_id(request.id());
// 1. get load channel
std::shared_ptr<LoadChannel> channel;
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/load_channel_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class LoadChannelMgr {
Status open(const PTabletWriterOpenRequest& request);

Status add_batch(const PTabletWriterAddBatchRequest& request,
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
int64_t* wait_lock_time_ns);
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);

// cancel all tablet stream for 'load_id' load
Status cancel(const PTabletWriterCancelRequest& request);
Expand Down
Loading