diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 57c4f62..72a652e 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -24,6 +24,18 @@ jobs: os: ubuntu-latest arch: x64 + rust-test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + with: + workspaces: ./iceberg_rust_ffi + - name: Run Rust tests + run: cargo test + working-directory: iceberg_rust_ffi + rust-format-check: runs-on: ubuntu-latest steps: diff --git a/iceberg_rust_ffi/src/catalog.rs b/iceberg_rust_ffi/src/catalog.rs index 9e4f102..48b216b 100644 --- a/iceberg_rust_ffi/src/catalog.rs +++ b/iceberg_rust_ffi/src/catalog.rs @@ -3,7 +3,7 @@ use crate::response::{ IcebergStringListResponse, }; /// Catalog support for iceberg_rust_ffi -use crate::IcebergTable; +use crate::{unexpected, IcebergTable}; use anyhow::Result; use async_trait::async_trait; use iceberg::io::{ @@ -155,16 +155,14 @@ impl StorageCredentialsLoader for RestCredentialsLoader { .catalog .get() .and_then(|w| w.upgrade()) - .ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "Catalog reference is not available") - })?; + .ok_or_else(|| unexpected("Catalog reference is not available"))?; let response = catalog.load_table_credentials(table_ident).await?; response .storage_credentials .into_iter() .filter(|c| location.starts_with(&c.prefix)) .max_by_key(|c| c.prefix.len()) - .ok_or_else(|| Error::new(ErrorKind::Unexpected, "No matching credential for location")) + .ok_or_else(|| unexpected("No matching credential for location")) } } diff --git a/iceberg_rust_ffi/src/full.rs b/iceberg_rust_ffi/src/full.rs index 1cb7285..aa6815d 100644 --- a/iceberg_rust_ffi/src/full.rs +++ b/iceberg_rust_ffi/src/full.rs @@ -1,50 +1,99 @@ use std::ffi::{c_char, c_void, CStr}; use std::ptr; +use iceberg::io::FileIO; use iceberg::scan::{TableScan, TableScanBuilder}; use object_store_ffi::{ export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT, }; -use tokio::sync::Mutex as AsyncMutex; - use crate::scan_common::*; -use crate::{IcebergArrowStream, IcebergArrowStreamResponse, IcebergTable}; +use crate::{ + IcebergArrowStream, IcebergArrowStreamResponse, IcebergFileScanStream, + IcebergFileScanStreamResponse, IcebergTable, +}; -/// Struct for regular (full) scan builder and scan +/// Holds state for a full table scan across its lifecycle: +/// 1. Construction: `builder` is set, everything else is None/0. +/// 2. Configuration: Julia calls with_* methods that transform `builder`. +/// 3. Build: `builder` is consumed → `scan` is populated. +/// 4. Stream: `scan` + `file_io` + `batch_size` are consumed by +/// `iceberg_arrow_stream` to create the file-parallel pipeline. #[repr(C)] pub struct IcebergScan { pub builder: Option>, pub scan: Option, - /// 0 = auto-detect (num_cpus) + /// Serialization thread count (0 = auto-detect). Currently unused by the + /// pipeline (kept for future use / backward compat with incremental scan). pub serialization_concurrency: usize, + /// Cloned from the Table at construction time. Passed to the pipeline so + /// each per-file ArrowReader can open its own parquet file. + pub file_io: FileIO, + /// Captured when Julia calls with_batch_size. Forwarded to each per-file + /// ArrowReaderBuilder inside the pipeline. + pub batch_size: Option, + /// How many parquet files to process concurrently in the pipeline. + /// Set by with_data_file_concurrency_limit (0 = auto-detect). + pub file_concurrency: usize, + /// How many FileScan items to queue in the outer channel of the nested pipeline. + /// 0 = use file_concurrency as the default. + pub file_prefetch_depth: usize, } unsafe impl Send for IcebergScan {} -/// Create a new scan builder +/// Create a new scan builder from an opened table. +/// Captures `file_io` from the table for later use by the pipeline. #[no_mangle] pub extern "C" fn iceberg_new_scan(table: *mut IcebergTable) -> *mut IcebergScan { if table.is_null() { return ptr::null_mut(); } let table_ref = unsafe { &*table }; + let file_io = table_ref.table.file_io().clone(); let scan_builder = table_ref.table.scan(); Box::into_raw(Box::new(IcebergScan { builder: Some(scan_builder), scan: None, serialization_concurrency: 0, + file_io, + batch_size: None, + file_concurrency: 0, + file_prefetch_depth: 0, })) } -// Use macros from scan_common for shared functionality +// ── Scan builder configuration (via macros from scan_common.rs) ───────── + impl_select_columns!(iceberg_select_columns, IcebergScan); -impl_scan_builder_method!( - iceberg_scan_with_data_file_concurrency_limit, - IcebergScan, - with_data_file_concurrency_limit, - n: usize -); +/// Set file concurrency. Hand-written (not macro-generated) because we need +/// to both (a) forward the value to the iceberg-rs scan builder and (b) +/// capture it in `file_concurrency` for our pipeline. +#[no_mangle] +pub extern "C" fn iceberg_scan_with_data_file_concurrency_limit( + scan: &mut *mut IcebergScan, + n: usize, +) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; + } + let scan_ref = unsafe { Box::from_raw(*scan) }; + if scan_ref.builder.is_none() { + return CResult::Error; + } + *scan = Box::into_raw(Box::new(IcebergScan { + builder: scan_ref + .builder + .map(|b| b.with_data_file_concurrency_limit(n)), + scan: scan_ref.scan, + serialization_concurrency: scan_ref.serialization_concurrency, + file_io: scan_ref.file_io, + batch_size: scan_ref.batch_size, + file_concurrency: n, + file_prefetch_depth: scan_ref.file_prefetch_depth, + })); + CResult::Ok +} impl_scan_builder_method!( iceberg_scan_with_manifest_file_concurrency_limit, @@ -73,6 +122,8 @@ impl_with_serialization_concurrency_limit!( IcebergScan ); +impl_with_file_prefetch_depth!(iceberg_scan_with_file_prefetch_depth, IcebergScan); + impl_scan_builder_method!( iceberg_scan_with_snapshot_id, IcebergScan, @@ -80,7 +131,36 @@ impl_scan_builder_method!( snapshot_id: i64 ); -// Async function to initialize stream from a table scan +// ── Stream creation ───────────────────────────────────────────────────── +// +// Instead of calling iceberg-rs's `scan.to_arrow()` (which uses +// `try_for_each_concurrent` internally and interleaves batches across +// files in arbitrary order), we: +// 1. Call `scan.plan_files()` to get an ordered list of FileScanTasks. +// 2. Feed them into our own file-parallel pipeline +// (ordered_file_pipeline.rs) which processes N files concurrently +// but yields batches in strict file-then-row order. + +/// Resolve the pipeline tuning parameters from a configured scan. +/// Returns `(concurrency, prefetch_depth, file_io, batch_size)`. +/// A stored value of 0 means "auto": concurrency defaults to available +/// parallelism; prefetch_depth defaults to concurrency. +fn resolve_pipeline_params(scan: &IcebergScan) -> (usize, usize, FileIO, Option) { + let concurrency = if scan.file_concurrency == 0 { + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + } else { + scan.file_concurrency + }; + let prefetch_depth = if scan.file_prefetch_depth == 0 { + concurrency + } else { + scan.file_prefetch_depth + }; + (concurrency, prefetch_depth, scan.file_io.clone(), scan.batch_size) +} + export_runtime_op!( iceberg_arrow_stream, IcebergArrowStreamResponse, @@ -88,41 +168,74 @@ export_runtime_op!( if scan.is_null() { return Err(anyhow::anyhow!("Null scan pointer provided")); } - let scan_ptr = unsafe { &*scan }; + let scan_ptr = unsafe { &mut *scan }; let scan_ref = &scan_ptr.scan; if scan_ref.is_none() { return Err(anyhow::anyhow!("Scan not initialized")); } - - // Determine concurrency (0 = auto-detect) - let serialization_concurrency = scan_ptr.serialization_concurrency; - let serialization_concurrency = if serialization_concurrency == 0 { - std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1) - } else { - serialization_concurrency - }; - - Ok((scan_ref.as_ref().unwrap(), serialization_concurrency)) + let (concurrency, prefetch_depth, file_io, batch_size) = + resolve_pipeline_params(scan_ptr); + Ok((scan_ref.as_ref().unwrap(), concurrency, prefetch_depth, file_io, batch_size)) }, result_tuple, async { - let (scan_ref, serialization_concurrency) = result_tuple; + let (scan_ref, concurrency, prefetch_depth, file_io, batch_size) = result_tuple; - let stream = scan_ref.to_arrow().await?; + // Collect the ordered file task list from iceberg-rs. + use futures::TryStreamExt; + let tasks: Vec = + scan_ref.plan_files().await?.try_collect().await?; - // Transform stream: RecordBatch -> ArrowBatch with parallel serialization - let serialized_stream = crate::transform_stream_with_parallel_serialization( - stream, - serialization_concurrency - ); + // Hand off to the file-parallel pipeline. + let stream = crate::ordered_file_pipeline::create_pipeline( + tasks, file_io, batch_size, concurrency, prefetch_depth, + ) + .await?; - Ok::(IcebergArrowStream { - stream: AsyncMutex::new(serialized_stream), - }) + Ok::(stream) }, scan: *mut IcebergScan ); impl_scan_free!(iceberg_scan_free, IcebergScan); + +// ── Nested stream creation ─────────────────────────────────────────────── +// +// Returns an IcebergFileScanStream whose items are per-file (filename, +// record_count, inner-batch-stream) tuples, yielded in strict file order. +// The flat iceberg_arrow_stream is implemented as a flattening wrapper over +// this same nested pipeline. + +export_runtime_op!( + iceberg_file_scan_stream, + IcebergFileScanStreamResponse, + || { + if scan.is_null() { + return Err(anyhow::anyhow!("Null scan pointer provided")); + } + let scan_ptr = unsafe { &mut *scan }; + let scan_ref = &scan_ptr.scan; + if scan_ref.is_none() { + return Err(anyhow::anyhow!("Scan not initialized")); + } + let (concurrency, prefetch_depth, file_io, batch_size) = + resolve_pipeline_params(scan_ptr); + Ok((scan_ref.as_ref().unwrap(), concurrency, prefetch_depth, file_io, batch_size)) + }, + result_tuple, + async { + let (scan_ref, concurrency, prefetch_depth, file_io, batch_size) = result_tuple; + + use futures::TryStreamExt; + let tasks: Vec = + scan_ref.plan_files().await?.try_collect().await?; + + let stream = crate::ordered_file_pipeline::create_nested_pipeline( + tasks, file_io, batch_size, concurrency, prefetch_depth, + ) + .await?; + + Ok::(stream) + }, + scan: *mut IcebergScan +); diff --git a/iceberg_rust_ffi/src/incremental.rs b/iceberg_rust_ffi/src/incremental.rs index 9e1d1fe..fff8b2c 100644 --- a/iceberg_rust_ffi/src/incremental.rs +++ b/iceberg_rust_ffi/src/incremental.rs @@ -22,6 +22,11 @@ pub struct IcebergIncrementalScan { pub scan: Option, /// 0 = auto-detect (num_cpus) pub serialization_concurrency: usize, + // Present for macro compatibility with IcebergScan; unused for incremental. + pub file_io: Option, + pub batch_size: Option, + pub file_concurrency: usize, + pub file_prefetch_depth: usize, } unsafe impl Send for IcebergIncrementalScan {} @@ -102,6 +107,10 @@ pub extern "C" fn iceberg_new_incremental_scan( builder: Some(scan_builder), scan: None, serialization_concurrency: 0, + file_io: None, + batch_size: None, + file_concurrency: 0, + file_prefetch_depth: 0, })) } diff --git a/iceberg_rust_ffi/src/lib.rs b/iceberg_rust_ffi/src/lib.rs index dee2aad..6c5107a 100644 --- a/iceberg_rust_ffi/src/lib.rs +++ b/iceberg_rust_ffi/src/lib.rs @@ -1,6 +1,10 @@ use futures::StreamExt; use std::ffi::{c_char, c_void}; +pub(crate) fn unexpected(msg: impl std::fmt::Display) -> iceberg::Error { + iceberg::Error::new(iceberg::ErrorKind::Unexpected, msg.to_string()) +} + use anyhow::Result; use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; @@ -31,6 +35,12 @@ mod writer; // Column-based writer module (zero-copy from Julia) mod writer_columns; +// Profiling stats for the file-parallel pipeline +mod pipeline_stats; + +// Ordered file-parallel pipeline +mod ordered_file_pipeline; + // Response types module mod response; @@ -46,8 +56,9 @@ pub use response::{ IcebergStringListResponse, }; pub use table::{ - ArrowBatch, IcebergArrowStream, IcebergArrowStreamResponse, IcebergBatchResponse, IcebergTable, - IcebergTableResponse, + ArrowBatch, IcebergArrowStream, IcebergArrowStreamResponse, IcebergBatchResponse, + IcebergFileScan, IcebergFileScanResponse, IcebergFileScanStream, IcebergFileScanStreamResponse, + IcebergTable, IcebergTableResponse, }; pub use transaction::{IcebergDataFiles, IcebergTransaction, IcebergTransactionResponse}; pub use writer::{ @@ -158,23 +169,14 @@ pub(crate) fn transform_stream_with_parallel_serialization( .await { Ok(Ok(arrow_batch)) => Ok(arrow_batch), - Ok(Err(e)) => Err(iceberg::Error::new( - iceberg::ErrorKind::Unexpected, - e.to_string(), - )), - Err(e) => Err(iceberg::Error::new( - iceberg::ErrorKind::Unexpected, - format!("Serialization task panicked: {}", e), - )), + Ok(Err(e)) => Err(unexpected(e)), + Err(e) => Err(unexpected(format!("Serialization task panicked: {e}"))), } } - Err(e) => Err(iceberg::Error::new( - iceberg::ErrorKind::Unexpected, - format!("Stream error: {}", e), - )), + Err(e) => Err(unexpected(format!("Stream error: {e}"))), } }) - .buffer_unordered(concurrency) + .buffered(concurrency) .boxed() } diff --git a/iceberg_rust_ffi/src/ordered_file_pipeline.rs b/iceberg_rust_ffi/src/ordered_file_pipeline.rs new file mode 100644 index 0000000..724b13a --- /dev/null +++ b/iceberg_rust_ffi/src/ordered_file_pipeline.rs @@ -0,0 +1,383 @@ +//! File-parallel pipeline for reading Iceberg tables with strict ordering. +//! +//! # Problem +//! iceberg-rs's `to_arrow()` uses `try_for_each_concurrent` which interleaves +//! batches from different files in arbitrary order. We need strict +//! file-then-row ordering. +//! +//! # Approach +//! We call `plan_files()` to get an ordered list of FileScanTasks, then +//! process N files concurrently while yielding batches in strict file order. +//! +//! Each file task is a background tokio task that: +//! 1. Builds a per-file ArrowReader (same iceberg-rs code path as to_arrow) +//! 2. Reads row groups sequentially → RecordBatch stream +//! 3. Serializes each batch to Arrow IPC (via spawn_blocking) +//! 4. Sends serialized batches into a per-file mpsc channel +//! +//! A consumer (`run_nested`) uses FuturesUnordered to maintain N tasks in +//! flight and yield per-file FileScan values as they become ready. Each +//! FileScan wraps the file's batch channel as an IcebergArrowStream with +//! integrated semaphore-permit release. +//! +//! # Memory bounding +//! Each file task has its own Semaphore(MAX_BUFFERED_BYTES_PER_TASK). After +//! serializing a batch, the task acquires byte_len permits. If the budget is +//! exhausted, the task yields (async, not blocking) until the consumer drains +//! batches and releases permits. This caps each file's buffered output to +//! ~100MB independently. + +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Instant; + +use futures::StreamExt; +use iceberg::arrow::ArrowReaderBuilder; +use iceberg::io::FileIO; +use iceberg::scan::FileScanTask; +use tokio::sync::{mpsc, Mutex as AsyncMutex, Semaphore}; + +use crate::pipeline_stats::{MAX_BUFFERED_BYTES_PER_TASK, STATS}; +use crate::table::{ArrowBatch, IcebergArrowStream, IcebergFileScanStream}; +use crate::unexpected; + +/// Hard cap on file-level concurrency to keep total memory bounded. +const MAX_FILE_CONCURRENCY: usize = 16; + +/// Time an async expression and record its duration into a STATS field. +/// +/// ```ignore +/// let result = timed!(serialize_ns, { tokio::task::spawn_blocking(...) }) +/// .map_err(...)?; +/// ``` +macro_rules! timed { + ($field:ident, $fut:expr) => {{ + let _start = Instant::now(); + let _result = $fut.await; + STATS.add_elapsed(&STATS.$field, _start); + _result + }}; +} + +// =========================================================================== +// Pipeline implementation +// =========================================================================== + +/// A serialized Arrow IPC batch bundled with its byte size and a reference +/// to the originating file task's semaphore. The consumer releases permits +/// after forwarding the batch to Julia, unblocking the producer. +struct BufferedBatch { + batch: ArrowBatch, + byte_len: usize, + semaphore: Arc, +} + +/// Internal per-file scan result: filename, record count, and a prefetched +/// inner batch stream. Used as the item type of the nested pipeline channel. +pub struct FileScan { + pub filename: String, + pub record_count: i64, + pub stream: IcebergArrowStream, +} + +/// Convert a per-file mpsc receiver into an IcebergArrowStream. +/// Semaphore permits are released and STATS updated as each batch is yielded. +fn make_file_stream( + file_rx: mpsc::Receiver>, +) -> IcebergArrowStream { + let stream = futures::stream::unfold(file_rx, |mut rx| async move { + rx.recv().await.map(|item| { + let result = item.map(|buf| { + buf.semaphore.add_permits(buf.byte_len); + STATS.track_buffer_release(buf.byte_len as u64); + buf.batch + }); + (result, rx) + }) + }) + .boxed(); + IcebergArrowStream { + stream: AsyncMutex::new(stream), + } +} + +/// Create the nested file-parallel pipeline and return it as an +/// IcebergFileScanStream. Each item in the outer stream is a FileScan +/// carrying the filename, record count, and a prefetched inner batch stream. +pub async fn create_nested_pipeline( + tasks: Vec, + file_io: FileIO, + batch_size: Option, + concurrency: usize, + prefetch_depth: usize, +) -> anyhow::Result { + if concurrency > MAX_FILE_CONCURRENCY { + anyhow::bail!( + "file concurrency {concurrency} exceeds hard cap {MAX_FILE_CONCURRENCY}" + ); + } + + STATS.reset(); + + let (tx, rx) = mpsc::channel::>(prefetch_depth); + + tokio::spawn(run_nested(tasks, file_io, batch_size, concurrency, tx)); + + let stream = futures::stream::unfold(rx, |mut rx| async move { + rx.recv().await.map(|item| (item, rx)) + }) + .boxed(); + + Ok(IcebergFileScanStream { + stream: AsyncMutex::new(stream), + }) +} + +/// Create the flat file-parallel pipeline and return it as an IcebergArrowStream. +/// +/// Implemented by creating the nested pipeline and flattening it, so all +/// ordering, backpressure, and stats logic lives in one place. +pub async fn create_pipeline( + tasks: Vec, + file_io: FileIO, + batch_size: Option, + concurrency: usize, + prefetch_depth: usize, +) -> anyhow::Result { + // Outer channel: flatten task → Julia (via IcebergArrowStream). + let (tx, rx) = mpsc::channel(concurrency * 2); + + let nested = create_nested_pipeline(tasks, file_io, batch_size, concurrency, prefetch_depth).await?; + + tokio::spawn(run_flat(nested, tx)); + + let stream = futures::stream::unfold(rx, |mut rx| async move { + rx.recv().await.map(|item| (item, rx)) + }) + .boxed(); + + Ok(IcebergArrowStream { + stream: AsyncMutex::new(stream), + }) +} + +/// Flatten the nested pipeline into a single batch channel (backwards-compat). +async fn run_flat( + nested: IcebergFileScanStream, + tx: mpsc::Sender>, +) { + let pipeline_start = Instant::now(); + + let mut outer = nested.stream.lock().await; + while let Some(item) = outer.next().await { + match item { + Ok(file_scan) => { + let mut inner = file_scan.stream.stream.lock().await; + while let Some(batch_result) = inner.next().await { + if tx.send(batch_result).await.is_err() { + return; + } + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + return; + } + } + } + + STATS.store_elapsed(&STATS.pipeline_wall_ns, pipeline_start); +} + +/// Main nested consumer loop. Keeps `concurrency` file tasks in flight concurrently using +/// FuturesUnordered and wraps each completed task's receiver as an +/// IcebergArrowStream before sending a FileScan to the outer channel. +async fn run_nested( + tasks: Vec, + file_io: FileIO, + batch_size: Option, + concurrency: usize, + tx: mpsc::Sender>, +) { + use futures::stream::FuturesUnordered; + + // For the nested path this records "time until last FileScan was handed to + // the consumer". For the flat path run_flat overwrites it with the full + // end-to-end time once all batches are drained, so the flat metric is + // unaffected. + let pipeline_start = Instant::now(); + + let mut in_flight = FuturesUnordered::new(); + let mut task_iter = tasks.into_iter(); + + // Seed the first N file tasks. + for _ in 0..concurrency { + if let Some(task) = task_iter.next() { + in_flight.push(spawn_file_task_with_meta(task, file_io.clone(), batch_size)); + } + } + + while let Some(file_result) = in_flight.next().await { + // Eagerly start the next file to keep N tasks in flight. + if let Some(task) = task_iter.next() { + in_flight.push(spawn_file_task_with_meta(task, file_io.clone(), batch_size)); + } + + match file_result { + Ok((filename, record_count, file_rx)) => { + let stream = make_file_stream(file_rx); + let file_scan = FileScan { + filename, + record_count, + stream, + }; + if timed!(file_dispatch_wait_ns, tx.send(Ok(file_scan))).is_err() { + return; // outer consumer dropped the stream + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + return; + } + } + } + + STATS.store_elapsed(&STATS.pipeline_wall_ns, pipeline_start); +} + +/// Spawn a single file task. Returns a future that resolves immediately to +/// the file's metadata and the receiving end of its batch channel. +/// +/// The actual work (parquet I/O, decode, serialize) happens in the background +/// tokio task. The future resolves immediately to (filename, record_count, +/// Receiver) so that FuturesUnordered can poll it alongside other tasks. +fn spawn_file_task_with_meta( + task: FileScanTask, + file_io: FileIO, + batch_size: Option, +) -> impl std::future::Future< + Output = Result< + (String, i64, mpsc::Receiver>), + iceberg::Error, + >, +> { + let filename = task.data_file_path().to_string(); + let record_count = task.record_count.unwrap_or(0) as i64; + + let sem = Arc::new(Semaphore::new(MAX_BUFFERED_BYTES_PER_TASK)); + let (file_tx, file_rx) = mpsc::channel(8); + + tokio::spawn(process_file(task, file_io, batch_size, sem, file_tx)); + + async move { Ok((filename, record_count, file_rx)) } +} + +/// Wrapper around process_file_inner that ensures errors are sent to the +/// channel and stats are updated even on failure. When this function +/// returns, `tx` is dropped, causing the consumer's `file_rx.recv()` to +/// return None — signaling that this file is done. +async fn process_file( + task: FileScanTask, + file_io: FileIO, + batch_size: Option, + semaphore: Arc, + tx: mpsc::Sender>, +) { + STATS.track_task_start(); + let result = process_file_inner(task, file_io, batch_size, semaphore, &tx).await; + if let Err(e) = result { + let _ = tx.send(Err(e)).await; + } + STATS.track_task_end(); +} + +/// Process a single parquet file through four timed phases: +/// 1. Reader setup — build ArrowReader, open file, resolve schema +/// 2. Fetch + decode — I/O, ZSTD decompression, column assembly +/// 3. Serialize — RecordBatch → Arrow IPC for transfer to Julia +/// 4. Backpressure — wait on semaphore if buffered too far ahead +/// +/// Each phase's cumulative time is recorded in STATS for the summary. +async fn process_file_inner( + task: FileScanTask, + file_io: FileIO, + batch_size: Option, + semaphore: Arc, + tx: &mpsc::Sender>, +) -> Result<(), iceberg::Error> { + // ── Phase 1: Reader setup ─────────────────────────────────────────── + // Builds a per-file ArrowReader using the same iceberg-rs code path as + // to_arrow(): opens parquet metadata, resolves schema, loads delete + // files, builds row filters. with_data_file_concurrency_limit(1) means + // this reader processes one file (the one we give it). + let setup_start = Instant::now(); + let mut builder = ArrowReaderBuilder::new(file_io).with_data_file_concurrency_limit(1); + if let Some(bs) = batch_size { + builder = builder.with_batch_size(bs); + } + let reader = builder.build(); + let task_stream = Box::pin(futures::stream::once(async { Ok(task) })); + let batch_stream = reader + .read(task_stream) + .map_err(|e| unexpected(e))?; + STATS.add_elapsed(&STATS.reader_setup_ns, setup_start); + + tokio::pin!(batch_stream); + + loop { + // ── Phase 2: Fetch + decode ───────────────────────────────────── + // Each .next() call fetches compressed parquet pages from storage, + // decompresses (ZSTD), decodes column encodings, and assembles a + // RecordBatch. These are inseparable without forking parquet-rs. + let batch_opt = timed!(fetch_decode_ns, batch_stream.next()); + + let batch = match batch_opt { + Some(Ok(b)) => b, + Some(Err(e)) => return Err(e), + None => break, // end of file + }; + + // ── Phase 3: Serialize to Arrow IPC ───────────────────────────── + // CPU-bound work, offloaded to the blocking thread pool to avoid + // starving the tokio executor. + let serialized = timed!( + serialize_ns, + tokio::task::spawn_blocking(move || crate::serialize_record_batch(batch)) + ) + .map_err(|e| unexpected(format!("serialize panicked: {e}")))? + .map_err(|e| unexpected(e))?; + + let byte_len = serialized.length; + STATS.batches_produced.fetch_add(1, Ordering::Relaxed); + STATS + .bytes_produced + .fetch_add(byte_len as u64, Ordering::Relaxed); + + // ── Phase 4: Backpressure ─────────────────────────────────────── + // Acquire permits equal to the serialized size. If this file task + // has produced > MAX_BUFFERED_BYTES_PER_TASK ahead of the consumer, + // this yields (async, not thread-blocking) until permits are freed. + let _permit = timed!(semaphore_wait_ns, semaphore.acquire_many(byte_len as u32)) + .map_err(|e| unexpected(format!("semaphore: {e}")))?; + // Detach the permit — the consumer releases it via add_permits(). + std::mem::forget(_permit); + + STATS.track_buffer_add(byte_len as u64); + + // Send the batch to this file's channel. + if tx + .send(Ok(BufferedBatch { + batch: serialized, + byte_len, + semaphore: semaphore.clone(), + })) + .await + .is_err() + { + return Ok(()); // consumer dropped the receiver + } + } + + STATS.files_completed.fetch_add(1, Ordering::Relaxed); + Ok(()) +} diff --git a/iceberg_rust_ffi/src/pipeline_stats.rs b/iceberg_rust_ffi/src/pipeline_stats.rs new file mode 100644 index 0000000..252c007 --- /dev/null +++ b/iceberg_rust_ffi/src/pipeline_stats.rs @@ -0,0 +1,424 @@ +// =========================================================================== +// Pipeline statistics — timing and throughput counters for the file-parallel +// scan pipeline. Global atomics accumulate data across all file tasks and are +// readable from Julia via `@ccall iceberg_print_pipeline_stats()`. +// =========================================================================== + +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::time::Instant; + +pub(crate) const MAX_BUFFERED_BYTES_PER_TASK: usize = 100 * 1024 * 1024; + +/// Accumulates profiling data across all file tasks in a pipeline run. +/// All fields are atomics so concurrent file tasks can update without locks. +pub(crate) struct PipelineStats { + // ── Overall ── + pub pipeline_wall_ns: AtomicU64, + pub files_completed: AtomicUsize, + pub batches_produced: AtomicUsize, + pub bytes_produced: AtomicU64, + + // ── Concurrency ── + pub peak_concurrency: AtomicUsize, + pub active_tasks: AtomicUsize, + + // ── Per-phase cumulative time (ns), summed across all file tasks ── + /// Time to build ArrowReader and call reader.read() (opens parquet, + /// loads metadata, resolves schema, loads delete files, builds filters). + pub reader_setup_ns: AtomicU64, + /// Time inside batch_stream.next() — fetches compressed pages from + /// storage, decompresses (ZSTD), decodes pages, assembles columns. + pub fetch_decode_ns: AtomicU64, + /// Time in spawn_blocking(serialize_record_batch) — writes RecordBatch + /// to Arrow IPC wire format for transfer to Julia. + pub serialize_ns: AtomicU64, + /// Time blocked on per-file semaphore (backpressure from consumer). + pub semaphore_wait_ns: AtomicU64, + + // ── Dispatch ── + /// Time `run_nested` blocks on the outer channel waiting for the consumer + /// to call `next_file_scan` (backpressure from a slow consumer). + pub file_dispatch_wait_ns: AtomicU64, + + // ── Memory ── + /// Live counter of serialized bytes buffered across all file tasks. + pub buffered_bytes: AtomicU64, + /// High-water mark of buffered_bytes. + pub peak_buffered_bytes: AtomicU64, +} + +impl PipelineStats { + pub(crate) const fn new() -> Self { + Self { + pipeline_wall_ns: AtomicU64::new(0), + files_completed: AtomicUsize::new(0), + batches_produced: AtomicUsize::new(0), + bytes_produced: AtomicU64::new(0), + peak_concurrency: AtomicUsize::new(0), + active_tasks: AtomicUsize::new(0), + reader_setup_ns: AtomicU64::new(0), + fetch_decode_ns: AtomicU64::new(0), + serialize_ns: AtomicU64::new(0), + semaphore_wait_ns: AtomicU64::new(0), + file_dispatch_wait_ns: AtomicU64::new(0), + buffered_bytes: AtomicU64::new(0), + peak_buffered_bytes: AtomicU64::new(0), + } + } + + pub(crate) fn reset(&self) { + self.pipeline_wall_ns.store(0, Ordering::Relaxed); + self.files_completed.store(0, Ordering::Relaxed); + self.batches_produced.store(0, Ordering::Relaxed); + self.bytes_produced.store(0, Ordering::Relaxed); + self.peak_concurrency.store(0, Ordering::Relaxed); + self.active_tasks.store(0, Ordering::Relaxed); + self.reader_setup_ns.store(0, Ordering::Relaxed); + self.fetch_decode_ns.store(0, Ordering::Relaxed); + self.serialize_ns.store(0, Ordering::Relaxed); + self.semaphore_wait_ns.store(0, Ordering::Relaxed); + self.file_dispatch_wait_ns.store(0, Ordering::Relaxed); + self.buffered_bytes.store(0, Ordering::Relaxed); + self.peak_buffered_bytes.store(0, Ordering::Relaxed); + } + + pub(crate) fn track_task_start(&self) { + let prev = self.active_tasks.fetch_add(1, Ordering::Relaxed); + self.peak_concurrency.fetch_max(prev + 1, Ordering::Relaxed); + } + + pub(crate) fn track_task_end(&self) { + self.active_tasks.fetch_sub(1, Ordering::Relaxed); + } + + pub(crate) fn add_elapsed(&self, field: &AtomicU64, start: Instant) { + field.fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed); + } + + pub(crate) fn store_elapsed(&self, field: &AtomicU64, start: Instant) { + field.store(start.elapsed().as_nanos() as u64, Ordering::Relaxed); + } + + pub(crate) fn track_buffer_add(&self, bytes: u64) { + let prev = self.buffered_bytes.fetch_add(bytes, Ordering::Relaxed); + self.peak_buffered_bytes + .fetch_max(prev + bytes, Ordering::Relaxed); + } + + /// Release buffered bytes. Uses saturating_sub to guard against underflow + /// caused by lingering tasks from a previous scan releasing after a reset(). + pub(crate) fn track_buffer_release(&self, bytes: u64) { + self.buffered_bytes + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { + Some(v.saturating_sub(bytes)) + }) + .ok(); + } + + pub(crate) fn print_summary(&self) { + let wall_ms = self.pipeline_wall_ns.load(Ordering::Relaxed) as f64 / 1e6; + let files = self.files_completed.load(Ordering::Relaxed); + let batches = self.batches_produced.load(Ordering::Relaxed); + let bytes = self.bytes_produced.load(Ordering::Relaxed); + let peak = self.peak_concurrency.load(Ordering::Relaxed); + let setup_ms = self.reader_setup_ns.load(Ordering::Relaxed) as f64 / 1e6; + let fd_ms = self.fetch_decode_ns.load(Ordering::Relaxed) as f64 / 1e6; + let ser_ms = self.serialize_ns.load(Ordering::Relaxed) as f64 / 1e6; + let sem_ms = self.semaphore_wait_ns.load(Ordering::Relaxed) as f64 / 1e6; + let dispatch_ms = self.file_dispatch_wait_ns.load(Ordering::Relaxed) as f64 / 1e6; + let peak_buf = + self.peak_buffered_bytes.load(Ordering::Relaxed) as f64 / (1024.0 * 1024.0); + + let bytes_mb = bytes as f64 / (1024.0 * 1024.0); + let throughput = if wall_ms > 0.0 { + bytes_mb / (wall_ms / 1000.0) + } else { + 0.0 + }; + let file_wall_ms = setup_ms + fd_ms + ser_ms + sem_ms; + let parallelism = if wall_ms > 0.0 { + file_wall_ms / wall_ms + } else { + 0.0 + }; + let limit_mb = MAX_BUFFERED_BYTES_PER_TASK / (1024 * 1024); + + // Box layout: "│ " + content + padding + " │", total = BOX chars. + // All content uses ASCII only so byte len = display width. + const BOX: usize = 68; // total width including borders + + let row = |content: &str| { + let pad = (BOX - 6).saturating_sub(content.len()); + println!("│ {}{:pad$} │", content, "", pad = pad); + }; + let dashes = |n: usize| -> String { "─".repeat(n) }; + let sep = |label: &str| { + let fill = (BOX - 10).saturating_sub(label.len()); + println!("│ {} {} {} │", dashes(2), label, dashes(fill)); + }; + let border = |left: char, right: char| { + println!("{}{}{}", left, dashes(BOX - 2), right); + }; + + border('┌', '┐'); + row("Pipeline Stats"); + row(""); + row(&format!("wall time: {:>9.1} ms", wall_ms)); + row(&format!( + "files: {:>9} peak concurrency: {}", + files, peak + )); + row(&format!( + "batches: {:>9} serialized: {:.1} MB", + batches, bytes_mb + )); + row(&format!( + "throughput: {:>9.1} MB/s parallelism: {:.1}x", + throughput, parallelism + )); + sep("time across all file tasks (sum)"); + row(&format!( + "reader setup: {:>9.1} ms (open, metadata, deletes)", + setup_ms + )); + row(&format!( + "fetch+decode: {:>9.1} ms (I/O + ZSTD + decode)", + fd_ms + )); + row(&format!( + "serialize IPC: {:>9.1} ms (RecordBatch -> Arrow IPC)", + ser_ms + )); + row(&format!( + "batch push wait: {:>9.1} ms (backpressure)", + sem_ms + )); + sep("dispatch"); + row(&format!( + "file push wait: {:>9.1} ms (backpressure)", + dispatch_ms + )); + sep("memory"); + row(&format!( + "peak buffered: {:>9.1} MB (limit: {} MB/task)", + peak_buf, limit_mb + )); + border('└', '┘'); + } +} + +pub(crate) static STATS: PipelineStats = PipelineStats::new(); + +// ── FFI exports (called from Julia benchmark teardown) ──────────────────── + +#[no_mangle] +pub extern "C" fn iceberg_print_pipeline_stats() { + STATS.print_summary(); +} + +#[no_mangle] +pub extern "C" fn iceberg_reset_pipeline_stats() { + STATS.reset(); +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + fn fresh() -> PipelineStats { + PipelineStats::new() + } + + // ── reset ───────────────────────────────────────────────────────────────── + + #[test] + fn reset_clears_every_field() { + let s = fresh(); + s.pipeline_wall_ns.store(1, Ordering::Relaxed); + s.files_completed.store(2, Ordering::Relaxed); + s.batches_produced.store(3, Ordering::Relaxed); + s.bytes_produced.store(4, Ordering::Relaxed); + s.peak_concurrency.store(5, Ordering::Relaxed); + s.active_tasks.store(6, Ordering::Relaxed); + s.reader_setup_ns.store(7, Ordering::Relaxed); + s.fetch_decode_ns.store(8, Ordering::Relaxed); + s.serialize_ns.store(9, Ordering::Relaxed); + s.semaphore_wait_ns.store(10, Ordering::Relaxed); + s.file_dispatch_wait_ns.store(11, Ordering::Relaxed); + s.buffered_bytes.store(12, Ordering::Relaxed); + s.peak_buffered_bytes.store(13, Ordering::Relaxed); + + s.reset(); + + assert_eq!(s.pipeline_wall_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.files_completed.load(Ordering::Relaxed), 0); + assert_eq!(s.batches_produced.load(Ordering::Relaxed), 0); + assert_eq!(s.bytes_produced.load(Ordering::Relaxed), 0); + assert_eq!(s.peak_concurrency.load(Ordering::Relaxed), 0); + assert_eq!(s.active_tasks.load(Ordering::Relaxed), 0); + assert_eq!(s.reader_setup_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.fetch_decode_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.serialize_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.semaphore_wait_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.file_dispatch_wait_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.buffered_bytes.load(Ordering::Relaxed), 0); + assert_eq!(s.peak_buffered_bytes.load(Ordering::Relaxed), 0); + } + + // ── track_task_start / track_task_end ───────────────────────────────────── + + #[test] + fn task_start_increments_active_and_updates_peak() { + let s = fresh(); + s.track_task_start(); + assert_eq!(s.active_tasks.load(Ordering::Relaxed), 1); + assert_eq!(s.peak_concurrency.load(Ordering::Relaxed), 1); + + s.track_task_start(); + assert_eq!(s.active_tasks.load(Ordering::Relaxed), 2); + assert_eq!(s.peak_concurrency.load(Ordering::Relaxed), 2); + } + + #[test] + fn task_end_decrements_active_without_touching_peak() { + let s = fresh(); + s.track_task_start(); + s.track_task_start(); + s.track_task_end(); + assert_eq!(s.active_tasks.load(Ordering::Relaxed), 1); + assert_eq!(s.peak_concurrency.load(Ordering::Relaxed), 2); // high-water mark unchanged + } + + #[test] + fn peak_concurrency_is_high_water_mark() { + let s = fresh(); + s.track_task_start(); // active 1, peak 1 + s.track_task_start(); // active 2, peak 2 + s.track_task_start(); // active 3, peak 3 + s.track_task_end(); // active 2, peak 3 + s.track_task_end(); // active 1, peak 3 + s.track_task_end(); // active 0, peak 3 + s.track_task_start(); // active 1 — below previous peak, peak stays 3 + assert_eq!(s.active_tasks.load(Ordering::Relaxed), 1); + assert_eq!(s.peak_concurrency.load(Ordering::Relaxed), 3); + } + + // ── track_buffer_add / track_buffer_release ─────────────────────────────── + + #[test] + fn buffer_add_increments_bytes_and_peak() { + let s = fresh(); + s.track_buffer_add(100); + assert_eq!(s.buffered_bytes.load(Ordering::Relaxed), 100); + assert_eq!(s.peak_buffered_bytes.load(Ordering::Relaxed), 100); + + s.track_buffer_add(50); + assert_eq!(s.buffered_bytes.load(Ordering::Relaxed), 150); + assert_eq!(s.peak_buffered_bytes.load(Ordering::Relaxed), 150); + } + + #[test] + fn buffer_release_decrements_bytes_without_touching_peak() { + let s = fresh(); + s.track_buffer_add(200); + s.track_buffer_release(80); + assert_eq!(s.buffered_bytes.load(Ordering::Relaxed), 120); + assert_eq!(s.peak_buffered_bytes.load(Ordering::Relaxed), 200); // peak unchanged + } + + #[test] + fn buffer_peak_does_not_decrease_after_release_and_smaller_add() { + let s = fresh(); + s.track_buffer_add(500); // peak → 500 + s.track_buffer_release(500); // current → 0 + s.track_buffer_add(10); // current → 10, peak stays 500 + assert_eq!(s.buffered_bytes.load(Ordering::Relaxed), 10); + assert_eq!(s.peak_buffered_bytes.load(Ordering::Relaxed), 500); + } + + #[test] + fn buffer_release_saturates_at_zero_instead_of_wrapping() { + let s = fresh(); + // Simulate a release arriving after a stats reset (lingering task). + s.track_buffer_release(100); // would have wrapped to u64::MAX without saturating_sub + assert_eq!(s.buffered_bytes.load(Ordering::Relaxed), 0); + assert_eq!(s.peak_buffered_bytes.load(Ordering::Relaxed), 0); + } + + // ── add_elapsed ─────────────────────────────────────────────────────────── + + #[test] + fn add_elapsed_accumulates_into_field() { + let s = fresh(); + let start = Instant::now(); + std::thread::sleep(Duration::from_millis(2)); + s.add_elapsed(&s.reader_setup_ns, start); + let after_first = s.reader_setup_ns.load(Ordering::Relaxed); + assert!(after_first > 0, "first add_elapsed should record > 0 ns"); + + let start2 = Instant::now(); + std::thread::sleep(Duration::from_millis(2)); + s.add_elapsed(&s.reader_setup_ns, start2); + let after_second = s.reader_setup_ns.load(Ordering::Relaxed); + assert!(after_second > after_first, "second add_elapsed should accumulate"); + } + + #[test] + fn add_elapsed_is_independent_per_field() { + let s = fresh(); + let start = Instant::now(); + std::thread::sleep(Duration::from_millis(2)); + s.add_elapsed(&s.fetch_decode_ns, start); + + // Unrelated fields stay at 0. + assert_eq!(s.serialize_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.semaphore_wait_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.reader_setup_ns.load(Ordering::Relaxed), 0); + assert!(s.fetch_decode_ns.load(Ordering::Relaxed) > 0); + } + + // ── store_elapsed ───────────────────────────────────────────────────────── + + #[test] + fn store_elapsed_overwrites_previous_value() { + let s = fresh(); + s.pipeline_wall_ns.store(999_999_999, Ordering::Relaxed); // some large value + + let start = Instant::now(); + std::thread::sleep(Duration::from_millis(2)); + s.store_elapsed(&s.pipeline_wall_ns, start); + let stored = s.pipeline_wall_ns.load(Ordering::Relaxed); + + // Stored value is the elapsed time, which is much less than 999_999_999 ns (1 s). + assert!(stored > 0); + assert!(stored < 999_999_999, "store_elapsed should overwrite, not accumulate"); + } + + #[test] + fn file_dispatch_wait_accumulates_via_add_elapsed() { + let s = fresh(); + let start = Instant::now(); + std::thread::sleep(Duration::from_millis(2)); + s.add_elapsed(&s.file_dispatch_wait_ns, start); + assert!(s.file_dispatch_wait_ns.load(Ordering::Relaxed) > 0); + // Other fields are unaffected. + assert_eq!(s.semaphore_wait_ns.load(Ordering::Relaxed), 0); + } + + #[test] + fn store_elapsed_does_not_affect_other_fields() { + let s = fresh(); + let start = Instant::now(); + std::thread::sleep(Duration::from_millis(2)); + s.store_elapsed(&s.pipeline_wall_ns, start); + + assert_eq!(s.reader_setup_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.fetch_decode_ns.load(Ordering::Relaxed), 0); + assert_eq!(s.serialize_ns.load(Ordering::Relaxed), 0); + } +} diff --git a/iceberg_rust_ffi/src/scan_common.rs b/iceberg_rust_ffi/src/scan_common.rs index daac382..a85645b 100644 --- a/iceberg_rust_ffi/src/scan_common.rs +++ b/iceberg_rust_ffi/src/scan_common.rs @@ -38,6 +38,10 @@ macro_rules! impl_select_columns { builder: scan_ref.builder.map(|b| b.select(columns)), scan: scan_ref.scan, serialization_concurrency: scan_ref.serialization_concurrency, + file_io: scan_ref.file_io, + batch_size: scan_ref.batch_size, + file_concurrency: scan_ref.file_concurrency, + file_prefetch_depth: scan_ref.file_prefetch_depth, })); CResult::Ok @@ -84,6 +88,10 @@ macro_rules! impl_scan_builder_method { builder: scan_ref.builder.map(|b| b.$builder_method($($param),*)), scan: scan_ref.scan, serialization_concurrency: scan_ref.serialization_concurrency, + file_io: scan_ref.file_io, + batch_size: scan_ref.batch_size, + file_concurrency: scan_ref.file_concurrency, + file_prefetch_depth: scan_ref.file_prefetch_depth, })); CResult::Ok @@ -111,6 +119,10 @@ macro_rules! impl_with_batch_size { builder: scan_ref.builder.map(|b| b.with_batch_size(Some(n))), scan: None, serialization_concurrency: scan_ref.serialization_concurrency, + file_io: scan_ref.file_io, + batch_size: Some(n), + file_concurrency: scan_ref.file_concurrency, + file_prefetch_depth: scan_ref.file_prefetch_depth, })); CResult::Ok @@ -138,6 +150,10 @@ macro_rules! impl_scan_build { builder: None, scan: Some(built_scan), serialization_concurrency: scan_ref.serialization_concurrency, + file_io: scan_ref.file_io, + batch_size: scan_ref.batch_size, + file_concurrency: scan_ref.file_concurrency, + file_prefetch_depth: scan_ref.file_prefetch_depth, })); CResult::Ok } @@ -178,10 +194,27 @@ macro_rules! impl_with_serialization_concurrency_limit { }; } +/// Macro to generate with_file_prefetch_depth function for any scan type +macro_rules! impl_with_file_prefetch_depth { + ($fn_name:ident, $scan_type:ident) => { + #[no_mangle] + pub extern "C" fn $fn_name(scan: &mut *mut $scan_type, n: usize) -> CResult { + if scan.is_null() || (*scan).is_null() { + return CResult::Error; + } + let mut scan_ref = unsafe { Box::from_raw(*scan) }; + scan_ref.file_prefetch_depth = n; + *scan = Box::into_raw(scan_ref); + CResult::Ok + } + }; +} + // Re-export macros for use in other modules pub(crate) use impl_scan_build; pub(crate) use impl_scan_builder_method; pub(crate) use impl_scan_free; pub(crate) use impl_select_columns; pub(crate) use impl_with_batch_size; +pub(crate) use impl_with_file_prefetch_depth; pub(crate) use impl_with_serialization_concurrency_limit; diff --git a/iceberg_rust_ffi/src/table.rs b/iceberg_rust_ffi/src/table.rs index 2e089a8..e6175f0 100644 --- a/iceberg_rust_ffi/src/table.rs +++ b/iceberg_rust_ffi/src/table.rs @@ -1,6 +1,7 @@ use crate::response::IcebergBoxedResponse; /// Table and streaming support for iceberg_rust_ffi use crate::{CResult, Context, RawResponse}; +use crate::ordered_file_pipeline::FileScan; use iceberg::io::{FileIOBuilder, OpenDalRoutingStorageFactory}; use iceberg::table::StaticTable; use iceberg::table::Table; @@ -77,6 +78,67 @@ impl RawResponse for IcebergBatchResponse { } } +/// Outer stream of per-file scans from the nested pipeline. +pub struct IcebergFileScanStream { + pub stream: AsyncMutex< + futures::stream::BoxStream<'static, Result>, + >, +} + +unsafe impl Send for IcebergFileScanStream {} + +/// C-compatible per-file scan item returned to Julia. +/// Owns `filename` (must be freed via iceberg_file_scan_free) and `stream`. +#[repr(C)] +pub struct IcebergFileScan { + /// Null-terminated file path. Owned; freed by iceberg_file_scan_free. + pub filename: *mut c_char, + pub record_count: i64, + /// Inner batch stream. Owned; freed by iceberg_file_scan_free. + /// Callers must NOT call iceberg_arrow_stream_free on this pointer. + pub stream: *mut IcebergArrowStream, +} + +unsafe impl Send for IcebergFileScan {} + +pub type IcebergFileScanStreamResponse = IcebergBoxedResponse; + +/// Response for iceberg_next_file_scan (mirrors IcebergBatchResponse). +#[repr(transparent)] +pub struct IcebergFileScanResponse(pub IcebergBoxedResponse); + +unsafe impl Send for IcebergFileScanResponse {} + +impl RawResponse for IcebergFileScanResponse { + type Payload = Option; + + fn result_mut(&mut self) -> &mut CResult { + &mut self.0.result + } + fn context_mut(&mut self) -> &mut *const Context { + &mut self.0.context + } + fn error_message_mut(&mut self) -> &mut *mut c_char { + &mut self.0.error_message + } + fn set_payload(&mut self, payload: Option) { + match payload.flatten() { + Some(fs) => { + let filename = std::ffi::CString::new(fs.filename) + .unwrap_or_default() + .into_raw(); + let stream = Box::into_raw(Box::new(fs.stream)); + self.0.value = Box::into_raw(Box::new(IcebergFileScan { + filename, + record_count: fs.record_count, + stream, + })); + } + None => self.0.value = ptr::null_mut(), + } + } +} + /// Synchronous operations for table and batch management /// Free a table @@ -114,6 +176,77 @@ pub extern "C" fn iceberg_arrow_stream_free(stream: *mut IcebergArrowStream) { } } +/// Free a file scan (its owned filename and inner stream). +#[no_mangle] +pub extern "C" fn iceberg_file_scan_free(scan: *mut IcebergFileScan) { + if scan.is_null() { + return; + } + unsafe { + let scan = Box::from_raw(scan); + if !scan.filename.is_null() { + let _ = std::ffi::CString::from_raw(scan.filename); + } + if !scan.stream.is_null() { + let _ = Box::from_raw(scan.stream); + } + } +} + +/// Free a file scan stream. +#[no_mangle] +pub extern "C" fn iceberg_file_scan_stream_free(stream: *mut IcebergFileScanStream) { + if !stream.is_null() { + unsafe { + let _ = Box::from_raw(stream); + } + } +} + +/// Return the record count of a file scan. Returns -1 on null input. +#[no_mangle] +pub extern "C" fn iceberg_file_scan_record_count(scan: *const IcebergFileScan) -> i64 { + if scan.is_null() { + return -1; + } + unsafe { (*scan).record_count } +} + +/// Return a borrowed pointer to the null-terminated filename of a file scan. +/// The pointer is valid for the lifetime of the IcebergFileScan. +/// The caller must NOT free this pointer; iceberg_file_scan_free handles it. +#[no_mangle] +pub extern "C" fn iceberg_file_scan_filename(scan: *const IcebergFileScan) -> *const c_char { + if scan.is_null() { + return ptr::null(); + } + unsafe { (*scan).filename } +} + +// Get next file scan from outer stream (async) +export_runtime_op!( + iceberg_next_file_scan, + IcebergFileScanResponse, + || { + if stream.is_null() { + return Err(anyhow::anyhow!("Null file scan stream pointer")); + } + let stream_ref = unsafe { &*stream }; + Ok(stream_ref) + }, + stream_ref, + async { + use futures::StreamExt; + let mut guard = stream_ref.stream.lock().await; + match guard.next().await { + Some(Ok(fs)) => Ok(Some(fs)), + Some(Err(e)) => Err(anyhow::anyhow!("Error reading file scan: {}", e)), + None => Ok(None), + } + }, + stream: *mut IcebergFileScanStream +); + // FFI Export functions for table operations // These functions are exported to be called from Julia via the FFI diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index 2179f5a..4ed921c 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -15,8 +15,11 @@ export new_incremental_scan, free_incremental_scan! export table_open, free_table, new_scan, free_scan! export table_location, table_uuid, table_format_version, table_last_sequence_number, table_last_updated_ms, table_schema export select_columns!, with_batch_size!, with_data_file_concurrency_limit!, with_manifest_entry_concurrency_limit! -export with_file_column!, with_pos_column! +export with_file_column!, with_pos_column!, with_file_prefetch_depth! export scan!, next_batch, free_batch, free_stream +export scan_nested!, nested_arrow_stream, next_file_scan +export FileScanStream, file_scan_record_count, file_scan_filename, file_scan_arrow_stream +export free_file_scan!, free_file_scan_stream! export FILE_COLUMN, POS_COLUMN export Catalog, catalog_create_rest, free_catalog export load_table, list_tables, list_namespaces, table_exists, create_table, drop_table, drop_namespace, create_namespace diff --git a/src/full.jl b/src/full.jl index 9ac088b..a29f074 100644 --- a/src/full.jl +++ b/src/full.jl @@ -206,6 +206,23 @@ function with_serialization_concurrency_limit!(scan::Scan, n::UInt) return nothing end +""" + with_file_prefetch_depth!(scan::Scan, n::UInt) + +Set how many FileScan tasks are queued ahead in the outer FileScanStream. +Higher values keep the Julia consumer busy but use more memory. +""" +function with_file_prefetch_depth!(scan::Scan, n::UInt) + result = GC.@preserve scan @ccall rust_lib.iceberg_scan_with_file_prefetch_depth( + convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}}, + n::Csize_t + )::Cint + if result != 0 + throw(IcebergException("Failed to set file prefetch depth", result)) + end + return nothing +end + """ with_snapshot_id!(scan::Scan, snapshot_id::Int64) @@ -297,3 +314,130 @@ function free_scan!(scan::Scan) convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}} )::Cvoid end + +# ── Nested file-scan pipeline ────────────────────────────────────────────── +# +# Returns a FileScanStream (outer stream). Each item yielded by next_file_scan +# is an opaque FileScan pointer carrying the filename, record count, and an +# inner IcebergArrowStream of prefetched batches. +# +# Ownership rules: +# - Call free_file_scan! after processing each FileScan. +# - Do NOT call free_stream on the pointer returned by file_scan_arrow_stream; +# iceberg_file_scan_free handles that. +# - Call free_file_scan_stream! after the outer loop completes. + +"""Opaque pointer type for an outer file-scan stream (IcebergFileScanStream).""" +const FileScanStream = Ptr{Cvoid} + +const FileScanStreamResponse = Response{FileScanStream} + +""" + nested_arrow_stream(scan::Scan)::FileScanStream + +Initialize a nested Arrow stream for the scan asynchronously. +Returns an outer FileScanStream whose items are per-file scans. +""" +function nested_arrow_stream(scan::Scan) + response = FileScanStreamResponse() + + async_ccall(response) do handle + @ccall rust_lib.iceberg_file_scan_stream( + scan.ptr::Ptr{Cvoid}, + response::Ref{FileScanStreamResponse}, + handle::Ptr{Cvoid} + )::Cint + end + + @throw_on_error(response, "iceberg_file_scan_stream", IcebergException) + + return response.value +end + +""" + scan_nested!(scan::Scan)::FileScanStream + +Build the scan and return a nested FileScanStream (one item per file). +""" +function scan_nested!(scan::Scan) + build!(scan) + return nested_arrow_stream(scan) +end + +# Response type for next_file_scan +const FileScanResponse = Response{Ptr{Cvoid}} + +""" + next_file_scan(stream::FileScanStream)::Ptr{Cvoid} + +Wait for the next per-file scan from the outer stream asynchronously. +Returns C_NULL when the stream is exhausted. +""" +function next_file_scan(stream::FileScanStream) + response = FileScanResponse() + + async_ccall(response) do handle + @ccall rust_lib.iceberg_next_file_scan( + stream::FileScanStream, + response::Ref{FileScanResponse}, + handle::Ptr{Cvoid} + )::Cint + end + + @throw_on_error(response, "iceberg_next_file_scan", IcebergException) + + return response.value +end + +""" + file_scan_record_count(fs::Ptr{Cvoid})::Int64 + +Return the record count of the file scan. Returns -1 on null input. +""" +function file_scan_record_count(fs::Ptr{Cvoid})::Int64 + return @ccall rust_lib.iceberg_file_scan_record_count(fs::Ptr{Cvoid})::Int64 +end + +""" + file_scan_filename(fs::Ptr{Cvoid})::String + +Return the file path of the file scan as a Julia String. +""" +function file_scan_filename(fs::Ptr{Cvoid})::String + ptr = @ccall rust_lib.iceberg_file_scan_filename(fs::Ptr{Cvoid})::Ptr{Cchar} + ptr == C_NULL && return "" + return unsafe_string(ptr) +end + +""" + file_scan_arrow_stream(fs::Ptr{Cvoid})::ArrowStream + +Return the inner batch stream of the file scan (borrowed pointer). +Do NOT call free_stream on the returned pointer; use free_file_scan! instead. +""" +function file_scan_arrow_stream(fs::Ptr{Cvoid})::ArrowStream + # IcebergFileScan layout (repr(C)): + # filename: *mut c_char (offset 0, 8 bytes) + # record_count: i64 (offset 8, 8 bytes) + # stream: *mut IcebergArrowStream (offset 16, 8 bytes) + offset = sizeof(Ptr{Cvoid}) + sizeof(Int64) + return unsafe_load(convert(Ptr{Ptr{Cvoid}}, fs + offset)) +end + +""" + free_file_scan!(fs::Ptr{Cvoid}) + +Free the memory associated with a file scan (filename and inner stream). +""" +function free_file_scan!(fs::Ptr{Cvoid}) + @ccall rust_lib.iceberg_file_scan_free(fs::Ptr{Cvoid})::Cvoid +end + +""" + free_file_scan_stream!(stream::FileScanStream) + +Free the memory associated with a file scan stream. +""" +function free_file_scan_stream!(stream::FileScanStream) + @ccall rust_lib.iceberg_file_scan_stream_free(stream::FileScanStream)::Cvoid +end diff --git a/test/scan_tests.jl b/test/scan_tests.jl index c8e0358..a213e2a 100644 --- a/test/scan_tests.jl +++ b/test/scan_tests.jl @@ -1223,3 +1223,237 @@ end end end end + +@testset "Nested File-Scan API" begin + nations_path = "s3://warehouse/tpch.sf01/nation/metadata/00001-44f668fe-3688-49d5-851f-36e75d143321.metadata.json" + customer_path = "s3://warehouse/tpch.sf01/customer/metadata/00001-76f6e7e4-b34f-492f-b6a1-cc9f8c8f4975.metadata.json" + + @testset "nested_arrow_stream called directly after build!" begin + # scan_nested! = build! + nested_arrow_stream; exercise the two-step path. + table = RustyIceberg.table_open(nations_path) + scan = RustyIceberg.new_scan(table) + RustyIceberg.build!(scan) + outer = RustyIceberg.nested_arrow_stream(scan) + @test outer != C_NULL + + row_count = 0 + try + fs_ptr = RustyIceberg.next_file_scan(outer) + while fs_ptr != C_NULL + inner = RustyIceberg.file_scan_arrow_stream(fs_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + row_count += nrow(DataFrame(arrow_table)) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + end + RustyIceberg.free_file_scan!(fs_ptr) + fs_ptr = RustyIceberg.next_file_scan(outer) + end + finally + RustyIceberg.free_file_scan_stream!(outer) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + + @test row_count == 25 + println("✅ nested_arrow_stream called directly after build! returns correct data") + end + + @testset "Basic iteration — filenames and record counts" begin + table = RustyIceberg.table_open(nations_path) + scan = RustyIceberg.new_scan(table) + outer = RustyIceberg.scan_nested!(scan) + @test outer != C_NULL + + file_count = 0 + total_records = 0 + try + fs_ptr = RustyIceberg.next_file_scan(outer) + while fs_ptr != C_NULL + file_count += 1 + fname = RustyIceberg.file_scan_filename(fs_ptr) + nrec = RustyIceberg.file_scan_record_count(fs_ptr) + + @test !isempty(fname) + @test endswith(fname, ".parquet") + @test startswith(fname, "s3://") + @test nrec > 0 + total_records += nrec + + # Drain the inner stream. + inner = RustyIceberg.file_scan_arrow_stream(fs_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + while batch_ptr != C_NULL + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + end + + RustyIceberg.free_file_scan!(fs_ptr) + fs_ptr = RustyIceberg.next_file_scan(outer) + end + finally + RustyIceberg.free_file_scan_stream!(outer) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + + @test file_count > 0 + @test total_records == 25 # nations table has exactly 25 rows + println("✅ Basic iteration: $file_count file(s), $total_records total records") + end + + @testset "Row counts match flat scan" begin + # Collect total rows via nested pipeline. + table_n = RustyIceberg.table_open(customer_path) + scan_n = RustyIceberg.new_scan(table_n) + outer = RustyIceberg.scan_nested!(scan_n) + + nested_rows = 0 + try + fs_ptr = RustyIceberg.next_file_scan(outer) + while fs_ptr != C_NULL + inner = RustyIceberg.file_scan_arrow_stream(fs_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + nested_rows += nrow(DataFrame(arrow_table)) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + end + RustyIceberg.free_file_scan!(fs_ptr) + fs_ptr = RustyIceberg.next_file_scan(outer) + end + finally + RustyIceberg.free_file_scan_stream!(outer) + RustyIceberg.free_scan!(scan_n) + RustyIceberg.free_table(table_n) + end + + # Collect total rows via flat pipeline. + table_f = RustyIceberg.table_open(customer_path) + scan_f = RustyIceberg.new_scan(table_f) + stream_f = RustyIceberg.scan!(scan_f) + + flat_rows = 0 + try + batch_ptr = RustyIceberg.next_batch(stream_f) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + flat_rows += nrow(DataFrame(arrow_table)) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream_f) + end + finally + RustyIceberg.free_stream(stream_f) + RustyIceberg.free_scan!(scan_f) + RustyIceberg.free_table(table_f) + end + + @test nested_rows == flat_rows + @test nested_rows > 0 + println("✅ Row count consistency: nested=$nested_rows, flat=$flat_rows") + end + + @testset "Correct data — nations table" begin + table = RustyIceberg.table_open(nations_path) + scan = RustyIceberg.new_scan(table) + outer = RustyIceberg.scan_nested!(scan) + + rows = Tuple[] + try + fs_ptr = RustyIceberg.next_file_scan(outer) + while fs_ptr != C_NULL + inner = RustyIceberg.file_scan_arrow_stream(fs_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + @test names(df) == ["n_nationkey", "n_name", "n_regionkey", "n_comment"] + for row in eachrow(df) + push!(rows, Tuple(row)) + end + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + end + RustyIceberg.free_file_scan!(fs_ptr) + fs_ptr = RustyIceberg.next_file_scan(outer) + end + finally + RustyIceberg.free_file_scan_stream!(outer) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + + sort!(rows, by = x -> x[1]) + @test length(rows) == 25 + @test rows[1] == (0, "ALGERIA", 0, "furiously regular requests. platelets affix furious") + @test rows[25] == (24, "UNITED STATES", 1, "ly ironic requests along the slyly bold ideas hang after the blithely special notornis; blithely even accounts") + println("✅ Correct data verified for nations table via nested API") + end + + @testset "Builder methods — select_columns! and with_batch_size!" begin + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name"]) + RustyIceberg.with_batch_size!(scan, UInt(10)) + outer = RustyIceberg.scan_nested!(scan) + + try + fs_ptr = RustyIceberg.next_file_scan(outer) + while fs_ptr != C_NULL + inner = RustyIceberg.file_scan_arrow_stream(fs_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + @test names(df) == ["c_custkey", "c_name"] + @test nrow(df) <= 10 + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + end + RustyIceberg.free_file_scan!(fs_ptr) + fs_ptr = RustyIceberg.next_file_scan(outer) + end + finally + RustyIceberg.free_file_scan_stream!(outer) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + println("✅ select_columns! and with_batch_size! work with nested API") + end + + @testset "Safe early drop of FileScan" begin + # Drop a FileScan after reading only the first batch — must not crash or hang. + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + RustyIceberg.with_batch_size!(scan, UInt(1)) + outer = RustyIceberg.scan_nested!(scan) + + try + fs_ptr = RustyIceberg.next_file_scan(outer) + @test fs_ptr != C_NULL + + # Read exactly one batch then abandon the rest. + inner = RustyIceberg.file_scan_arrow_stream(fs_ptr) + batch_ptr = RustyIceberg.next_batch(inner) + if batch_ptr != C_NULL + RustyIceberg.free_batch(batch_ptr) + end + # Drop the file scan without draining inner stream. + RustyIceberg.free_file_scan!(fs_ptr) + # Drop the outer stream without consuming remaining files. + finally + RustyIceberg.free_file_scan_stream!(outer) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + println("✅ Safe early drop of FileScan does not crash") + end +end