Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ jobs:
os: ubuntu-latest
arch: x64

rust-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
workspaces: ./iceberg_rust_ffi
- name: Run Rust tests
run: cargo test
working-directory: iceberg_rust_ffi

rust-format-check:
runs-on: ubuntu-latest
steps:
Expand Down
8 changes: 3 additions & 5 deletions iceberg_rust_ffi/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::response::{
IcebergStringListResponse,
};
/// Catalog support for iceberg_rust_ffi
use crate::IcebergTable;
use crate::{unexpected, IcebergTable};
use anyhow::Result;
use async_trait::async_trait;
use iceberg::io::{
Expand Down Expand Up @@ -155,16 +155,14 @@ impl StorageCredentialsLoader for RestCredentialsLoader {
.catalog
.get()
.and_then(|w| w.upgrade())
.ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "Catalog reference is not available")
})?;
.ok_or_else(|| unexpected("Catalog reference is not available"))?;
let response = catalog.load_table_credentials(table_ident).await?;
response
.storage_credentials
.into_iter()
.filter(|c| location.starts_with(&c.prefix))
.max_by_key(|c| c.prefix.len())
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "No matching credential for location"))
.ok_or_else(|| unexpected("No matching credential for location"))
}
}

Expand Down
187 changes: 150 additions & 37 deletions iceberg_rust_ffi/src/full.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,99 @@
use std::ffi::{c_char, c_void, CStr};
use std::ptr;

use iceberg::io::FileIO;
use iceberg::scan::{TableScan, TableScanBuilder};
use object_store_ffi::{
export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT,
};
use tokio::sync::Mutex as AsyncMutex;

use crate::scan_common::*;
use crate::{IcebergArrowStream, IcebergArrowStreamResponse, IcebergTable};
use crate::{
IcebergArrowStream, IcebergArrowStreamResponse, IcebergFileScanStream,
IcebergFileScanStreamResponse, IcebergTable,
};

/// Struct for regular (full) scan builder and scan
/// Holds state for a full table scan across its lifecycle:
/// 1. Construction: `builder` is set, everything else is None/0.
/// 2. Configuration: Julia calls with_* methods that transform `builder`.
/// 3. Build: `builder` is consumed → `scan` is populated.
/// 4. Stream: `scan` + `file_io` + `batch_size` are consumed by
/// `iceberg_arrow_stream` to create the file-parallel pipeline.
#[repr(C)]
pub struct IcebergScan {
pub builder: Option<TableScanBuilder<'static>>,
pub scan: Option<TableScan>,
/// 0 = auto-detect (num_cpus)
/// Serialization thread count (0 = auto-detect). Currently unused by the
/// pipeline (kept for future use / backward compat with incremental scan).
pub serialization_concurrency: usize,
/// Cloned from the Table at construction time. Passed to the pipeline so
/// each per-file ArrowReader can open its own parquet file.
pub file_io: FileIO,
/// Captured when Julia calls with_batch_size. Forwarded to each per-file
/// ArrowReaderBuilder inside the pipeline.
pub batch_size: Option<usize>,
/// How many parquet files to process concurrently in the pipeline.
/// Set by with_data_file_concurrency_limit (0 = auto-detect).
pub file_concurrency: usize,
/// How many FileScan items to queue in the outer channel of the nested pipeline.
/// 0 = use file_concurrency as the default.
pub file_prefetch_depth: usize,
}

unsafe impl Send for IcebergScan {}

/// Create a new scan builder
/// Create a new scan builder from an opened table.
/// Captures `file_io` from the table for later use by the pipeline.
#[no_mangle]
pub extern "C" fn iceberg_new_scan(table: *mut IcebergTable) -> *mut IcebergScan {
if table.is_null() {
return ptr::null_mut();
}
let table_ref = unsafe { &*table };
let file_io = table_ref.table.file_io().clone();
let scan_builder = table_ref.table.scan();
Box::into_raw(Box::new(IcebergScan {
builder: Some(scan_builder),
scan: None,
serialization_concurrency: 0,
file_io,
batch_size: None,
file_concurrency: 0,
file_prefetch_depth: 0,
}))
}

// Use macros from scan_common for shared functionality
// ── Scan builder configuration (via macros from scan_common.rs) ─────────

impl_select_columns!(iceberg_select_columns, IcebergScan);

impl_scan_builder_method!(
iceberg_scan_with_data_file_concurrency_limit,
IcebergScan,
with_data_file_concurrency_limit,
n: usize
);
/// Set file concurrency. Hand-written (not macro-generated) because we need
/// to both (a) forward the value to the iceberg-rs scan builder and (b)
/// capture it in `file_concurrency` for our pipeline.
#[no_mangle]
pub extern "C" fn iceberg_scan_with_data_file_concurrency_limit(
scan: &mut *mut IcebergScan,
n: usize,
) -> CResult {
if scan.is_null() || (*scan).is_null() {
return CResult::Error;
}
let scan_ref = unsafe { Box::from_raw(*scan) };
if scan_ref.builder.is_none() {
return CResult::Error;
}
*scan = Box::into_raw(Box::new(IcebergScan {
builder: scan_ref
.builder
.map(|b| b.with_data_file_concurrency_limit(n)),
scan: scan_ref.scan,
serialization_concurrency: scan_ref.serialization_concurrency,
file_io: scan_ref.file_io,
batch_size: scan_ref.batch_size,
file_concurrency: n,
file_prefetch_depth: scan_ref.file_prefetch_depth,
}));
CResult::Ok
}

impl_scan_builder_method!(
iceberg_scan_with_manifest_file_concurrency_limit,
Expand Down Expand Up @@ -73,56 +122,120 @@ impl_with_serialization_concurrency_limit!(
IcebergScan
);

impl_with_file_prefetch_depth!(iceberg_scan_with_file_prefetch_depth, IcebergScan);

impl_scan_builder_method!(
iceberg_scan_with_snapshot_id,
IcebergScan,
snapshot_id,
snapshot_id: i64
);

// Async function to initialize stream from a table scan
// ── Stream creation ─────────────────────────────────────────────────────
//
// Instead of calling iceberg-rs's `scan.to_arrow()` (which uses
// `try_for_each_concurrent` internally and interleaves batches across
// files in arbitrary order), we:
// 1. Call `scan.plan_files()` to get an ordered list of FileScanTasks.
// 2. Feed them into our own file-parallel pipeline
// (ordered_file_pipeline.rs) which processes N files concurrently
// but yields batches in strict file-then-row order.

/// Resolve the pipeline tuning parameters from a configured scan.
/// Returns `(concurrency, prefetch_depth, file_io, batch_size)`.
/// A stored value of 0 means "auto": concurrency defaults to available
/// parallelism; prefetch_depth defaults to concurrency.
fn resolve_pipeline_params(scan: &IcebergScan) -> (usize, usize, FileIO, Option<usize>) {
let concurrency = if scan.file_concurrency == 0 {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
} else {
scan.file_concurrency
};
let prefetch_depth = if scan.file_prefetch_depth == 0 {
concurrency
} else {
scan.file_prefetch_depth
};
(concurrency, prefetch_depth, scan.file_io.clone(), scan.batch_size)
}

export_runtime_op!(
iceberg_arrow_stream,
IcebergArrowStreamResponse,
|| {
if scan.is_null() {
return Err(anyhow::anyhow!("Null scan pointer provided"));
}
let scan_ptr = unsafe { &*scan };
let scan_ptr = unsafe { &mut *scan };
let scan_ref = &scan_ptr.scan;
if scan_ref.is_none() {
return Err(anyhow::anyhow!("Scan not initialized"));
}

// Determine concurrency (0 = auto-detect)
let serialization_concurrency = scan_ptr.serialization_concurrency;
let serialization_concurrency = if serialization_concurrency == 0 {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
} else {
serialization_concurrency
};

Ok((scan_ref.as_ref().unwrap(), serialization_concurrency))
let (concurrency, prefetch_depth, file_io, batch_size) =
resolve_pipeline_params(scan_ptr);
Ok((scan_ref.as_ref().unwrap(), concurrency, prefetch_depth, file_io, batch_size))
},
result_tuple,
async {
let (scan_ref, serialization_concurrency) = result_tuple;
let (scan_ref, concurrency, prefetch_depth, file_io, batch_size) = result_tuple;

let stream = scan_ref.to_arrow().await?;
// Collect the ordered file task list from iceberg-rs.
use futures::TryStreamExt;
let tasks: Vec<iceberg::scan::FileScanTask> =
scan_ref.plan_files().await?.try_collect().await?;

// Transform stream: RecordBatch -> ArrowBatch with parallel serialization
let serialized_stream = crate::transform_stream_with_parallel_serialization(
stream,
serialization_concurrency
);
// Hand off to the file-parallel pipeline.
let stream = crate::ordered_file_pipeline::create_pipeline(
tasks, file_io, batch_size, concurrency, prefetch_depth,
)
.await?;

Ok::<IcebergArrowStream, anyhow::Error>(IcebergArrowStream {
stream: AsyncMutex::new(serialized_stream),
})
Ok::<IcebergArrowStream, anyhow::Error>(stream)
},
scan: *mut IcebergScan
);

impl_scan_free!(iceberg_scan_free, IcebergScan);

// ── Nested stream creation ───────────────────────────────────────────────
//
// Returns an IcebergFileScanStream whose items are per-file (filename,
// record_count, inner-batch-stream) tuples, yielded in strict file order.
// The flat iceberg_arrow_stream is implemented as a flattening wrapper over
// this same nested pipeline.

export_runtime_op!(
iceberg_file_scan_stream,
IcebergFileScanStreamResponse,
|| {
if scan.is_null() {
return Err(anyhow::anyhow!("Null scan pointer provided"));
}
let scan_ptr = unsafe { &mut *scan };
let scan_ref = &scan_ptr.scan;
if scan_ref.is_none() {
return Err(anyhow::anyhow!("Scan not initialized"));
}
let (concurrency, prefetch_depth, file_io, batch_size) =
resolve_pipeline_params(scan_ptr);
Ok((scan_ref.as_ref().unwrap(), concurrency, prefetch_depth, file_io, batch_size))
},
result_tuple,
async {
let (scan_ref, concurrency, prefetch_depth, file_io, batch_size) = result_tuple;

use futures::TryStreamExt;
let tasks: Vec<iceberg::scan::FileScanTask> =
scan_ref.plan_files().await?.try_collect().await?;

let stream = crate::ordered_file_pipeline::create_nested_pipeline(
tasks, file_io, batch_size, concurrency, prefetch_depth,
)
.await?;

Ok::<IcebergFileScanStream, anyhow::Error>(stream)
},
scan: *mut IcebergScan
);
9 changes: 9 additions & 0 deletions iceberg_rust_ffi/src/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ pub struct IcebergIncrementalScan {
pub scan: Option<IncrementalTableScan>,
/// 0 = auto-detect (num_cpus)
pub serialization_concurrency: usize,
// Present for macro compatibility with IcebergScan; unused for incremental.
pub file_io: Option<iceberg::io::FileIO>,
pub batch_size: Option<usize>,
pub file_concurrency: usize,
pub file_prefetch_depth: usize,
}

unsafe impl Send for IcebergIncrementalScan {}
Expand Down Expand Up @@ -102,6 +107,10 @@ pub extern "C" fn iceberg_new_incremental_scan(
builder: Some(scan_builder),
scan: None,
serialization_concurrency: 0,
file_io: None,
batch_size: None,
file_concurrency: 0,
file_prefetch_depth: 0,
}))
}

Expand Down
32 changes: 17 additions & 15 deletions iceberg_rust_ffi/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use futures::StreamExt;
use std::ffi::{c_char, c_void};

pub(crate) fn unexpected(msg: impl std::fmt::Display) -> iceberg::Error {
iceberg::Error::new(iceberg::ErrorKind::Unexpected, msg.to_string())
}

use anyhow::Result;
use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
Expand Down Expand Up @@ -31,6 +35,12 @@ mod writer;
// Column-based writer module (zero-copy from Julia)
mod writer_columns;

// Profiling stats for the file-parallel pipeline
mod pipeline_stats;

// Ordered file-parallel pipeline
mod ordered_file_pipeline;

// Response types module
mod response;

Expand All @@ -46,8 +56,9 @@ pub use response::{
IcebergStringListResponse,
};
pub use table::{
ArrowBatch, IcebergArrowStream, IcebergArrowStreamResponse, IcebergBatchResponse, IcebergTable,
IcebergTableResponse,
ArrowBatch, IcebergArrowStream, IcebergArrowStreamResponse, IcebergBatchResponse,
IcebergFileScan, IcebergFileScanResponse, IcebergFileScanStream, IcebergFileScanStreamResponse,
IcebergTable, IcebergTableResponse,
};
pub use transaction::{IcebergDataFiles, IcebergTransaction, IcebergTransactionResponse};
pub use writer::{
Expand Down Expand Up @@ -158,23 +169,14 @@ pub(crate) fn transform_stream_with_parallel_serialization(
.await
{
Ok(Ok(arrow_batch)) => Ok(arrow_batch),
Ok(Err(e)) => Err(iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
e.to_string(),
)),
Err(e) => Err(iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
format!("Serialization task panicked: {}", e),
)),
Ok(Err(e)) => Err(unexpected(e)),
Err(e) => Err(unexpected(format!("Serialization task panicked: {e}"))),
}
}
Err(e) => Err(iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
format!("Stream error: {}", e),
)),
Err(e) => Err(unexpected(format!("Stream error: {e}"))),
}
})
.buffer_unordered(concurrency)
.buffered(concurrency)
.boxed()
}

Expand Down
Loading
Loading