diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index 6fde083581c92a..b842f82d597a4a 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -23,6 +23,10 @@ #include "common/status.h" #include "exec/tablet_info.h" #include "gen_cpp/internal_service.pb.h" +#include "io/fs/file_writer.h" +#include "io/fs/local_file_system.h" +#include "io/fs/local_file_writer.h" +#include "io/fs/remote_file_system.h" #include "io/fs/s3_file_system.h" #include "olap/delta_writer.h" #include "olap/rowset/beta_rowset.h" @@ -37,25 +41,144 @@ namespace doris { static StorageEngine* k_engine = nullptr; -static const std::string kTestDir = "./ut_dir/tablet_cooldown_test"; +static const std::string kTestDir = "ut_dir/tablet_cooldown_test"; static constexpr int64_t kResourceId = 10000; static constexpr int64_t kStoragePolicyId = 10002; +static constexpr int64_t kTabletId = 10005; +static constexpr int64_t kTabletId2 = 10006; +static constexpr int64_t kReplicaId = 10009; +static constexpr int32_t kSchemaHash = 270068377; +static constexpr int64_t kReplicaId2 = 10010; +static constexpr int32_t kSchemaHash2 = 270068381; + +static constexpr int32_t kTxnId = 20003; +static constexpr int32_t kPartitionId = 30003; +static constexpr int32_t kTxnId2 = 40003; +static constexpr int32_t kPartitionId2 = 50003; + +using io::Path; + +static io::FileSystemSPtr s_fs; + +static std::string get_remote_path(const Path& path) { + return fmt::format("{}/remote/{}", config::storage_root_path, path.string()); +} + +class FileWriterMock : public io::FileWriter { +public: + FileWriterMock(Path path) : io::FileWriter(std::move(path)) { + io::global_local_filesystem()->create_file(get_remote_path(_path), &_local_file_writer); + } + + ~FileWriterMock() override = default; + + Status close() override { return _local_file_writer->close(); } + + Status abort() override { return _local_file_writer->abort(); } + + Status append(const Slice& data) override { return _local_file_writer->append(data); } + + Status appendv(const Slice* data, size_t data_cnt) override { + return _local_file_writer->appendv(data, data_cnt); + } + + Status write_at(size_t offset, const Slice& data) override { + return _local_file_writer->write_at(offset, data); + } + + Status finalize() override { return _local_file_writer->finalize(); } + + size_t bytes_appended() const override { return _local_file_writer->bytes_appended(); } + + io::FileSystemSPtr fs() const override { return s_fs; } + +private: + std::unique_ptr _local_file_writer; +}; + +class RemoteFileSystemMock : public io::RemoteFileSystem { +public: + RemoteFileSystemMock(Path root_path, std::string&& id, io::FileSystemType type) + : RemoteFileSystem(std::move(root_path), std::move(id), type) { + _local_fs = io::LocalFileSystem::create(get_remote_path(_root_path)); + } + ~RemoteFileSystemMock() override = default; + + Status create_file(const Path& path, io::FileWriterPtr* writer) override { + Path fs_path = path; + *writer = std::make_unique(fs_path); + return Status::OK(); + } + + Status open_file(const Path& path, io::FileReaderSPtr* reader, IOContext* io_ctx) override { + return _local_fs->open_file(get_remote_path(path), reader, io_ctx); + } + + Status delete_file(const Path& path) override { + return _local_fs->delete_file(get_remote_path(path)); + } + + Status create_directory(const Path& path) override { + return _local_fs->create_directory(get_remote_path(path)); + } + + Status delete_directory(const Path& path) override { + return _local_fs->delete_directory(get_remote_path(path)); + } + + Status link_file(const Path& src, const Path& dest) override { + return _local_fs->link_file(get_remote_path(src), get_remote_path(dest)); + } + + Status exists(const Path& path, bool* res) const override { + return _local_fs->exists(get_remote_path(path), res); + } + + Status file_size(const Path& path, size_t* file_size) const override { + return _local_fs->file_size(get_remote_path(path), file_size); + } + + Status list(const Path& path, std::vector* files) override { + std::vector local_paths; + RETURN_IF_ERROR(_local_fs->list(get_remote_path(path), &local_paths)); + for (Path path : local_paths) { + files->emplace_back(path.string().substr(config::storage_root_path.size() + 1)); + } + return Status::OK(); + } + + Status upload(const Path& local_path, const Path& dest_path) override { + return _local_fs->link_file(local_path.string(), get_remote_path(dest_path)); + } + + Status batch_upload(const std::vector& local_paths, + const std::vector& dest_paths) override { + for (int i = 0; i < local_paths.size(); ++i) { + RETURN_IF_ERROR(upload(local_paths[i], dest_paths[i])); + } + return Status::OK(); + } + + Status batch_delete(const std::vector& paths) override { + for (int i = 0; i < paths.size(); ++i) { + RETURN_IF_ERROR(delete_file(paths[i])); + } + return Status::OK(); + } + + Status connect() override { return Status::OK(); } + +private: + std::shared_ptr _local_fs; +}; -// remove DISABLED_ when need run this test -#define TabletCooldownTest DISABLED_TabletCooldownTest class TabletCooldownTest : public testing::Test { public: static void SetUpTestSuite() { - S3Conf s3_conf; - s3_conf.ak = config::test_s3_ak; - s3_conf.sk = config::test_s3_sk; - s3_conf.endpoint = config::test_s3_endpoint; - s3_conf.region = config::test_s3_region; - s3_conf.bucket = config::test_s3_bucket; - s3_conf.prefix = config::test_s3_prefix + "/tablet_cooldown_test"; - auto s3_fs = io::S3FileSystem::create(std::move(s3_conf), std::to_string(kResourceId)); - ASSERT_TRUE(s3_fs->connect().ok()); - put_storage_resource(kResourceId, {s3_fs, 1}); + s_fs.reset(new RemoteFileSystemMock("test_path", std::to_string(kResourceId), + io::FileSystemType::S3)); + StorageResource resource = {s_fs, 1}; + put_storage_resource(kResourceId, resource); auto storage_policy = std::make_shared(); storage_policy->name = "TabletCooldownTest"; storage_policy->version = 1; @@ -70,6 +193,8 @@ class TabletCooldownTest : public testing::Test { FileUtils::remove_all(config::storage_root_path); FileUtils::create_dir(config::storage_root_path); + FileUtils::create_dir(get_remote_path(fmt::format("data/{}", kTabletId))); + FileUtils::create_dir(get_remote_path(fmt::format("data/{}", kTabletId2))); std::vector paths {{config::storage_root_path, -1}}; @@ -145,11 +270,13 @@ static TDescriptorTable create_descriptor_tablet_with_sequence_col() { return desc_tbl_builder.desc_tbl(); } -TEST_F(TabletCooldownTest, normal) { +void createTablet(StorageEngine* engine, TabletSharedPtr* tablet, int64_t replica_id, + int32_t schema_hash, int64_t tablet_id, int64_t txn_id, int64_t partition_id) { // create tablet TCreateTabletReq request; - create_tablet_request_with_sequence_col(10005, 270068377, &request); - Status st = k_engine->create_tablet(request); + create_tablet_request_with_sequence_col(tablet_id, schema_hash, &request); + request.__set_replica_id(replica_id); + Status st = engine->create_tablet(request); ASSERT_EQ(Status::OK(), st); TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); @@ -163,8 +290,9 @@ TEST_F(TabletCooldownTest, normal) { PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003, 30003, - load_id, tuple_desc, &(tuple_desc->slots()), false, ¶m}; + + WriteRequest write_req = {tablet_id, schema_hash, WriteType::LOAD, txn_id, partition_id, + load_id, tuple_desc, &(tuple_desc->slots()), false, ¶m}; DeltaWriter* delta_writer = nullptr; DeltaWriter::open(&write_req, &delta_writer); ASSERT_NE(delta_writer, nullptr); @@ -203,33 +331,42 @@ TEST_F(TabletCooldownTest, normal) { delete delta_writer; // publish version success - TabletSharedPtr tablet = - k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); - OlapMeta* meta = tablet->data_dir()->get_meta(); + *tablet = engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + OlapMeta* meta = (*tablet)->data_dir()->get_meta(); Version version; - version.first = tablet->rowset_with_max_version()->end_version() + 1; - version.second = tablet->rowset_with_max_version()->end_version() + 1; + version.first = (*tablet)->rowset_with_max_version()->end_version() + 1; + version.second = (*tablet)->rowset_with_max_version()->end_version() + 1; std::map tablet_related_rs; - StorageEngine::instance()->txn_manager()->get_txn_related_tablets( - write_req.txn_id, write_req.partition_id, &tablet_related_rs); + engine->txn_manager()->get_txn_related_tablets(write_req.txn_id, write_req.partition_id, + &tablet_related_rs); for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; - st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - tablet->tablet_id(), tablet->schema_hash(), - tablet->tablet_uid(), version); + st = engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + (*tablet)->tablet_id(), (*tablet)->schema_hash(), + (*tablet)->tablet_uid(), version); ASSERT_EQ(Status::OK(), st); - st = tablet->add_inc_rowset(rowset); + st = (*tablet)->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st); } - EXPECT_EQ(1, tablet->num_rows()); + EXPECT_EQ(1, (*tablet)->num_rows()); +} +TEST_F(TabletCooldownTest, normal) { + TabletSharedPtr tablet1; + TabletSharedPtr tablet2; + createTablet(k_engine, &tablet1, kReplicaId, kSchemaHash, kTabletId, kTxnId, kPartitionId); + createTablet(k_engine, &tablet2, kReplicaId2, kSchemaHash2, kTabletId2, kTxnId2, kPartitionId2); // test cooldown - tablet->set_storage_policy_id(kStoragePolicyId); - st = tablet->cooldown(); // rowset [0-1] + tablet1->set_storage_policy_id(kStoragePolicyId); + Status st = tablet1->cooldown(); // rowset [0-1] + ASSERT_NE(Status::OK(), st); + tablet1->update_cooldown_conf(1, kReplicaId); + // cooldown for upload node + st = tablet1->cooldown(); // rowset [0-1] ASSERT_EQ(Status::OK(), st); - st = tablet->cooldown(); // rowset [2-2] + st = tablet1->cooldown(); // rowset [2-2] ASSERT_EQ(Status::OK(), st); - auto rs = tablet->get_rowset_by_version({2, 2}); + auto rs = tablet1->get_rowset_by_version({2, 2}); ASSERT_FALSE(rs->is_local()); // test read @@ -238,6 +375,31 @@ TEST_F(TabletCooldownTest, normal) { st = std::static_pointer_cast(rs)->load_segments(&segments); ASSERT_EQ(Status::OK(), st); ASSERT_EQ(segments.size(), 1); + + st = io::global_local_filesystem()->link_file( + get_remote_path(fmt::format("data/{}/{}.{}.meta", kTabletId, kReplicaId, 1)), + get_remote_path(fmt::format("data/{}/{}.{}.meta", kTabletId2, kReplicaId, 2))); + ASSERT_EQ(Status::OK(), st); + // follow cooldown + tablet2->set_storage_policy_id(kStoragePolicyId); + tablet2->update_cooldown_conf(1, 111111111); + st = tablet2->cooldown(); // rowset [0-1] + ASSERT_NE(Status::OK(), st); + tablet2->update_cooldown_conf(1, kReplicaId); + st = tablet2->cooldown(); // rowset [0-1] + ASSERT_NE(Status::OK(), st); + tablet2->update_cooldown_conf(2, kReplicaId); + st = tablet2->cooldown(); // rowset [0-1] + ASSERT_EQ(Status::OK(), st); + auto rs2 = tablet2->get_rowset_by_version({2, 2}); + ASSERT_FALSE(rs2->is_local()); + + // test read tablet2 + ASSERT_EQ(Status::OK(), st); + std::vector segments2; + st = std::static_pointer_cast(rs2)->load_segments(&segments2); + ASSERT_EQ(Status::OK(), st); + ASSERT_EQ(segments2.size(), 1); } } // namespace doris