Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use crate::io::{FileIO, FileMetadata, FileRead};
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::util::available_parallelism;
use crate::{Error, ErrorKind};

/// Default gap between byte ranges below which they are coalesced into a
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ pub mod arrow;
pub(crate) mod delete_file_index;
pub mod encryption;
pub mod test_utils;
mod utils;
pub mod writer;

mod delete_vector;
pub mod metadata_columns;
pub mod puffin;
/// Utility functions and modules.
pub mod util;
35 changes: 34 additions & 1 deletion crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
use crate::runtime::spawn;
use crate::spec::{DataContentType, SnapshotRef};
use crate::table::Table;
use crate::utils::available_parallelism;
use crate::util::available_parallelism;
use crate::{Error, ErrorKind, Result};

/// A stream of arrow [`RecordBatch`]es.
Expand Down Expand Up @@ -683,6 +683,39 @@ pub mod tests {
}
}

/// Creates a fixture with 5 snapshots chained as:
/// S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
/// Useful for testing snapshot history traversal.
pub fn new_with_deep_history() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let table_metadata1_location = table_location.join("metadata/v1.json");

let file_io = FileIO::new_with_fs();

let table_metadata = {
let json_str = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2_deep_history.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
serde_json::from_str::<TableMetadata>(&json_str).unwrap()
};

let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
.build()
.unwrap();

Self {
table_location: table_location.to_str().unwrap().to_string(),
table,
}
}

pub fn new_unpartitioned() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

use std::num::NonZeroUsize;

/// Utilities for working with snapshots.
pub mod snapshot;

// Use a default value of 1 as the safest option.
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
// for more details.
Expand Down
185 changes: 185 additions & 0 deletions crates/iceberg/src/util/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::spec::{SnapshotRef, TableMetadataRef};

struct Ancestors {
Comment thread
blackmwk marked this conversation as resolved.
next: Option<SnapshotRef>,
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
}

impl Iterator for Ancestors {
type Item = SnapshotRef;

fn next(&mut self) -> Option<Self::Item> {
let snapshot = self.next.take()?;
self.next = snapshot
.parent_snapshot_id()
.and_then(|id| (self.get_snapshot)(id));
Some(snapshot)
}
}

/// Iterate starting from `snapshot_id` (inclusive) to the root snapshot.
pub fn ancestors_of(
Comment thread
blackmwk marked this conversation as resolved.
table_metadata: &TableMetadataRef,
snapshot_id: i64,
) -> impl Iterator<Item = SnapshotRef> + Send {
let initial = table_metadata.snapshot_by_id(snapshot_id).cloned();
let table_metadata = table_metadata.clone();
Ancestors {
next: initial,
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
}
}

/// Iterate starting from `latest_snapshot_id` (inclusive) to `oldest_snapshot_id` (exclusive).
pub fn ancestors_between(
Comment thread
CTTY marked this conversation as resolved.
table_metadata: &TableMetadataRef,
latest_snapshot_id: i64,
oldest_snapshot_id: Option<i64>,
) -> impl Iterator<Item = SnapshotRef> + Send {
ancestors_of(table_metadata, latest_snapshot_id).take_while(move |snapshot| {
oldest_snapshot_id
.map(|id| snapshot.snapshot_id() != id)
.unwrap_or(true)
})
}

#[cfg(test)]
mod tests {
use super::*;
use crate::scan::tests::TableTestFixture;

// Five snapshots chained as: S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
const S1: i64 = 3051729675574597004;
const S2: i64 = 3055729675574597004;
const S3: i64 = 3056729675574597004;
const S4: i64 = 3057729675574597004;
const S5: i64 = 3059729675574597004;

fn metadata() -> TableMetadataRef {
let fixture = TableTestFixture::new_with_deep_history();
std::sync::Arc::new(fixture.table.metadata().clone())
}

// --- ancestors_of ---

#[test]
fn test_ancestors_of_nonexistent_snapshot_returns_empty() {
let meta = metadata();
let ids: Vec<i64> = ancestors_of(&meta, 999).map(|s| s.snapshot_id()).collect();
assert!(ids.is_empty());
}

#[test]
fn test_ancestors_of_root_returns_only_root() {
let meta = metadata();
let ids: Vec<i64> = ancestors_of(&meta, S1).map(|s| s.snapshot_id()).collect();
assert_eq!(ids, vec![S1]);
}

#[test]
fn test_ancestors_of_leaf_returns_full_chain() {
let meta = metadata();
let ids: Vec<i64> = ancestors_of(&meta, S5).map(|s| s.snapshot_id()).collect();
assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
}

#[test]
fn test_ancestors_of_mid_chain_returns_partial_chain() {
let meta = metadata();
let ids: Vec<i64> = ancestors_of(&meta, S3).map(|s| s.snapshot_id()).collect();
assert_eq!(ids, vec![S3, S2, S1]);
}

#[test]
fn test_ancestors_of_second_snapshot() {
let meta = metadata();
let ids: Vec<i64> = ancestors_of(&meta, S2).map(|s| s.snapshot_id()).collect();
assert_eq!(ids, vec![S2, S1]);
}

// --- ancestors_between ---

#[test]
fn test_ancestors_between_same_id_returns_empty() {
let meta = metadata();
let ids: Vec<i64> = ancestors_between(&meta, S3, Some(S3))
.map(|s| s.snapshot_id())
.collect();
assert!(ids.is_empty());
}

#[test]
fn test_ancestors_between_no_oldest_returns_all_ancestors() {
let meta = metadata();
let ids: Vec<i64> = ancestors_between(&meta, S5, None)
.map(|s| s.snapshot_id())
.collect();
assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
}

#[test]
fn test_ancestors_between_excludes_oldest_snapshot() {
let meta = metadata();
// S5 down to (but not including) S2
let ids: Vec<i64> = ancestors_between(&meta, S5, Some(S2))
.map(|s| s.snapshot_id())
.collect();
assert_eq!(ids, vec![S5, S4, S3]);
}

#[test]
fn test_ancestors_between_adjacent_snapshots() {
let meta = metadata();
// S3 down to (but not including) S2 — only S3 itself
let ids: Vec<i64> = ancestors_between(&meta, S3, Some(S2))
.map(|s| s.snapshot_id())
.collect();
assert_eq!(ids, vec![S3]);
}

#[test]
fn test_ancestors_between_leaf_and_root() {
let meta = metadata();
// S5 down to (but not including) S1
let ids: Vec<i64> = ancestors_between(&meta, S5, Some(S1))
.map(|s| s.snapshot_id())
.collect();
assert_eq!(ids, vec![S5, S4, S3, S2]);
}

#[test]
fn test_ancestors_between_nonexistent_oldest_returns_full_chain() {
let meta = metadata();
// oldest_snapshot_id doesn't exist in the chain, so take_while never stops
let ids: Vec<i64> = ancestors_between(&meta, S5, Some(999))
.map(|s| s.snapshot_id())
.collect();
assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
}

#[test]
fn test_ancestors_between_nonexistent_latest_returns_empty() {
let meta = metadata();
let ids: Vec<i64> = ancestors_between(&meta, 999, Some(S1))
.map(|s| s.snapshot_id())
.collect();
assert!(ids.is_empty());
}
}
104 changes: 104 additions & 0 deletions crates/iceberg/testdata/example_table_metadata_v2_deep_history.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
{
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"last-updated-ms": 1602638573590,
"last-column-id": 3,
"current-schema-id": 1,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{"id": 1, "name": "x", "required": true, "type": "long"}
]
},
{
"type": "struct",
"schema-id": 1,
"identifier-field-ids": [1, 2],
"fields": [
{"id": 1, "name": "x", "required": true, "type": "long"},
{"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"},
{"id": 3, "name": "z", "required": true, "type": "long"}
]
}
],
"default-spec-id": 0,
"partition-specs": [
{
"spec-id": 0,
"fields": [
{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}
]
}
],
"last-partition-id": 1000,
"default-sort-order-id": 3,
"sort-orders": [
{
"order-id": 3,
"fields": [
{"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"},
{"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"}
]
}
],
"properties": {},
"current-snapshot-id": 3059729675574597004,
"snapshots": [
{
"snapshot-id": 3051729675574597004,
"timestamp-ms": 1515100955770,
"sequence-number": 0,
"summary": {"operation": "append"},
"manifest-list": "s3://bucket/metadata/snap-3051729675574597004.avro"
},
{
"snapshot-id": 3055729675574597004,
"parent-snapshot-id": 3051729675574597004,
"timestamp-ms": 1555100955770,
"sequence-number": 1,
"summary": {"operation": "append"},
"manifest-list": "s3://bucket/metadata/snap-3055729675574597004.avro",
"schema-id": 1
},
{
"snapshot-id": 3056729675574597004,
"parent-snapshot-id": 3055729675574597004,
"timestamp-ms": 1575100955770,
"sequence-number": 2,
"summary": {"operation": "append"},
"manifest-list": "s3://bucket/metadata/snap-3056729675574597004.avro",
"schema-id": 1
},
{
"snapshot-id": 3057729675574597004,
"parent-snapshot-id": 3056729675574597004,
"timestamp-ms": 1595100955770,
"sequence-number": 3,
"summary": {"operation": "overwrite"},
"manifest-list": "s3://bucket/metadata/snap-3057729675574597004.avro",
"schema-id": 1
},
{
"snapshot-id": 3059729675574597004,
"parent-snapshot-id": 3057729675574597004,
"timestamp-ms": 1602638573590,
"sequence-number": 4,
"summary": {"operation": "append"},
"manifest-list": "s3://bucket/metadata/snap-3059729675574597004.avro",
"schema-id": 1
}
],
"snapshot-log": [
{"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770},
{"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770},
{"snapshot-id": 3056729675574597004, "timestamp-ms": 1575100955770},
{"snapshot-id": 3057729675574597004, "timestamp-ms": 1595100955770},
{"snapshot-id": 3059729675574597004, "timestamp-ms": 1602638573590}
],
"metadata-log": [],
"refs": {"main": {"snapshot-id": 3059729675574597004, "type": "branch"}}
}
Loading