Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "io/cache/mem_file_cache_storage.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
#include "util/time.h"
#include "vec/common/sip_hash.h"
#include "vec/common/uint128.h"
Expand Down Expand Up @@ -344,24 +345,15 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
}
RETURN_IF_ERROR(_storage->init(this));
_cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
pthread_setname_np(_cache_background_monitor_thread.native_handle(), "run_background_monitor");
_cache_background_ttl_gc_thread = std::thread(&BlockFileCache::run_background_ttl_gc, this);
pthread_setname_np(_cache_background_ttl_gc_thread.native_handle(), "run_background_ttl_gc");
_cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
pthread_setname_np(_cache_background_gc_thread.native_handle(), "run_background_gc");
_cache_background_evict_in_advance_thread =
std::thread(&BlockFileCache::run_background_evict_in_advance, this);
pthread_setname_np(_cache_background_evict_in_advance_thread.native_handle(),
"run_background_evict_in_advance");

// Initialize LRU dump thread and restore queues
_cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
pthread_setname_np(_cache_background_lru_dump_thread.native_handle(),
"run_background_lru_dump");
_cache_background_lru_log_replay_thread =
std::thread(&BlockFileCache::run_background_lru_log_replay, this);
pthread_setname_np(_cache_background_lru_log_replay_thread.native_handle(),
"run_background_lru_log_replay");

return Status::OK();
}
Expand Down Expand Up @@ -1857,6 +1849,7 @@ void BlockFileCache::check_need_evict_cache_in_advance() {
}

void BlockFileCache::run_background_monitor() {
Thread::set_self_name("run_background_monitor");
while (!_close) {
int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
Expand Down Expand Up @@ -1915,6 +1908,7 @@ void BlockFileCache::run_background_monitor() {
}

void BlockFileCache::run_background_ttl_gc() {
Thread::set_self_name("run_background_ttl_gc");
while (!_close) {
int64_t interval_ms = config::file_cache_background_ttl_gc_interval_ms;
int64_t batch_size = config::file_cache_background_ttl_gc_batch;
Expand Down Expand Up @@ -1946,6 +1940,7 @@ void BlockFileCache::run_background_ttl_gc() {
}

void BlockFileCache::run_background_gc() {
Thread::set_self_name("run_background_gc");
FileCacheKey key;
size_t batch_count = 0;
while (!_close) {
Expand Down Expand Up @@ -1979,6 +1974,7 @@ void BlockFileCache::run_background_gc() {
}

void BlockFileCache::run_background_evict_in_advance() {
Thread::set_self_name("run_background_evict_in_advance");
LOG(INFO) << "Starting background evict in advance thread";
int64_t batch = 0;
while (!_close) {
Expand Down Expand Up @@ -2228,6 +2224,7 @@ void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
}

void BlockFileCache::run_background_lru_log_replay() {
Thread::set_self_name("run_background_lru_log_replay");
while (!_close) {
int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
{
Expand All @@ -2254,6 +2251,7 @@ void BlockFileCache::run_background_lru_log_replay() {
}

void BlockFileCache::run_background_lru_dump() {
Thread::set_self_name("run_background_lru_dump");
while (!_close) {
int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
{
Expand Down
15 changes: 7 additions & 8 deletions be/src/io/cache/cache_lru_dumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ std::string CacheLRUDumper::Footer::serialize_as_string() const {
result.reserve(sizeof(Footer));

// Serialize meta_offset (convert to little-endian)
uint64_t meta_offset_le = htole64(meta_offset);
uint64_t meta_offset_le;
LittleEndian::Store64(&meta_offset_le, meta_offset);
result.append(reinterpret_cast<const char*>(&meta_offset_le), sizeof(meta_offset_le));

// Serialize checksum (convert to little-endian)
uint32_t checksum_le = htole32(checksum);
uint32_t checksum_le;
LittleEndian::Store32(&checksum_le, checksum);
result.append(reinterpret_cast<const char*>(&checksum_le), sizeof(checksum_le));

result.append(reinterpret_cast<const char*>(&version), sizeof(version));
Expand All @@ -52,13 +54,13 @@ bool CacheLRUDumper::Footer::deserialize_from_string(const std::string& data) {
// Deserialize meta_offset (convert from little-endian)
uint64_t meta_offset_le;
std::memcpy(&meta_offset_le, ptr, sizeof(meta_offset_le));
meta_offset = le64toh(meta_offset_le);
meta_offset = LittleEndian::Load64(&meta_offset_le);
ptr += sizeof(meta_offset_le);

// Deserialize checksum (convert from little-endian)
uint32_t checksum_le;
std::memcpy(&checksum_le, ptr, sizeof(checksum_le));
checksum = le32toh(checksum_le);
checksum = LittleEndian::Load32(&checksum_le);
ptr += sizeof(checksum_le);

version = *((uint8_t*)ptr);
Expand Down Expand Up @@ -216,7 +218,7 @@ Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,

// Write footer
Footer footer;
footer.meta_offset = htole64(meta_offset); // Explicitly convert to little-endian
footer.meta_offset = meta_offset;
footer.checksum = 0;
footer.version = 1;
std::memcpy(footer.magic, "DOR", 3);
Expand Down Expand Up @@ -322,9 +324,6 @@ Status CacheLRUDumper::parse_dump_footer(std::ifstream& in, std::string& filenam
return Status::InternalError<false>(warn_msg);
}

// Convert from little-endian to host byte order
footer.meta_offset = le64toh(footer.meta_offset);

// Validate footer
if (footer.version != 1 || std::string(footer.magic, 3) != "DOR") {
std::string warn_msg = std::string(fmt::format(
Expand Down
Loading