Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/config.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/olap_define.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/segment_loader.h"
#include "olap/tablet_schema.h"
#include "util/time.h"
Expand Down Expand Up @@ -126,6 +127,14 @@ void Rowset::clear_cache() {
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->remove_if_cached_async(file_key);
}

// inverted index
auto file_names = get_index_file_names();
for (const auto& file_name : file_names) {
auto file_key = io::BlockFileCache::hash(file_name);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->remove_if_cached_async(file_key);
}
}
}

Expand Down Expand Up @@ -163,4 +172,24 @@ void Rowset::merge_rowset_meta(const RowsetMeta& other) {
_schema = _rowset_meta->tablet_schema();
}

std::vector<std::string> Rowset::get_index_file_names() {
std::vector<std::string> file_names;
auto idx_version = _schema->get_inverted_index_storage_format();
for (int64_t seg_id = 0; seg_id < num_segments(); ++seg_id) {
if (idx_version == InvertedIndexStorageFormatPB::V1) {
for (const auto& index : _schema->inverted_indexes()) {
auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v1(
rowset_id().to_string(), seg_id, index->index_id(),
index->get_index_suffix());
file_names.emplace_back(std::move(file_name));
}
} else {
auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v2(
rowset_id().to_string(), seg_id);
file_names.emplace_back(std::move(file_name));
}
}
return file_names;
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder

Result<std::string> segment_path(int64_t seg_id);

std::vector<std::string> get_index_file_names();

protected:
friend class RowsetFactory;

Expand Down
13 changes: 13 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,17 @@ std::string InvertedIndexDescriptor::get_index_file_cache_key(std::string_view i
return fmt::format("{}_{}{}", index_path_prefix, index_id, suffix);
}

std::string InvertedIndexDescriptor::get_index_file_name_v1(const std::string& rowset_id,
int64_t seg_id, int64_t index_id,
std::string_view index_path_suffix) {
std::string suffix =
index_path_suffix.empty() ? "" : std::string {"@"} + index_path_suffix.data();
return fmt::format("{}_{}_{}{}{}", rowset_id, seg_id, index_id, suffix, index_suffix);
}

std::string InvertedIndexDescriptor::get_index_file_name_v2(const std::string& rowset_id,
int64_t seg_id) {
return fmt::format("{}_{}{}", rowset_id, seg_id, index_suffix);
}

} // namespace doris::segment_v2
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class InvertedIndexDescriptor {
int64_t index_id,
std::string_view index_path_suffix);

static std::string get_index_file_name_v1(const std::string& rowset_id, int64_t seg_id,
int64_t index_id, std::string_view index_path_suffix);

static std::string get_index_file_name_v2(const std::string& rowset_id, int64_t seg_id);

static const char* get_temporary_null_bitmap_file_name() { return "null_bitmap"; }
static const char* get_temporary_bkd_index_data_file_name() { return "bkd"; }
static const char* get_temporary_bkd_index_meta_file_name() { return "bkd_meta"; }
Expand Down
107 changes: 107 additions & 0 deletions be/test/olap/rowset/beta_rowset_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <gen_cpp/olap_common.pb.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <stdint.h>
#include <unistd.h>

Expand All @@ -44,6 +45,7 @@
#include "io/fs/local_file_system.h"
#include "io/fs/s3_file_system.h"
#include "io/fs/s3_obj_storage_client.h"
#include "json2pb/json_to_pb.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/options.h"
Expand Down Expand Up @@ -170,6 +172,61 @@ class BetaRowsetTest : public testing::Test {
EXPECT_EQ(Status::OK(), s);
}

void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
std::string json_rowset_meta = R"({
"rowset_id": 540085,
"tablet_id": 15674,
"partition_id": 10000,
"txn_id": 4045,
"tablet_schema_hash": 567997588,
"rowset_type": "BETA_ROWSET",
"rowset_state": "VISIBLE",
"start_version": 2,
"end_version": 2,
"num_rows": 3929,
"total_disk_size": 84699,
"data_disk_size": 84464,
"index_disk_size": 235,
"empty": false,
"load_id": {
"hi": -5350970832824939812,
"lo": -6717994719194512122
},
"creation_time": 1553765670
})";

RowsetMetaPB rowset_meta_pb;
json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb);
rowset_meta_pb.set_start_version(start);
rowset_meta_pb.set_end_version(end);
rowset_meta_pb.set_creation_time(10000);

pb1->init_from_pb(rowset_meta_pb);
}

void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id,
const std::string& index_name, int32_t col_unique_id,
const std::string& column_type, const std::string& column_name,
const std::map<std::string, std::string>& properties =
std::map<std::string, std::string>(),
bool is_key = false) {
column_pb->set_unique_id(col_unique_id);
column_pb->set_name(column_name);
column_pb->set_type(column_type);
column_pb->set_is_key(is_key);
column_pb->set_is_nullable(true);
tablet_index->set_index_id(index_id);
tablet_index->set_index_name(index_name);
tablet_index->set_index_type(IndexType::INVERTED);
tablet_index->add_col_unique_id(col_unique_id);
if (!properties.empty()) {
auto* pros = tablet_index->mutable_properties();
for (const auto& [key, value] : properties) {
(*pros)[key] = value;
}
}
}

private:
std::unique_ptr<DataDir> _data_dir;
};
Expand Down Expand Up @@ -304,4 +361,54 @@ TEST_F(BetaRowsetTest, AddToBinlogTest) {
ASSERT_TRUE(s.ok()) << "second add_to_binlog(): " << s;
}

TEST_F(BetaRowsetTest, GetIndexFileNames) {
// v1
{
TabletSchemaPB schema_pb;
schema_pb.set_keys_type(KeysType::DUP_KEYS);
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0,
"INT", "key");
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1,
"STRING", "v1");
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(schema_pb);

auto rowset_meta = std::make_shared<RowsetMeta>();
init_rs_meta(rowset_meta, 1, 1);
rowset_meta->set_num_segments(2);

BetaRowset rowset(tablet_schema, rowset_meta, "");
auto file_names = rowset.get_index_file_names();
ASSERT_EQ(file_names[0], "540085_0_10000.idx");
ASSERT_EQ(file_names[1], "540085_0_10001.idx");
ASSERT_EQ(file_names[2], "540085_1_10000.idx");
ASSERT_EQ(file_names[3], "540085_1_10001.idx");
}

// v2
{
TabletSchemaPB schema_pb;
schema_pb.set_keys_type(KeysType::DUP_KEYS);
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0,
"INT", "key");
construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1,
"STRING", "v1");
schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2);
auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(schema_pb);

auto rowset_meta = std::make_shared<RowsetMeta>();
init_rs_meta(rowset_meta, 1, 1);
rowset_meta->set_num_segments(2);

BetaRowset rowset(tablet_schema, rowset_meta, "");
auto file_names = rowset.get_index_file_names();
ASSERT_EQ(file_names[0], "540085_0.idx");
ASSERT_EQ(file_names[1], "540085_1.idx");
}
}

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -3040,7 +3040,7 @@ class Suite implements GroovyInterceptable {
}
}

def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version ->
def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version, fileSuffix = "dat" ->
def hashValues = []
def segmentFiles = []
getSegmentFilesFromMs(msHttpPort, tabletId, version) {
Expand All @@ -3050,7 +3050,7 @@ class Suite implements GroovyInterceptable {
// {"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"0200000000000004694889e84c76391cfd52ec7db0a483ba","resource_id":"1","newest_write_timestamp":"1736153402","segments_key_bounds":[{"min_key":"AoAAAAAAAAAC","max_key":"AoAAAAAAAAAC"}],"txn_expiration":"1736167802","segments_overlap_pb":"NONOVERLAPPING","compaction_level":"0","segments_file_size":["895"],"index_id":"27697","schema_version":0,"enable_segments_file_size":true,"has_variant_type_in_schema":false,"enable_inverted_index_file_info":false}
def segmentNum = json.num_segments as int
def rowsetId = json.rowset_id_v2 as String
segmentFiles = (0..<segmentNum).collect { i -> "${rowsetId}_${i}.dat" }
segmentFiles = (0..<segmentNum).collect { i -> "${rowsetId}_${i}.${fileSuffix}" }
}

segmentFiles.each {
Expand All @@ -3064,15 +3064,15 @@ class Suite implements GroovyInterceptable {
}

// get table's tablet file cache
def getTabletFileCacheDirFromBe = { msHttpPort, table, version ->
def getTabletFileCacheDirFromBe = { msHttpPort, table, version, fileSuffix = "dat" ->
// beHost HashFile
def beHostToHashFile = [:]

def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table)
getTabletsAndHostFromFe.each {
def beHost = it.Value[1]
def tabletId = it.Key
def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version)
def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version, fileSuffix)
hashRet.each {
def hashFile = it
if (beHostToHashFile.containsKey(beHost)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ suite('test_clean_stale_rs_file_cache', 'docker') {
sql """
insert into $table values (10, 1, 'v1'), (20, 2, 'v2'), (30, 3, 'v3')
"""
def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 2)
def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3)
// version 4
sql """
insert into $table values (100, 1, 'v1'), (200, 2, 'v2'), (300, 3, 'v3')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.Http

suite('test_clean_stale_rs_index_file_cache', 'docker') {
if (!isCloudMode()) {
return;
}
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
'cloud_tablet_rebalancer_interval_second=1',
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1'
]
options.beConfigs += [
'report_tablet_interval_seconds=1',
'cumulative_compaction_min_deltas=5',
'tablet_rowset_stale_sweep_by_size=false',
'tablet_rowset_stale_sweep_time_sec=60',
'vacuum_stale_rowsets_interval_s=10'
]
options.setFeNum(1)
options.setBeNum(1)
options.cloudMode = true


def table = "test_clean_stale_rs_index_file_cache"
sql """ drop table if exists $table; """

docker(options) {
def ms = cluster.getAllMetaservices().get(0)
def msHttpPort = ms.host + ":" + ms.httpPort
sql """CREATE TABLE $table (
`k1` int(11) NULL,
`k2` int(11) NULL,
`v1` varchar(2048),
INDEX v1_idx (`v1`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT ''
)
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
"""
// version 2
sql """
insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3')
"""
def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2, "idx")
// version 3
sql """
insert into $table values (10, 1, 'v1'), (20, 2, 'v2'), (30, 3, 'v3')
"""
def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3, "idx")
// version 4
sql """
insert into $table values (100, 1, 'v1'), (200, 2, 'v2'), (300, 3, 'v3')
"""
// version 5
sql """
insert into $table values (1000, 1, 'v1'), (2000, 2, 'v2'), (3000, 3, 'v3')
"""
// version 6
sql """
insert into $table values (10000, 1, 'v1'), (20000, 2, 'v2'), (30000, 3, 'v3')
"""

def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles ->
[(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles]
}
for (int i = 0; i < 5; i++) {
sql """
select count(*) from $table
"""
}
def beforeGetFromFe = getTabletAndBeHostFromFe(table)
logger.info("fe tablets {}, cache dir {}", beforeGetFromFe , mergedCacheDir)
// wait compaction finish, and vacuum_stale_rowsets work
sleep(80 * 1000)

// check cache file has been deleted
beforeGetFromFe.each {
def tabletId = it.Key
def backendId = it.Value[0]
def backendHost = it.Value[1]
def be = cluster.getBeByBackendId(backendId.toLong())
def dataPath = new File("${be.path}/storage/file_cache")
def subDirs = []

def collectDirs
collectDirs = { File dir ->
if (dir.exists()) {
dir.eachDir { subDir ->
subDirs << subDir.name
collectDirs(subDir)
}
}
}


collectDirs(dataPath)
logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs)
def cacheDir = mergedCacheDir[backendHost]

// add check
cacheDir.each { hashFile ->
assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) },
"Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " +
"Matching subdir found in: ${subDirs}")
}
}
}
}
Loading