Skip to content

Conversation

@morningman
Copy link
Contributor

ISSUE: #1797

@morningman
Copy link
Contributor Author

Separate TabletsChannel from TabletWriterMgr to avoid conflict define between gutil and brpc

// the size of queue for saving immutable memtables.
// set this size larger may reduce the time of waiting memtable flush,
// but will increase memory usage of loading.
// CONF_Int32(memtable_queue_size, "1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove it

bool _delta_written_success;
std::atomic<OLAPStatus> _flush_status;
int64_t _flush_cost_ns;
int64_t _flush_time;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename _flush_cost_ns to _flush_time_ns?
rename _flush_time to _flush_count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

void _flush_memtable();

private:
// id of this load channel, just for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

// the size of flush queue equals to the number of tablets.
// so that each tablet has at least one rotational memtable.
// and the over all mem usage is at most 2 times of total memtable's size
BlockingQueue<std::shared_ptr<MemTable>> _flush_queue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tablets data are not uniform. An extreme example: there 10 tablets, tablet A occupies ninety percent data, so tablet A data flush will very slowly. So I think there maybe two even more tablet A MemTable in flush_queue at the same time.

So "the over all mem usage is at most 2 times of total memtable's size" will not always right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, all memtables' max size are equal(default is 100MB).
Suppose there 5 tablets, and even if tablet A occupies all data, there are at most 5 memtables(all of them are tablet A's) in the queue. And plus the number of memtables for incoming data, there are 10 memtables at most.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. you are right. I forgot the " all memtables' max size are equal(default is 100MB)."

_delta_written_success(false), _flush_status(OLAP_SUCCESS),
_flush_queue(flush_queue) {

_mem_table.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

// and create a new memtable for incoming data
if (_mem_table->memory_usage() >= config::write_buffer_size) {
RETURN_NOT_OK(_mem_table->flush(_rowset_writer.get()));
if (_flush_status.load() != OLAP_SUCCESS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RETURN_NOT_OK?

int64_t _flush_count;

// queue for saving immable mem tables
BlockingQueue<std::shared_ptr<MemTable>>* _flush_queue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to have an executor, which can be system wide or a load wide. When DeltaWriter submit a flush request to the executor, the executor will return a future to it. If you do that way, you need not to modify TabletChannel, and TabletChannel don't call MemTable flush. MemTable flush operation is StorageEngine implementation related, it is a little weird to put it out of StorageEngine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think about it

@morningman morningman force-pushed the optimize_tablet_channel branch from bb0dbc7 to cd4ffd0 Compare September 16, 2019 00:56
_stream_load_executor = new StreamLoadExecutor(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_memtable_flush_executor = new MemTableFlushExecutor(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flush executor should be included in StorageEngine other than here.

_req.tuple_desc, _tablet->keys_type());
_mem_table = std::make_shared<MemTable>(_tablet->tablet_id(), _schema, _tablet_schema, _req.slots, _req.tuple_desc, _tablet->keys_type());

_flush_queue_idx = _flush_executor->get_queue_idx(_tablet->data_dir()->path_hash());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you exposing this queue to the outside?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because for now, each DeltaWriter must only push memtables to the same queue.

struct MemTableFlushContext {
std::shared_ptr<MemTable> memtable;
DeltaWriter* delta_writer;
std::atomic<OLAPStatus>* flush_status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaWriter will call MemTableFlushExecutor, but you put DeltaWriter in context, which make a loop dependence

namespace doris {

// channel that process all data for this load
class TabletsChannel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not necessary to modify, please don't change these files to avoid Introducing new questions

@morningman morningman closed this Sep 18, 2019
@morningman morningman reopened this Sep 18, 2019
// The min bytes that should be left of a data dir
CONF_Int64(storage_flood_stage_left_capacity_bytes, "1073741824") // 1GB
// number of thread for flushing memtable per data dir
CONF_Int32(flush_thread_num_per_dir, "2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CONF_Int32(flush_thread_num_per_dir, "2");
CONF_Int32(flush_thread_num_per_store, "2");

_delta_written_success(false), _flush_status(OLAP_SUCCESS),
_flush_queue(flush_queue) {

_mem_table.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

// if the last memtable is flushed, all previous memtables should already be flushed.
// so we only need to wait and block on the last memtable's flush future.
std::future<OLAPStatus> _flush_future;
#endif
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

private:
// id of this load channel
TabletsChannelKey _key;
MemTableFlushExecutor* _flush_executor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not to store this in TabletChannel, this is storage engine internal.

int32_t get_queue_idx(size_t path_hash);

// push the memtable to specified flush queue, and return a future
std::future<OLAPStatus> push_memtable(int32_t queue_idx, const MemTableFlushContext& ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to assign queue in this class, not outside this class.

@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have two executor?

SAFE_DELETE(_mem_table);
_mem_table = new MemTable(_schema, _tablet_schema, _req.slots,
_req.tuple_desc, _tablet->keys_type());
OLAPStatus DeltaWriter::_check_flush_futures(bool block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put this logic in _flush_handler. DeltaWriter can test status of _flush_handler.
Executor can update flush_handler's status through callback. In that way, there is no need to loop the status of submitted MemTable.

In your implementation code, you give a 1 microseconds interval to loop the status. When waiting last MemTable, block is true, you will loop 10^6 times one second, which is waste of CPU.

I think following interface is enough for current usage.

class FlusherClient {
public:
    // submit a memtable to flush. return error if some previous submitted MemTable has failed  
    Status submit();
    // wait for all submitted memtable finished.
    Status wait();
    // Get flush operations' statistics
    Statistics get_stats();

    // called when a memtable is finished by executor.
    void on_flush_finished();
};

@morningman morningman force-pushed the optimize_tablet_channel branch from 4348e78 to f3c4967 Compare September 23, 2019 01:05
// ...
// FlushHandler* flush_handler;
// memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler);
// std::shared_ptr<FlushHandler> shared_handler(flush_handler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If client should wrap returned pointer with a shared_ptr, why not define the argument with std::shared_ptr<>*

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

// wait for all submitted memtable finished.
OLAPStatus wait();
// get flush operations' statistics
const FlushStatistic& get_stats();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const FlushStatistic& get_stats();
const FlushStatistic& get_stats() const;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

MemTableFlushContext ctx;
ctx.memtable = memtable;
ctx.flush_handler = this->shared_from_this();
_flush_futures.push(_flush_executor->_push_memtable(_flush_queue_idx, ctx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of future and callback is enough. No need to have both, which will make it more complex.
In this case, what you need is CountDownLatch, update in callback.

}

// if last flush of this tablet already failed, just skip
if (ctx.flush_handler->last_flush_status() != OLAP_SUCCESS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you call last_flush_status() here, this means that Executor know details of flush handler.
there are other options to do this

  1. Executor just do and callback, and Flusher will add more task in callback function
  2. make a context an interface who has an abstract function get_state() to tell if it should be scheduled.

std::lock_guard<SpinLock> l(_lock);
_flush_promises[ctx.flush_id].set_value(res.flush_status);
_flush_promises.erase(ctx.flush_id);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just easy for reading

@morningman morningman force-pushed the optimize_tablet_channel branch from 25aff96 to 0d97379 Compare September 23, 2019 14:55
// waiter:
// one or more waiter call xxx_wait() to wait until all or at least one tasks are finished.
class CounterCondVariable {
public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent

std::unique_lock<std::mutex> lock(_mutex);
++count_;
cv_.notify_one();
++_count;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

// worker:
// one or more workers do the task and call dec_count() after finishing the task
// waiter:
// one or more waiter call xxx_wait() to wait until all or at least one tasks are finished.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to give a regular usage example

// wait until count down to zero
void block_wait() {
std::unique_lock<std::mutex> lock(_lock);
_cv.wait(lock, [=] { return _count <= 0; });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_cv.wait(lock, [=] { return _count <= 0; });
_cv.wait(lock, [this] { return _count <= 0; });

only capture what you need

// wait if count larger than 0
// and after being notified, return true if count down zo zero,
// or return false other wise.
bool check_wait() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a dec_to_zero function is better than this interface in this case.
And dec_to_zero is more general.

_delta_written_success = true;

const FlushStatistic& stat = _flush_handler->get_stats();
LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this INFO log? this would create many log in our info log files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is already online and not write too much log.
It's better to use INFO in this version for checking. I will remove it at next version if it a problem.

#ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H
#define DORIS_BE_SRC_OLAP_MEMTABLE_H

#include <future>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no use


// if last flush of this tablet already failed, just skip
if (ctx.flush_handler->is_cancelled()) {
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

callback?


namespace doris {

OLAPStatus FlushHandler::submit(std::shared_ptr<MemTable> memtable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to limit the number of memtables submitted to executor?
I'm afraid that one big ingest will block other ingest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many memtables in flush queue will take too much memory.
And the size of a memtable is no more than 100MB, so there will no be a big ingest

Copy link
Contributor

@imay imay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@morningman morningman merged commit c643cbd into apache:master Sep 25, 2019
@imay imay mentioned this pull request Sep 26, 2019
swjtu-zhanglei pushed a commit to swjtu-zhanglei/incubator-doris that referenced this pull request Jul 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants