Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "util/uuid_generator.h"

namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
Expand Down Expand Up @@ -490,8 +491,10 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
double process_memory_usage =
cast_set<double>(doris::GlobalMemoryArbitrator::process_memory_usage());
bool memory_usage_high =
process_memory_usage > cast_set<double>(MemInfo::soft_mem_limit()) * 0.8;
if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
Expand Down Expand Up @@ -621,4 +624,5 @@ void CloudCumulativeCompaction::do_lease() {
}
}

#include "common/compile_check_end.h"
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/compaction.h"

namespace doris {
#include "common/compile_check_begin.h"

class CloudCumulativeCompaction : public CloudCompactionMixin {
public:
Expand Down Expand Up @@ -60,4 +61,5 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {
Version _last_delete_version {-1, -1};
};

#include "common/compile_check_end.h"
} // namespace doris
12 changes: 7 additions & 5 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "olap/tablet_meta.h"

namespace doris {
#include "common/compile_check_begin.h"

CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
Expand All @@ -48,7 +49,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size
return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
}

int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
Expand Down Expand Up @@ -114,8 +115,8 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
size_t new_compaction_score = *compaction_score;
while (rs_begin != input_rowsets->end()) {
auto& rs_meta = (*rs_begin)->rowset_meta();
int current_level = _level_size(rs_meta->total_disk_size());
int remain_level = _level_size(total_size - rs_meta->total_disk_size());
int64_t current_level = _level_size(rs_meta->total_disk_size());
int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
// if current level less then remain level, input rowsets contain current rowset
// and process return; otherwise, input rowsets do not contain current rowset.
if (current_level <= remain_level) {
Expand Down Expand Up @@ -185,7 +186,7 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
}

int64_t CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) const {
int64_t promotion_size = int64_t(t->base_size() * _promotion_ratio);
int64_t promotion_size = int64_t(cast_set<double>(t->base_size()) * _promotion_ratio);
// promotion_size is between _size_based_promotion_size and _size_based_promotion_min_size
return promotion_size > _promotion_size ? _promotion_size
: promotion_size < _promotion_min_size ? _promotion_min_size
Expand Down Expand Up @@ -215,7 +216,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
: last_cumulative_point;
}

int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
Expand Down Expand Up @@ -377,4 +378,5 @@ int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
return output_rowset->end_version() + 1;
}

#include "common/compile_check_end.h"
} // namespace doris
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "olap/rowset/rowset_meta.h"

namespace doris {
#include "common/compile_check_begin.h"

class Tablet;
struct Version;
Expand All @@ -44,7 +45,7 @@ class CloudCumulativeCompactionPolicy {

virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0;

virtual int32_t pick_input_rowsets(CloudTablet* tablet,
virtual int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand All @@ -71,7 +72,7 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
return 0;
}

int32_t pick_input_rowsets(CloudTablet* tablet,
int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand Down Expand Up @@ -106,7 +107,7 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override;

int32_t pick_input_rowsets(CloudTablet* tablet,
int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
const int64_t max_compaction_score,
const int64_t min_compaction_score,
Expand All @@ -115,4 +116,5 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
bool allow_delete = false) override;
};

#include "common/compile_check_end.h"
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_delete_bitmap_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "util/stopwatch.hpp"

namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

namespace {
Expand Down Expand Up @@ -177,4 +178,5 @@ void CloudDeleteBitmapAction::handle(HttpRequest* req) {
}
}

#include "common/compile_check_end.h"
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_delete_bitmap_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "olap/tablet.h"

namespace doris {
#include "common/compile_check_begin.h"
class HttpRequest;

class ExecEnv;
Expand All @@ -52,4 +53,5 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
CloudStorageEngine& _engine;
DeleteBitmapActionType _delete_bitmap_action_type;
};
#include "common/compile_check_end.h"
} // namespace doris
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime/memory/mem_tracker_limiter.h"

namespace doris {
#include "common/compile_check_begin.h"

CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask(
CloudStorageEngine& engine, const TCalcDeleteBitmapRequest& cal_delete_bitmap_req,
Expand Down Expand Up @@ -325,4 +326,5 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
return status;
}

#include "common/compile_check_end.h"
} // namespace doris
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "util/thrift_rpc_helper.h"

namespace doris::cloud {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) {
Expand Down Expand Up @@ -717,7 +718,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
"rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size());
}
for (size_t i = 0; i < rowset_ids.size(); i++) {
for (int i = 0; i < rowset_ids.size(); i++) {
RowsetId rst_id;
rst_id.init(rowset_ids[i]);
delete_bitmap->merge(
Expand Down Expand Up @@ -1286,4 +1287,5 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
return total_inverted_index_size;
}

#include "common/compile_check_end.h"
} // namespace doris::cloud
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "util/s3_util.h"

namespace doris {
#include "common/compile_check_begin.h"

class DeleteBitmap;
class StreamLoadContext;
Expand Down Expand Up @@ -124,4 +125,5 @@ class CloudMetaMgr {
};

} // namespace cloud
#include "common/compile_check_end.h"
} // namespace doris
14 changes: 9 additions & 5 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "util/parse_util.h"

namespace doris {
#include "common/compile_check_begin.h"

using namespace std::literals;

Expand Down Expand Up @@ -166,7 +167,8 @@ Status CloudStorageEngine::open() {

_memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
// Use file cache disks number
_memtable_flush_executor->init(io::FileCacheFactory::instance()->get_cache_instance_size());
_memtable_flush_executor->init(
cast_set<int32_t>(io::FileCacheFactory::instance()->get_cache_instance_size()));

_calc_delete_bitmap_executor = std::make_unique<CalcDeleteBitmapExecutor>();
_calc_delete_bitmap_executor->init();
Expand Down Expand Up @@ -321,7 +323,7 @@ void CloudStorageEngine::_check_file_cache_ttl_block_valid() {
for (const auto& rowset : rowsets) {
int64_t ttl_seconds = tablet->tablet_meta()->ttl_seconds();
if (rowset->newest_write_timestamp() + ttl_seconds <= UnixSeconds()) continue;
for (int64_t seg_id = 0; seg_id < rowset->num_segments(); seg_id++) {
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); seg_id++) {
auto hash = Segment::file_cache_key(rowset->rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(hash);
file_cache->update_ttl_atime(hash);
Expand Down Expand Up @@ -544,7 +546,8 @@ std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_task
int num_cumu =
std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0,
[](int a, auto& b) { return a + b.second.size(); });
int num_base = submitted_base_compactions.size() + submitted_full_compactions.size();
int num_base =
cast_set<int>(submitted_base_compactions.size() + submitted_full_compactions.size());
int n = thread_per_disk - num_cumu - num_base;
if (compaction_type == CompactionType::BASE_COMPACTION) {
// We need to reserve at least one thread for cumulative compaction,
Expand Down Expand Up @@ -822,7 +825,7 @@ Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
// cumu
std::string_view cumu = "CumulativeCompaction";
rapidjson::Value cumu_key;
cumu_key.SetString(cumu.data(), cumu.length(), root.GetAllocator());
cumu_key.SetString(cumu.data(), cast_set<uint32_t>(cumu.length()), root.GetAllocator());
rapidjson::Document cumu_arr;
cumu_arr.SetArray();
for (auto& [tablet_id, v] : _submitted_cumu_compactions) {
Expand All @@ -834,7 +837,7 @@ Status CloudStorageEngine::get_compaction_status_json(std::string* result) {
// base
std::string_view base = "BaseCompaction";
rapidjson::Value base_key;
base_key.SetString(base.data(), base.length(), root.GetAllocator());
base_key.SetString(base.data(), cast_set<uint32_t>(base.length()), root.GetAllocator());
rapidjson::Document base_arr;
base_arr.SetArray();
for (auto& [tablet_id, _] : _submitted_base_compactions) {
Expand All @@ -857,4 +860,5 @@ std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compac
return _cumulative_compaction_policies.at(compaction_policy);
}

#include "common/compile_check_end.h"
} // namespace doris
11 changes: 7 additions & 4 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "vec/common/schema_util.h"

namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
Expand Down Expand Up @@ -380,7 +381,7 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
_tablet_meta->modify_rs_metas({}, rs_metas, false);
}

int CloudTablet::delete_expired_stale_rowsets() {
uint64_t CloudTablet::delete_expired_stale_rowsets() {
std::vector<RowsetSharedPtr> expired_rowsets;
int64_t expired_stale_sweep_endtime =
::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
Expand Down Expand Up @@ -539,7 +540,7 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_write

return RowsetFactory::create_rowset_writer(_engine, context, false)
.transform([&](auto&& writer) {
writer->set_segment_start_id(rowset.num_segments());
writer->set_segment_start_id(cast_set<int32_t>(rowset.num_segments()));
return writer;
});
}
Expand Down Expand Up @@ -617,7 +618,8 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
}
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(), versions_arr.GetAllocator());
value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
versions_arr.GetAllocator());
versions_arr.PushBack(value, versions_arr.GetAllocator());
last_version = ver.second;
}
Expand All @@ -630,7 +632,7 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
for (auto& rowset : stale_rowsets) {
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
stale_versions_arr.GetAllocator());
stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
}
Expand Down Expand Up @@ -924,4 +926,5 @@ void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
// but it may be used in the future.
}

#include "common/compile_check_end.h"
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class CloudTablet final : public BaseTablet {
void clear_cache() override;

// Return number of deleted stale rowsets
int delete_expired_stale_rowsets();
uint64_t delete_expired_stale_rowsets();

bool has_stale_rowsets() const { return !_stale_rs_version_map.empty(); }

Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "common/status.h"

namespace doris::config {
#include "common/compile_check_begin.h"

DEFINE_String(deploy_mode, "");
DEFINE_mString(cloud_unique_id, "");
Expand Down Expand Up @@ -76,4 +77,5 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");

DEFINE_mBool(enable_cloud_tablet_report, "true");
#include "common/compile_check_end.h"
} // namespace doris::config
2 changes: 2 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "common/config.h"

namespace doris::config {
#include "common/compile_check_begin.h"

DECLARE_String(deploy_mode);
// deprecated do not configure directly
Expand Down Expand Up @@ -110,4 +111,5 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe);

DECLARE_Bool(enable_cloud_tablet_report);

#include "common/compile_check_end.h"
} // namespace doris::config
2 changes: 1 addition & 1 deletion be/src/common/compile_check_begin.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@
#pragma clang diagnostic ignored "-Wfloat-conversion"
#endif

//#include "common/compile_check_begin.h"
//#include "common/compile_check_begin.h"
2 changes: 1 addition & 1 deletion be/src/common/compile_check_end.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
#endif
#undef COMPILE_CHECK

// #include "common/compile_check_end.h"
// #include "common/compile_check_end.h"
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,7 @@ std::vector<RowsetSharedPtr> BaseTablet::get_snapshot_rowset(bool include_stale_

void BaseTablet::calc_consecutive_empty_rowsets(
std::vector<RowsetSharedPtr>* empty_rowsets,
const std::vector<RowsetSharedPtr>& candidate_rowsets, int limit) {
const std::vector<RowsetSharedPtr>& candidate_rowsets, int64_t limit) {
int len = cast_set<int>(candidate_rowsets.size());
for (int i = 0; i < len - 1; ++i) {
auto rowset = candidate_rowsets[i];
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class BaseTablet {
// Find the first consecutive empty rowsets. output->size() >= limit
void calc_consecutive_empty_rowsets(std::vector<RowsetSharedPtr>* empty_rowsets,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
int limit);
int64_t limit);

// Return the merged schema of all rowsets
virtual TabletSchemaSPtr merged_tablet_schema() const { return _max_version_schema; }
Expand Down
Loading
Loading