Skip to content

Conversation

@weizuo93
Copy link
Contributor

Proposed changes

Currently, there are M threads to do base compaction and N threads to do cumulative compaction for each disk. Too many compaction tasks may run out of memory, so the max concurrency of running compaction tasks is limited by semaphore. If the running threads cost too much memory, we can't defense it. In addition, reducing concurrency to avoid OOM will lead to some compaction tasks can't be executed in time and we may encounter more heavy compaction. Therefore, concurrency limitation is not enough.

The strategy proposed in #3624 may be effective to solve the OOM.

A CompactionPermitLimiter is used for compaction limitation, and use single-producer/multi-consumer model. Producer will try to generate compaction tasks and acquire permits for each task. The compaction task which can hold permits will be executed in thread pool and each finished task will release its permits.

permits should be applied for before a compaction task can execute. When the sum of permits held by executing compaction tasks reaches a threshold, subsequent compaction task will be no longer allowed, until some permits are released. Tablet compaction score is used as permits of compaction task here.

To some extent, memory consumption can be limited by setting appropriate permits threshold.

Types of changes

What types of changes does your code introduce to Doris?
Put an x in the boxes that apply

  • [] Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • [] Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • [] Documentation Update (if none of the other choices apply)
  • Code refactor (Modify the code structure, format the code, etc...)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.

  • Issue (Fix [Proposal] The execution model of compaction needs to improve #3624)have described the bug/feature there in detail
  • Compiling and unit tests pass locally with my changes
  • [] I have added tests that prove my fix is effective or that my feature works
  • [] If this change need a document change, I have updated the document
  • Any dependent changes have been merged

@weizuo93 weizuo93 changed the title [Optimize] optimize the execution model of compaction to limit memory consumption [Optimize] Optimize the execution model of compaction to limit memory consumption Sep 25, 2020
@morningman morningman added area/compact Issues or PRs related to the compact kind/improvement labels Sep 26, 2020
@morningman morningman self-assigned this Sep 26, 2020
CONF_mInt64(total_permits_for_compaction_score, "10000")

// Whether compaction task is allowed to start when compaction score of current tablet is out of upper limit.
CONF_mBool(enable_over_sold, "true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CONF_mBool(enable_over_sold, "true");
CONF_mBool(enable_compaction_permit_over_sold, "true");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK !

CompactionType compaction_type;
do {
if (!config::disable_auto_compaction) {
if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default cumulative_compaction_rounds_for_each_base_compaction_round is 9, and default generate_compaction_tasks_interval_seconds is 2. So generally, it will create a base compaction task for every 18 seconds?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I also test this with following case:

  1. Only 1 BE with 1 data dir.
  2. Create one table with 100 buckets.
  3. insert data into this table for every 5 seconds.

The compaction is triggered every 2 seconds. And each compaction task cost just 0.x seconds. But the average version count of tablets is about 50, and can not be lower.

So I think the way to generate compaction tasks through polling may not be appropriate. One possible way is to generate compaction tasks through triggering.

Based on polling, currently only one task can be done in 2 seconds, and based on triggering, in my case, it can be done 500 times per second (because the amount of data in each batch is very small in the case of high-frequency load)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestions! I optimized the implementation logic of my producer. If all the compaction tasks produced can hold permits, the producer will continue to produce compaction tasks without sleep. In this way, the production speed can far meet consumer demand.

CompactionType compaction_type, std::vector<DataDir*> data_dirs) {
vector<TabletSharedPtr> tablets_compaction;
std::random_shuffle(data_dirs.begin(), data_dirs.end());
for (auto data_dir : data_dirs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can find more than one tablet for each data dir at this time.
the number of tablet found here can be compaction_task_num_per_disk

@morningman
Copy link
Contributor

morningman commented Sep 29, 2020

Hi @weizuo93 , I tested in my env with high frequency load, And I recommend to add following patch.

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index f81a029cc..aa0f0b452 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -39,13 +39,13 @@ OLAPStatus Compaction::do_compaction(int64_t permits) {
     TRACE("start to do compaction");
     _tablet->data_dir()->disks_compaction_score_increment(permits);
     _tablet->data_dir()->disks_compaction_num_increment(1);
-    OLAPStatus st = do_compaction_impl();
+    OLAPStatus st = do_compaction_impl(permits);
     _tablet->data_dir()->disks_compaction_score_increment(-permits);
     _tablet->data_dir()->disks_compaction_num_increment(-1);
     return st;
 }

-OLAPStatus Compaction::do_compaction_impl() {
+OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
     OlapStopWatch watch;

     // 1. prepare input and output parameters
@@ -63,7 +63,8 @@ OLAPStatus Compaction::do_compaction_impl() {
     _tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash);

     LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
-            << ", output version is=" << _output_version.first << "-" << _output_version.second;
+            << ", output version is=" << _output_version.first << "-" << _output_version.second
+            << ", score: " << permits;

     RETURN_NOT_OK(construct_output_rowset_writer());
     RETURN_NOT_OK(construct_input_rowset_readers());
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 20a567cdc..f43bc6f1d 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -55,7 +55,7 @@ protected:
     virtual ReaderType compaction_type() const = 0;

     OLAPStatus do_compaction(int64_t permits);
-    OLAPStatus do_compaction_impl();
+    OLAPStatus do_compaction_impl(int64_t permits);

     void modify_rowsets();
     OLAPStatus gc_unused_rowsets();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 020b40fca..c2cfcc162 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -598,8 +598,8 @@ void StorageEngine::_perform_cumulative_compaction(TabletSharedPtr best_tablet)

     OLAPStatus res = cumulative_compaction.compact();
     if (res != OLAP_SUCCESS) {
-        best_tablet->set_last_cumu_compaction_failure_time(UnixMillis());
         if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) {
+            best_tablet->set_last_cumu_compaction_failure_time(UnixMillis());
             DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
             LOG(WARNING) << "failed to do cumulative compaction. res=" << res
                         << ", table=" << best_tablet->full_name();
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index fdfa99631..bf7d09890 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -763,7 +763,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
     }

     if (best_tablet != nullptr) {
-        LOG(INFO) << "Found the best tablet for compaction. "
+        VLOG(1) << "Found the best tablet for compaction. "
                   << "compaction_type=" << compaction_type_str
                   << ", tablet_id=" << best_tablet->tablet_id()
                   << ", highest_score=" << highest_score;

For 3 things:

  1. Avoid too many "Found the best tablet for compaction. " log.
  2. Still show the highest score int log, but along with the compaction start log.
  3. Do not set last_cumu_compaction_failure_time of tablet if error is OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS. This can make compaction process continue running.

@weizuo93
Copy link
Contributor Author

Hi @weizuo93 , I tested in my env with high frequency load, And I recommend to add following patch.

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index f81a029cc..aa0f0b452 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -39,13 +39,13 @@ OLAPStatus Compaction::do_compaction(int64_t permits) {
     TRACE("start to do compaction");
     _tablet->data_dir()->disks_compaction_score_increment(permits);
     _tablet->data_dir()->disks_compaction_num_increment(1);
-    OLAPStatus st = do_compaction_impl();
+    OLAPStatus st = do_compaction_impl(permits);
     _tablet->data_dir()->disks_compaction_score_increment(-permits);
     _tablet->data_dir()->disks_compaction_num_increment(-1);
     return st;
 }

-OLAPStatus Compaction::do_compaction_impl() {
+OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
     OlapStopWatch watch;

     // 1. prepare input and output parameters
@@ -63,7 +63,8 @@ OLAPStatus Compaction::do_compaction_impl() {
     _tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash);

     LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
-            << ", output version is=" << _output_version.first << "-" << _output_version.second;
+            << ", output version is=" << _output_version.first << "-" << _output_version.second
+            << ", score: " << permits;

     RETURN_NOT_OK(construct_output_rowset_writer());
     RETURN_NOT_OK(construct_input_rowset_readers());
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 20a567cdc..f43bc6f1d 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -55,7 +55,7 @@ protected:
     virtual ReaderType compaction_type() const = 0;

     OLAPStatus do_compaction(int64_t permits);
-    OLAPStatus do_compaction_impl();
+    OLAPStatus do_compaction_impl(int64_t permits);

     void modify_rowsets();
     OLAPStatus gc_unused_rowsets();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 020b40fca..c2cfcc162 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -598,8 +598,8 @@ void StorageEngine::_perform_cumulative_compaction(TabletSharedPtr best_tablet)

     OLAPStatus res = cumulative_compaction.compact();
     if (res != OLAP_SUCCESS) {
-        best_tablet->set_last_cumu_compaction_failure_time(UnixMillis());
         if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) {
+            best_tablet->set_last_cumu_compaction_failure_time(UnixMillis());
             DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
             LOG(WARNING) << "failed to do cumulative compaction. res=" << res
                         << ", table=" << best_tablet->full_name();
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index fdfa99631..bf7d09890 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -763,7 +763,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
     }

     if (best_tablet != nullptr) {
-        LOG(INFO) << "Found the best tablet for compaction. "
+        VLOG(1) << "Found the best tablet for compaction. "
                   << "compaction_type=" << compaction_type_str
                   << ", tablet_id=" << best_tablet->tablet_id()
                   << ", highest_score=" << highest_score;

For 3 things:

  1. Avoid too many "Found the best tablet for compaction. " log.
  2. Still show the highest score int log, but along with the compaction start log.
  3. Do not set last_cumu_compaction_failure_time of tablet if error is OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS. This can make compaction process continue running.

OK. It seems more reasonable.

Copy link
Contributor

@morningman morningman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@morningman morningman added the approved Indicates a PR has been approved by one committer. label Oct 10, 2020
@morningman morningman merged commit eba5955 into apache:master Oct 11, 2020
acelyc111 pushed a commit to acelyc111/incubator-doris that referenced this pull request Jan 20, 2021
… consumption (apache#4670)

Currently, there are M threads to do base compaction and N threads to do cumulative compaction for each disk.
Too many compaction tasks may run out of memory, so the max concurrency of running compaction tasks
is limited by semaphore.
If the running threads cost too much memory, we can't defense it. In addition, reducing concurrency to avoid OOM
will lead to some compaction tasks can't be executed in time and we may encounter more heavy compaction.
Therefore, concurrency limitation is not enough.

The strategy proposed in apache#3624  may be effective to solve the OOM.

A CompactionPermitLimiter is used for compaction limitation, and use single-producer/multi-consumer model.
Producer will try to generate compaction tasks and acquire `permits` for each task.
The compaction task which can hold `permits` will be executed in thread pool and each finished task will
release its `permits`.

`permits` should be applied for before a compaction task can execute. When the sum of `permits`
held by executing compaction tasks reaches a threshold, subsequent compaction task will be no longer allowed,
until some `permits` are released. Tablet compaction score is used as `permits` of compaction task here.

To some extent, memory consumption can be limited by setting appropriate `permits` threshold.
@yangzhg yangzhg mentioned this pull request Feb 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved Indicates a PR has been approved by one committer. area/compact Issues or PRs related to the compact kind/improvement

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Proposal] The execution model of compaction needs to improve

2 participants