From 76e949d0989a5457aa483e113e329c1ccb14e450 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Tue, 27 Jan 2026 17:19:36 +0800 Subject: [PATCH 1/2] feat: introduce scan.tag-name option to specify scanning from tag for reading given tag --- include/paimon/defs.h | 2 + src/paimon/CMakeLists.txt | 6 +- src/paimon/common/defs.cpp | 1 + src/paimon/core/core_options.cpp | 12 + src/paimon/core/core_options.h | 2 + src/paimon/core/core_options_test.cpp | 1 + .../core/table/source/abstract_table_scan.h | 11 +- .../static_from_tag_starting_scanner.h | 49 ++++ src/paimon/core/tag/tag.cpp | 115 +++++++++ src/paimon/core/tag/tag.h | 86 +++++++ src/paimon/core/tag/tag_test.cpp | 240 ++++++++++++++++++ src/paimon/core/utils/snapshot_manager.cpp | 8 + src/paimon/core/utils/snapshot_manager.h | 2 + src/paimon/core/utils/tag_manager.cpp | 61 +++++ src/paimon/core/utils/tag_manager.h | 49 ++++ src/paimon/core/utils/tag_manager_test.cpp | 38 +++ test/inte/scan_inte_test.cpp | 5 +- 17 files changed, 684 insertions(+), 4 deletions(-) create mode 100644 src/paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h create mode 100644 src/paimon/core/tag/tag.cpp create mode 100644 src/paimon/core/tag/tag.h create mode 100644 src/paimon/core/tag/tag_test.cpp create mode 100644 src/paimon/core/utils/tag_manager.cpp create mode 100644 src/paimon/core/utils/tag_manager.h create mode 100644 src/paimon/core/utils/tag_manager_test.cpp diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 65367bb5..857a9ec9 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -287,6 +287,8 @@ struct PAIMON_EXPORT Options { /// "global-index.external-path" - Global index root directory, if not set, the global index /// files will be stored under the index directory. static const char GLOBAL_INDEX_EXTERNAL_PATH[]; + /// "scan.tag-name" - Optional tag name used in case of "from-snapshot" scan mode. + static const char SCAN_TAG_NAME[]; }; static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits::max(); diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index ead27e63..d18b7ac7 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -265,6 +265,7 @@ set(PAIMON_CORE_SRCS core/table/source/table_read.cpp core/table/source/table_scan.cpp core/table/source/data_evolution_batch_scan.cpp + core/tag/tag.cpp core/utils/field_mapping.cpp core/utils/fields_comparator.cpp core/utils/file_store_path_factory.cpp @@ -273,7 +274,8 @@ set(PAIMON_CORE_SRCS core/utils/partition_path_utils.cpp core/utils/primary_key_table_utils.cpp core/utils/snapshot_manager.cpp - core/utils/special_field_ids.cpp) + core/utils/special_field_ids.cpp + core/utils/tag_manager.cpp) add_paimon_lib(paimon SOURCES @@ -563,6 +565,7 @@ if(PAIMON_BUILD_TESTS) core/table/source/split_generator_test.cpp core/table/source/startup_mode_test.cpp core/table/source/table_scan_test.cpp + core/tag/tag_test.cpp core/utils/branch_manager_test.cpp core/utils/field_mapping_test.cpp core/utils/fields_comparator_test.cpp @@ -572,6 +575,7 @@ if(PAIMON_BUILD_TESTS) core/utils/offset_row_test.cpp core/utils/partition_path_utils_test.cpp core/utils/snapshot_manager_test.cpp + core/utils/tag_manager_test.cpp core/utils/primary_key_table_utils_test.cpp core/utils/index_file_path_factories_test.cpp STATIC_LINK_LIBS diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 12143c24..35bbac8e 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -81,4 +81,5 @@ const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name"; const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor"; const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled"; const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path"; +const char Options::SCAN_TAG_NAME[] = "scan.tag-name"; } // namespace paimon diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index dadc38a2..1e3fa599 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -304,6 +304,8 @@ struct CoreOptions::Impl { bool legacy_partition_name_enabled = true; bool global_index_enabled = true; std::optional global_index_external_path; + + std::optional scan_tag_name; }; // Parse configurations from a map and return a populated CoreOptions object @@ -479,6 +481,12 @@ Result CoreOptions::FromMap( if (!global_index_external_path.empty()) { impl->global_index_external_path = global_index_external_path; } + // Parse scan.tag-name + std::string scan_tag_name; + PAIMON_RETURN_NOT_OK(parser.ParseString(Options::SCAN_TAG_NAME, &scan_tag_name)); + if (!scan_tag_name.empty()) { + impl->scan_tag_name = scan_tag_name; + } return options; } @@ -775,4 +783,8 @@ Result> CoreOptions::CreateGlobalIndexExternalPath() return std::optional(path.ToString()); } +std::optional CoreOptions::GetScanTagName() const { + return impl_->scan_tag_name; +} + } // namespace paimon diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index f1f31a1b..ef8bca2d 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -118,6 +118,8 @@ class PAIMON_EXPORT CoreOptions { bool GlobalIndexEnabled() const; Result> CreateGlobalIndexExternalPath() const; + std::optional GetScanTagName() const; + const std::map& ToMap() const; private: diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index fa005d4c..9bb6d2d5 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -88,6 +88,7 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_TRUE(core_options.LegacyPartitionNameEnabled()); ASSERT_TRUE(core_options.GlobalIndexEnabled()); ASSERT_FALSE(core_options.GetGlobalIndexExternalPath()); + ASSERT_EQ(std::nullopt, core_options.GetScanTagName()); } TEST(CoreOptionsTest, TestFromMap) { diff --git a/src/paimon/core/table/source/abstract_table_scan.h b/src/paimon/core/table/source/abstract_table_scan.h index d6968a79..35eed942 100644 --- a/src/paimon/core/table/source/abstract_table_scan.h +++ b/src/paimon/core/table/source/abstract_table_scan.h @@ -25,6 +25,7 @@ #include "paimon/core/table/source/snapshot/full_starting_scanner.h" #include "paimon/core/table/source/snapshot/snapshot_reader.h" #include "paimon/core/table/source/snapshot/static_from_snapshot_starting_scanner.h" +#include "paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h" #include "paimon/table/source/startup_mode.h" #include "paimon/table/source/table_scan.h" namespace paimon { @@ -51,6 +52,7 @@ class AbstractTableScan : public TableScan { return std::shared_ptr(new FullStartingScanner(snapshot_manager)); } } else if (startup_mode == StartupMode::FromSnapshot()) { + const std::optional scan_tag_name = core_options_.GetScanTagName(); if (specified_snapshot_id != std::nullopt) { return is_streaming ? std::shared_ptr( @@ -58,9 +60,16 @@ class AbstractTableScan : public TableScan { snapshot_manager, specified_snapshot_id.value())) : std::shared_ptr(new StaticFromSnapshotStartingScanner( snapshot_manager, specified_snapshot_id.value())); + } else if (scan_tag_name != std::nullopt) { + if (is_streaming) { + return Status::Invalid("Cannot scan from tag in streaming mode"); + } + return std::make_shared(snapshot_manager, + scan_tag_name.value()); } else { return Status::Invalid( - "scan.snapshot-id must be set when startup mode is FROM_SNAPSHOT"); + "scan.snapshot-id or scan.tag-name must be set when startup mode is " + "FROM_SNAPSHOT"); } } else if (startup_mode == StartupMode::FromSnapshotFull()) { if (specified_snapshot_id != std::nullopt) { diff --git a/src/paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h b/src/paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h new file mode 100644 index 00000000..3a936b87 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h @@ -0,0 +1,49 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/core/table/source/snapshot/starting_scanner.h" +#include "paimon/core/utils/tag_manager.h" + +namespace paimon { +/// `StartingScanner` for the `CoreOptions::GetScanTagName()` of a batch read. +class StaticFromTagStartingScanner : public StartingScanner { + public: + StaticFromTagStartingScanner(const std::shared_ptr& snapshot_manager, + const std::string& tag_name) + : StartingScanner(snapshot_manager) { + tag_name_ = tag_name; + } + + Result> Scan( + const std::shared_ptr& snapshot_reader) override { + const TagManager tag_manager(snapshot_manager_->Fs(), snapshot_manager_->RootPath(), + snapshot_manager_->Branch()); + PAIMON_ASSIGN_OR_RAISE(const Tag tag, tag_manager.GetOrThrow(tag_name_)); + PAIMON_ASSIGN_OR_RAISE(const Snapshot snapshot, tag.TrimToSnapshot()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr plan, + snapshot_reader->WithMode(ScanMode::ALL)->WithSnapshot(snapshot)->Read()); + return std::make_shared(plan); + } + + private: + std::string tag_name_; +}; +} // namespace paimon diff --git a/src/paimon/core/tag/tag.cpp b/src/paimon/core/tag/tag.cpp new file mode 100644 index 00000000..764cb618 --- /dev/null +++ b/src/paimon/core/tag/tag.cpp @@ -0,0 +1,115 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/tag/tag.h" + +#include +#include +#include + +#include "paimon/common/utils/rapidjson_util.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "rapidjson/allocators.h" +#include "rapidjson/document.h" +#include "rapidjson/rapidjson.h" + +namespace paimon { + +Tag::Tag(const std::optional& version, int64_t id, int64_t schema_id, + const std::string& base_manifest_list, + const std::optional& base_manifest_list_size, + const std::string& delta_manifest_list, + const std::optional& delta_manifest_list_size, + const std::optional& changelog_manifest_list, + const std::optional& changelog_manifest_list_size, + const std::optional& index_manifest, const std::string& commit_user, + int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis, + const std::optional>& log_offsets, + const std::optional& total_record_count, + const std::optional& delta_record_count, + const std::optional& changelog_record_count, + const std::optional& watermark, const std::optional& statistics, + const std::optional>& properties, + const std::optional& next_row_id, const std::optional& tag_create_time, + const std::optional& tag_time_retained) + : Snapshot(version, id, schema_id, base_manifest_list, base_manifest_list_size, + delta_manifest_list, delta_manifest_list_size, changelog_manifest_list, + changelog_manifest_list_size, index_manifest, commit_user, commit_identifier, + commit_kind, time_millis, log_offsets, total_record_count, delta_record_count, + changelog_record_count, watermark, statistics, properties, next_row_id), + tag_create_time_(tag_create_time), + tag_time_retained_(tag_time_retained) {} + +bool Tag::operator==(const Tag& other) const { + if (this == &other) { + return true; + } + return Snapshot::operator==(other) && tag_create_time_ == other.tag_create_time_ && + tag_time_retained_ == other.tag_time_retained_; +} + +bool Tag::TEST_Equal(const Tag& other) const { + if (this == &other) { + return true; + } + + return Snapshot::TEST_Equal(other) && tag_create_time_ == other.tag_create_time_ && + tag_time_retained_ == other.tag_time_retained_; +} + +Result Tag::TrimToSnapshot() const { + return Snapshot(Version(), Id(), SchemaId(), BaseManifestList(), BaseManifestListSize(), + DeltaManifestList(), DeltaManifestListSize(), ChangelogManifestList(), + ChangelogManifestListSize(), IndexManifest(), CommitUser(), CommitIdentifier(), + GetCommitKind(), TimeMillis(), LogOffsets(), TotalRecordCount(), + DeltaRecordCount(), ChangelogRecordCount(), Watermark(), Statistics(), + Properties(), NextRowId()); +} + +rapidjson::Value Tag::ToJson(rapidjson::Document::AllocatorType* allocator) const noexcept(false) { + rapidjson::Value obj(rapidjson::kObjectType); + obj = Snapshot::ToJson(allocator); + if (tag_create_time_ != std::nullopt) { + obj.AddMember(rapidjson::StringRef(FIELD_TAG_CREATE_TIME), + RapidJsonUtil::SerializeValue(tag_create_time_.value(), allocator).Move(), + *allocator); + } + if (tag_time_retained_ != std::nullopt) { + obj.AddMember(rapidjson::StringRef(FIELD_TAG_TIME_RETAINED), + RapidJsonUtil::SerializeValue(tag_time_retained_.value(), allocator).Move(), + *allocator); + } + return obj; +} + +void Tag::FromJson(const rapidjson::Value& obj) noexcept(false) { + Snapshot::FromJson(obj); + tag_create_time_ = + RapidJsonUtil::DeserializeKeyValue>(obj, FIELD_TAG_CREATE_TIME); + tag_time_retained_ = + RapidJsonUtil::DeserializeKeyValue>(obj, FIELD_TAG_TIME_RETAINED); +} + +Result Tag::FromPath(const std::shared_ptr& fs, const std::string& path) { + std::string json_str; + PAIMON_RETURN_NOT_OK(fs->ReadFile(path, &json_str)); + Tag tag; + PAIMON_RETURN_NOT_OK(RapidJsonUtil::FromJsonString(json_str, &tag)); + return tag; +} +} // namespace paimon diff --git a/src/paimon/core/tag/tag.h b/src/paimon/core/tag/tag.h new file mode 100644 index 00000000..e509678b --- /dev/null +++ b/src/paimon/core/tag/tag.h @@ -0,0 +1,86 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/utils/jsonizable.h" +#include "paimon/core/snapshot.h" +#include "paimon/result.h" +#include "rapidjson/allocators.h" +#include "rapidjson/document.h" +#include "rapidjson/rapidjson.h" + +namespace paimon { +class FileSystem; + +/// Snapshot with tagCreateTime and tagTimeRetained. +class Tag : public Snapshot { + public: + static constexpr char FIELD_TAG_CREATE_TIME[] = "tagCreateTime"; + static constexpr char FIELD_TAG_TIME_RETAINED[] = "tagTimeRetained"; + + JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(Tag); + + Tag(const std::optional& version, int64_t id, int64_t schema_id, + const std::string& base_manifest_list, + const std::optional& base_manifest_list_size, + const std::string& delta_manifest_list, + const std::optional& delta_manifest_list_size, + const std::optional& changelog_manifest_list, + const std::optional& changelog_manifest_list_size, + const std::optional& index_manifest, const std::string& commit_user, + int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis, + const std::optional>& log_offsets, + const std::optional& total_record_count, + const std::optional& delta_record_count, + const std::optional& changelog_record_count, + const std::optional& watermark, const std::optional& statistics, + const std::optional>& properties, + const std::optional& next_row_id, const std::optional& tag_create_time, + const std::optional& tag_time_retained); + + bool operator==(const Tag& other) const; + bool TEST_Equal(const Tag& other) const; + + std::optional TagCreateTime() const { + return tag_create_time_; + } + + std::optional TagTimeRetained() const { + return tag_time_retained_; + } + + Result TrimToSnapshot() const; + + rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) const + noexcept(false) override; + + void FromJson(const rapidjson::Value& obj) noexcept(false) override; + + static Result FromPath(const std::shared_ptr& fs, const std::string& path); + + private: + std::optional tag_create_time_; + std::optional tag_time_retained_; +}; +} // namespace paimon diff --git a/src/paimon/core/tag/tag_test.cpp b/src/paimon/core/tag/tag_test.cpp new file mode 100644 index 00000000..a4bc8dd5 --- /dev/null +++ b/src/paimon/core/tag/tag_test.cpp @@ -0,0 +1,240 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/tag/tag.h" + +#include "gtest/gtest.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class TagTest : public testing::Test { + public: + static std::string ReplaceAll(const std::string& str) { + std::string replaced_str = StringUtils::Replace(str, " ", ""); + replaced_str = StringUtils::Replace(replaced_str, "\t", ""); + replaced_str = StringUtils::Replace(replaced_str, "\n", ""); + return replaced_str; + } +}; + +TEST_F(TagTest, TestSimple) { + const std::map log_offset = {{25, 30}}; + const std::map properties = {{"key1", "value1"}, {"key2", "value2"}}; + const auto tag_create_time = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count(); + const Tag tag( + /*version=*/5, /*id=*/10, /*schema_id=*/15, /*base_manifest_list=*/"base_manifest_list", 10, + /*delta_manifest_list=*/"delta_manifest_list", 20, + /*changelog_manifest_list=*/"changelog_manifest_list", 30, + /*index_manifest=*/"index_manifest", + /*commit_user=*/"commit_user_01", /*commit_identifier=*/20, + /*commit_kind=*/Snapshot::CommitKind::Compact(), /*time_millis=*/1234, log_offset, + /*total_record_count=*/35, + /*delta_record_count=*/40, /*changelog_record_count=*/45, /*watermark=*/50, + /*statistics=*/"statistic_test", properties, /*next_row_id=*/0, + /*tag_create_time=*/tag_create_time, /*tag_time_retained=*/5000); + ASSERT_EQ(5, tag.Version()); + ASSERT_EQ(10, tag.Id()); + ASSERT_EQ(15, tag.SchemaId()); + ASSERT_EQ("base_manifest_list", tag.BaseManifestList()); + ASSERT_EQ(10, tag.BaseManifestListSize().value()); + ASSERT_EQ("delta_manifest_list", tag.DeltaManifestList()); + ASSERT_EQ(20, tag.DeltaManifestListSize().value()); + ASSERT_EQ("changelog_manifest_list", tag.ChangelogManifestList().value()); + ASSERT_EQ(30, tag.ChangelogManifestListSize().value()); + ASSERT_EQ("index_manifest", tag.IndexManifest().value()); + ASSERT_EQ("commit_user_01", tag.CommitUser()); + ASSERT_EQ(20, tag.CommitIdentifier()); + ASSERT_EQ(Snapshot::CommitKind::Compact(), tag.GetCommitKind()); + ASSERT_EQ(1234, tag.TimeMillis()); + ASSERT_EQ(log_offset, tag.LogOffsets().value()); + ASSERT_EQ(35, tag.TotalRecordCount().value()); + ASSERT_EQ(40, tag.DeltaRecordCount().value()); + ASSERT_EQ(45, tag.ChangelogRecordCount().value()); + ASSERT_EQ(50, tag.Watermark().value()); + ASSERT_EQ("statistic_test", tag.Statistics().value()); + ASSERT_EQ(properties, tag.Properties().value()); + ASSERT_EQ(0, tag.NextRowId().value()); + ASSERT_EQ(tag_create_time, tag.TagCreateTime().value()); + ASSERT_EQ(5000, tag.TagTimeRetained().value()); +} + +TEST_F(TagTest, TestFromPath) { + const std::string data_path = + paimon::test::GetDataDir() + "/orc/append_09.db/append_09/snapshot/snapshot-1"; + const auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(Tag tag, Tag::FromPath(fs, data_path)); + ASSERT_EQ(3, tag.Version()); + ASSERT_EQ(1, tag.Id()); + ASSERT_EQ(0, tag.SchemaId()); + ASSERT_EQ("manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-0", tag.BaseManifestList()); + ASSERT_EQ(std::nullopt, tag.BaseManifestListSize()); + ASSERT_EQ("manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-1", tag.DeltaManifestList()); + ASSERT_EQ(std::nullopt, tag.DeltaManifestListSize()); + ASSERT_EQ(std::nullopt, tag.ChangelogManifestList()); + ASSERT_EQ(std::nullopt, tag.ChangelogManifestListSize()); + ASSERT_EQ(std::nullopt, tag.IndexManifest()); + ASSERT_EQ("b02e4322-9c5f-41e1-a560-c0156fdf7b9c", tag.CommitUser()); + ASSERT_EQ(9223372036854775807ll, tag.CommitIdentifier()); + ASSERT_EQ(Snapshot::CommitKind::Append(), tag.GetCommitKind()); + ASSERT_EQ(1721614343270ll, tag.TimeMillis()); + ASSERT_EQ((std::map()), tag.LogOffsets().value()); + ASSERT_EQ(5, tag.TotalRecordCount().value()); + ASSERT_EQ(5, tag.DeltaRecordCount().value()); + ASSERT_EQ(0, tag.ChangelogRecordCount().value()); + ASSERT_EQ(std::nullopt, tag.Watermark()); + ASSERT_EQ(std::nullopt, tag.Statistics()); + ASSERT_EQ(std::nullopt, tag.Properties()); + ASSERT_EQ(std::nullopt, tag.NextRowId()); + ASSERT_EQ(std::nullopt, tag.TagCreateTime()); + ASSERT_EQ(std::nullopt, tag.TagTimeRetained()); +} + +TEST_F(TagTest, TestJsonizable) { + const std::string json_str = R"({ + "version" : 3, + "id" : 1, + "schemaId" : 0, + "baseManifestList" : "manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-0", + "baseManifestListSize" : 20, + "deltaManifestList" : "manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-1", + "deltaManifestListSize" : 50, + "changelogManifestList" : null, + "commitUser" : "0e4d92f7-53b0-40d6-a7c0-102bf3801e6a", + "commitIdentifier" : 9223372036854775807, + "commitKind" : "OVERWRITE", + "timeMillis" : 1711692199281, + "logOffsets" : { }, + "totalRecordCount" : 3, + "deltaRecordCount" : 3, + "changelogRecordCount" : 0, + "tagCreateTime": 1769503588, + "tagTimeRetained": 1000 + })"; + + Tag tag; + ASSERT_OK(RapidJsonUtil::FromJsonString(json_str, &tag)); + + const Tag expected_tag( + /*version=*/3, /*id=*/1, /*schema_id=*/0, /*base_manifest_list=*/ + "manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-0", /*base_manifest_list_size=*/20, + /*delta_manifest_list=*/"manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-1", + /*delta_manifest_list_size=*/50, /*changelog_manifest_list=*/std::nullopt, + /*changelog_manifest_list_size=*/std::nullopt, /*index_manifest=*/std::nullopt, + /*commit_user=*/"0e4d92f7-53b0-40d6-a7c0-102bf3801e6a", + /*commit_identifier=*/9223372036854775807ll, + /*commit_kind=*/Snapshot::CommitKind::Overwrite(), /*time_millis=*/1711692199281ll, + /*log_offsets=*/std::map(), + /*total_record_count=*/3, /*delta_record_count=*/3, /*changelog_record_count=*/0, + /*watermark=*/std::nullopt, /*statistics=*/std::nullopt, /*properties=*/std::nullopt, + /*next_row_id=*/std::nullopt, /*tag_create_time=*/1769503588, /*tag_time_retained=*/1000); + ASSERT_EQ(expected_tag, tag); + + ASSERT_OK_AND_ASSIGN(std::string new_json_str, tag.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(new_json_str)); +} + +TEST_F(TagTest, TestSerializeAndDeserialize) { + const auto se_and_de = [&](const std::string& data_path) { + auto fs = std::make_shared(); + std::string json_str; + ASSERT_OK(fs->ReadFile(data_path, &json_str)); + ASSERT_OK_AND_ASSIGN(Tag tag, Tag::FromPath(fs, data_path)); + ASSERT_OK_AND_ASSIGN(std::string se_json_str, tag.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(se_json_str)); + }; + auto se_and_de_from_str = [&](const std::string& json_str) { + Tag tag; + ASSERT_OK(RapidJsonUtil::FromJsonString(json_str, &tag)); + ASSERT_OK_AND_ASSIGN(std::string se_json_str, tag.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(se_json_str)); + }; + { + const std::string data_path = + paimon::test::GetDataDir() + + "/orc/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/snapshot/snapshot-1"; + se_and_de(data_path); + } + { + // with tagCreateTime + const std::string json_str = R"({ + "version" : 3, + "id" : 10, + "schemaId" : 2, + "baseManifestList" : "base-manifest-list-1", + "baseManifestListSize" : 100, + "deltaManifestList" : "delta-manifest-list-2", + "deltaManifestListSize" : 200, + "changelogManifestList" : null, + "commitUser" : "commit-usr-3", + "commitIdentifier" : 12, + "commitKind" : "APPEND", + "timeMillis" : 1749724197266, + "logOffsets" : { + "0" : 1, + "1" : 3 + }, + "totalRecordCount" : 1024, + "deltaRecordCount" : 4096, + "watermark" : 1749724196266, + "statistics" : "statistics-4", + "properties" : { + "key0" : "value0", + "key1" : "value1" + }, + "tagCreateTime": 1769503988 + })"; + se_and_de_from_str(json_str); + } + { + // with tagTimeRetained + const std::string json_str = R"({ + "version" : 3, + "id" : 10, + "schemaId" : 2, + "baseManifestList" : "base-manifest-list-1", + "baseManifestListSize" : 100, + "deltaManifestList" : "delta-manifest-list-2", + "deltaManifestListSize" : 200, + "changelogManifestList" : null, + "commitUser" : "commit-usr-3", + "commitIdentifier" : 12, + "commitKind" : "APPEND", + "timeMillis" : 1749724197266, + "logOffsets" : { + "0" : 1, + "1" : 3 + }, + "totalRecordCount" : 1024, + "deltaRecordCount" : 4096, + "watermark" : 1749724196266, + "statistics" : "statistics-4", + "properties" : { + "key0" : "value0", + "key1" : "value1" + }, + "tagTimeRetained" : 900 + })"; + se_and_de_from_str(json_str); + } +} +} // namespace paimon::test diff --git a/src/paimon/core/utils/snapshot_manager.cpp b/src/paimon/core/utils/snapshot_manager.cpp index fc5e2cce..0990d958 100644 --- a/src/paimon/core/utils/snapshot_manager.cpp +++ b/src/paimon/core/utils/snapshot_manager.cpp @@ -42,6 +42,14 @@ SnapshotManager::SnapshotManager(const std::shared_ptr& fs, SnapshotManager::~SnapshotManager() = default; +const std::shared_ptr& SnapshotManager::Fs() const { + return fs_; +} + +const std::string& SnapshotManager::RootPath() const { + return root_path_; +} + const std::string& SnapshotManager::Branch() const { return branch_; } diff --git a/src/paimon/core/utils/snapshot_manager.h b/src/paimon/core/utils/snapshot_manager.h index 3b733082..688f1518 100644 --- a/src/paimon/core/utils/snapshot_manager.h +++ b/src/paimon/core/utils/snapshot_manager.h @@ -45,6 +45,8 @@ class SnapshotManager { const std::string& branch); ~SnapshotManager(); + const std::shared_ptr& Fs() const; + const std::string& RootPath() const; const std::string& Branch() const; Result> LatestSnapshot() const; std::string SnapshotDirectory() const; diff --git a/src/paimon/core/utils/tag_manager.cpp b/src/paimon/core/utils/tag_manager.cpp new file mode 100644 index 00000000..b57ff89a --- /dev/null +++ b/src/paimon/core/utils/tag_manager.cpp @@ -0,0 +1,61 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/utils/tag_manager.h" + +#include + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/tag/tag.h" +#include "paimon/core/utils/branch_manager.h" + +namespace paimon { + +TagManager::TagManager(const std::shared_ptr& fs, const std::string& root_path) + : TagManager(fs, root_path, BranchManager::DEFAULT_MAIN_BRANCH) {} + +TagManager::TagManager(const std::shared_ptr& fs, const std::string& root_path, + const std::string& branch) + : fs_(fs), root_path_(root_path), branch_(BranchManager::NormalizeBranch(branch)) {} + +Result TagManager::GetOrThrow(const std::string& tag_name) const { + PAIMON_ASSIGN_OR_RAISE(std::optional tag, Get(tag_name)); + if (tag == std::nullopt) { + return Status::NotExist(fmt::format("Tag '{}' doesn't exist.", tag_name)); + } + return tag.value(); +} + +Result> TagManager::Get(const std::string& tag_name) const { + std::string tag_path = TagPath(tag_name); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(tag_path)); + if (!is_exist) { + return std::optional(); + } + PAIMON_ASSIGN_OR_RAISE(Tag tag, Tag::FromPath(fs_, tag_path)); + return std::optional(std::move(tag)); +} + +std::string TagManager::TagPath(const std::string& tag_name) const { + return PathUtil::JoinPath(BranchManager::BranchPath(root_path_, branch_), + "/tag/" + std::string(TAG_PREFIX) + tag_name); +} +} // namespace paimon diff --git a/src/paimon/core/utils/tag_manager.h b/src/paimon/core/utils/tag_manager.h new file mode 100644 index 00000000..0aad3022 --- /dev/null +++ b/src/paimon/core/utils/tag_manager.h @@ -0,0 +1,49 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/core/tag/tag.h" + +namespace paimon { + +class FileSystem; + +/// Manager for `Tag`. +class TagManager { + public: + static constexpr char TAG_PREFIX[] = "tag-"; + + TagManager(const std::shared_ptr& fs, const std::string& root_path); + TagManager(const std::shared_ptr& fs, const std::string& root_path, + const std::string& branch); + + Result GetOrThrow(const std::string& tag_name) const; + + Result> Get(const std::string& tag_name) const; + + std::string TagPath(const std::string& tag_name) const; + + private: + std::shared_ptr fs_; + std::string root_path_; + std::string branch_; +}; +} // namespace paimon diff --git a/src/paimon/core/utils/tag_manager_test.cpp b/src/paimon/core/utils/tag_manager_test.cpp new file mode 100644 index 00000000..069e09b5 --- /dev/null +++ b/src/paimon/core/utils/tag_manager_test.cpp @@ -0,0 +1,38 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/utils/tag_manager.h" + +#include "gtest/gtest.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(TagManagerTest, TestGet) { + ASSERT_OK_AND_ASSIGN(auto tag, + TagManager(std::make_shared(), + paimon::test::GetDataDir() + "/orc/append_09.db/append_09") + .Get("tag")); + ASSERT_EQ(tag, std::nullopt); +} + +TEST(TagManagerTest, TestTagPath) { + ASSERT_EQ(TagManager(nullptr, "/root").TagPath("data"), "/root/tag/tag-data"); + ASSERT_EQ(TagManager(nullptr, "/root", "data").TagPath("data"), + "/root/branch/branch-data/tag/tag-data"); +} +} // namespace paimon::test diff --git a/test/inte/scan_inte_test.cpp b/test/inte/scan_inte_test.cpp index f91cb558..e4def380 100644 --- a/test/inte/scan_inte_test.cpp +++ b/test/inte/scan_inte_test.cpp @@ -928,8 +928,9 @@ TEST_F(ScanInteTest, TestScanAppendWithInvalidOptions) { .WithStreamingMode(true); ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); - ASSERT_NOK_WITH_MSG(table_scan->CreatePlan(), - "scan.snapshot-id must be set when startup mode is FROM_SNAPSHOT"); + ASSERT_NOK_WITH_MSG( + table_scan->CreatePlan(), + "scan.snapshot-id or scan.tag-name must be set when startup mode is FROM_SNAPSHOT"); } { ScanContextBuilder context_builder(table_path); From f390c5e186c445b4aadb1e353f9f2a4ef931f2f8 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 28 Jan 2026 13:30:39 +0800 Subject: [PATCH 2/2] feat: introduce scan.tag-name option to specify scanning from tag for reading given tag --- src/paimon/core/core_options_test.cpp | 2 ++ src/paimon/core/utils/tag_manager_test.cpp | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 9bb6d2d5..1bc2cd91 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -147,6 +147,7 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::PARTITION_GENERATE_LEGACY_NAME, "false"}, {Options::GLOBAL_INDEX_ENABLED, "false"}, {Options::GLOBAL_INDEX_EXTERNAL_PATH, "FILE:///tmp/global_index/"}, + {Options::SCAN_TAG_NAME, "test-tag"}, }; ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); @@ -217,6 +218,7 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_FALSE(core_options.GlobalIndexEnabled()); ASSERT_TRUE(core_options.GetGlobalIndexExternalPath()); ASSERT_EQ(core_options.GetGlobalIndexExternalPath().value(), "FILE:///tmp/global_index/"); + ASSERT_EQ(core_options.GetScanTagName().value(), "test-tag"); } TEST(CoreOptionsTest, TestInvalidCase) { diff --git a/src/paimon/core/utils/tag_manager_test.cpp b/src/paimon/core/utils/tag_manager_test.cpp index 069e09b5..a1a37b4b 100644 --- a/src/paimon/core/utils/tag_manager_test.cpp +++ b/src/paimon/core/utils/tag_manager_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.