diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index c4e42de..2f94f32 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -123,23 +123,21 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow-arith" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad08897b81588f60ba983e3ca39bda2b179bdd84dced378e7df81a5313802ef8" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "chrono", - "num", + "num-traits", ] [[package]] name = "arrow-array" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8548ca7c070d8db9ce7aa43f37393e4bfcf3f2d3681df278490772fd1673d08d" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -148,29 +146,31 @@ dependencies = [ "chrono", "half", "hashbrown 0.16.0", - "num", + "num-complex", + "num-integer", + "num-traits", ] [[package]] name = "arrow-buffer" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e003216336f70446457e280807a73899dd822feaf02087d31febca1363e2fccc" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "bytes", "half", - "num", + "num-bigint", + "num-traits", ] [[package]] name = "arrow-cast" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "919418a0681298d3a77d1a315f625916cb5678ad0d74b9c60108eb15fd083023" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", + "arrow-ord", "arrow-schema", "arrow-select", "atoi", @@ -178,27 +178,26 @@ dependencies = [ "chrono", "half", "lexical-core", - "num", + "num-traits", "ryu", ] [[package]] name = "arrow-data" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5c64fff1d142f833d78897a772f2e5b55b36cb3e6320376f0961ab0db7bd6d0" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-buffer", "arrow-schema", "half", - "num", + "num-integer", + "num-traits", ] [[package]] name = "arrow-ipc" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d3594dcddccc7f20fd069bc8e9828ce37220372680ff638c5e00dea427d88f5" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", @@ -210,9 +209,8 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c8f82583eb4f8d84d4ee55fd1cb306720cddead7596edce95b50ee418edf66f" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", @@ -223,29 +221,26 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" [[package]] name = "arrow-select" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c41dbbd1e97bfcaee4fcb30e29105fb2c75e4d82ae4de70b792a5d3f66b2e7a" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "ahash 0.8.12", "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", - "num", + "num-traits", ] [[package]] name = "arrow-string" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53f5183c150fbc619eede22b861ea7c0eebed8eaac0333eaa7f6da5205fd504d" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", @@ -253,7 +248,7 @@ dependencies = [ "arrow-schema", "arrow-select", "memchr", - "num", + "num-traits", "regex", "regex-syntax", ] @@ -1518,7 +1513,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=ae83309fd198ec30f052a7e9f983711c5f581aea#ae83309fd198ec30f052a7e9f983711c5f581aea" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=cd1daca8d45eb2a78bc90f1eec18435502c6bc04#cd1daca8d45eb2a78bc90f1eec18435502c6bc04" dependencies = [ "anyhow", "apache-avro", @@ -1563,7 +1558,6 @@ dependencies = [ "serde_repr", "serde_with", "strum", - "thrift", "tokio", "typed-builder", "url", @@ -1943,9 +1937,9 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" [[package]] name = "lz4_flex" -version = "0.11.5" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" +checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e" dependencies = [ "twox-hash", ] @@ -2079,20 +2073,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "num" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" -dependencies = [ - "num-bigint", - "num-complex", - "num-integer", - "num-iter", - "num-rational", - "num-traits", -] - [[package]] name = "num-bigint" version = "0.4.6" @@ -2128,28 +2108,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-iter" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - -[[package]] -name = "num-rational" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" -dependencies = [ - "num-bigint", - "num-integer", - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.19" @@ -2404,9 +2362,8 @@ dependencies = [ [[package]] name = "parquet" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0dbd48ad52d7dccf8ea1b90a3ddbfaea4f69878dd7683e51c507d4bc52b5b27" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -2425,8 +2382,9 @@ dependencies = [ "half", "hashbrown 0.16.0", "lz4_flex", - "num", "num-bigint", + "num-integer", + "num-traits", "paste", "seq-macro", "simdutf8", @@ -3579,15 +3537,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "thrift" version = "0.17.0" @@ -3596,9 +3545,7 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding", - "log", "ordered-float 2.10.1", - "threadpool", ] [[package]] diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 29c9ec3..e0bd02f 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -12,14 +12,14 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "ae83309fd198ec30f052a7e9f983711c5f581aea" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "cd1daca8d45eb2a78bc90f1eec18435502c6bc04" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" libc = "0.2" anyhow = "1.0" -arrow-array = "56.2.0" -arrow-ipc = "56.2.0" +arrow-array = { git = "https://github.com/apache/arrow-rs", rev = "fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" } +arrow-ipc = { git = "https://github.com/apache/arrow-rs", rev = "fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" } tracing-subscriber = "0.3" tracing = "0.1" once_cell = "1.19" diff --git a/iceberg_rust_ffi/src/full.rs b/iceberg_rust_ffi/src/full.rs index 7ede34a..fae6c86 100644 --- a/iceberg_rust_ffi/src/full.rs +++ b/iceberg_rust_ffi/src/full.rs @@ -54,6 +54,8 @@ impl_with_batch_size!(iceberg_scan_with_batch_size, IcebergScan); impl_scan_builder_method!(iceberg_scan_with_file_column, IcebergScan, with_file_column); +impl_scan_builder_method!(iceberg_scan_with_pos_column, IcebergScan, with_pos_column); + impl_scan_build!(iceberg_scan_build, IcebergScan); // Async function to initialize stream from a table scan diff --git a/iceberg_rust_ffi/src/incremental.rs b/iceberg_rust_ffi/src/incremental.rs index 3d85368..4f8ee1e 100644 --- a/iceberg_rust_ffi/src/incremental.rs +++ b/iceberg_rust_ffi/src/incremental.rs @@ -130,6 +130,12 @@ impl_scan_builder_method!( with_file_column ); +impl_scan_builder_method!( + iceberg_incremental_scan_with_pos_column, + IcebergIncrementalScan, + with_pos_column +); + impl_scan_build!(iceberg_incremental_scan_build, IcebergIncrementalScan); // Get unzipped Arrow streams from incremental scan (async) diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index c372924..b4d8276 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -11,9 +11,9 @@ export IcebergException export new_incremental_scan, free_incremental_scan! export table_open, free_table, new_scan, free_scan! export select_columns!, with_batch_size!, with_data_file_concurrency_limit!, with_manifest_entry_concurrency_limit! -export with_file_column! +export with_file_column!, with_pos_column! export scan!, next_batch, free_batch, free_stream -export FILE_COLUMN +export FILE_COLUMN, POS_COLUMN # Always use the JLL library - override via Preferences if needed for local development # To use a local build, set the preference: diff --git a/src/full.jl b/src/full.jl index b64a26e..68cb3b8 100644 --- a/src/full.jl +++ b/src/full.jl @@ -133,6 +133,32 @@ function with_file_column!(scan::Scan) return nothing end +""" + with_pos_column!(scan::Scan) + +Add the _pos metadata column to the scan. + +The _pos column contains the position of each row within its data file, which can +be useful for tracking row locations and debugging. + +# Example +```julia +scan = new_scan(table) +with_pos_column!(scan) +stream = scan!(scan) +``` +""" +function with_pos_column!(scan::Scan) + result = @ccall rust_lib.iceberg_scan_with_pos_column( + convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}} + )::Cint + + if result != 0 + error("Failed to add pos column to scan") + end + return nothing +end + """ build!(scan::Scan) diff --git a/src/incremental.jl b/src/incremental.jl index a18254c..5bfae67 100644 --- a/src/incremental.jl +++ b/src/incremental.jl @@ -186,6 +186,32 @@ function with_file_column!(scan::IncrementalScan) return nothing end +""" + with_pos_column!(scan::IncrementalScan) + +Add the _pos metadata column to the incremental scan. + +The _pos column contains the position of each row within its data file, which can +be useful for tracking row locations during incremental scans. + +# Example +```julia +scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id) +with_pos_column!(scan) +inserts_stream, deletes_stream = scan!(scan) +``` +""" +function with_pos_column!(scan::IncrementalScan) + result = @ccall rust_lib.iceberg_incremental_scan_with_pos_column( + convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}} + )::Cint + + if result != 0 + error("Failed to add pos column to incremental scan") + end + return nothing +end + """ build!(scan::IncrementalScan) diff --git a/src/scan_common.jl b/src/scan_common.jl index c002e36..cf94dbd 100644 --- a/src/scan_common.jl +++ b/src/scan_common.jl @@ -27,6 +27,25 @@ stream = scan!(scan) """ const FILE_COLUMN = "_file" +""" + POS_COLUMN + +The name of the metadata column containing row positions within files (_pos). + +This constant can be used with the `select_columns!` function to include +position information in query results. It corresponds to the _pos metadata +column in Iceberg tables, which represents the row's position within its data file. + +# Example +```julia +# Select specific columns including the position +scan = new_scan(table) +select_columns!(scan, ["id", "name", POS_COLUMN]) +stream = scan!(scan) +``` +""" +const POS_COLUMN = "_pos" + """ BatchResponse diff --git a/test/runtests.jl b/test/runtests.jl index a077b90..a61ba77 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -688,6 +688,11 @@ end # Verify file column contains file paths (strings ending in .parquet) file_paths = df._file @test all(endswith.(file_paths, ".parquet")) + # Verify file paths are non-empty strings with proper structure + @test all(length.(file_paths) .> 0) + # Should be full S3 paths like "s3://warehouse/tpch.sf01/customer/data/data_customer-00000.parquet" + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test eltype(file_paths) <: AbstractString RustyIceberg.free_batch(batch_ptr) println("✅ select_columns! with with_file_column! test passed for full scan") @@ -722,6 +727,9 @@ end # Verify file column contains file paths file_paths = df._file @test all(endswith.(file_paths, ".parquet")) + @test all(length.(file_paths) .> 0) + @test all(startswith.(file_paths, "s3://warehouse/incremental/")) + @test eltype(file_paths) <: AbstractString RustyIceberg.free_batch(batch_ptr) println("✅ select_columns! with with_file_column! test passed for incremental scan") @@ -760,6 +768,9 @@ end # Verify file column contains file paths file_paths = df._file @test all(endswith.(file_paths, ".parquet")) + @test all(length.(file_paths) .> 0) + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test eltype(file_paths) <: AbstractString RustyIceberg.free_batch(batch_ptr) println("✅ select_columns! with FILE_COLUMN constant test passed") @@ -769,6 +780,181 @@ end RustyIceberg.free_table(table) end end + + @testset "select_columns! with with_pos_column! - Full Scan" begin + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + + # Select specific columns AND include pos metadata + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name"]) + RustyIceberg.with_pos_column!(scan) + stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(stream) + @test batch_ptr != C_NULL + + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + + # Should have selected columns plus pos column + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_pos" in names(df) + @test !isempty(df) + + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer + + # Verify positions are sequential within the batch (starting from 0) + # Positions should start from 0 for the first row in each file + @test minimum(positions) == 0 + # Check that positions are unique and sequential (no gaps or duplicates in same file) + @test length(unique(positions)) == length(positions) + @test maximum(positions) == length(positions) - 1 + + RustyIceberg.free_batch(batch_ptr) + println("✅ select_columns! with with_pos_column! test passed for full scan") + finally + RustyIceberg.free_stream(stream) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + end + + @testset "select_columns! with with_pos_column! - Incremental Scan" begin + table = RustyIceberg.table_open(incremental_path) + scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id) + + # Select specific column AND include pos metadata for incremental scan + RustyIceberg.select_columns!(scan, ["n"]) + RustyIceberg.with_pos_column!(scan) + inserts_stream, deletes_stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(inserts_stream) + if batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + + # Should have the selected column "n" plus pos column + @test "n" in names(df) + @test "_pos" in names(df) + @test !isempty(df) + + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer + + # Verify positions are sequential within the batch + @test minimum(positions) == 0 + @test length(unique(positions)) == length(positions) + @test maximum(positions) == length(positions) - 1 + + RustyIceberg.free_batch(batch_ptr) + println("✅ select_columns! with with_pos_column! test passed for incremental scan") + end + finally + RustyIceberg.free_stream(inserts_stream) + RustyIceberg.free_stream(deletes_stream) + RustyIceberg.free_incremental_scan!(scan) + RustyIceberg.free_table(table) + end + end + + @testset "select_columns! with POS_COLUMN constant" begin + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + + # Select columns including POS_COLUMN constant + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name", RustyIceberg.POS_COLUMN]) + stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(stream) + @test batch_ptr != C_NULL + + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + + # Should have selected columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + # POS_COLUMN should be "_pos" + @test "_pos" in names(df) + @test !isempty(df) + + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer + + # Verify positions are sequential within the batch + @test minimum(positions) == 0 + @test length(unique(positions)) == length(positions) + @test maximum(positions) == length(positions) - 1 + + RustyIceberg.free_batch(batch_ptr) + println("✅ select_columns! with POS_COLUMN constant test passed") + finally + RustyIceberg.free_stream(stream) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + end + + @testset "with_file_column! and with_pos_column! combined" begin + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + + # Select columns and include both file and pos metadata + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name"]) + RustyIceberg.with_file_column!(scan) + RustyIceberg.with_pos_column!(scan) + stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(stream) + @test batch_ptr != C_NULL + + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + + # Should have selected columns plus both metadata columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_file" in names(df) + @test "_pos" in names(df) + @test !isempty(df) + + # Verify both metadata columns + file_paths = df._file + @test all(endswith.(file_paths, ".parquet")) + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test all(df._pos .>= 0) + @test eltype(df._pos) <: Integer + + # Verify positions are sequential within the batch + positions = df._pos + @test minimum(positions) == 0 + @test length(unique(positions)) == length(positions) + @test maximum(positions) == length(positions) - 1 + + RustyIceberg.free_batch(batch_ptr) + println("✅ with_file_column! and with_pos_column! combined test passed") + finally + RustyIceberg.free_stream(stream) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + end end end # End of testset