From 4b47b2e878ef5bd0e76aa858442f8cdd82c1779d Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 2 Jan 2026 22:48:12 +0800 Subject: [PATCH] refactor: change DataFile.partition_spec_id to be optional --- src/iceberg/delete_file_index.cc | 40 ++++++++++++------- src/iceberg/manifest/manifest_entry.h | 8 ++-- src/iceberg/manifest/manifest_reader.cc | 9 +++-- .../manifest/manifest_reader_internal.h | 2 +- src/iceberg/test/delete_file_index_test.cc | 4 +- src/iceberg/test/manifest_group_test.cc | 2 +- 6 files changed, 41 insertions(+), 24 deletions(-) diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc index 7c1a12f0a..7c8c35032 100644 --- a/src/iceberg/delete_file_index.cc +++ b/src/iceberg/delete_file_index.cc @@ -168,7 +168,7 @@ Result CanContainEqDeletesForFile(const DataFile& data_file, Status PositionDeletes::Add(ManifestEntry&& entry) { ICEBERG_PRECHECK(entry.sequence_number.has_value(), - "Missing sequence number for position delete: {}", + "Missing sequence number from position delete: {}", entry.data_file->file_path); files_.emplace_back(std::move(entry)); indexed_ = false; @@ -213,7 +213,7 @@ void PositionDeletes::IndexIfNeeded() { Status EqualityDeletes::Add(ManifestEntry&& entry) { ICEBERG_PRECHECK(entry.sequence_number.has_value(), - "Missing sequence number for equality delete: {}", + "Missing sequence number from equality delete: {}", entry.data_file->file_path); files_.emplace_back(&schema_, std::move(entry)); indexed_ = false; @@ -343,7 +343,7 @@ Result>> DeleteFileIndex::ForEntry( const ManifestEntry& entry) const { ICEBERG_PRECHECK(entry.data_file != nullptr, "Manifest entry has null data file"); ICEBERG_PRECHECK(entry.sequence_number.has_value(), - "Missing sequence number for data file: {}", + "Missing sequence number from data file: {}", entry.data_file->file_path); return ForDataFile(entry.sequence_number.value(), *entry.data_file); } @@ -396,8 +396,11 @@ Result>> DeleteFileIndex::FindEqPartitionD return {}; } - auto deletes = - eq_deletes_by_partition_->get(data_file.partition_spec_id, data_file.partition); + ICEBERG_PRECHECK(data_file.partition_spec_id.has_value(), + "Missing partition spec id from data file {}", data_file.file_path); + + auto deletes = eq_deletes_by_partition_->get(data_file.partition_spec_id.value(), + data_file.partition); if (!deletes.has_value()) { return {}; } @@ -410,8 +413,11 @@ Result>> DeleteFileIndex::FindPosPartition return {}; } - auto deletes = - pos_deletes_by_partition_->get(data_file.partition_spec_id, data_file.partition); + ICEBERG_PRECHECK(data_file.partition_spec_id.has_value(), + "Missing partition spec id from data file {}", data_file.file_path); + + auto deletes = pos_deletes_by_partition_->get(data_file.partition_spec_id.value(), + data_file.partition); if (!deletes.has_value()) { return {}; } @@ -606,7 +612,7 @@ Result> DeleteFileIndex::Builder::LoadDeleteFiles() { for (auto& entry : entries) { ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); ICEBERG_CHECK(entry.sequence_number.has_value(), - "Missing sequence number for delete file: {}", + "Missing sequence number from delete file: {}", entry.data_file->file_path); if (entry.sequence_number.value() > min_sequence_number_) { auto& file = *entry.data_file; @@ -628,8 +634,8 @@ Result> DeleteFileIndex::Builder::LoadDeleteFiles() { Status DeleteFileIndex::Builder::AddDV( std::unordered_map& dv_by_path, ManifestEntry&& entry) { ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); - ICEBERG_PRECHECK(entry.sequence_number.has_value(), "Missing sequence number for DV {}", - entry.data_file->file_path); + ICEBERG_PRECHECK(entry.sequence_number.has_value(), + "Missing sequence number from DV {}", entry.data_file->file_path); const auto& path = entry.data_file->referenced_data_file; ICEBERG_PRECHECK(path.has_value(), "DV must have a referenced data file"); @@ -649,7 +655,7 @@ Status DeleteFileIndex::Builder::AddPositionDelete( ManifestEntry&& entry) { ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); ICEBERG_PRECHECK(entry.sequence_number.has_value(), - "Missing sequence number for position delete {}", + "Missing sequence number from position delete {}", entry.data_file->file_path); ICEBERG_ASSIGN_OR_RAISE(auto referenced_path, @@ -664,7 +670,10 @@ Status DeleteFileIndex::Builder::AddPositionDelete( ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry))); } else { // Partition-scoped position delete - int32_t spec_id = entry.data_file->partition_spec_id; + ICEBERG_PRECHECK(entry.data_file->partition_spec_id.has_value(), + "Missing partition spec id from position delete {}", + entry.data_file->file_path); + int32_t spec_id = entry.data_file->partition_spec_id.value(); const auto& partition = entry.data_file->partition; auto existing = deletes_by_partition.get(spec_id, partition); @@ -686,10 +695,13 @@ Status DeleteFileIndex::Builder::AddEqualityDelete( ManifestEntry&& entry) { ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); ICEBERG_PRECHECK(entry.sequence_number.has_value(), - "Missing sequence number for equality delete {}", + "Missing sequence number from equality delete {}", + entry.data_file->file_path); + ICEBERG_PRECHECK(entry.data_file->partition_spec_id.has_value(), + "Missing partition spec id from equality delete {}", entry.data_file->file_path); - int32_t spec_id = entry.data_file->partition_spec_id; + int32_t spec_id = entry.data_file->partition_spec_id.value(); auto spec_it = specs_by_id_.find(spec_id); if (spec_it == specs_by_id_.end()) { diff --git a/src/iceberg/manifest/manifest_entry.h b/src/iceberg/manifest/manifest_entry.h index 1e4af0445..843bf0aef 100644 --- a/src/iceberg/manifest/manifest_entry.h +++ b/src/iceberg/manifest/manifest_entry.h @@ -147,9 +147,6 @@ struct ICEBERG_EXPORT DataFile { /// order, and should set sort order id to null. Readers must ignore sort order id for /// position delete files. std::optional sort_order_id; - /// This field is not included in spec, so it is not serialized into the manifest file. - /// It is just store in memory representation used in process. - int32_t partition_spec_id = PartitionSpec::kInitialSpecId; /// Field id: 142 /// The _row_id for the first row in the data file. /// @@ -178,6 +175,11 @@ struct ICEBERG_EXPORT DataFile { /// present std::optional content_size_in_bytes; + /// \brief Partition spec id for this data file. + /// \note This field is for internal use only and will not be persisted to manifest + /// entry. + std::optional partition_spec_id; + static constexpr int32_t kContentFieldId = 134; inline static const SchemaField kContent = SchemaField::MakeOptional( kContentFieldId, "content", int32(), diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 834a30bb2..b03c83421 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -814,11 +814,13 @@ Result ManifestReaderImpl::GetMetricsEvaluator() { return metrics_evaluator_.get(); } -bool ManifestReaderImpl::InPartitionSet(const DataFile& file) const { +Result ManifestReaderImpl::InPartitionSet(const DataFile& file) const { if (!partition_set_) { return true; } - return partition_set_->contains(file.partition_spec_id, file.partition); + ICEBERG_PRECHECK(file.partition_spec_id.has_value(), + "Missing partition spec id from data file {}", file.file_path); + return partition_set_->contains(file.partition_spec_id.value(), file.partition); } Status ManifestReaderImpl::OpenReader(std::shared_ptr projection) { @@ -943,7 +945,8 @@ Result> ManifestReaderImpl::ReadEntries(bool only_liv continue; } } - if (!InPartitionSet(*entry.data_file)) { + ICEBERG_ASSIGN_OR_RAISE(bool in_partition_set, InPartitionSet(*entry.data_file)); + if (!in_partition_set) { continue; } } diff --git a/src/iceberg/manifest/manifest_reader_internal.h b/src/iceberg/manifest/manifest_reader_internal.h index 42263808d..2b4b1e0ba 100644 --- a/src/iceberg/manifest/manifest_reader_internal.h +++ b/src/iceberg/manifest/manifest_reader_internal.h @@ -96,7 +96,7 @@ class ManifestReaderImpl : public ManifestReader { Result GetMetricsEvaluator(); /// \brief Check if a partition is in the partition set. - bool InPartitionSet(const DataFile& file) const; + Result InPartitionSet(const DataFile& file) const; // Fields set at construction const std::string manifest_path_; diff --git a/src/iceberg/test/delete_file_index_test.cc b/src/iceberg/test/delete_file_index_test.cc index 8c87772ed..cf75fe2dc 100644 --- a/src/iceberg/test/delete_file_index_test.cc +++ b/src/iceberg/test/delete_file_index_test.cc @@ -108,8 +108,8 @@ class DeleteFileIndexTest : public testing::TestWithParam { .partition = partition, .record_count = 1, .file_size_in_bytes = 10, - .partition_spec_id = spec_id, .referenced_data_file = referenced_file, + .partition_spec_id = spec_id, }); } @@ -141,10 +141,10 @@ class DeleteFileIndexTest : public testing::TestWithParam { .partition = partition, .record_count = 1, .file_size_in_bytes = 10, - .partition_spec_id = spec_id, .referenced_data_file = referenced_file, .content_offset = content_offset, .content_size_in_bytes = content_size, + .partition_spec_id = spec_id, }); } diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc index 32466c228..c7655342b 100644 --- a/src/iceberg/test/manifest_group_test.cc +++ b/src/iceberg/test/manifest_group_test.cc @@ -98,8 +98,8 @@ class ManifestGroupTest : public testing::TestWithParam { .partition = partition, .record_count = 1, .file_size_in_bytes = 10, - .partition_spec_id = spec_id, .referenced_data_file = referenced_file, + .partition_spec_id = spec_id, }); }