diff --git a/be/src/common/config.h b/be/src/common/config.h index 9bbe0a9c80373c..93a0bff1ae4109 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -252,6 +252,10 @@ namespace config { CONF_Int32(cumulative_compaction_num_threads_per_disk, "1"); CONF_mInt64(cumulative_compaction_budgeted_bytes, "104857600"); // CONF_Int32(cumulative_compaction_write_mbytes_per_sec, "100"); + // cumulative compaction skips recently published deltas in order to prevent + // compacting a version that might be queried (in case the query planning phase took some time). + // the following config set the window size + CONF_mInt32(cumulative_compaction_skip_window_seconds, "30"); // if compaction of a tablet failed, this tablet should not be chosen to // compaction until this interval passes. diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 87c662885f19b2..4a32553b483d39 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -66,7 +66,8 @@ OLAPStatus CumulativeCompaction::compact() { OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { std::vector candidate_rowsets; - _tablet->pick_candicate_rowsets_to_cumulative_compaction(&candidate_rowsets); + _tablet->pick_candicate_rowsets_to_cumulative_compaction( + config::cumulative_compaction_skip_window_seconds, &candidate_rowsets); if (candidate_rowsets.empty()) { return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index c30653d916bc2b..0e2727af36cf73 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -17,6 +17,8 @@ #include "olap/rowset/rowset.h" +#include "util/time.h" + namespace doris { Rowset::Rowset(const TabletSchema *schema, @@ -65,6 +67,9 @@ void Rowset::make_visible(Version version, VersionHash version_hash) { _rowset_meta->set_version(version); _rowset_meta->set_version_hash(version_hash); _rowset_meta->set_rowset_state(VISIBLE); + // update create time to the visible time, + // it's used to skip recently published version during compaction + _rowset_meta->set_creation_time(UnixSeconds()); if (_rowset_meta->has_delete_predicate()) { _rowset_meta->mutable_delete_predicate()->set_version(version.first); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 4dd3d199448552..8d96b9b64bf234 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -387,7 +387,7 @@ void Tablet::_delete_inc_rowset_by_version(const Version& version, } void Tablet::delete_expired_inc_rowsets() { - time_t now = time(nullptr); + int64_t now = UnixSeconds(); vector> expired_versions; WriteLock wrlock(&_meta_lock); for (auto& rs_meta : _tablet_meta->all_inc_rs_metas()) { @@ -933,11 +933,12 @@ TabletInfo Tablet::get_tablet_info() const { return TabletInfo(tablet_id(), schema_hash(), tablet_uid()); } -void Tablet::pick_candicate_rowsets_to_cumulative_compaction( - vector* candidate_rowsets) { +void Tablet::pick_candicate_rowsets_to_cumulative_compaction(int64_t skip_window_sec, + std::vector* candidate_rowsets) { + int64_t now = UnixSeconds(); ReadLock rdlock(&_meta_lock); for (auto& it : _rs_version_map) { - if (it.first.first >= _cumulative_point) { + if (it.first.first >= _cumulative_point && (it.second->creation_time() + skip_window_sec < now)) { candidate_rowsets->push_back(it.second); } } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 02c676bafd0536..f19218180a9298 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -224,7 +224,8 @@ class Tablet : public std::enable_shared_from_this { TabletInfo get_tablet_info() const; - void pick_candicate_rowsets_to_cumulative_compaction(std::vector* candidate_rowsets); + void pick_candicate_rowsets_to_cumulative_compaction(int64_t skip_window_sec, + std::vector* candidate_rowsets); void pick_candicate_rowsets_to_base_compaction(std::vector* candidate_rowsets); OLAPStatus calculate_cumulative_point(); diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index f152164391bbad..c098c5b5344b8f 100755 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -494,7 +494,7 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i } void TxnManager::build_expire_txn_map(std::map>* expire_txn_map) { - time_t now = time(nullptr); + int64_t now = UnixSeconds(); // traverse the txn map, and get all expired txns ReadLock txn_rdlock(&_txn_map_lock); for (auto& it : _txn_tablet_map) { diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index d4a3e923b4f5f5..96736d0a27958f 100755 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -18,7 +18,6 @@ #ifndef DORIS_BE_SRC_OLAP_TXN_MANAGER_H #define DORIS_BE_SRC_OLAP_TXN_MANAGER_H -#include #include #include #include @@ -46,6 +45,7 @@ #include "olap/options.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" +#include "util/time.h" namespace doris { @@ -59,7 +59,7 @@ struct TabletTxnInfo { RowsetSharedPtr rowset) : load_id(load_id), rowset(rowset), - creation_time(time(nullptr)) {} + creation_time(UnixSeconds()) {} TabletTxnInfo() {} }; diff --git a/be/src/util/time.h b/be/src/util/time.h index 1e81dce640acbd..01d56c6768ba78 100755 --- a/be/src/util/time.h +++ b/be/src/util/time.h @@ -74,6 +74,14 @@ inline int64_t UnixMillis() { return GetCurrentTimeMicros() / MICROS_PER_MILLI; } +/// Returns the number of seconds that have passed since the Unix epoch. This is +/// affected by manual changes to the system clock but is more suitable for use across +/// a cluster. For more accurate timings on the local host use the monotonic functions +/// above. +inline int64_t UnixSeconds() { + return GetCurrentTimeMicros() / MICROS_PER_SEC; +} + /// Returns the number of microseconds that have passed since the Unix epoch. This is /// affected by manual changes to the system clock but is more suitable for use across /// a cluster. For more accurate timings on the local host use the monotonic functions