Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions be/src/io/cache/block/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ namespace fs = std::filesystem;
namespace doris {
namespace io {

const std::string IFileCache::FILE_CACHE_VERSION = "2.0";
const int IFileCache::KEY_PREFIX_LENGTH = 3;

IFileCache::IFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings)
: _cache_base_path(cache_base_path),
_max_size(cache_settings.max_size),
Expand All @@ -58,13 +55,21 @@ std::string IFileCache::get_path_in_local_cache(const Key& key, size_t offset,
bool is_persistent) const {
auto key_str = key.to_string();
std::string suffix = is_persistent ? "_persistent" : "";
return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / key_str /
(std::to_string(offset) + suffix);
if constexpr (USE_CACHE_VERSION2) {
return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / key_str /
(std::to_string(offset) + suffix);
} else {
return fs::path(_cache_base_path) / key_str / (std::to_string(offset) + suffix);
}
}

std::string IFileCache::get_path_in_local_cache(const Key& key) const {
auto key_str = key.to_string();
return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / key_str;
if constexpr (USE_CACHE_VERSION2) {
return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / key_str;
} else {
return fs::path(_cache_base_path) / key_str;
}
}

std::string IFileCache::get_version_path() const {
Expand Down
7 changes: 5 additions & 2 deletions be/src/io/cache/block/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ class IFileCache {
friend struct FileBlocksHolder;

public:
static const std::string FILE_CACHE_VERSION;
static const int KEY_PREFIX_LENGTH;
/// use version 2 when USE_CACHE_VERSION2 = true, while use version 1 if false
/// version 1.0: cache_base_path / key / offset
/// version 2.0: cache_base_path / key_prefix / key / offset
static constexpr bool USE_CACHE_VERSION2 = true;
static constexpr int KEY_PREFIX_LENGTH = 3;

struct Key {
uint128_t key;
Expand Down
55 changes: 32 additions & 23 deletions be/src/io/cache/block/block_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ void LRUFileCache::remove(const Key& key, bool is_persistent, size_t offset,
void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cache_lock) {
/// version 1.0: cache_base_path / key / offset
/// version 2.0: cache_base_path / key_prefix / key / offset
if (read_file_cache_version() != FILE_CACHE_VERSION) {
if (USE_CACHE_VERSION2 && read_file_cache_version() != "2.0") {
// move directories format as version 2.0
fs::directory_iterator key_it {_cache_base_path};
for (; key_it != fs::directory_iterator(); ++key_it) {
Expand Down Expand Up @@ -675,22 +675,7 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cach
uint64_t offset = 0;
size_t size = 0;
std::vector<std::pair<LRUQueue::Iterator, bool>> queue_entries;

/// version 2.0: cache_base_path / key_prefix / key / offset
fs::directory_iterator key_prefix_it {_cache_base_path};
for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) {
if (!key_prefix_it->is_directory()) {
// maybe version hits file
continue;
}
if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) {
LOG(WARNING) << "Unknown directory " << key_prefix_it->path().native()
<< ", try to remove it";
std::filesystem::remove(key_prefix_it->path());
continue;
}

fs::directory_iterator key_it {key_prefix_it->path()};
auto scan_file_cache = [&](fs::directory_iterator& key_it) {
for (; key_it != fs::directory_iterator(); ++key_it) {
key = Key(
vectorized::unhex_uint<uint128_t>(key_it->path().filename().native().c_str()));
Expand Down Expand Up @@ -747,6 +732,27 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cach
}
}
}
};

if constexpr (USE_CACHE_VERSION2) {
fs::directory_iterator key_prefix_it {_cache_base_path};
for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) {
if (!key_prefix_it->is_directory()) {
// maybe version hits file
continue;
}
if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) {
LOG(WARNING) << "Unknown directory " << key_prefix_it->path().native()
<< ", try to remove it";
std::filesystem::remove(key_prefix_it->path());
continue;
}
fs::directory_iterator key_it {key_prefix_it->path()};
scan_file_cache(key_it);
}
} else {
fs::directory_iterator key_it {_cache_base_path};
scan_file_cache(key_it);
}

/// Shuffle cells to have random order in LRUQueue as at startup all cells have the same priority.
Expand All @@ -760,12 +766,15 @@ void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cach
}

Status LRUFileCache::write_file_cache_version() const {
std::string version_path = get_version_path();
Slice version(FILE_CACHE_VERSION);
FileWriterPtr version_writer;
RETURN_IF_ERROR(global_local_filesystem()->create_file(version_path, &version_writer));
RETURN_IF_ERROR(version_writer->append(version));
return version_writer->close();
if constexpr (USE_CACHE_VERSION2) {
std::string version_path = get_version_path();
Slice version("2.0");
FileWriterPtr version_writer;
RETURN_IF_ERROR(global_local_filesystem()->create_file(version_path, &version_writer));
RETURN_IF_ERROR(version_writer->append(version));
return version_writer->close();
}
return Status::OK();
}

std::string LRUFileCache::read_file_cache_version() const {
Expand Down
10 changes: 8 additions & 2 deletions be/test/io/cache/file_block_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,21 @@ std::vector<io::FileBlockSPtr> fromHolder(const io::FileBlocksHolder& holder) {
std::string getFileBlockPath(const std::string& base_path, const io::IFileCache::Key& key,
size_t offset) {
auto key_str = key.to_string();
return fs::path(base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset);
if constexpr (IFileCache::USE_CACHE_VERSION2) {
return fs::path(base_path) / key_str.substr(0, 3) / key_str / std::to_string(offset);
} else {
return fs::path(base_path) / key_str / std::to_string(offset);
}
}

void download(io::FileBlockSPtr file_segment) {
const auto& key = file_segment->key();
size_t size = file_segment->range().size();

auto key_str = key.to_string();
auto subdir = fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
auto subdir = IFileCache::USE_CACHE_VERSION2
? fs::path(cache_base_path) / key_str.substr(0, 3) / key_str
: fs::path(cache_base_path) / key_str;
ASSERT_TRUE(fs::exists(subdir));

std::string data(size, '0');
Expand Down