Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ namespace config {
CONF_Int32(cumulative_compaction_num_threads_per_disk, "1");
CONF_mInt64(cumulative_compaction_budgeted_bytes, "104857600");
// CONF_Int32(cumulative_compaction_write_mbytes_per_sec, "100");
// cumulative compaction skips recently published deltas in order to prevent
// compacting a version that might be queried (in case the query planning phase took some time).
// the following config set the window size
CONF_mInt32(cumulative_compaction_skip_window_seconds, "30");

// if compaction of a tablet failed, this tablet should not be chosen to
// compaction until this interval passes.
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ OLAPStatus CumulativeCompaction::compact() {

OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
std::vector<RowsetSharedPtr> candidate_rowsets;
_tablet->pick_candicate_rowsets_to_cumulative_compaction(&candidate_rowsets);
_tablet->pick_candicate_rowsets_to_cumulative_compaction(
config::cumulative_compaction_skip_window_seconds, &candidate_rowsets);

if (candidate_rowsets.empty()) {
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "olap/rowset/rowset.h"

#include "util/time.h"

namespace doris {

Rowset::Rowset(const TabletSchema *schema,
Expand Down Expand Up @@ -65,6 +67,9 @@ void Rowset::make_visible(Version version, VersionHash version_hash) {
_rowset_meta->set_version(version);
_rowset_meta->set_version_hash(version_hash);
_rowset_meta->set_rowset_state(VISIBLE);
// update create time to the visible time,
// it's used to skip recently published version during compaction
_rowset_meta->set_creation_time(UnixSeconds());

if (_rowset_meta->has_delete_predicate()) {
_rowset_meta->mutable_delete_predicate()->set_version(version.first);
Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ void Tablet::_delete_inc_rowset_by_version(const Version& version,
}

void Tablet::delete_expired_inc_rowsets() {
time_t now = time(nullptr);
int64_t now = UnixSeconds();
vector<pair<Version, VersionHash>> expired_versions;
WriteLock wrlock(&_meta_lock);
for (auto& rs_meta : _tablet_meta->all_inc_rs_metas()) {
Expand Down Expand Up @@ -933,11 +933,12 @@ TabletInfo Tablet::get_tablet_info() const {
return TabletInfo(tablet_id(), schema_hash(), tablet_uid());
}

void Tablet::pick_candicate_rowsets_to_cumulative_compaction(
vector<RowsetSharedPtr>* candidate_rowsets) {
void Tablet::pick_candicate_rowsets_to_cumulative_compaction(int64_t skip_window_sec,
std::vector<RowsetSharedPtr>* candidate_rowsets) {
int64_t now = UnixSeconds();
ReadLock rdlock(&_meta_lock);
for (auto& it : _rs_version_map) {
if (it.first.first >= _cumulative_point) {
if (it.first.first >= _cumulative_point && (it.second->creation_time() + skip_window_sec < now)) {
candidate_rowsets->push_back(it.second);
}
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ class Tablet : public std::enable_shared_from_this<Tablet> {

TabletInfo get_tablet_info() const;

void pick_candicate_rowsets_to_cumulative_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
void pick_candicate_rowsets_to_cumulative_compaction(int64_t skip_window_sec,
std::vector<RowsetSharedPtr>* candidate_rowsets);
void pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);

OLAPStatus calculate_cumulative_point();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i
}

void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) {
time_t now = time(nullptr);
int64_t now = UnixSeconds();
// traverse the txn map, and get all expired txns
ReadLock txn_rdlock(&_txn_map_lock);
for (auto& it : _txn_tablet_map) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#ifndef DORIS_BE_SRC_OLAP_TXN_MANAGER_H
#define DORIS_BE_SRC_OLAP_TXN_MANAGER_H

#include <ctime>
#include <list>
#include <map>
#include <mutex>
Expand Down Expand Up @@ -46,6 +45,7 @@
#include "olap/options.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "util/time.h"

namespace doris {

Expand All @@ -59,7 +59,7 @@ struct TabletTxnInfo {
RowsetSharedPtr rowset) :
load_id(load_id),
rowset(rowset),
creation_time(time(nullptr)) {}
creation_time(UnixSeconds()) {}

TabletTxnInfo() {}
};
Expand Down
8 changes: 8 additions & 0 deletions be/src/util/time.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ inline int64_t UnixMillis() {
return GetCurrentTimeMicros() / MICROS_PER_MILLI;
}

/// Returns the number of seconds that have passed since the Unix epoch. This is
/// affected by manual changes to the system clock but is more suitable for use across
/// a cluster. For more accurate timings on the local host use the monotonic functions
/// above.
inline int64_t UnixSeconds() {
return GetCurrentTimeMicros() / MICROS_PER_SEC;
}

/// Returns the number of microseconds that have passed since the Unix epoch. This is
/// affected by manual changes to the system clock but is more suitable for use across
/// a cluster. For more accurate timings on the local host use the monotonic functions
Expand Down