Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_BATCH")
.tag("job_id", request.job_id)
.tag("batch_id", request.batch_id)
.tag("jobs size", request.job_metas.size());
.tag("jobs size", request.job_metas.size())
.tag("tablet num of first meta",
request.job_metas.empty() ? 0 : request.job_metas[0].tablet_ids.size());
bool retry = false;
st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry);
if (!retry && st) {
Expand Down
12 changes: 10 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
},
.download_done {},
.download_done {[](Status st) {
if (!st) {
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
});

auto download_idx_file = [&](const io::Path& idx_path) {
Expand All @@ -299,7 +303,11 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
},
.download_done {},
.download_done {[](Status st) {
if (!st) {
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
Expand Down
138 changes: 85 additions & 53 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

#include "cloud/cloud_warm_up_manager.h"

#include <bthread/countdown_event.h>
#include <bvar/bvar.h>
#include <bvar/reducer.h>

#include <algorithm>
#include <cstddef>
Expand All @@ -34,6 +35,8 @@

namespace doris {

bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num");

CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) {
_download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
}
Expand All @@ -59,6 +62,52 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTable
return id_to_rowset_meta_map;
}

void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size,
io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait) {
if (file_size < 0) {
auto st = file_system->file_size(path, &file_size);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "get file size failed: " << path;
file_cache_warm_up_failed_task_num << 1;
return;
}
}

const int64_t chunk_size = 10 * 1024 * 1024; // 10MB
int64_t offset = 0;
int64_t remaining_size = file_size;

while (remaining_size > 0) {
int64_t current_chunk_size = std::min(chunk_size, remaining_size);
wait->add_count();

_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = path,
.file_size = file_size,
.offset = offset,
.download_size = current_chunk_size,
.file_system = file_system,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
},
.download_done =
[wait](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
},
});

offset += current_chunk_size;
remaining_size -= current_chunk_size;
}
}

void CloudWarmUpManager::handle_jobs() {
#ifndef BE_TEST
constexpr int WAIT_TIME_SECONDS = 600;
Expand All @@ -77,6 +126,10 @@ void CloudWarmUpManager::handle_jobs() {
LOG_WARNING("Warm up job is null");
continue;
}

std::shared_ptr<bthread::CountdownEvent> wait =
std::make_shared<bthread::CountdownEvent>(0);

for (int64_t tablet_id : cur_job->tablet_ids) {
if (_cur_job_id == 0) { // The job is canceled
break;
Expand All @@ -92,8 +145,7 @@ void CloudWarmUpManager::handle_jobs() {
LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(st);
continue;
}
std::shared_ptr<bthread::CountdownEvent> wait =
std::make_shared<bthread::CountdownEvent>(0);

auto tablet_meta = tablet->tablet_meta();
auto rs_metas = snapshot_rs_metas(tablet.get());
for (auto& [_, rs] : rs_metas) {
Expand All @@ -112,71 +164,51 @@ void CloudWarmUpManager::handle_jobs() {
expiration_time = 0;
}

wait->add_count();
// clang-format off
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(*rs, seg_id),
.file_size = rs->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
},
.download_done =
[wait](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
},
});

auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
},
.download_done =
[wait](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
},
};
// clang-format on
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
// 1st. download segment files
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs, seg_id),
rs->segment_file_size(seg_id), storage_resource.value()->fs,
expiration_time, wait);

// 2nd. download inverted index files
int64_t file_size = -1;
auto schema_ptr = rs->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
const auto& idx_file_info = rs->inverted_index_file_info(seg_id);
if (idx_version == InvertedIndexStorageFormatPB::V1) {
for (const auto& index : schema_ptr->inverted_indexes()) {
wait->add_count();
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rs, seg_id, index->index_id(), index->get_index_suffix());
download_idx_file(idx_path);
if (idx_file_info.index_info_size() > 0) {
for (const auto& idx_info : idx_file_info.index_info()) {
if (index->index_id() == idx_info.index_id() &&
index->get_index_suffix() == idx_info.index_suffix()) {
file_size = idx_info.index_file_size();
break;
}
}
}
submit_download_tasks(idx_path, file_size, storage_resource.value()->fs,
expiration_time, wait);
}
} else {
if (schema_ptr->has_inverted_index()) {
wait->add_count();
auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
download_idx_file(idx_path);
file_size = idx_file_info.has_index_size() ? idx_file_info.index_size()
: -1;
submit_download_tasks(idx_path, file_size, storage_resource.value()->fs,
expiration_time, wait);
}
}
}
}
timespec time;
time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
if (!wait->timed_wait(time)) {
LOG_WARNING("Warm up tablet {} take a long time", tablet_meta->tablet_id());
}
}

timespec time;
time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
if (wait->timed_wait(time)) {
LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size());
}
{
std::unique_lock lock(_mtx);
Expand Down
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <bthread/countdown_event.h>

#include <condition_variable>
#include <deque>
#include <mutex>
Expand Down Expand Up @@ -69,7 +71,9 @@ class CloudWarmUpManager {

private:
void handle_jobs();

void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait);
std::mutex _mtx;
std::condition_variable _cond;
int64_t _cur_job_id {0};
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");

DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
DEFINE_Int32(file_cache_downloader_thread_num_max, "32");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
DEFINE_mBool(enable_write_index_searcher_cache, "false");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,9 @@ DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_batch);

DECLARE_Int32(file_cache_downloader_thread_num_min);
DECLARE_Int32(file_cache_downloader_thread_num_max);
// inverted index searcher cache
// cache entry stay time after lookup
DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ namespace doris::io {
FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) {
_poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this);
auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
.set_min_threads(4)
.set_max_threads(16)
.set_min_threads(config::file_cache_downloader_thread_num_min)
.set_max_threads(config::file_cache_downloader_thread_num_max)
.build(&_workers);
CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/block_file_cache_downloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class FileCacheBlockDownloader {
// tablet id -> inflight block num of tablet
std::unordered_map<int64_t, int64_t> _inflight_tablets;

static inline constexpr size_t _max_size {10240};
static inline constexpr size_t _max_size {102400};
};

} // namespace doris::io
Original file line number Diff line number Diff line change
Expand Up @@ -3338,6 +3338,9 @@ public static int metaServiceRpcRetryTimes() {
@ConfField(mutable = true, masterOnly = true)
public static int cloud_warm_up_job_scheduler_interval_millisecond = 1000; // 1 seconds

@ConfField(mutable = true, masterOnly = true)
public static long cloud_warm_up_job_max_bytes_per_batch = 21474836480L; // 20GB

@ConfField(mutable = true, masterOnly = true)
public static boolean enable_fetch_cluster_cache_hotspot = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ Long getFileCacheCapacity(String clusterName) throws RuntimeException {
}

private Map<Long, List<List<Long>>> splitBatch(Map<Long, List<Tablet>> beToWarmUpTablets) {
final Long maxSizePerBatch = 10737418240L; // 10G
final Long maxSizePerBatch = Config.cloud_warm_up_job_max_bytes_per_batch;
Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();
for (Map.Entry<Long, List<Tablet>> entry : beToWarmUpTablets.entrySet()) {
List<List<Long>> batches = new ArrayList<>();
Expand Down
Loading