Skip to content
Merged
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7a5167c
add ut for cooldown on be
pengxiangyu Feb 28, 2023
0bca8b4
add ut for cooldown on be
pengxiangyu Mar 1, 2023
dd5f761
add ut for cooldown on be
pengxiangyu Mar 1, 2023
9070c4c
add ut for cooldown on be
pengxiangyu Mar 1, 2023
bae5d9c
add ut for cooldown on be
pengxiangyu Mar 1, 2023
a2bfed7
add ut for cooldown on be
pengxiangyu Mar 1, 2023
8153afe
add ut for cooldown on be
pengxiangyu Mar 1, 2023
056da2b
add ut for cooldown on be
pengxiangyu Mar 1, 2023
95ad9db
add ut for cooldown on be
pengxiangyu Mar 1, 2023
4567bd0
add ut for cooldown on be
pengxiangyu Mar 2, 2023
af7103d
add ut for cooldown on be
pengxiangyu Mar 2, 2023
74eac86
add ut for cooldown on be
pengxiangyu Mar 2, 2023
f295cf7
add ut for cooldown on be
pengxiangyu Mar 2, 2023
4212a54
add ut for cooldown on be
pengxiangyu Mar 2, 2023
cb4dd81
add ut for cooldown on be
pengxiangyu Mar 2, 2023
a5d074e
add ut for cooldown on be
pengxiangyu Mar 2, 2023
c88d836
add ut for cooldown on be
pengxiangyu Mar 2, 2023
b416b87
add ut for cooldown on be
pengxiangyu Mar 2, 2023
68a5eef
add ut for cooldown on be
pengxiangyu Mar 2, 2023
2982d2c
add ut for cooldown on be
pengxiangyu Mar 3, 2023
8883d26
add ut for cooldown on be
pengxiangyu Mar 3, 2023
4d07491
add ut for cooldown on be
pengxiangyu Mar 3, 2023
40f256f
add ut for cooldown on be
pengxiangyu Mar 3, 2023
b95edb7
add ut for cooldown on be
pengxiangyu Mar 3, 2023
cd0a776
add ut for cooldown on be
pengxiangyu Mar 3, 2023
b309226
add ut for cooldown on be
pengxiangyu Mar 3, 2023
86abbb4
add ut for cooldown on be
pengxiangyu Mar 3, 2023
63c0d88
add ut for cooldown on be
pengxiangyu Mar 3, 2023
bb68e7d
add ut for cooldown on be
pengxiangyu Mar 3, 2023
db486ba
add ut for cooldown on be
pengxiangyu Mar 6, 2023
aa17de1
add ut for cooldown on be
pengxiangyu Mar 6, 2023
893fdf3
add ut for cooldown on be
pengxiangyu Mar 6, 2023
710881c
add ut for cooldown on be
pengxiangyu Mar 6, 2023
8154c66
add ut for cooldown on be
pengxiangyu Mar 6, 2023
3d56f62
add ut for cooldown on be
pengxiangyu Mar 6, 2023
8ee945c
Merge branch 'apache:master' into add_be_ut
pengxiangyu Mar 6, 2023
7a411ed
add ut for cooldown on be
pengxiangyu Mar 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 196 additions & 34 deletions be/test/olap/tablet_cooldown_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
#include "common/status.h"
#include "exec/tablet_info.h"
#include "gen_cpp/internal_service.pb.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/local_file_writer.h"
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_system.h"
#include "olap/delta_writer.h"
#include "olap/rowset/beta_rowset.h"
Expand All @@ -37,25 +41,144 @@ namespace doris {

static StorageEngine* k_engine = nullptr;

static const std::string kTestDir = "./ut_dir/tablet_cooldown_test";
static const std::string kTestDir = "ut_dir/tablet_cooldown_test";
static constexpr int64_t kResourceId = 10000;
static constexpr int64_t kStoragePolicyId = 10002;
static constexpr int64_t kTabletId = 10005;
static constexpr int64_t kTabletId2 = 10006;
static constexpr int64_t kReplicaId = 10009;
static constexpr int32_t kSchemaHash = 270068377;
static constexpr int64_t kReplicaId2 = 10010;
static constexpr int32_t kSchemaHash2 = 270068381;

static constexpr int32_t kTxnId = 20003;
static constexpr int32_t kPartitionId = 30003;
static constexpr int32_t kTxnId2 = 40003;
static constexpr int32_t kPartitionId2 = 50003;

using io::Path;

static io::FileSystemSPtr s_fs;

static std::string get_remote_path(const Path& path) {
return fmt::format("{}/remote/{}", config::storage_root_path, path.string());
}

class FileWriterMock : public io::FileWriter {
public:
FileWriterMock(Path path) : io::FileWriter(std::move(path)) {
io::global_local_filesystem()->create_file(get_remote_path(_path), &_local_file_writer);
}

~FileWriterMock() override = default;

Status close() override { return _local_file_writer->close(); }

Status abort() override { return _local_file_writer->abort(); }

Status append(const Slice& data) override { return _local_file_writer->append(data); }

Status appendv(const Slice* data, size_t data_cnt) override {
return _local_file_writer->appendv(data, data_cnt);
}

Status write_at(size_t offset, const Slice& data) override {
return _local_file_writer->write_at(offset, data);
}

Status finalize() override { return _local_file_writer->finalize(); }

size_t bytes_appended() const override { return _local_file_writer->bytes_appended(); }

io::FileSystemSPtr fs() const override { return s_fs; }

private:
std::unique_ptr<io::FileWriter> _local_file_writer;
};

class RemoteFileSystemMock : public io::RemoteFileSystem {
public:
RemoteFileSystemMock(Path root_path, std::string&& id, io::FileSystemType type)
: RemoteFileSystem(std::move(root_path), std::move(id), type) {
_local_fs = io::LocalFileSystem::create(get_remote_path(_root_path));
}
~RemoteFileSystemMock() override = default;

Status create_file(const Path& path, io::FileWriterPtr* writer) override {
Path fs_path = path;
*writer = std::make_unique<FileWriterMock>(fs_path);
return Status::OK();
}

Status open_file(const Path& path, io::FileReaderSPtr* reader, IOContext* io_ctx) override {
return _local_fs->open_file(get_remote_path(path), reader, io_ctx);
}

Status delete_file(const Path& path) override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: unknown type name 'Path'; did you mean 'io::Path'? [clang-diagnostic-error]

Suggested change
Status delete_file(const Path& path) override {
Status delete_file(const io::Path& path) override {

be/src/io/fs/path.h:24: 'io::Path' declared here

using Path = std::filesystem::path;
      ^

return _local_fs->delete_file(get_remote_path(path));
}

Status create_directory(const Path& path) override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: unknown type name 'Path'; did you mean 'io::Path'? [clang-diagnostic-error]

Suggested change
Status create_directory(const Path& path) override {
Status create_directory(const io::Path& path) override {

be/src/io/fs/path.h:24: 'io::Path' declared here

using Path = std::filesystem::path;
      ^

return _local_fs->create_directory(get_remote_path(path));
}

Status delete_directory(const Path& path) override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: unknown type name 'Path'; did you mean 'io::Path'? [clang-diagnostic-error]

Suggested change
Status delete_directory(const Path& path) override {
Status delete_directory(const io::Path& path) override {

be/src/io/fs/path.h:24: 'io::Path' declared here

using Path = std::filesystem::path;
      ^

return _local_fs->delete_directory(get_remote_path(path));
}

Status link_file(const Path& src, const Path& dest) override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: unknown type name 'Path'; did you mean 'io::Path'? [clang-diagnostic-error]

Suggested change
Status link_file(const Path& src, const Path& dest) override {
Status link_file(const io::Path& src, const Path& dest) override {

be/src/io/fs/path.h:24: 'io::Path' declared here

using Path = std::filesystem::path;
      ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: unknown type name 'Path'; did you mean 'io::Path'? [clang-diagnostic-error]

Suggested change
Status link_file(const Path& src, const Path& dest) override {
Status link_file(const Path& src, const io::Path& dest) override {

be/src/io/fs/path.h:24: 'io::Path' declared here

using Path = std::filesystem::path;
      ^

return _local_fs->link_file(get_remote_path(src), get_remote_path(dest));
}

Status exists(const Path& path, bool* res) const override {
return _local_fs->exists(get_remote_path(path), res);
}

Status file_size(const Path& path, size_t* file_size) const override {
return _local_fs->file_size(get_remote_path(path), file_size);
}

Status list(const Path& path, std::vector<Path>* files) override {
std::vector<Path> local_paths;
RETURN_IF_ERROR(_local_fs->list(get_remote_path(path), &local_paths));
for (Path path : local_paths) {
files->emplace_back(path.string().substr(config::storage_root_path.size() + 1));
}
return Status::OK();
}

Status upload(const Path& local_path, const Path& dest_path) override {
return _local_fs->link_file(local_path.string(), get_remote_path(dest_path));
}

Status batch_upload(const std::vector<Path>& local_paths,
const std::vector<Path>& dest_paths) override {
for (int i = 0; i < local_paths.size(); ++i) {
RETURN_IF_ERROR(upload(local_paths[i], dest_paths[i]));
}
return Status::OK();
}

Status batch_delete(const std::vector<Path>& paths) override {
for (int i = 0; i < paths.size(); ++i) {
RETURN_IF_ERROR(delete_file(paths[i]));
}
return Status::OK();
}

Status connect() override { return Status::OK(); }

private:
std::shared_ptr<io::LocalFileSystem> _local_fs;
};

// remove DISABLED_ when need run this test
#define TabletCooldownTest DISABLED_TabletCooldownTest
class TabletCooldownTest : public testing::Test {
public:
static void SetUpTestSuite() {
S3Conf s3_conf;
s3_conf.ak = config::test_s3_ak;
s3_conf.sk = config::test_s3_sk;
s3_conf.endpoint = config::test_s3_endpoint;
s3_conf.region = config::test_s3_region;
s3_conf.bucket = config::test_s3_bucket;
s3_conf.prefix = config::test_s3_prefix + "/tablet_cooldown_test";
auto s3_fs = io::S3FileSystem::create(std::move(s3_conf), std::to_string(kResourceId));
ASSERT_TRUE(s3_fs->connect().ok());
put_storage_resource(kResourceId, {s3_fs, 1});
s_fs.reset(new RemoteFileSystemMock("test_path", std::to_string(kResourceId),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: allocating an object of abstract class type 'doris::RemoteFileSystemMock' [clang-diagnostic-error]

        s_fs.reset(new RemoteFileSystemMock("test_path", std::to_string(kResourceId),
                       ^

be/src/io/fs/file_system.h:66: unimplemented pure virtual method 'exists' in 'RemoteFileSystemMock'

    virtual Status exists(const Path& path, bool* res) const = 0;
                   ^

be/src/io/fs/file_system.h:68: unimplemented pure virtual method 'file_size' in 'RemoteFileSystemMock'

    virtual Status file_size(const Path& path, size_t* file_size) const = 0;
                   ^

io::FileSystemType::S3));
StorageResource resource = {s_fs, 1};
put_storage_resource(kResourceId, resource);
auto storage_policy = std::make_shared<StoragePolicy>();
storage_policy->name = "TabletCooldownTest";
storage_policy->version = 1;
Expand All @@ -70,6 +193,8 @@ class TabletCooldownTest : public testing::Test {

FileUtils::remove_all(config::storage_root_path);
FileUtils::create_dir(config::storage_root_path);
FileUtils::create_dir(get_remote_path(fmt::format("data/{}", kTabletId)));
FileUtils::create_dir(get_remote_path(fmt::format("data/{}", kTabletId2)));

std::vector<StorePath> paths {{config::storage_root_path, -1}};

Expand Down Expand Up @@ -145,11 +270,13 @@ static TDescriptorTable create_descriptor_tablet_with_sequence_col() {
return desc_tbl_builder.desc_tbl();
}

TEST_F(TabletCooldownTest, normal) {
void createTablet(StorageEngine* engine, TabletSharedPtr* tablet, int64_t replica_id,
int32_t schema_hash, int64_t tablet_id, int64_t txn_id, int64_t partition_id) {
// create tablet
TCreateTabletReq request;
create_tablet_request_with_sequence_col(10005, 270068377, &request);
Status st = k_engine->create_tablet(request);
create_tablet_request_with_sequence_col(tablet_id, schema_hash, &request);
request.__set_replica_id(replica_id);
Status st = engine->create_tablet(request);
ASSERT_EQ(Status::OK(), st);

TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
Expand All @@ -163,8 +290,9 @@ TEST_F(TabletCooldownTest, normal) {
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, 30003,
load_id, tuple_desc, &(tuple_desc->slots()), false, &param};

WriteRequest write_req = {tablet_id, schema_hash, WriteType::LOAD, txn_id, partition_id,
load_id, tuple_desc, &(tuple_desc->slots()), false, &param};
DeltaWriter* delta_writer = nullptr;
DeltaWriter::open(&write_req, &delta_writer);
ASSERT_NE(delta_writer, nullptr);
Expand Down Expand Up @@ -203,33 +331,42 @@ TEST_F(TabletCooldownTest, normal) {
delete delta_writer;

// publish version success
TabletSharedPtr tablet =
k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
OlapMeta* meta = tablet->data_dir()->get_meta();
*tablet = engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
OlapMeta* meta = (*tablet)->data_dir()->get_meta();
Version version;
version.first = tablet->rowset_with_max_version()->end_version() + 1;
version.second = tablet->rowset_with_max_version()->end_version() + 1;
version.first = (*tablet)->rowset_with_max_version()->end_version() + 1;
version.second = (*tablet)->rowset_with_max_version()->end_version() + 1;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
write_req.txn_id, write_req.partition_id, &tablet_related_rs);
engine->txn_manager()->get_txn_related_tablets(write_req.txn_id, write_req.partition_id,
&tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid(), version);
st = engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
(*tablet)->tablet_id(), (*tablet)->schema_hash(),
(*tablet)->tablet_uid(), version);
ASSERT_EQ(Status::OK(), st);
st = tablet->add_inc_rowset(rowset);
st = (*tablet)->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);
}
EXPECT_EQ(1, tablet->num_rows());
EXPECT_EQ(1, (*tablet)->num_rows());
}

TEST_F(TabletCooldownTest, normal) {
TabletSharedPtr tablet1;
TabletSharedPtr tablet2;
createTablet(k_engine, &tablet1, kReplicaId, kSchemaHash, kTabletId, kTxnId, kPartitionId);
createTablet(k_engine, &tablet2, kReplicaId2, kSchemaHash2, kTabletId2, kTxnId2, kPartitionId2);
// test cooldown
tablet->set_storage_policy_id(kStoragePolicyId);
st = tablet->cooldown(); // rowset [0-1]
tablet1->set_storage_policy_id(kStoragePolicyId);
Status st = tablet1->cooldown(); // rowset [0-1]
ASSERT_NE(Status::OK(), st);
tablet1->update_cooldown_conf(1, kReplicaId);
// cooldown for upload node
st = tablet1->cooldown(); // rowset [0-1]
ASSERT_EQ(Status::OK(), st);
st = tablet->cooldown(); // rowset [2-2]
st = tablet1->cooldown(); // rowset [2-2]
ASSERT_EQ(Status::OK(), st);
auto rs = tablet->get_rowset_by_version({2, 2});
auto rs = tablet1->get_rowset_by_version({2, 2});
ASSERT_FALSE(rs->is_local());

// test read
Expand All @@ -238,6 +375,31 @@ TEST_F(TabletCooldownTest, normal) {
st = std::static_pointer_cast<BetaRowset>(rs)->load_segments(&segments);
ASSERT_EQ(Status::OK(), st);
ASSERT_EQ(segments.size(), 1);

st = io::global_local_filesystem()->link_file(
get_remote_path(fmt::format("data/{}/{}.{}.meta", kTabletId, kReplicaId, 1)),
get_remote_path(fmt::format("data/{}/{}.{}.meta", kTabletId2, kReplicaId, 2)));
ASSERT_EQ(Status::OK(), st);
// follow cooldown
tablet2->set_storage_policy_id(kStoragePolicyId);
tablet2->update_cooldown_conf(1, 111111111);
st = tablet2->cooldown(); // rowset [0-1]
ASSERT_NE(Status::OK(), st);
tablet2->update_cooldown_conf(1, kReplicaId);
st = tablet2->cooldown(); // rowset [0-1]
ASSERT_NE(Status::OK(), st);
tablet2->update_cooldown_conf(2, kReplicaId);
st = tablet2->cooldown(); // rowset [0-1]
ASSERT_EQ(Status::OK(), st);
auto rs2 = tablet2->get_rowset_by_version({2, 2});
ASSERT_FALSE(rs2->is_local());

// test read tablet2
ASSERT_EQ(Status::OK(), st);
std::vector<segment_v2::SegmentSharedPtr> segments2;
st = std::static_pointer_cast<BetaRowset>(rs2)->load_segments(&segments2);
ASSERT_EQ(Status::OK(), st);
ASSERT_EQ(segments2.size(), 1);
}

} // namespace doris