Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ struct PAIMON_EXPORT Options {
/// "global-index.external-path" - Global index root directory, if not set, the global index
/// files will be stored under the index directory.
static const char GLOBAL_INDEX_EXTERNAL_PATH[];
/// "scan.tag-name" - Optional tag name used in case of "from-snapshot" scan mode.
static const char SCAN_TAG_NAME[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
6 changes: 5 additions & 1 deletion src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ set(PAIMON_CORE_SRCS
core/table/source/table_read.cpp
core/table/source/table_scan.cpp
core/table/source/data_evolution_batch_scan.cpp
core/tag/tag.cpp
core/utils/field_mapping.cpp
core/utils/fields_comparator.cpp
core/utils/file_store_path_factory.cpp
Expand All @@ -273,7 +274,8 @@ set(PAIMON_CORE_SRCS
core/utils/partition_path_utils.cpp
core/utils/primary_key_table_utils.cpp
core/utils/snapshot_manager.cpp
core/utils/special_field_ids.cpp)
core/utils/special_field_ids.cpp
core/utils/tag_manager.cpp)

add_paimon_lib(paimon
SOURCES
Expand Down Expand Up @@ -563,6 +565,7 @@ if(PAIMON_BUILD_TESTS)
core/table/source/split_generator_test.cpp
core/table/source/startup_mode_test.cpp
core/table/source/table_scan_test.cpp
core/tag/tag_test.cpp
core/utils/branch_manager_test.cpp
core/utils/field_mapping_test.cpp
core/utils/fields_comparator_test.cpp
Expand All @@ -572,6 +575,7 @@ if(PAIMON_BUILD_TESTS)
core/utils/offset_row_test.cpp
core/utils/partition_path_utils_test.cpp
core/utils/snapshot_manager_test.cpp
core/utils/tag_manager_test.cpp
core/utils/primary_key_table_utils_test.cpp
core/utils/index_file_path_factories_test.cpp
STATIC_LINK_LIBS
Expand Down
1 change: 1 addition & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
const char Options::SCAN_TAG_NAME[] = "scan.tag-name";
} // namespace paimon
12 changes: 12 additions & 0 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ struct CoreOptions::Impl {
bool legacy_partition_name_enabled = true;
bool global_index_enabled = true;
std::optional<std::string> global_index_external_path;

std::optional<std::string> scan_tag_name;
};

// Parse configurations from a map and return a populated CoreOptions object
Expand Down Expand Up @@ -479,6 +481,12 @@ Result<CoreOptions> CoreOptions::FromMap(
if (!global_index_external_path.empty()) {
impl->global_index_external_path = global_index_external_path;
}
// Parse scan.tag-name
std::string scan_tag_name;
PAIMON_RETURN_NOT_OK(parser.ParseString(Options::SCAN_TAG_NAME, &scan_tag_name));
if (!scan_tag_name.empty()) {
impl->scan_tag_name = scan_tag_name;
}

return options;
}
Expand Down Expand Up @@ -775,4 +783,8 @@ Result<std::optional<std::string>> CoreOptions::CreateGlobalIndexExternalPath()
return std::optional<std::string>(path.ToString());
}

std::optional<std::string> CoreOptions::GetScanTagName() const {
return impl_->scan_tag_name;
}

} // namespace paimon
2 changes: 2 additions & 0 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class PAIMON_EXPORT CoreOptions {
bool GlobalIndexEnabled() const;
Result<std::optional<std::string>> CreateGlobalIndexExternalPath() const;

std::optional<std::string> GetScanTagName() const;

const std::map<std::string, std::string>& ToMap() const;

private:
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_TRUE(core_options.LegacyPartitionNameEnabled());
ASSERT_TRUE(core_options.GlobalIndexEnabled());
ASSERT_FALSE(core_options.GetGlobalIndexExternalPath());
ASSERT_EQ(std::nullopt, core_options.GetScanTagName());
}

TEST(CoreOptionsTest, TestFromMap) {
Expand Down Expand Up @@ -146,6 +147,7 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::PARTITION_GENERATE_LEGACY_NAME, "false"},
{Options::GLOBAL_INDEX_ENABLED, "false"},
{Options::GLOBAL_INDEX_EXTERNAL_PATH, "FILE:///tmp/global_index/"},
{Options::SCAN_TAG_NAME, "test-tag"},
};

ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
Expand Down Expand Up @@ -216,6 +218,7 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_FALSE(core_options.GlobalIndexEnabled());
ASSERT_TRUE(core_options.GetGlobalIndexExternalPath());
ASSERT_EQ(core_options.GetGlobalIndexExternalPath().value(), "FILE:///tmp/global_index/");
ASSERT_EQ(core_options.GetScanTagName().value(), "test-tag");
}

TEST(CoreOptionsTest, TestInvalidCase) {
Expand Down
11 changes: 10 additions & 1 deletion src/paimon/core/table/source/abstract_table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "paimon/core/table/source/snapshot/full_starting_scanner.h"
#include "paimon/core/table/source/snapshot/snapshot_reader.h"
#include "paimon/core/table/source/snapshot/static_from_snapshot_starting_scanner.h"
#include "paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h"
#include "paimon/table/source/startup_mode.h"
#include "paimon/table/source/table_scan.h"
namespace paimon {
Expand All @@ -51,16 +52,24 @@ class AbstractTableScan : public TableScan {
return std::shared_ptr<StartingScanner>(new FullStartingScanner(snapshot_manager));
}
} else if (startup_mode == StartupMode::FromSnapshot()) {
const std::optional<std::string> scan_tag_name = core_options_.GetScanTagName();
if (specified_snapshot_id != std::nullopt) {
return is_streaming
? std::shared_ptr<StartingScanner>(
new ContinuousFromSnapshotStartingScanner(
snapshot_manager, specified_snapshot_id.value()))
: std::shared_ptr<StartingScanner>(new StaticFromSnapshotStartingScanner(
snapshot_manager, specified_snapshot_id.value()));
} else if (scan_tag_name != std::nullopt) {
if (is_streaming) {
return Status::Invalid("Cannot scan from tag in streaming mode");
}
return std::make_shared<StaticFromTagStartingScanner>(snapshot_manager,
scan_tag_name.value());
} else {
return Status::Invalid(
"scan.snapshot-id must be set when startup mode is FROM_SNAPSHOT");
"scan.snapshot-id or scan.tag-name must be set when startup mode is "
"FROM_SNAPSHOT");
}
} else if (startup_mode == StartupMode::FromSnapshotFull()) {
if (specified_snapshot_id != std::nullopt) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>

#include "paimon/core/table/source/snapshot/starting_scanner.h"
#include "paimon/core/utils/tag_manager.h"

namespace paimon {
/// `StartingScanner` for the `CoreOptions::GetScanTagName()` of a batch read.
class StaticFromTagStartingScanner : public StartingScanner {
public:
StaticFromTagStartingScanner(const std::shared_ptr<SnapshotManager>& snapshot_manager,
const std::string& tag_name)
: StartingScanner(snapshot_manager) {
tag_name_ = tag_name;
}

Result<std::shared_ptr<ScanResult>> Scan(
const std::shared_ptr<SnapshotReader>& snapshot_reader) override {
const TagManager tag_manager(snapshot_manager_->Fs(), snapshot_manager_->RootPath(),
snapshot_manager_->Branch());
PAIMON_ASSIGN_OR_RAISE(const Tag tag, tag_manager.GetOrThrow(tag_name_));
PAIMON_ASSIGN_OR_RAISE(const Snapshot snapshot, tag.TrimToSnapshot());
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<Plan> plan,
snapshot_reader->WithMode(ScanMode::ALL)->WithSnapshot(snapshot)->Read());
return std::make_shared<StartingScanner::CurrentSnapshot>(plan);
}

private:
std::string tag_name_;
};
} // namespace paimon
115 changes: 115 additions & 0 deletions src/paimon/core/tag/tag.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/core/tag/tag.h"

#include <cassert>
#include <stdexcept>
#include <utility>

#include "paimon/common/utils/rapidjson_util.h"
#include "paimon/fs/file_system.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "rapidjson/allocators.h"
#include "rapidjson/document.h"
#include "rapidjson/rapidjson.h"

namespace paimon {

Tag::Tag(const std::optional<int32_t>& version, int64_t id, int64_t schema_id,
const std::string& base_manifest_list,
const std::optional<int64_t>& base_manifest_list_size,
const std::string& delta_manifest_list,
const std::optional<int64_t>& delta_manifest_list_size,
const std::optional<std::string>& changelog_manifest_list,
const std::optional<int64_t>& changelog_manifest_list_size,
const std::optional<std::string>& index_manifest, const std::string& commit_user,
int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis,
const std::optional<std::map<int32_t, int64_t>>& log_offsets,
const std::optional<int64_t>& total_record_count,
const std::optional<int64_t>& delta_record_count,
const std::optional<int64_t>& changelog_record_count,
const std::optional<int64_t>& watermark, const std::optional<std::string>& statistics,
const std::optional<std::map<std::string, std::string>>& properties,
const std::optional<int64_t>& next_row_id, const std::optional<int64_t>& tag_create_time,
const std::optional<int64_t>& tag_time_retained)
: Snapshot(version, id, schema_id, base_manifest_list, base_manifest_list_size,
delta_manifest_list, delta_manifest_list_size, changelog_manifest_list,
changelog_manifest_list_size, index_manifest, commit_user, commit_identifier,
commit_kind, time_millis, log_offsets, total_record_count, delta_record_count,
changelog_record_count, watermark, statistics, properties, next_row_id),
tag_create_time_(tag_create_time),
tag_time_retained_(tag_time_retained) {}

bool Tag::operator==(const Tag& other) const {
if (this == &other) {
return true;
}
return Snapshot::operator==(other) && tag_create_time_ == other.tag_create_time_ &&
tag_time_retained_ == other.tag_time_retained_;
}

bool Tag::TEST_Equal(const Tag& other) const {
if (this == &other) {
return true;
}

return Snapshot::TEST_Equal(other) && tag_create_time_ == other.tag_create_time_ &&
tag_time_retained_ == other.tag_time_retained_;
}

Result<Snapshot> Tag::TrimToSnapshot() const {
return Snapshot(Version(), Id(), SchemaId(), BaseManifestList(), BaseManifestListSize(),
DeltaManifestList(), DeltaManifestListSize(), ChangelogManifestList(),
ChangelogManifestListSize(), IndexManifest(), CommitUser(), CommitIdentifier(),
GetCommitKind(), TimeMillis(), LogOffsets(), TotalRecordCount(),
DeltaRecordCount(), ChangelogRecordCount(), Watermark(), Statistics(),
Properties(), NextRowId());
}

rapidjson::Value Tag::ToJson(rapidjson::Document::AllocatorType* allocator) const noexcept(false) {
rapidjson::Value obj(rapidjson::kObjectType);
obj = Snapshot::ToJson(allocator);
if (tag_create_time_ != std::nullopt) {
obj.AddMember(rapidjson::StringRef(FIELD_TAG_CREATE_TIME),
RapidJsonUtil::SerializeValue(tag_create_time_.value(), allocator).Move(),
*allocator);
}
if (tag_time_retained_ != std::nullopt) {
obj.AddMember(rapidjson::StringRef(FIELD_TAG_TIME_RETAINED),
RapidJsonUtil::SerializeValue(tag_time_retained_.value(), allocator).Move(),
*allocator);
}
return obj;
}

void Tag::FromJson(const rapidjson::Value& obj) noexcept(false) {
Snapshot::FromJson(obj);
tag_create_time_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(obj, FIELD_TAG_CREATE_TIME);
tag_time_retained_ =
RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(obj, FIELD_TAG_TIME_RETAINED);
}

Result<Tag> Tag::FromPath(const std::shared_ptr<FileSystem>& fs, const std::string& path) {
std::string json_str;
PAIMON_RETURN_NOT_OK(fs->ReadFile(path, &json_str));
Tag tag;
PAIMON_RETURN_NOT_OK(RapidJsonUtil::FromJsonString(json_str, &tag));
return tag;
}
} // namespace paimon
86 changes: 86 additions & 0 deletions src/paimon/core/tag/tag.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstdint>
#include <limits>
#include <map>
#include <memory>
#include <optional>
#include <string>

#include "paimon/common/utils/jsonizable.h"
#include "paimon/core/snapshot.h"
#include "paimon/result.h"
#include "rapidjson/allocators.h"
#include "rapidjson/document.h"
#include "rapidjson/rapidjson.h"

namespace paimon {
class FileSystem;

/// Snapshot with tagCreateTime and tagTimeRetained.
class Tag : public Snapshot {
public:
static constexpr char FIELD_TAG_CREATE_TIME[] = "tagCreateTime";
static constexpr char FIELD_TAG_TIME_RETAINED[] = "tagTimeRetained";

JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(Tag);

Tag(const std::optional<int32_t>& version, int64_t id, int64_t schema_id,
const std::string& base_manifest_list,
const std::optional<int64_t>& base_manifest_list_size,
const std::string& delta_manifest_list,
const std::optional<int64_t>& delta_manifest_list_size,
const std::optional<std::string>& changelog_manifest_list,
const std::optional<int64_t>& changelog_manifest_list_size,
const std::optional<std::string>& index_manifest, const std::string& commit_user,
int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis,
const std::optional<std::map<int32_t, int64_t>>& log_offsets,
const std::optional<int64_t>& total_record_count,
const std::optional<int64_t>& delta_record_count,
const std::optional<int64_t>& changelog_record_count,
const std::optional<int64_t>& watermark, const std::optional<std::string>& statistics,
const std::optional<std::map<std::string, std::string>>& properties,
const std::optional<int64_t>& next_row_id, const std::optional<int64_t>& tag_create_time,
const std::optional<int64_t>& tag_time_retained);

bool operator==(const Tag& other) const;
bool TEST_Equal(const Tag& other) const;

std::optional<int64_t> TagCreateTime() const {
return tag_create_time_;
}

std::optional<int64_t> TagTimeRetained() const {
return tag_time_retained_;
}

Result<Snapshot> TrimToSnapshot() const;

rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) const
noexcept(false) override;

void FromJson(const rapidjson::Value& obj) noexcept(false) override;

static Result<Tag> FromPath(const std::shared_ptr<FileSystem>& fs, const std::string& path);

private:
std::optional<int64_t> tag_create_time_;
std::optional<int64_t> tag_time_retained_;
};
} // namespace paimon
Loading
Loading