From 2825a71c5ad1d7b66914820ac7091b55ba744927 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 12 Mar 2025 22:41:30 -0400 Subject: [PATCH 1/2] refactor: Split transaction module --- crates/iceberg/src/transaction.rs | 1046 ------------------ crates/iceberg/src/transaction/append.rs | 373 +++++++ crates/iceberg/src/transaction/mod.rs | 326 ++++++ crates/iceberg/src/transaction/snapshot.rs | 309 ++++++ crates/iceberg/src/transaction/sort_order.rs | 132 +++ 5 files changed, 1140 insertions(+), 1046 deletions(-) delete mode 100644 crates/iceberg/src/transaction.rs create mode 100644 crates/iceberg/src/transaction/append.rs create mode 100644 crates/iceberg/src/transaction/mod.rs create mode 100644 crates/iceberg/src/transaction/snapshot.rs create mode 100644 crates/iceberg/src/transaction/sort_order.rs diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs deleted file mode 100644 index 15d5c99a1a..0000000000 --- a/crates/iceberg/src/transaction.rs +++ /dev/null @@ -1,1046 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This module contains transaction api. - -use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; -use std::future::Future; -use std::mem::discriminant; -use std::ops::RangeFrom; - -use arrow_array::StringArray; -use futures::TryStreamExt; -use uuid::Uuid; - -use crate::error::Result; -use crate::io::OutputFile; -use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, - ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH, -}; -use crate::table::Table; -use crate::writer::file_writer::ParquetWriter; -use crate::TableUpdate::UpgradeFormatVersion; -use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; - -const META_ROOT_PATH: &str = "metadata"; - -/// Table transaction. -pub struct Transaction<'a> { - table: &'a Table, - updates: Vec, - requirements: Vec, -} - -impl<'a> Transaction<'a> { - /// Creates a new transaction. - pub fn new(table: &'a Table) -> Self { - Self { - table, - updates: vec![], - requirements: vec![], - } - } - - fn append_updates(&mut self, updates: Vec) -> Result<()> { - for update in &updates { - for up in &self.updates { - if discriminant(up) == discriminant(update) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot apply update with same type at same time: {:?}", - update - ), - )); - } - } - } - self.updates.extend(updates); - Ok(()) - } - - fn append_requirements(&mut self, requirements: Vec) -> Result<()> { - self.requirements.extend(requirements); - Ok(()) - } - - /// Sets table to a new version. - pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.table.metadata().format_version(); - match current_version.cmp(&format_version) { - Ordering::Greater => { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot downgrade table version from {} to {}", - current_version, format_version - ), - )); - } - Ordering::Less => { - self.append_updates(vec![UpgradeFormatVersion { format_version }])?; - } - Ordering::Equal => { - // Do nothing. - } - } - Ok(self) - } - - /// Update table's property. - pub fn set_properties(mut self, props: HashMap) -> Result { - self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?; - Ok(self) - } - - fn generate_unique_snapshot_id(&self) -> i64 { - let generate_random_id = || -> i64 { - let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); - let snapshot_id = (lhs ^ rhs) as i64; - if snapshot_id < 0 { - -snapshot_id - } else { - snapshot_id - } - }; - let mut snapshot_id = generate_random_id(); - while self - .table - .metadata() - .snapshots() - .any(|s| s.snapshot_id() == snapshot_id) - { - snapshot_id = generate_random_id(); - } - snapshot_id - } - - /// Creates a fast append action. - pub fn fast_append( - self, - commit_uuid: Option, - key_metadata: Vec, - ) -> Result> { - let snapshot_id = self.generate_unique_snapshot_id(); - FastAppendAction::new( - self, - snapshot_id, - commit_uuid.unwrap_or_else(Uuid::now_v7), - key_metadata, - HashMap::new(), - ) - } - - /// Creates replace sort order action. - pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { - ReplaceSortOrderAction { - tx: self, - sort_fields: vec![], - } - } - - /// Remove properties in table. - pub fn remove_properties(mut self, keys: Vec) -> Result { - self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?; - Ok(self) - } - - /// Commit transaction. - pub async fn commit(self, catalog: &dyn Catalog) -> Result { - let table_commit = TableCommit::builder() - .ident(self.table.identifier().clone()) - .updates(self.updates) - .requirements(self.requirements) - .build(); - - catalog.update_table(table_commit).await - } -} - -/// FastAppendAction is a transaction action for fast append data files to the table. -pub struct FastAppendAction<'a> { - snapshot_produce_action: SnapshotProduceAction<'a>, - check_duplicate: bool, -} - -impl<'a> FastAppendAction<'a> { - #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - tx: Transaction<'a>, - snapshot_id: i64, - commit_uuid: Uuid, - key_metadata: Vec, - snapshot_properties: HashMap, - ) -> Result { - Ok(Self { - snapshot_produce_action: SnapshotProduceAction::new( - tx, - snapshot_id, - key_metadata, - commit_uuid, - snapshot_properties, - )?, - check_duplicate: true, - }) - } - - /// Set whether to check duplicate files - pub fn with_check_duplicate(mut self, v: bool) -> Self { - self.check_duplicate = v; - self - } - - /// Add data files to the snapshot. - pub fn add_data_files( - &mut self, - data_files: impl IntoIterator, - ) -> Result<&mut Self> { - self.snapshot_produce_action.add_data_files(data_files)?; - Ok(self) - } - - /// Adds existing parquet files - #[allow(dead_code)] - async fn add_parquet_files(mut self, file_path: Vec) -> Result> { - if !self - .snapshot_produce_action - .tx - .table - .metadata() - .default_spec - .is_unpartitioned() - { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Appending to partitioned tables is not supported", - )); - } - - let table_metadata = self.snapshot_produce_action.tx.table.metadata(); - - let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.table.file_io(), - file_path, - table_metadata, - ) - .await?; - - self.add_data_files(data_files)?; - - self.apply().await - } - - /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { - // Checks duplicate files - if self.check_duplicate { - let new_files: HashSet<&str> = self - .snapshot_produce_action - .added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); - - let mut manifest_stream = self - .snapshot_produce_action - .tx - .table - .inspect() - .manifests() - .scan() - .await?; - let mut referenced_files = Vec::new(); - - while let Some(batch) = manifest_stream.try_next().await? { - let file_path_array = batch - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "Failed to downcast file_path column to StringArray", - ) - })?; - - for i in 0..batch.num_rows() { - let file_path = file_path_array.value(i); - if new_files.contains(file_path) { - referenced_files.push(file_path.to_string()); - } - } - } - - if !referenced_files.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add files that are already referenced by table, files: {}", - referenced_files.join(", ") - ), - )); - } - } - - self.snapshot_produce_action - .apply(FastAppendOperation, DefaultManifestProcess) - .await - } -} - -struct FastAppendOperation; - -impl SnapshotProduceOperation for FastAppendOperation { - fn operation(&self) -> Operation { - Operation::Append - } - - async fn delete_entries( - &self, - _snapshot_produce: &SnapshotProduceAction<'_>, - ) -> Result> { - Ok(vec![]) - } - - async fn existing_manifest( - &self, - snapshot_produce: &SnapshotProduceAction<'_>, - ) -> Result> { - let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { - return Ok(vec![]); - }; - - let manifest_list = snapshot - .load_manifest_list( - snapshot_produce.tx.table.file_io(), - &snapshot_produce.tx.table.metadata_ref(), - ) - .await?; - - Ok(manifest_list - .entries() - .iter() - .filter(|entry| entry.has_added_files() || entry.has_existing_files()) - .cloned() - .collect()) - } -} - -trait SnapshotProduceOperation: Send + Sync { - fn operation(&self) -> Operation; - #[allow(unused)] - fn delete_entries( - &self, - snapshot_produce: &SnapshotProduceAction, - ) -> impl Future>> + Send; - fn existing_manifest( - &self, - snapshot_produce: &SnapshotProduceAction, - ) -> impl Future>> + Send; -} - -struct DefaultManifestProcess; - -impl ManifestProcess for DefaultManifestProcess { - fn process_manifeset(&self, manifests: Vec) -> Vec { - manifests - } -} - -trait ManifestProcess: Send + Sync { - fn process_manifeset(&self, manifests: Vec) -> Vec; -} - -struct SnapshotProduceAction<'a> { - tx: Transaction<'a>, - snapshot_id: i64, - key_metadata: Vec, - commit_uuid: Uuid, - snapshot_properties: HashMap, - added_data_files: Vec, - // A counter used to generate unique manifest file names. - // It starts from 0 and increments for each new manifest file. - // Note: This counter is limited to the range of (0..u64::MAX). - manifest_counter: RangeFrom, -} - -impl<'a> SnapshotProduceAction<'a> { - pub(crate) fn new( - tx: Transaction<'a>, - snapshot_id: i64, - key_metadata: Vec, - commit_uuid: Uuid, - snapshot_properties: HashMap, - ) -> Result { - Ok(Self { - tx, - snapshot_id, - commit_uuid, - snapshot_properties, - added_data_files: vec![], - manifest_counter: (0..), - key_metadata, - }) - } - - // Check if the partition value is compatible with the partition type. - fn validate_partition_value( - partition_value: &Struct, - partition_type: &StructType, - ) -> Result<()> { - if partition_value.fields().len() != partition_type.fields().len() { - return Err(Error::new( - ErrorKind::DataInvalid, - "Partition value is not compatible with partition type", - )); - } - - for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { - if !field - .field_type - .as_primitive_type() - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "Partition field should only be primitive type.", - ) - })? - .compatible(&value.as_primitive_literal().unwrap()) - { - return Err(Error::new( - ErrorKind::DataInvalid, - "Partition value is not compatible partition type", - )); - } - } - Ok(()) - } - - /// Add data files to the snapshot. - pub fn add_data_files( - &mut self, - data_files: impl IntoIterator, - ) -> Result<&mut Self> { - let data_files: Vec = data_files.into_iter().collect(); - for data_file in &data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { - return Err(Error::new( - ErrorKind::DataInvalid, - "Only data content type is allowed for fast append", - )); - } - Self::validate_partition_value( - data_file.partition(), - self.tx.table.metadata().default_partition_type(), - )?; - } - self.added_data_files.extend(data_files); - Ok(self) - } - - fn new_manifest_output(&mut self) -> Result { - let new_manifest_path = format!( - "{}/{}/{}-m{}.{}", - self.tx.table.metadata().location(), - META_ROOT_PATH, - self.commit_uuid, - self.manifest_counter.next().unwrap(), - DataFileFormat::Avro - ); - self.tx.table.file_io().new_output(new_manifest_path) - } - - // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); - let snapshot_id = self.snapshot_id; - let manifest_entries = added_data_files.into_iter().map(|data_file| { - let builder = ManifestEntry::builder() - .status(crate::spec::ManifestStatus::Added) - .data_file(data_file); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { - builder.snapshot_id(snapshot_id).build() - } else { - // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when - // commit failed. - builder.build() - } - }); - let mut writer = { - let builder = ManifestWriterBuilder::new( - self.new_manifest_output()?, - Some(self.snapshot_id), - self.key_metadata.clone(), - self.tx.table.metadata().current_schema().clone(), - self.tx - .table - .metadata() - .default_partition_spec() - .as_ref() - .clone(), - ); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { - builder.build_v1() - } else { - builder.build_v2_data() - } - }; - for entry in manifest_entries { - writer.add_entry(entry)?; - } - writer.write_manifest_file().await - } - - async fn manifest_file( - &mut self, - snapshot_produce_operation: &OP, - manifest_process: &MP, - ) -> Result> { - let added_manifest = self.write_added_manifest().await?; - let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; - // # TODO - // Support process delete entries. - - let mut manifest_files = vec![added_manifest]; - manifest_files.extend(existing_manifests); - let manifest_files = manifest_process.process_manifeset(manifest_files); - Ok(manifest_files) - } - - // # TODO - // Fulfill this function - fn summary(&self, snapshot_produce_operation: &OP) -> Summary { - Summary { - operation: snapshot_produce_operation.operation(), - additional_properties: self.snapshot_properties.clone(), - } - } - - fn generate_manifest_list_file_path(&self, attempt: i64) -> String { - format!( - "{}/{}/snap-{}-{}-{}.{}", - self.tx.table.metadata().location(), - META_ROOT_PATH, - self.snapshot_id, - attempt, - self.commit_uuid, - DataFileFormat::Avro - ) - } - - /// Finished building the action and apply it to the transaction. - pub async fn apply( - mut self, - snapshot_produce_operation: OP, - process: MP, - ) -> Result> { - let new_manifests = self - .manifest_file(&snapshot_produce_operation, &process) - .await?; - let next_seq_num = self.tx.table.metadata().next_sequence_number(); - - let summary = self.summary(&snapshot_produce_operation); - - let manifest_list_path = self.generate_manifest_list_file_path(0); - - let mut manifest_list_writer = match self.tx.table.metadata().format_version() { - FormatVersion::V1 => ManifestListWriter::v1( - self.tx - .table - .file_io() - .new_output(manifest_list_path.clone())?, - self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), - ), - FormatVersion::V2 => ManifestListWriter::v2( - self.tx - .table - .file_io() - .new_output(manifest_list_path.clone())?, - self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), - next_seq_num, - ), - }; - manifest_list_writer.add_manifests(new_manifests.into_iter())?; - manifest_list_writer.close().await?; - - let commit_ts = chrono::Utc::now().timestamp_millis(); - let new_snapshot = Snapshot::builder() - .with_manifest_list(manifest_list_path) - .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id()) - .with_sequence_number(next_seq_num) - .with_summary(summary) - .with_schema_id(self.tx.table.metadata().current_schema_id()) - .with_timestamp_ms(commit_ts) - .build(); - - self.tx.append_updates(vec![ - TableUpdate::AddSnapshot { - snapshot: new_snapshot, - }, - TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: SnapshotReference::new( - self.snapshot_id, - SnapshotRetention::branch(None, None, None), - ), - }, - ])?; - self.tx.append_requirements(vec![ - TableRequirement::UuidMatch { - uuid: self.tx.table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.table.metadata().current_snapshot_id(), - }, - ])?; - Ok(self.tx) - } -} - -/// Transaction action for replacing sort order. -pub struct ReplaceSortOrderAction<'a> { - tx: Transaction<'a>, - sort_fields: Vec, -} - -impl<'a> ReplaceSortOrderAction<'a> { - /// Adds a field for sorting in ascending order. - pub fn asc(self, name: &str, null_order: NullOrder) -> Result { - self.add_sort_field(name, SortDirection::Ascending, null_order) - } - - /// Adds a field for sorting in descending order. - pub fn desc(self, name: &str, null_order: NullOrder) -> Result { - self.add_sort_field(name, SortDirection::Descending, null_order) - } - - /// Finished building the action and apply it to the transaction. - pub fn apply(mut self) -> Result> { - let unbound_sort_order = SortOrder::builder() - .with_fields(self.sort_fields) - .build_unbound()?; - - let updates = vec![ - TableUpdate::AddSortOrder { - sort_order: unbound_sort_order, - }, - TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }, - ]; - - let requirements = vec![ - TableRequirement::CurrentSchemaIdMatch { - current_schema_id: self.tx.table.metadata().current_schema().schema_id(), - }, - TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id, - }, - ]; - - self.tx.append_requirements(requirements)?; - self.tx.append_updates(updates)?; - Ok(self.tx) - } - - fn add_sort_field( - mut self, - name: &str, - sort_direction: SortDirection, - null_order: NullOrder, - ) -> Result { - let field_id = self - .tx - .table - .metadata() - .current_schema() - .field_id_by_name(name) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Cannot find field {} in table schema", name), - ) - })?; - - let sort_field = SortField::builder() - .source_id(field_id) - .transform(Transform::Identity) - .direction(sort_direction) - .null_order(null_order) - .build(); - - self.sort_fields.push(sort_field); - Ok(self) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::fs::File; - use std::io::BufReader; - - use crate::io::FileIOBuilder; - use crate::scan::tests::TableTestFixture; - use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct, - TableMetadata, - }; - use crate::table::Table; - use crate::transaction::{Transaction, MAIN_BRANCH}; - use crate::{TableIdent, TableRequirement, TableUpdate}; - - fn make_v1_table() -> Table { - let file = File::open(format!( - "{}/testdata/table_metadata/{}", - env!("CARGO_MANIFEST_DIR"), - "TableMetadataV1Valid.json" - )) - .unwrap(); - let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); - - Table::builder() - .metadata(resp) - .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) - .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIOBuilder::new("memory").build().unwrap()) - .build() - .unwrap() - } - - fn make_v2_table() -> Table { - let file = File::open(format!( - "{}/testdata/table_metadata/{}", - env!("CARGO_MANIFEST_DIR"), - "TableMetadataV2Valid.json" - )) - .unwrap(); - let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); - - Table::builder() - .metadata(resp) - .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) - .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIOBuilder::new("memory").build().unwrap()) - .build() - .unwrap() - } - - fn make_v2_minimal_table() -> Table { - let file = File::open(format!( - "{}/testdata/table_metadata/{}", - env!("CARGO_MANIFEST_DIR"), - "TableMetadataV2ValidMinimal.json" - )) - .unwrap(); - let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); - - Table::builder() - .metadata(resp) - .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) - .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIOBuilder::new("memory").build().unwrap()) - .build() - .unwrap() - } - - #[test] - fn test_upgrade_table_version_v1_to_v2() { - let table = make_v1_table(); - let tx = Transaction::new(&table); - let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); - - assert_eq!( - vec![TableUpdate::UpgradeFormatVersion { - format_version: FormatVersion::V2 - }], - tx.updates - ); - } - - #[test] - fn test_upgrade_table_version_v2_to_v2() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); - - assert!( - tx.updates.is_empty(), - "Upgrade table to same version should not generate any updates" - ); - assert!( - tx.requirements.is_empty(), - "Upgrade table to same version should not generate any requirements" - ); - } - - #[test] - fn test_downgrade_table_version() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx.upgrade_table_version(FormatVersion::V1); - - assert!(tx.is_err(), "Downgrade table version should fail!"); - } - - #[test] - fn test_set_table_property() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .set_properties(HashMap::from([("a".to_string(), "b".to_string())])) - .unwrap(); - - assert_eq!( - vec![TableUpdate::SetProperties { - updates: HashMap::from([("a".to_string(), "b".to_string())]) - }], - tx.updates - ); - } - - #[test] - fn test_remove_property() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .remove_properties(vec!["a".to_string(), "b".to_string()]) - .unwrap(); - - assert_eq!( - vec![TableUpdate::RemoveProperties { - removals: vec!["a".to_string(), "b".to_string()] - }], - tx.updates - ); - } - - #[test] - fn test_replace_sort_order() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx.replace_sort_order().apply().unwrap(); - - assert_eq!( - vec![ - TableUpdate::AddSortOrder { - sort_order: Default::default() - }, - TableUpdate::SetDefaultSortOrder { sort_order_id: -1 } - ], - tx.updates - ); - - assert_eq!( - vec![ - TableRequirement::CurrentSchemaIdMatch { - current_schema_id: 1 - }, - TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: 3 - } - ], - tx.requirements - ); - } - - #[tokio::test] - async fn test_fast_append_action() { - let table = make_v2_minimal_table(); - let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); - - // check add data file with incompatible partition value - let data_file = DataFileBuilder::default() - .content(DataContentType::Data) - .file_path("test/3.parquet".to_string()) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::string("test"))])) - .build() - .unwrap(); - assert!(action.add_data_files(vec![data_file.clone()]).is_err()); - - let data_file = DataFileBuilder::default() - .content(DataContentType::Data) - .file_path("test/3.parquet".to_string()) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(300))])) - .build() - .unwrap(); - action.add_data_files(vec![data_file.clone()]).unwrap(); - let tx = action.apply().await.unwrap(); - - // check updates and requirements - assert!( - matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) - ); - assert_eq!( - vec![ - TableRequirement::UuidMatch { - uuid: tx.table.metadata().uuid() - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: tx.table.metadata().current_snapshot_id - } - ], - tx.requirements - ); - - // check manifest list - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { - snapshot - } else { - unreachable!() - }; - let manifest_list = new_snapshot - .load_manifest_list(table.file_io(), table.metadata()) - .await - .unwrap(); - assert_eq!(1, manifest_list.entries().len()); - assert_eq!( - manifest_list.entries()[0].sequence_number, - new_snapshot.sequence_number() - ); - - // check manifset - let manifest = manifest_list.entries()[0] - .load_manifest(table.file_io()) - .await - .unwrap(); - assert_eq!(1, manifest.entries().len()); - assert_eq!( - new_snapshot.sequence_number(), - manifest.entries()[0] - .sequence_number() - .expect("Inherit sequence number by load manifest") - ); - - assert_eq!( - new_snapshot.snapshot_id(), - manifest.entries()[0].snapshot_id().unwrap() - ); - assert_eq!(data_file, *manifest.entries()[0].data_file()); - } - - #[test] - fn test_do_same_update_in_same_transaction() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .remove_properties(vec!["a".to_string(), "b".to_string()]) - .unwrap(); - - let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]); - - assert!( - tx.is_err(), - "Should not allow to do same kinds update in same transaction" - ); - } - - #[tokio::test] - async fn test_add_existing_parquet_files_to_unpartitioned_table() { - let mut fixture = TableTestFixture::new_unpartitioned(); - fixture.setup_unpartitioned_manifest_files().await; - let tx = crate::transaction::Transaction::new(&fixture.table); - - let file_paths = vec![ - format!("{}/1.parquet", &fixture.table_location), - format!("{}/2.parquet", &fixture.table_location), - format!("{}/3.parquet", &fixture.table_location), - ]; - - let fast_append_action = tx.fast_append(None, vec![]).unwrap(); - - // Attempt to add the existing Parquet files with fast append. - let new_tx = fast_append_action - .add_parquet_files(file_paths.clone()) - .await - .expect("Adding existing Parquet files should succeed"); - - let mut found_add_snapshot = false; - let mut found_set_snapshot_ref = false; - for update in new_tx.updates.iter() { - match update { - TableUpdate::AddSnapshot { .. } => { - found_add_snapshot = true; - } - TableUpdate::SetSnapshotRef { - ref_name, - reference, - } => { - found_set_snapshot_ref = true; - assert_eq!(ref_name, crate::transaction::MAIN_BRANCH); - assert!(reference.snapshot_id > 0); - } - _ => {} - } - } - assert!(found_add_snapshot); - assert!(found_set_snapshot_ref); - - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] { - snapshot - } else { - panic!("Expected the first update to be an AddSnapshot update"); - }; - - let manifest_list = new_snapshot - .load_manifest_list(fixture.table.file_io(), fixture.table.metadata()) - .await - .expect("Failed to load manifest list"); - - assert_eq!( - manifest_list.entries().len(), - 2, - "Expected 2 manifest list entries, got {}", - manifest_list.entries().len() - ); - - // Load the manifest from the manifest list - let manifest = manifest_list.entries()[0] - .load_manifest(fixture.table.file_io()) - .await - .expect("Failed to load manifest"); - - // Check that the manifest contains three entries. - assert_eq!(manifest.entries().len(), 3); - - // Verify each file path appears in manifest. - let manifest_paths: Vec = manifest - .entries() - .iter() - .map(|entry| entry.data_file().file_path.clone()) - .collect(); - for path in file_paths { - assert!(manifest_paths.contains(&path)); - } - } -} diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs new file mode 100644 index 0000000000..361924de02 --- /dev/null +++ b/crates/iceberg/src/transaction/append.rs @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use arrow_array::StringArray; +use futures::TryStreamExt; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation, +}; +use crate::transaction::Transaction; +use crate::writer::file_writer::ParquetWriter; +use crate::{Error, ErrorKind}; + +/// FastAppendAction is a transaction action for fast append data files to the table. +pub struct FastAppendAction<'a> { + snapshot_produce_action: SnapshotProduceAction<'a>, + check_duplicate: bool, +} + +impl<'a> FastAppendAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + commit_uuid: Uuid, + key_metadata: Vec, + snapshot_properties: HashMap, + ) -> Result { + Ok(Self { + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + check_duplicate: true, + }) + } + + /// Set whether to check duplicate files + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator, + ) -> Result<&mut Self> { + self.snapshot_produce_action.add_data_files(data_files)?; + Ok(self) + } + + /// Adds existing parquet files + #[allow(dead_code)] + async fn add_parquet_files(mut self, file_path: Vec) -> Result> { + if !self + .snapshot_produce_action + .tx + .table + .metadata() + .default_spec + .is_unpartitioned() + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Appending to partitioned tables is not supported", + )); + } + + let table_metadata = self.snapshot_produce_action.tx.table.metadata(); + + let data_files = ParquetWriter::parquet_files_to_data_files( + self.snapshot_produce_action.tx.table.file_io(), + file_path, + table_metadata, + ) + .await?; + + self.add_data_files(data_files)?; + + self.apply().await + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result> { + // Checks duplicate files + if self.check_duplicate { + let new_files: HashSet<&str> = self + .snapshot_produce_action + .added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); + + let mut manifest_stream = self + .snapshot_produce_action + .tx + .table + .inspect() + .manifests() + .scan() + .await?; + let mut referenced_files = Vec::new(); + + while let Some(batch) = manifest_stream.try_next().await? { + let file_path_array = batch + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Failed to downcast file_path column to StringArray", + ) + })?; + + for i in 0..batch.num_rows() { + let file_path = file_path_array.value(i); + if new_files.contains(file_path) { + referenced_files.push(file_path.to_string()); + } + } + } + + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } + } + + self.snapshot_produce_action + .apply(FastAppendOperation, DefaultManifestProcess) + .await + } +} + +struct FastAppendOperation; + +impl SnapshotProduceOperation for FastAppendOperation { + fn operation(&self) -> Operation { + Operation::Append + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.tx.table.file_io(), + &snapshot_produce.tx.table.metadata_ref(), + ) + .await?; + + Ok(manifest_list + .entries() + .iter() + .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .cloned() + .collect()) + } +} + +#[cfg(test)] +mod tests { + use crate::scan::tests::TableTestFixture; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, MAIN_BRANCH, + }; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::Transaction; + use crate::{TableRequirement, TableUpdate}; + + #[tokio::test] + async fn test_fast_append_action() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let mut action = tx.fast_append(None, vec![]).unwrap(); + + // check add data file with incompatible partition value + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::string("test"))])) + .build() + .unwrap(); + assert!(action.add_data_files(vec![data_file.clone()]).is_err()); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + action.add_data_files(vec![data_file.clone()]).unwrap(); + let tx = action.apply().await.unwrap(); + + // check updates and requirements + assert!( + matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + ); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: tx.table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: tx.table.metadata().current_snapshot_id + } + ], + tx.requirements + ); + + // check manifest list + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { + snapshot + } else { + unreachable!() + }; + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + assert_eq!( + manifest_list.entries()[0].sequence_number, + new_snapshot.sequence_number() + ); + + // check manifset + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(1, manifest.entries().len()); + assert_eq!( + new_snapshot.sequence_number(), + manifest.entries()[0] + .sequence_number() + .expect("Inherit sequence number by load manifest") + ); + + assert_eq!( + new_snapshot.snapshot_id(), + manifest.entries()[0].snapshot_id().unwrap() + ); + assert_eq!(data_file, *manifest.entries()[0].data_file()); + } + + #[tokio::test] + async fn test_add_existing_parquet_files_to_unpartitioned_table() { + let mut fixture = TableTestFixture::new_unpartitioned(); + fixture.setup_unpartitioned_manifest_files().await; + let tx = crate::transaction::Transaction::new(&fixture.table); + + let file_paths = vec![ + format!("{}/1.parquet", &fixture.table_location), + format!("{}/2.parquet", &fixture.table_location), + format!("{}/3.parquet", &fixture.table_location), + ]; + + let fast_append_action = tx.fast_append(None, vec![]).unwrap(); + + // Attempt to add the existing Parquet files with fast append. + let new_tx = fast_append_action + .add_parquet_files(file_paths.clone()) + .await + .expect("Adding existing Parquet files should succeed"); + + let mut found_add_snapshot = false; + let mut found_set_snapshot_ref = false; + for update in new_tx.updates.iter() { + match update { + TableUpdate::AddSnapshot { .. } => { + found_add_snapshot = true; + } + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => { + found_set_snapshot_ref = true; + assert_eq!(ref_name, MAIN_BRANCH); + assert!(reference.snapshot_id > 0); + } + _ => {} + } + } + assert!(found_add_snapshot); + assert!(found_set_snapshot_ref); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] { + snapshot + } else { + panic!("Expected the first update to be an AddSnapshot update"); + }; + + let manifest_list = new_snapshot + .load_manifest_list(fixture.table.file_io(), fixture.table.metadata()) + .await + .expect("Failed to load manifest list"); + + assert_eq!( + manifest_list.entries().len(), + 2, + "Expected 2 manifest list entries, got {}", + manifest_list.entries().len() + ); + + // Load the manifest from the manifest list + let manifest = manifest_list.entries()[0] + .load_manifest(fixture.table.file_io()) + .await + .expect("Failed to load manifest"); + + // Check that the manifest contains three entries. + assert_eq!(manifest.entries().len(), 3); + + // Verify each file path appears in manifest. + let manifest_paths: Vec = manifest + .entries() + .iter() + .map(|entry| entry.data_file().file_path.clone()) + .collect(); + for path in file_paths { + assert!(manifest_paths.contains(&path)); + } + } +} diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs new file mode 100644 index 0000000000..d3c7bc3f9f --- /dev/null +++ b/crates/iceberg/src/transaction/mod.rs @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains transaction api. + +mod append; +mod snapshot; +mod sort_order; + +use std::cmp::Ordering; +use std::collections::HashMap; +use std::mem::discriminant; + +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::FormatVersion; +use crate::table::Table; +use crate::transaction::append::FastAppendAction; +use crate::transaction::sort_order::ReplaceSortOrderAction; +use crate::TableUpdate::UpgradeFormatVersion; +use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; + +/// Table transaction. +pub struct Transaction<'a> { + table: &'a Table, + updates: Vec, + requirements: Vec, +} + +impl<'a> Transaction<'a> { + /// Creates a new transaction. + pub fn new(table: &'a Table) -> Self { + Self { + table, + updates: vec![], + requirements: vec![], + } + } + + fn append_updates(&mut self, updates: Vec) -> Result<()> { + for update in &updates { + for up in &self.updates { + if discriminant(up) == discriminant(update) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot apply update with same type at same time: {:?}", + update + ), + )); + } + } + } + self.updates.extend(updates); + Ok(()) + } + + fn append_requirements(&mut self, requirements: Vec) -> Result<()> { + self.requirements.extend(requirements); + Ok(()) + } + + /// Sets table to a new version. + pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { + let current_version = self.table.metadata().format_version(); + match current_version.cmp(&format_version) { + Ordering::Greater => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade table version from {} to {}", + current_version, format_version + ), + )); + } + Ordering::Less => { + self.append_updates(vec![UpgradeFormatVersion { format_version }])?; + } + Ordering::Equal => { + // Do nothing. + } + } + Ok(self) + } + + /// Update table's property. + pub fn set_properties(mut self, props: HashMap) -> Result { + self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?; + Ok(self) + } + + fn generate_unique_snapshot_id(&self) -> i64 { + let generate_random_id = || -> i64 { + let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); + let snapshot_id = (lhs ^ rhs) as i64; + if snapshot_id < 0 { + -snapshot_id + } else { + snapshot_id + } + }; + let mut snapshot_id = generate_random_id(); + while self + .table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + snapshot_id = generate_random_id(); + } + snapshot_id + } + + /// Creates a fast append action. + pub fn fast_append( + self, + commit_uuid: Option, + key_metadata: Vec, + ) -> Result> { + let snapshot_id = self.generate_unique_snapshot_id(); + FastAppendAction::new( + self, + snapshot_id, + commit_uuid.unwrap_or_else(Uuid::now_v7), + key_metadata, + HashMap::new(), + ) + } + + /// Creates replace sort order action. + pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { + ReplaceSortOrderAction { + tx: self, + sort_fields: vec![], + } + } + + /// Remove properties in table. + pub fn remove_properties(mut self, keys: Vec) -> Result { + self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?; + Ok(self) + } + + /// Commit transaction. + pub async fn commit(self, catalog: &dyn Catalog) -> Result
{ + let table_commit = TableCommit::builder() + .ident(self.table.identifier().clone()) + .updates(self.updates) + .requirements(self.requirements) + .build(); + + catalog.update_table(table_commit).await + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::fs::File; + use std::io::BufReader; + + use crate::io::FileIOBuilder; + use crate::spec::{FormatVersion, TableMetadata}; + use crate::table::Table; + use crate::transaction::Transaction; + use crate::{TableIdent, TableUpdate}; + + fn make_v1_table() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV1Valid.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) + .build() + .unwrap() + } + + pub fn make_v2_table() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2Valid.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) + .build() + .unwrap() + } + + pub fn make_v2_minimal_table() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2ValidMinimal.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) + .build() + .unwrap() + } + + #[test] + fn test_upgrade_table_version_v1_to_v2() { + let table = make_v1_table(); + let tx = Transaction::new(&table); + let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); + + assert_eq!( + vec![TableUpdate::UpgradeFormatVersion { + format_version: FormatVersion::V2 + }], + tx.updates + ); + } + + #[test] + fn test_upgrade_table_version_v2_to_v2() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); + + assert!( + tx.updates.is_empty(), + "Upgrade table to same version should not generate any updates" + ); + assert!( + tx.requirements.is_empty(), + "Upgrade table to same version should not generate any requirements" + ); + } + + #[test] + fn test_downgrade_table_version() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx.upgrade_table_version(FormatVersion::V1); + + assert!(tx.is_err(), "Downgrade table version should fail!"); + } + + #[test] + fn test_set_table_property() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx + .set_properties(HashMap::from([("a".to_string(), "b".to_string())])) + .unwrap(); + + assert_eq!( + vec![TableUpdate::SetProperties { + updates: HashMap::from([("a".to_string(), "b".to_string())]) + }], + tx.updates + ); + } + + #[test] + fn test_remove_property() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx + .remove_properties(vec!["a".to_string(), "b".to_string()]) + .unwrap(); + + assert_eq!( + vec![TableUpdate::RemoveProperties { + removals: vec!["a".to_string(), "b".to_string()] + }], + tx.updates + ); + } + + #[test] + fn test_do_same_update_in_same_transaction() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx + .remove_properties(vec!["a".to_string(), "b".to_string()]) + .unwrap(); + + let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]); + + assert!( + tx.is_err(), + "Should not allow to do same kinds update in same transaction" + ); + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs new file mode 100644 index 0000000000..e59169e2a9 --- /dev/null +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::future::Future; +use std::ops::RangeFrom; + +use uuid::Uuid; + +use crate::error::Result; +use crate::io::OutputFile; +use crate::spec::{ + DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, + ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, Struct, + StructType, Summary, MAIN_BRANCH, +}; +use crate::transaction::Transaction; +use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; + +const META_ROOT_PATH: &str = "metadata"; + +pub trait SnapshotProduceOperation: Send + Sync { + fn operation(&self) -> Operation; + #[allow(unused)] + fn delete_entries( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future>> + Send; + fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future>> + Send; +} + +pub struct DefaultManifestProcess; + +impl ManifestProcess for DefaultManifestProcess { + fn process_manifeset(&self, manifests: Vec) -> Vec { + manifests + } +} + +pub trait ManifestProcess: Send + Sync { + fn process_manifeset(&self, manifests: Vec) -> Vec; +} + +pub struct SnapshotProduceAction<'a> { + pub tx: Transaction<'a>, + snapshot_id: i64, + key_metadata: Vec, + commit_uuid: Uuid, + snapshot_properties: HashMap, + pub added_data_files: Vec, + // A counter used to generate unique manifest file names. + // It starts from 0 and increments for each new manifest file. + // Note: This counter is limited to the range of (0..u64::MAX). + manifest_counter: RangeFrom, +} + +impl<'a> SnapshotProduceAction<'a> { + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + key_metadata: Vec, + commit_uuid: Uuid, + snapshot_properties: HashMap, + ) -> Result { + Ok(Self { + tx, + snapshot_id, + commit_uuid, + snapshot_properties, + added_data_files: vec![], + manifest_counter: (0..), + key_metadata, + }) + } + + // Check if the partition value is compatible with the partition type. + fn validate_partition_value( + partition_value: &Struct, + partition_type: &StructType, + ) -> Result<()> { + if partition_value.fields().len() != partition_type.fields().len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatible with partition type", + )); + } + + for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { + if !field + .field_type + .as_primitive_type() + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Partition field should only be primitive type.", + ) + })? + .compatible(&value.as_primitive_literal().unwrap()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatible partition type", + )); + } + } + Ok(()) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator, + ) -> Result<&mut Self> { + let data_files: Vec = data_files.into_iter().collect(); + for data_file in &data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + Self::validate_partition_value( + data_file.partition(), + self.tx.table.metadata().default_partition_type(), + )?; + } + self.added_data_files.extend(data_files); + Ok(self) + } + + fn new_manifest_output(&mut self) -> Result { + let new_manifest_path = format!( + "{}/{}/{}-m{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + self.commit_uuid, + self.manifest_counter.next().unwrap(), + DataFileFormat::Avro + ); + self.tx.table.file_io().new_output(new_manifest_path) + } + + // Write manifest file for added data files and return the ManifestFile for ManifestList. + async fn write_added_manifest(&mut self) -> Result { + let added_data_files = std::mem::take(&mut self.added_data_files); + let snapshot_id = self.snapshot_id; + let manifest_entries = added_data_files.into_iter().map(|data_file| { + let builder = ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file); + if self.tx.table.metadata().format_version() == FormatVersion::V1 { + builder.snapshot_id(snapshot_id).build() + } else { + // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when + // commit failed. + builder.build() + } + }); + let mut writer = { + let builder = ManifestWriterBuilder::new( + self.new_manifest_output()?, + Some(self.snapshot_id), + self.key_metadata.clone(), + self.tx.table.metadata().current_schema().clone(), + self.tx + .table + .metadata() + .default_partition_spec() + .as_ref() + .clone(), + ); + if self.tx.table.metadata().format_version() == FormatVersion::V1 { + builder.build_v1() + } else { + builder.build_v2_data() + } + }; + for entry in manifest_entries { + writer.add_entry(entry)?; + } + writer.write_manifest_file().await + } + + async fn manifest_file( + &mut self, + snapshot_produce_operation: &OP, + manifest_process: &MP, + ) -> Result> { + let added_manifest = self.write_added_manifest().await?; + let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + // # TODO + // Support process delete entries. + + let mut manifest_files = vec![added_manifest]; + manifest_files.extend(existing_manifests); + let manifest_files = manifest_process.process_manifeset(manifest_files); + Ok(manifest_files) + } + + // # TODO + // Fulfill this function + fn summary(&self, snapshot_produce_operation: &OP) -> Summary { + Summary { + operation: snapshot_produce_operation.operation(), + additional_properties: self.snapshot_properties.clone(), + } + } + + fn generate_manifest_list_file_path(&self, attempt: i64) -> String { + format!( + "{}/{}/snap-{}-{}-{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + self.snapshot_id, + attempt, + self.commit_uuid, + DataFileFormat::Avro + ) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply( + mut self, + snapshot_produce_operation: OP, + process: MP, + ) -> Result> { + let new_manifests = self + .manifest_file(&snapshot_produce_operation, &process) + .await?; + let next_seq_num = self.tx.table.metadata().next_sequence_number(); + + let summary = self.summary(&snapshot_produce_operation); + + let manifest_list_path = self.generate_manifest_list_file_path(0); + + let mut manifest_list_writer = match self.tx.table.metadata().format_version() { + FormatVersion::V1 => ManifestListWriter::v1( + self.tx + .table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.tx.table.metadata().current_snapshot_id(), + ), + FormatVersion::V2 => ManifestListWriter::v2( + self.tx + .table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.tx.table.metadata().current_snapshot_id(), + next_seq_num, + ), + }; + manifest_list_writer.add_manifests(new_manifests.into_iter())?; + manifest_list_writer.close().await?; + + let commit_ts = chrono::Utc::now().timestamp_millis(); + let new_snapshot = Snapshot::builder() + .with_manifest_list(manifest_list_path) + .with_snapshot_id(self.snapshot_id) + .with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id()) + .with_sequence_number(next_seq_num) + .with_summary(summary) + .with_schema_id(self.tx.table.metadata().current_schema_id()) + .with_timestamp_ms(commit_ts) + .build(); + + self.tx.append_updates(vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot, + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ])?; + self.tx.append_requirements(vec![ + TableRequirement::UuidMatch { + uuid: self.tx.table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: self.tx.table.metadata().current_snapshot_id(), + }, + ])?; + Ok(self.tx) + } +} diff --git a/crates/iceberg/src/transaction/sort_order.rs b/crates/iceberg/src/transaction/sort_order.rs new file mode 100644 index 0000000000..4f21eef070 --- /dev/null +++ b/crates/iceberg/src/transaction/sort_order.rs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::spec::{NullOrder, SortDirection, SortField, SortOrder, Transform}; +use crate::transaction::Transaction; +use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; + +/// Transaction action for replacing sort order. +pub struct ReplaceSortOrderAction<'a> { + pub tx: Transaction<'a>, + pub sort_fields: Vec, +} + +impl<'a> ReplaceSortOrderAction<'a> { + /// Adds a field for sorting in ascending order. + pub fn asc(self, name: &str, null_order: NullOrder) -> Result { + self.add_sort_field(name, SortDirection::Ascending, null_order) + } + + /// Adds a field for sorting in descending order. + pub fn desc(self, name: &str, null_order: NullOrder) -> Result { + self.add_sort_field(name, SortDirection::Descending, null_order) + } + + /// Finished building the action and apply it to the transaction. + pub fn apply(mut self) -> Result> { + let unbound_sort_order = SortOrder::builder() + .with_fields(self.sort_fields) + .build_unbound()?; + + let updates = vec![ + TableUpdate::AddSortOrder { + sort_order: unbound_sort_order, + }, + TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }, + ]; + + let requirements = vec![ + TableRequirement::CurrentSchemaIdMatch { + current_schema_id: self.tx.table.metadata().current_schema().schema_id(), + }, + TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id, + }, + ]; + + self.tx.append_requirements(requirements)?; + self.tx.append_updates(updates)?; + Ok(self.tx) + } + + fn add_sort_field( + mut self, + name: &str, + sort_direction: SortDirection, + null_order: NullOrder, + ) -> Result { + let field_id = self + .tx + .table + .metadata() + .current_schema() + .field_id_by_name(name) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot find field {} in table schema", name), + ) + })?; + + let sort_field = SortField::builder() + .source_id(field_id) + .transform(Transform::Identity) + .direction(sort_direction) + .null_order(null_order) + .build(); + + self.sort_fields.push(sort_field); + Ok(self) + } +} + +#[cfg(test)] +mod tests { + use crate::transaction::tests::make_v2_table; + use crate::transaction::Transaction; + use crate::{TableRequirement, TableUpdate}; + + #[test] + fn test_replace_sort_order() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx.replace_sort_order().apply().unwrap(); + + assert_eq!( + vec![ + TableUpdate::AddSortOrder { + sort_order: Default::default() + }, + TableUpdate::SetDefaultSortOrder { sort_order_id: -1 } + ], + tx.updates + ); + + assert_eq!( + vec![ + TableRequirement::CurrentSchemaIdMatch { + current_schema_id: 1 + }, + TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id: 3 + } + ], + tx.requirements + ); + } +} From c337e2ee072bf797fcad82070c53bd80b5c7de12 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Thu, 13 Mar 2025 13:35:29 -0400 Subject: [PATCH 2/2] add visibility --- crates/iceberg/src/transaction/snapshot.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index e59169e2a9..4a3035ba99 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -33,7 +33,7 @@ use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; -pub trait SnapshotProduceOperation: Send + Sync { +pub(crate) trait SnapshotProduceOperation: Send + Sync { fn operation(&self) -> Operation; #[allow(unused)] fn delete_entries( @@ -46,7 +46,7 @@ pub trait SnapshotProduceOperation: Send + Sync { ) -> impl Future>> + Send; } -pub struct DefaultManifestProcess; +pub(crate) struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { fn process_manifeset(&self, manifests: Vec) -> Vec { @@ -54,11 +54,11 @@ impl ManifestProcess for DefaultManifestProcess { } } -pub trait ManifestProcess: Send + Sync { +pub(crate) trait ManifestProcess: Send + Sync { fn process_manifeset(&self, manifests: Vec) -> Vec; } -pub struct SnapshotProduceAction<'a> { +pub(crate) struct SnapshotProduceAction<'a> { pub tx: Transaction<'a>, snapshot_id: i64, key_metadata: Vec,