diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index d61f4cc..c2cd772 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1638,7 +1638,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.7.18" +version = "0.7.19" dependencies = [ "anyhow", "arrow-array", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 0cff2e5..f76f726 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.7.18" +version = "0.7.19" edition = "2021" [lib] diff --git a/iceberg_rust_ffi/src/writer.rs b/iceberg_rust_ffi/src/writer.rs index efc194a..19b7e51 100644 --- a/iceberg_rust_ffi/src/writer.rs +++ b/iceberg_rust_ffi/src/writer.rs @@ -1,12 +1,20 @@ /// Writer support for iceberg_rust_ffi /// -/// This module provides FFI bindings for the iceberg-rust Writer API, -/// enabling Julia to write Arrow data to Parquet files and produce DataFiles -/// for use with the Transaction API. -use std::ffi::{c_char, c_void}; +/// Encoding is handled by a global pool of N=available_parallelism OS threads shared +/// across all writers. Per-writer ordering is guaranteed by the per-writer +/// `Arc>` inside WriterState: only one pool thread encodes +/// a given writer at a time, and the FIFO global queue ensures batches are submitted +/// in order. +use std::any::Any; +use std::cell::RefCell; +use std::ffi::{c_char, c_void, CString}; use std::io::Cursor; -use std::sync::Arc; +use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; +use std::thread; +use arrow_array::RecordBatch; use arrow_ipc::reader::StreamReader; use arrow_schema::SchemaRef as ArrowSchemaRef; use iceberg::arrow::schema_to_arrow_schema; @@ -18,8 +26,8 @@ use iceberg::writer::file_writer::location_generator::{ use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; -use parquet::basic::Compression; -use parquet::file::properties::WriterProperties; +use parquet::basic::{Compression, Encoding}; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; /// Compression codec values (must match Julia's CompressionCodec enum) const COMPRESSION_UNCOMPRESSED: i32 = 0; @@ -28,7 +36,6 @@ const COMPRESSION_GZIP: i32 = 2; const COMPRESSION_LZ4: i32 = 3; const COMPRESSION_ZSTD: i32 = 4; -/// Convert FFI compression code to parquet Compression fn compression_from_code(code: i32) -> Compression { match code { COMPRESSION_UNCOMPRESSED => Compression::UNCOMPRESSED, @@ -36,7 +43,52 @@ fn compression_from_code(code: i32) -> Compression { COMPRESSION_GZIP => Compression::GZIP(Default::default()), COMPRESSION_LZ4 => Compression::LZ4, COMPRESSION_ZSTD => Compression::ZSTD(Default::default()), - _ => Compression::SNAPPY, // Default to SNAPPY for unknown values + _ => Compression::SNAPPY, + } +} + +/// Parquet writer properties passed from Julia (must match Julia's ParquetWriterProperties layout). +/// Fields are ordered largest-to-smallest to avoid gaps in the repr(C) layout. +/// Size fields use 0 to mean "use parquet default". +#[repr(C)] +pub struct ParquetWriterPropertiesFFI { + /// Maximum number of rows per row group (0 = parquet default: 1 048 576) + pub max_row_group_size: i64, + /// Target uncompressed data page size in bytes (0 = parquet default: 1 048 576) + pub data_page_size: i64, + /// Number of rows encoded per column chunk within a row group (0 = parquet default: 1024) + pub write_batch_size: i64, + /// Compression codec (see COMPRESSION_* constants) + pub compression_codec: i32, + /// Whether to enable dictionary encoding globally + pub dictionary_enabled: bool, + /// Force PLAIN encoding for all columns, bypassing DELTA_BINARY_PACKED default for INT64/INT32 + pub use_plain_encoding: bool, + /// Collect per-page and per-row-group min/max statistics (default true; set false to skip) + pub statistics_enabled: bool, +} + +impl ParquetWriterPropertiesFFI { + fn to_writer_properties(&self) -> WriterProperties { + let mut builder = WriterProperties::builder() + .set_compression(compression_from_code(self.compression_codec)) + .set_dictionary_enabled(self.dictionary_enabled); + if self.max_row_group_size > 0 { + builder = builder.set_max_row_group_row_count(Some(self.max_row_group_size as usize)); + } + if self.data_page_size > 0 { + builder = builder.set_data_page_size_limit(self.data_page_size as usize); + } + if self.write_batch_size > 0 { + builder = builder.set_write_batch_size(self.write_batch_size as usize); + } + if self.use_plain_encoding { + builder = builder.set_encoding(Encoding::PLAIN); + } + if !self.statistics_enabled { + builder = builder.set_statistics_enabled(EnabledStatistics::None); + } + builder.build() } } @@ -44,6 +96,9 @@ use crate::response::IcebergBoxedResponse; use crate::table::IcebergTable; use crate::transaction::IcebergDataFiles; use crate::util::parse_c_string; +use crate::writer_columns::{ + build_arrow_array_gathered, ColumnDescriptor, GatheredColumnDescriptor, SliceRef, +}; use object_store_ffi::{ export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT, }; @@ -52,12 +107,188 @@ use object_store_ffi::{ type ConcreteDataFileWriter = DataFileWriter; -/// Opaque writer handle for FFI -/// Holds the DataFileWriter which can write RecordBatches and produce DataFiles +/// Encode task submitted to the global worker pool. +struct EncodeTask { + batch: RecordBatch, + state: Arc, +} + +// Safety: RecordBatch is Send; WriterState fields are Send. +unsafe impl Send for EncodeTask {} + +/// Shared mutable state for one IcebergDataFileWriter. +/// Owned by the IcebergDataFileWriter and shared with pool workers via Arc. +pub(crate) struct WriterState { + /// The underlying Parquet writer. Protected by a Mutex so pool workers can access it + /// concurrently (though at most one worker encodes a given writer at a time due to the + /// bounded per-writer channel). Set to None when the writer is closed or freed. + writer: Mutex>, + /// Number of encode tasks submitted to the pool but not yet completed. + pending: AtomicUsize, + /// Notified when `pending` drops to zero, so iceberg_writer_close can wait efficiently. + done_notify: tokio::sync::Notify, + /// First encode error encountered by a pool worker, if any. + error: Mutex>, +} + +// Safety: ConcreteDataFileWriter is Send (verified by its use in spawn_blocking previously). +unsafe impl Send for WriterState {} +unsafe impl Sync for WriterState {} + +/// Global pool of N=available_parallelism encode worker threads shared across all writers. +struct GlobalWorkerPool { + task_tx: tokio::sync::mpsc::Sender, +} + +static GLOBAL_ENCODE_POOL: OnceLock = OnceLock::new(); + +// Thread-local storage for the most recent synchronous gather error. +// Set by iceberg_writer_write_gathered_columns on failure; consumed by iceberg_take_gather_error. +thread_local! { + static LAST_GATHER_ERROR: RefCell> = const { RefCell::new(None) }; +} + +fn store_gather_error(e: &anyhow::Error) { + let msg = format!("{:#}", e); + LAST_GATHER_ERROR.with(|cell| { + *cell.borrow_mut() = CString::new(msg).ok(); + }); +} + +/// Returns a heap-allocated C string with the most recent gather error on this thread, +/// or NULL if none. Must be called on the same thread as the failed write call, immediately +/// after it returns. The caller must free the returned string with `iceberg_destroy_cstring`. +#[no_mangle] +pub extern "C" fn iceberg_take_gather_error() -> *mut c_char { + LAST_GATHER_ERROR.with(|cell| { + cell.borrow_mut() + .take() + .map(|s| s.into_raw()) + .unwrap_or(std::ptr::null_mut()) + }) +} + +/// Formats a Rust panic payload into an anyhow error, preserving the message where possible. +fn format_panic_error(panic: Box) -> anyhow::Error { + let msg = if let Some(s) = panic.downcast_ref::<&str>() { + format!("encode worker panicked: {}", s) + } else if let Some(s) = panic.downcast_ref::() { + format!("encode worker panicked: {}", s) + } else { + "encode worker panicked (no string payload)".to_string() + }; + anyhow::anyhow!(msg) +} + +/// Body of each encode worker thread: receives tasks from the shared channel and encodes them. +fn encode_worker_loop( + task_rx: Arc>>, + handle: tokio::runtime::Handle, +) { + loop { + // Acquire the shared receiver lock, then wait for a task. + // The lock is released as soon as recv() returns, so workers are not serialized + // during encoding — only during task pickup. + let task = { + let mut rx = handle.block_on(task_rx.lock()); + match handle.block_on(rx.recv()) { + Some(t) => t, + None => break, // sender dropped → pool shutting down + } + }; + + // Clone state before moving task into the closure so we can always decrement + // pending even if the closure panics. + let state = task.state.clone(); + let handle_enc = handle.clone(); + let encode_result = catch_unwind(AssertUnwindSafe(move || { + let mut guard = task.state.writer.lock().unwrap_or_else(|e| e.into_inner()); + match guard.as_mut() { + Some(w) => handle_enc + .block_on(w.write(task.batch)) + .map_err(|e| anyhow::anyhow!("write batch: {}", e)), + None => Err(anyhow::anyhow!("writer already closed")), + } + })); + + let err = match encode_result { + Ok(Ok(())) => None, + Ok(Err(e)) => Some(e), + Err(panic) => Some(format_panic_error(panic)), + }; + if let Some(e) = err { + let mut slot = state.error.lock().unwrap_or_else(|e| e.into_inner()); + if slot.is_none() { + *slot = Some(e); + } + } + + // Always decrement pending; notify close() if this was the last task. + let prev = state.pending.fetch_sub(1, Ordering::AcqRel); + if prev == 1 { + state.done_notify.notify_one(); + } + } +} + +/// Desired encode worker count. 0 means "use available_parallelism". +/// Must be set before the first iceberg_writer_new call. +static ENCODE_WORKERS: AtomicUsize = AtomicUsize::new(0); + +/// Set the number of encode worker threads in the global pool. +/// Must be called before any writer is created. Returns 0 on success, 1 if the pool is +/// already initialized (call ignored). +#[no_mangle] +pub extern "C" fn iceberg_set_encode_workers(n: i32) -> i32 { + if GLOBAL_ENCODE_POOL.get().is_some() { + return 1; + } + if n > 0 { + ENCODE_WORKERS.store(n as usize, Ordering::Relaxed); + } + 0 +} + +/// Initialize the global encode pool on first call. +/// Must be called from within a Tokio runtime (iceberg_writer_new satisfies this). +fn get_or_init_encode_pool() -> &'static GlobalWorkerPool { + GLOBAL_ENCODE_POOL.get_or_init(|| { + let configured = ENCODE_WORKERS.load(Ordering::Relaxed); + let n = if configured > 0 { + configured + } else { + // available_parallelism() only fails on unusual platforms (embedded, some sandboxes). + // On Linux/macOS/Windows it always succeeds, so the unwrap never fires in practice. + thread::available_parallelism().unwrap().get() + }; + let handle = tokio::runtime::Handle::current(); + // Buffer 2× workers — drain tasks are rarely blocked on submit. + let (task_tx, task_rx) = tokio::sync::mpsc::channel::(n * 2); + let task_rx = Arc::new(tokio::sync::Mutex::new(task_rx)); + + for i in 0..n { + let task_rx = task_rx.clone(); + let handle = handle.clone(); + thread::Builder::new() + .name(format!("iceberg-encode-{}", i)) + .spawn(move || encode_worker_loop(task_rx, handle)) + .expect("failed to spawn iceberg encode worker"); + } + + GlobalWorkerPool { task_tx } + }) +} + +/// Opaque writer handle for FFI. +/// +/// Writing is pipelined: Julia gathers a RecordBatch and submits it directly to the +/// global encode pool, then returns immediately. Pool workers (N = available_parallelism) +/// encode Parquet concurrently across all active writers. pub struct IcebergDataFileWriter { - pub(crate) writer: Option, - /// The Arrow schema for this table, used by write_columns to create RecordBatches + /// Arrow schema for this table, used by write_columns to create RecordBatches. pub(crate) arrow_schema: ArrowSchemaRef, + /// Shared state: owns the ConcreteDataFileWriter, tracks pending count and errors. + pub(crate) writer_state: Arc, } unsafe impl Send for IcebergDataFileWriter {} @@ -69,25 +300,229 @@ pub type IcebergDataFileWriterResponse = IcebergBoxedResponse; -/// Free a writer +/// Store an error in the writer state (first error wins). +fn store_writer_error(writer_ref: &IcebergDataFileWriter, e: anyhow::Error) { + let mut slot = writer_ref + .writer_state + .error + .lock() + .unwrap_or_else(|e| e.into_inner()); + if slot.is_none() { + *slot = Some(e); + } +} + +/// Build a `RecordBatch` from a slice of `GatheredColumnDescriptor`s. +/// +/// # Safety +/// All pointers inside each `GatheredColumnDescriptor` must be valid for the duration of +/// this call (callers hold `GC.@preserve` or equivalent). +unsafe fn build_record_batch( + arrow_schema: ArrowSchemaRef, + col_descs: I, +) -> Result +where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, +{ + let iter = col_descs.into_iter(); + let mut arrays = Vec::with_capacity(iter.len()); + for (i, desc) in iter.enumerate() { + arrays.push(unsafe { build_arrow_array_gathered(&desc, arrow_schema.field(i))? }); + } + RecordBatch::try_new(arrow_schema, arrays).map_err(|e| anyhow::anyhow!("RecordBatch: {}", e)) +} + +/// Submit a `RecordBatch` to the global encode pool. +/// +/// Increments the writer's pending count before sending and rolls it back on channel failure. +fn submit_batch( + writer_ref: &IcebergDataFileWriter, + pool: &GlobalWorkerPool, + batch: RecordBatch, +) -> Result<(), anyhow::Error> { + writer_ref + .writer_state + .pending + .fetch_add(1, Ordering::AcqRel); + let task = EncodeTask { + batch, + state: writer_ref.writer_state.clone(), + }; + match pool.task_tx.blocking_send(task) { + Ok(()) => Ok(()), + Err(_) => { + let prev = writer_ref + .writer_state + .pending + .fetch_sub(1, Ordering::AcqRel); + if prev == 1 { + writer_ref.writer_state.done_notify.notify_one(); + } + Err(anyhow::anyhow!("encode pool channel closed unexpectedly")) + } + } +} + +/// Validates column count, builds a `RecordBatch` from pre-built gathered descriptors, +/// and submits it to the encode pool. +unsafe fn write_gathered_inner( + writer_ref: &IcebergDataFileWriter, + pool: &GlobalWorkerPool, + arrow_schema: ArrowSchemaRef, + num_columns: usize, + col_descs: I, +) -> Result<(), anyhow::Error> +where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, +{ + if num_columns != arrow_schema.fields().len() { + return Err(anyhow::anyhow!( + "Column count mismatch: got {} but schema has {}", + num_columns, + arrow_schema.fields().len() + )); + } + let batch = unsafe { build_record_batch(arrow_schema, col_descs) }?; + submit_batch(writer_ref, pool, batch) +} + +/// Validates column count, builds a `RecordBatch` from flat `ColumnDescriptor`s (each +/// treated as a single sequential slice), and submits it to the encode pool. +/// +/// Each `SliceRef` is constructed on the stack and used within the same loop iteration, +/// so no heap allocation is needed for the descriptor conversion. +unsafe fn write_columns_inner( + writer_ref: &IcebergDataFileWriter, + pool: &GlobalWorkerPool, + arrow_schema: ArrowSchemaRef, + col_descs: &[ColumnDescriptor], +) -> Result<(), anyhow::Error> { + if col_descs.len() != arrow_schema.fields().len() { + return Err(anyhow::anyhow!( + "Column count mismatch: got {} but schema has {}", + col_descs.len(), + arrow_schema.fields().len() + )); + } + let mut arrays = Vec::with_capacity(col_descs.len()); + for (i, d) in col_descs.iter().enumerate() { + // SliceRef lives on the stack for exactly this iteration; the raw pointer + // is consumed by build_arrow_array_gathered before the next iteration begins. + let slice = SliceRef { + data_ptr: d.data_ptr, + lengths_ptr: d.lengths_ptr, + validity_ptr: d.validity_ptr, + sel_ptr: std::ptr::null(), + len: d.num_rows, + }; + let desc = GatheredColumnDescriptor { + slices: &slice as *const SliceRef, + num_slices: 1, + total_rows: d.num_rows, + column_type: d.column_type, + is_nullable: d.is_nullable, + }; + arrays.push(unsafe { build_arrow_array_gathered(&desc, arrow_schema.field(i))? }); + } + let batch = RecordBatch::try_new(arrow_schema, arrays) + .map_err(|e| anyhow::anyhow!("RecordBatch: {}", e))?; + submit_batch(writer_ref, pool, batch) +} + +/// Gather column data from Julia memory into Arrow arrays in the calling thread, then +/// submit the RecordBatch to the global encode pool asynchronously. +/// +/// Julia keeps source arrays alive via `GC.@preserve` for the duration of this call. +/// After this function returns, all Julia pointers have been consumed and Julia may safely +/// release the source data. Encode is still asynchronous in the global pool; call +/// `iceberg_writer_close` to wait for all pending encodes. +/// +/// Returns 0 on success, -1 on error (error stored in writer state, propagated on close). +#[no_mangle] +pub extern "C" fn iceberg_writer_write_gathered_columns( + writer: *mut IcebergDataFileWriter, + columns: *const GatheredColumnDescriptor, + num_columns: usize, +) -> i32 { + if writer.is_null() || columns.is_null() || num_columns == 0 { + return -1; + } + let writer_ref = unsafe { &*writer }; + let pool = match GLOBAL_ENCODE_POOL.get() { + Some(p) => p, + None => { + eprintln!("[iceberg] encode pool not initialized; call iceberg_writer_new first"); + return -1; + } + }; + let arrow_schema = writer_ref.arrow_schema.clone(); + let col_descs = unsafe { std::slice::from_raw_parts(columns, num_columns) }; + if let Err(e) = unsafe { + write_gathered_inner( + writer_ref, + pool, + arrow_schema, + num_columns, + col_descs.iter().copied(), + ) + } { + store_gather_error(&e); + store_writer_error(writer_ref, e); + return -1; + } + 0 +} + +/// Synchronous write of flat column data: copies each column from Julia memory into +/// Rust-owned Arrow arrays in the calling thread, then submits to the global encode +/// pool asynchronously. +/// +/// Each `ColumnDescriptor` is treated as a single sequential slice (no scatter/gather). +/// Returns 0 on success, -1 on error (error stored in writer state, propagated on close). +#[no_mangle] +pub extern "C" fn iceberg_writer_write_columns( + writer: *mut IcebergDataFileWriter, + columns: *const ColumnDescriptor, + num_columns: usize, +) -> i32 { + if writer.is_null() || columns.is_null() || num_columns == 0 { + return -1; + } + let writer_ref = unsafe { &*writer }; + let pool = match GLOBAL_ENCODE_POOL.get() { + Some(p) => p, + None => { + eprintln!("[iceberg] encode pool not initialized; call iceberg_writer_new first"); + return -1; + } + }; + let arrow_schema = writer_ref.arrow_schema.clone(); + let col_descs = unsafe { std::slice::from_raw_parts(columns, num_columns) }; + if let Err(e) = unsafe { write_columns_inner(writer_ref, pool, arrow_schema, col_descs) } { + store_writer_error(writer_ref, e); + return -1; + } + 0 +} + +/// Free a writer. Poisons the writer state so any in-flight pool tasks fail gracefully. #[no_mangle] pub extern "C" fn iceberg_writer_free(writer: *mut IcebergDataFileWriter) { if !writer.is_null() { unsafe { - let _ = Box::from_raw(writer); + let boxed = Box::from_raw(writer); + // Poison the ConcreteDataFileWriter so any in-flight pool tasks return an error + // rather than writing to a partially-freed writer. + let _ = boxed.writer_state.writer.lock().unwrap().take(); } } } -// Create a new DataFileWriter from a table with configuration options -// -// This creates the full writer chain: -// ParquetWriterBuilder -> RollingFileWriterBuilder -> DataFileWriterBuilder -> build() +// Create a new DataFileWriter from a table with configuration options. // -// Configuration options: -// - `prefix`: File name prefix (e.g., "data" produces files like "data-xxx.parquet") -// - `target_file_size_bytes`: Target size for rolling to a new file (0 = use default 512 MB) -// - `compression_codec`: Compression codec (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZ4, 4=ZSTD) +// The global encode pool (N = available_parallelism threads) is initialized on the first call. export_runtime_op!( iceberg_writer_new, IcebergDataFileWriterResponse, @@ -95,14 +530,18 @@ export_runtime_op!( if table.is_null() { return Err(anyhow::anyhow!("Null table pointer provided")); } + if parquet_props.is_null() { + return Err(anyhow::anyhow!("Null parquet_props pointer provided")); + } let prefix_str = parse_c_string(prefix, "prefix")?; let table_ref = unsafe { &*table }; - Ok((table_ref, prefix_str, target_file_size_bytes, compression_codec)) + let props = unsafe { &*parquet_props }; + Ok((table_ref, prefix_str, target_file_size_bytes, props.to_writer_properties())) }, result_tuple, async { - let (table_ref, prefix_str, target_file_size_bytes, compression_codec) = result_tuple; + let (table_ref, prefix_str, target_file_size_bytes, writer_props) = result_tuple; let table = &table_ref.table; // Create LocationGenerator from table metadata @@ -116,12 +555,6 @@ export_runtime_op!( DataFileFormat::Parquet, ); - // Create WriterProperties with compression - let compression = compression_from_code(compression_codec); - let writer_props = WriterProperties::builder() - .set_compression(compression) - .build(); - // Create ParquetWriterBuilder with table schema and configured properties let parquet_writer_builder = ParquetWriterBuilder::new( writer_props, @@ -146,11 +579,8 @@ export_runtime_op!( ) }; - // Create DataFileWriterBuilder - let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); - - // Build the writer (no partition key for basic writes) - let writer = data_file_writer_builder + // Build the concrete DataFileWriter + let concrete_writer = DataFileWriterBuilder::new(rolling_file_writer_builder) .build(None) .await .map_err(|e| anyhow::anyhow!("Failed to build data file writer: {}", e))?; @@ -161,81 +591,83 @@ export_runtime_op!( .map_err(|e| anyhow::anyhow!("Failed to convert schema to Arrow: {}", e))? ); + // Initialize global pool (no-op if already running). + get_or_init_encode_pool(); + + let writer_state = Arc::new(WriterState { + writer: Mutex::new(Some(concrete_writer)), + pending: AtomicUsize::new(0), + done_notify: tokio::sync::Notify::new(), + error: Mutex::new(None), + }); + Ok::(IcebergDataFileWriter { - writer: Some(writer), arrow_schema, + writer_state, }) }, table: *mut IcebergTable, prefix: *const c_char, target_file_size_bytes: i64, - compression_codec: i32 + parquet_props: *const ParquetWriterPropertiesFFI ); -// Write Arrow IPC data to the writer -// -// The data must be in Arrow IPC stream format (as produced by Arrow.tobuffer in Julia). -// This deserializes the IPC data to RecordBatch and writes it to the Parquet file. -export_runtime_op!( - iceberg_writer_write, - crate::IcebergResponse, - || { - if writer.is_null() { - return Err(anyhow::anyhow!("Null writer pointer provided")); - } - if arrow_ipc_data.is_null() { - return Err(anyhow::anyhow!("Null arrow_ipc_data pointer provided")); - } - if arrow_ipc_len == 0 { - return Err(anyhow::anyhow!("Arrow IPC data length is zero")); - } +/// Write Arrow IPC data synchronously: copy IPC bytes from Julia, deserialize the stream, +/// and submit each RecordBatch to the global encode pool. +/// +/// Returns 0 on success, -1 on error (error stored in writer state, propagated on close). +#[no_mangle] +pub extern "C" fn iceberg_writer_write( + writer: *mut IcebergDataFileWriter, + arrow_ipc_data: *const u8, + arrow_ipc_len: usize, +) -> i32 { + if writer.is_null() || arrow_ipc_data.is_null() || arrow_ipc_len == 0 { + return -1; + } - // Copy the IPC data into a Vec for safe use across await points - let ipc_bytes = unsafe { - std::slice::from_raw_parts(arrow_ipc_data, arrow_ipc_len).to_vec() - }; + let writer_ref = unsafe { &*writer }; - let writer_ref = unsafe { &mut *writer }; - Ok((writer_ref, ipc_bytes)) - }, - result_tuple, - async { - let (writer_ref, ipc_bytes) = result_tuple; + let pool = match GLOBAL_ENCODE_POOL.get() { + Some(p) => p, + None => { + eprintln!("[iceberg:sync] encode pool not initialized; call iceberg_writer_new first"); + return -1; + } + }; - // Deserialize Arrow IPC to RecordBatch - let cursor = Cursor::new(ipc_bytes); - let mut reader = StreamReader::try_new(cursor, None) - .map_err(|e| anyhow::anyhow!("Failed to create Arrow IPC reader: {}", e))?; + let ipc_bytes = unsafe { std::slice::from_raw_parts(arrow_ipc_data, arrow_ipc_len).to_vec() }; - // Get the writer - let writer = writer_ref - .writer - .as_mut() - .ok_or_else(|| anyhow::anyhow!("Writer has been closed"))?; - - // Read and write all batches from the IPC stream - while let Some(batch_result) = reader.next() { - let batch = batch_result - .map_err(|e| anyhow::anyhow!("Failed to read Arrow IPC batch: {}", e))?; - writer - .write(batch) - .await - .map_err(|e| anyhow::anyhow!("Failed to write batch: {}", e))?; + let cursor = Cursor::new(ipc_bytes); + let reader = match StreamReader::try_new(cursor, None) { + Ok(r) => r, + Err(e) => { + store_writer_error(writer_ref, anyhow::anyhow!("IPC reader: {}", e)); + return -1; } + }; - Ok::<(), anyhow::Error>(()) - }, - writer: *mut IcebergDataFileWriter, - arrow_ipc_data: *const u8, - arrow_ipc_len: usize -); + for batch_result in reader { + let batch = match batch_result { + Ok(b) => b, + Err(e) => { + store_writer_error(writer_ref, anyhow::anyhow!("IPC batch: {}", e)); + return -1; + } + }; + if let Err(e) = submit_batch(writer_ref, pool, batch) { + store_writer_error(writer_ref, e); + return -1; + } + } -// Close the writer and return the produced DataFiles -// -// This flushes any remaining data, closes the Parquet file(s), and returns -// the DataFiles metadata that can be used with fast_append! in a Transaction. + 0 +} + +// Close the writer and return the produced DataFiles. // -// After calling this, the writer cannot be used again. +// Waits for all pending pool encodes to complete, then finalizes the Parquet file +// and returns the DataFiles metadata. export_runtime_op!( iceberg_writer_close, IcebergWriterCloseResponse, @@ -248,14 +680,45 @@ export_runtime_op!( }, writer_ref, async { - // Take the writer out - let mut writer = writer_ref + // Wait for all pending pool encodes to complete. + // Uses a timeout to guard against a dead worker thread (e.g. panic outside + // catch_unwind) that would otherwise leave pending > 0 forever. + // Tokio's Notify preserves the notification if notify_one() fired before + // notified() is polled, so the check-then-wait sequence is race-free. + let state = &writer_ref.writer_state; + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120); + loop { + if state.pending.load(Ordering::Acquire) == 0 { + break; + } + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + return Err(anyhow::anyhow!( + "Timed out waiting for {} encode task(s) to complete; \ + an encode worker may have crashed", + state.pending.load(Ordering::Acquire) + )); + } + tokio::select! { + _ = state.done_notify.notified() => {} + _ = tokio::time::sleep(remaining) => {} + } + } + + // Propagate any encode error + if let Some(e) = state.error.lock().unwrap().take() { + return Err(e); + } + + // Take the concrete writer and finalize the Parquet file + let mut concrete = state .writer + .lock() + .unwrap() .take() - .ok_or_else(|| anyhow::anyhow!("Writer has already been closed"))?; + .ok_or_else(|| anyhow::anyhow!("Writer already closed"))?; - // Close the writer to get the DataFiles - let data_files = writer + let data_files = concrete .close() .await .map_err(|e| anyhow::anyhow!("Failed to close writer: {}", e))?; diff --git a/iceberg_rust_ffi/src/writer_columns.rs b/iceberg_rust_ffi/src/writer_columns.rs index f892d0c..7e1665a 100644 --- a/iceberg_rust_ffi/src/writer_columns.rs +++ b/iceberg_rust_ffi/src/writer_columns.rs @@ -11,16 +11,9 @@ use arrow_array::{ Date32Type, Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, TimestampMicrosecondType, }, - ArrayRef, BooleanArray, PrimitiveArray, RecordBatch, StringArray, -}; -use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer}; -use arrow_schema::DataType; - -use crate::writer::IcebergDataFileWriter; -use iceberg::writer::IcebergWriter; -use object_store_ffi::{ - export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT, + ArrayRef, BooleanArray, PrimitiveArray, StringArray, }; +use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; /// Column type codes (must match Julia's ColumnType enum) pub const COLUMN_TYPE_INT32: i32 = 0; @@ -65,248 +58,318 @@ pub struct ColumnDescriptor { unsafe impl Send for ColumnDescriptor {} unsafe impl Sync for ColumnDescriptor {} -/// Build an Arrow array from a ColumnDescriptor and its corresponding schema field. -/// The schema field is used to extract precision/scale for decimal columns. -unsafe fn build_arrow_array( - desc: &ColumnDescriptor, +// ============================================================================= +// Scattered-gather writer: pass raw source pointers + selection indices to Rust, +// which gathers the data directly into Arrow arrays — eliminating the Julia-side +// staging copy for non-converting numeric column types. +// ============================================================================= + +/// A reference to one slice of source column data. +/// `sel_ptr = null` → sequential (identity) access: read data[0..len]. +/// `sel_ptr != null` → scattered access: read data[sel[i]-1] for i in 0..len (1-based Julia indices). +/// `validity_ptr = null` → all rows valid (non-nullable or known all-valid slice). +/// `lengths_ptr != null` → string column: data_ptr is Ptr{UInt8}[], lengths_ptr is Int64[] of byte lengths per string. +/// Fields are all 8 bytes — no padding, total 40 bytes. +#[repr(C)] +#[derive(Clone, Copy)] +pub struct SliceRef { + pub data_ptr: *const c_void, + pub lengths_ptr: *const i64, + pub validity_ptr: *const u8, + pub sel_ptr: *const i64, + pub len: usize, +} + +unsafe impl Send for SliceRef {} +unsafe impl Sync for SliceRef {} + +/// Gathered column descriptor: gather `num_slices` SliceRefs into one Arrow column. +/// `total_rows` must equal the sum of all `slice.len` values. +/// Fields ordered largest-to-smallest; 3 bytes trailing padding → 32 bytes total. +#[repr(C)] +#[derive(Clone, Copy)] +pub struct GatheredColumnDescriptor { + pub slices: *const SliceRef, + pub num_slices: usize, + pub total_rows: usize, + pub column_type: i32, + pub is_nullable: bool, +} + +unsafe impl Send for GatheredColumnDescriptor {} +unsafe impl Sync for GatheredColumnDescriptor {} + +/// Merges the per-slice validity bitmaps from all slices into a single output bitmap. +/// +/// Each slice contributes `slice.len` output rows. Slices with a null `validity_ptr` are +/// all-valid. Slices with a bitmap may be misaligned relative to the output (each slice +/// starts at a different `out` offset), so bits are copied one at a time with a shift. +/// The selection vector (`sel_ptr`) governs which *source data* elements to read; the +/// validity bitmap is always indexed by output row position, so sequential and scattered +/// slices are treated identically here. +/// +/// Returns `None` if every slice is all-valid (no null buffer needed). +unsafe fn build_null_buffer_gathered(slices: &[SliceRef], total_rows: usize) -> Option { + if !slices.iter().any(|s| !s.validity_ptr.is_null()) { + return None; + } + let mut bits = vec![0u8; (total_rows + 7) / 8]; + let mut out = 0usize; + for slice in slices { + if slice.validity_ptr.is_null() { + // All rows in this slice are valid — set one bit per output row. + for i in 0..slice.len { + bits[(out + i) / 8] |= 1u8 << ((out + i) % 8); + } + } else { + // Copy validity bits from the slice's bitmap into the output bitmap, + // re-aligning from source bit position i to output bit position (out + i). + for i in 0..slice.len { + let b = (*slice.validity_ptr.add(i / 8) >> (i % 8)) & 1; + bits[(out + i) / 8] |= b << ((out + i) % 8); + } + } + out += slice.len; + } + Some(NullBuffer::new(BooleanBuffer::new( + Buffer::from(bits), + 0, + total_rows, + ))) +} + +/// Gather all slices for a column into an Arrow array. +pub(crate) unsafe fn build_arrow_array_gathered( + desc: &GatheredColumnDescriptor, schema_field: &arrow_schema::Field, ) -> Result { - let null_buffer = if desc.is_nullable && !desc.validity_ptr.is_null() { - // Julia's BitVector stores bits packed in UInt64 chunks, which we can use directly - // since Arrow also uses little-endian bit-packed format. - // We just need to copy the right number of bytes. - let num_bytes = (desc.num_rows + 7) / 8; - let validity_slice = std::slice::from_raw_parts(desc.validity_ptr, num_bytes); - Some(NullBuffer::new(BooleanBuffer::new( - Buffer::from(validity_slice.to_vec()), - 0, - desc.num_rows, - ))) + let slices = std::slice::from_raw_parts(desc.slices, desc.num_slices); + let total = desc.total_rows; + let null_buf = if desc.is_nullable { + build_null_buffer_gathered(slices, total) } else { None }; + // Macro gathers a primitive numeric type from all slices. + // sel_ptr=null → sequential copy; sel_ptr!=null → indirect gather (1-based indices). + macro_rules! gather_primitive { + ($T:ty, $ArrowType:ty) => {{ + let mut values = Vec::<$T>::with_capacity(total); + for slice in slices { + let src = slice.data_ptr as *const $T; + if slice.sel_ptr.is_null() { + values.extend_from_slice(std::slice::from_raw_parts(src, slice.len)); + } else { + for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { + values.push(*src.add((idx - 1) as usize)); + } + } + } + Arc::new(PrimitiveArray::<$ArrowType>::new( + ScalarBuffer::from(values), + null_buf, + )) as ArrayRef + }}; + } + let array: ArrayRef = match desc.column_type { - COLUMN_TYPE_INT32 => { - let data = std::slice::from_raw_parts(desc.data_ptr as *const i32, desc.num_rows); - let buffer = ScalarBuffer::from(data.to_vec()); - Arc::new(PrimitiveArray::::new(buffer, null_buffer)) - } - COLUMN_TYPE_INT64 => { - let data = std::slice::from_raw_parts(desc.data_ptr as *const i64, desc.num_rows); - let buffer = ScalarBuffer::from(data.to_vec()); - Arc::new(PrimitiveArray::::new(buffer, null_buffer)) - } - COLUMN_TYPE_FLOAT32 => { - let data = std::slice::from_raw_parts(desc.data_ptr as *const f32, desc.num_rows); - let buffer = ScalarBuffer::from(data.to_vec()); - Arc::new(PrimitiveArray::::new(buffer, null_buffer)) - } - COLUMN_TYPE_FLOAT64 => { - let data = std::slice::from_raw_parts(desc.data_ptr as *const f64, desc.num_rows); - let buffer = ScalarBuffer::from(data.to_vec()); - Arc::new(PrimitiveArray::::new(buffer, null_buffer)) - } - COLUMN_TYPE_DATE => { - // Date is stored as Int32 (days since epoch) - let data = std::slice::from_raw_parts(desc.data_ptr as *const i32, desc.num_rows); - let buffer = ScalarBuffer::from(data.to_vec()); - Arc::new(PrimitiveArray::::new(buffer, null_buffer)) - } + COLUMN_TYPE_INT32 => gather_primitive!(i32, Int32Type), + COLUMN_TYPE_INT64 => gather_primitive!(i64, Int64Type), + COLUMN_TYPE_FLOAT32 => gather_primitive!(f32, Float32Type), + COLUMN_TYPE_FLOAT64 => gather_primitive!(f64, Float64Type), + COLUMN_TYPE_DATE => gather_primitive!(i32, Date32Type), COLUMN_TYPE_TIMESTAMP => { - // Timestamp without timezone (Iceberg `timestamp`) - // Stored as Int64 microseconds since epoch - let data = std::slice::from_raw_parts(desc.data_ptr as *const i64, desc.num_rows); - let buffer = ScalarBuffer::from(data.to_vec()); + let mut values = Vec::::with_capacity(total); + for slice in slices { + let src = slice.data_ptr as *const i64; + if slice.sel_ptr.is_null() { + values.extend_from_slice(std::slice::from_raw_parts(src, slice.len)); + } else { + for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { + values.push(*src.add((idx - 1) as usize)); + } + } + } Arc::new(PrimitiveArray::::new( - buffer, - null_buffer, + ScalarBuffer::from(values), + null_buf, )) } COLUMN_TYPE_TIMESTAMPTZ => { - // Timestamp with UTC timezone (Iceberg `timestamptz`) - // Stored as Int64 microseconds since epoch, with timezone metadata - let data = std::slice::from_raw_parts(desc.data_ptr as *const i64, desc.num_rows); - let buffer = ScalarBuffer::from(data.to_vec()); + let mut values = Vec::::with_capacity(total); + for slice in slices { + let src = slice.data_ptr as *const i64; + if slice.sel_ptr.is_null() { + values.extend_from_slice(std::slice::from_raw_parts(src, slice.len)); + } else { + for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { + values.push(*src.add((idx - 1) as usize)); + } + } + } Arc::new( - PrimitiveArray::::new(buffer, null_buffer) - .with_timezone("UTC"), + PrimitiveArray::::new( + ScalarBuffer::from(values), + null_buf, + ) + .with_timezone("UTC"), ) } COLUMN_TYPE_BOOLEAN => { - let data = std::slice::from_raw_parts(desc.data_ptr as *const u8, desc.num_rows); - // Convert bytes to boolean buffer - let mut bits = vec![0u8; (desc.num_rows + 7) / 8]; - for (i, &val) in data.iter().enumerate() { - if val != 0 { - bits[i / 8] |= 1 << (i % 8); + let mut bits = vec![0u8; (total + 7) / 8]; + let mut out = 0usize; + for slice in slices { + let src = slice.data_ptr as *const u8; + if slice.sel_ptr.is_null() { + let data = std::slice::from_raw_parts(src, slice.len); + for (i, &v) in data.iter().enumerate() { + if v != 0 { + bits[(out + i) / 8] |= 1 << ((out + i) % 8); + } + } + } else { + for (i, &idx) in std::slice::from_raw_parts(slice.sel_ptr, slice.len) + .iter() + .enumerate() + { + if *src.add((idx - 1) as usize) != 0 { + bits[(out + i) / 8] |= 1 << ((out + i) % 8); + } + } } + out += slice.len; } - let values = BooleanBuffer::new(Buffer::from(bits), 0, desc.num_rows); - Arc::new(BooleanArray::new(values, null_buffer)) + Arc::new(BooleanArray::new( + BooleanBuffer::new(Buffer::from(bits), 0, total), + null_buf, + )) } COLUMN_TYPE_STRING => { - // String data passed from Julia: - // - data_ptr: pointer to array of string pointers (each pointing to UTF-8 bytes) - // - lengths_ptr: pointer to array of string lengths (Int64) - // Note: While we avoid copying on the Julia side (just passing pointers), - // Arrow's StringArray copies the data into its own contiguous buffer below. - if desc.lengths_ptr.is_null() { - return Err(anyhow::anyhow!("String column requires lengths")); - } - let str_ptrs = - std::slice::from_raw_parts(desc.data_ptr as *const *const u8, desc.num_rows); - let str_lens = std::slice::from_raw_parts(desc.lengths_ptr, desc.num_rows); - - // Build string references, then Arrow copies them into its internal buffer - let mut strings: Vec> = Vec::with_capacity(desc.num_rows); - for i in 0..desc.num_rows { - let is_null: bool = if let Some(ref nb) = null_buffer { - nb.is_null(i) - } else { - false - }; - if is_null { - strings.push(None); - } else { - let ptr = str_ptrs[i]; - let len = str_lens[i] as usize; - let bytes = std::slice::from_raw_parts(ptr, len); - let s = std::str::from_utf8(bytes) - .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in string column: {}", e))?; - strings.push(Some(s)); + // String columns do not support selection vectors. Julia strings are + // heap-allocated with non-contiguous addresses, so the caller must build + // str_ptrs/str_lens arrays up-front — any row selection is already applied + // before add_string_slice! is called. sel_ptr is therefore always null here. + // data_ptr = *const *const u8, lengths_ptr = *const i64. + // + // Build the Arrow StringArray directly: one pass copies string bytes into a + // contiguous values buffer and tracks cumulative offsets. This avoids the + // intermediate Vec> and skips UTF-8 validation — Julia strings + // are guaranteed valid UTF-8. + let null_buf = if desc.is_nullable { + build_null_buffer_gathered(slices, total) + } else { + None + }; + let mut offsets = Vec::::with_capacity(total + 1); + offsets.push(0i32); + let mut values = Vec::::new(); + for slice in slices { + if slice.lengths_ptr.is_null() { + return Err(anyhow::anyhow!("String column requires lengths_ptr")); + } + let ptrs = + std::slice::from_raw_parts(slice.data_ptr as *const *const u8, slice.len); + let lens = std::slice::from_raw_parts(slice.lengths_ptr, slice.len); + for i in 0..slice.len { + let is_null = !slice.validity_ptr.is_null() + && ((*slice.validity_ptr.add(i / 8) >> (i % 8)) & 1) == 0; + if !is_null { + values.extend_from_slice(std::slice::from_raw_parts( + ptrs[i], + lens[i] as usize, + )); + } + offsets.push(values.len() as i32); } } - Arc::new(StringArray::from(strings)) + // SAFETY: offsets are monotonically non-decreasing by construction; values + // bytes come from Julia String objects (valid UTF-8) kept alive in col.preserve. + Arc::new(unsafe { + StringArray::new_unchecked( + OffsetBuffer::new(ScalarBuffer::from(offsets)), + Buffer::from_vec(values), + null_buf, + ) + }) } COLUMN_TYPE_UUID => { - // UUID is stored as 16 bytes (UInt128 in Julia) - // Store as fixed-size binary (16 bytes per value) - let data = std::slice::from_raw_parts(desc.data_ptr as *const u8, desc.num_rows * 16); - - // Build the array using the builder or from_iter_values - let values: Vec<&[u8]> = data.chunks(16).collect(); + let mut data: Vec = Vec::with_capacity(total * 16); + for slice in slices { + let src = slice.data_ptr as *const u8; + if slice.sel_ptr.is_null() { + data.extend_from_slice(std::slice::from_raw_parts(src, slice.len * 16)); + } else { + for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { + data.extend_from_slice(std::slice::from_raw_parts( + src.add((idx - 1) as usize * 16), + 16, + )); + } + } + } + let chunks: Vec<&[u8]> = data.chunks(16).collect(); Arc::new( - arrow_array::FixedSizeBinaryArray::try_from_iter(values.into_iter()) - .map_err(|e| anyhow::anyhow!("Failed to create UUID array: {}", e))?, + arrow_array::FixedSizeBinaryArray::try_from_iter(chunks.into_iter()) + .map_err(|e| anyhow::anyhow!("Failed to build UUID array: {}", e))?, ) } COLUMN_TYPE_DECIMAL_INT32 | COLUMN_TYPE_DECIMAL_INT64 | COLUMN_TYPE_DECIMAL_INT128 => { - // All decimal variants map to Arrow Decimal128. Precision and scale come from - // the schema field (set when the Iceberg table was created). let (precision, scale) = match schema_field.data_type() { - DataType::Decimal128(p, s) => (*p, *s), - dt => { - return Err(anyhow::anyhow!( - "Expected Decimal128 schema field for decimal column, got {:?}", - dt - )) - } + arrow_schema::DataType::Decimal128(p, s) => (*p, *s), + dt => return Err(anyhow::anyhow!("Expected Decimal128, got {:?}", dt)), }; - - // Convert the backing integer representation to i128 values - let i128_values: Vec = match desc.column_type { - COLUMN_TYPE_DECIMAL_INT32 => { - let data = - std::slice::from_raw_parts(desc.data_ptr as *const i32, desc.num_rows); - data.iter().map(|&v| v as i128).collect() + let mut values = Vec::::with_capacity(total); + for slice in slices { + match desc.column_type { + COLUMN_TYPE_DECIMAL_INT32 => { + let src = slice.data_ptr as *const i32; + if slice.sel_ptr.is_null() { + values.extend( + std::slice::from_raw_parts(src, slice.len) + .iter() + .map(|&v| v as i128), + ); + } else { + for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { + values.push(*src.add((idx - 1) as usize) as i128); + } + } + } + COLUMN_TYPE_DECIMAL_INT64 => { + let src = slice.data_ptr as *const i64; + if slice.sel_ptr.is_null() { + values.extend( + std::slice::from_raw_parts(src, slice.len) + .iter() + .map(|&v| v as i128), + ); + } else { + for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { + values.push(*src.add((idx - 1) as usize) as i128); + } + } + } + _ => { + // DECIMAL_INT128: i128 layout matches Julia Int128 + let src = slice.data_ptr as *const i128; + if slice.sel_ptr.is_null() { + values.extend_from_slice(std::slice::from_raw_parts(src, slice.len)); + } else { + for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { + values.push(*src.add((idx - 1) as usize)); + } + } + } } - COLUMN_TYPE_DECIMAL_INT64 => { - let data = - std::slice::from_raw_parts(desc.data_ptr as *const i64, desc.num_rows); - data.iter().map(|&v| v as i128).collect() - } - _ => { - // COLUMN_TYPE_DECIMAL_INT128: Julia Int128 and Rust i128 share the same - // 16-byte little-endian layout on all supported platforms. - let data = - std::slice::from_raw_parts(desc.data_ptr as *const i128, desc.num_rows); - data.to_vec() - } - }; - - let buffer = ScalarBuffer::::from(i128_values); + } Arc::new( - PrimitiveArray::::new(buffer, null_buffer) + PrimitiveArray::::new(ScalarBuffer::from(values), null_buf) .with_precision_and_scale(precision, scale) - .map_err(|e| anyhow::anyhow!("Failed to set decimal precision/scale: {}", e))?, + .map_err(|e| anyhow::anyhow!("Decimal precision/scale: {}", e))?, ) } - _ => { - return Err(anyhow::anyhow!("Unknown column type: {}", desc.column_type)); - } + _ => return Err(anyhow::anyhow!("Unknown column type: {}", desc.column_type)), }; - Ok(array) } - -// Write columns directly to the Parquet writer. -// Accepts an array of ColumnDescriptors and builds a RecordBatch from them, -// then writes to the underlying Parquet writer. -// The caller must ensure all pointers are valid and point to appropriately sized data. -export_runtime_op!( - iceberg_writer_write_columns, - crate::IcebergResponse, - || { - if writer.is_null() { - return Err(anyhow::anyhow!("Null writer pointer provided")); - } - if columns.is_null() || num_columns == 0 { - return Err(anyhow::anyhow!("No columns provided")); - } - - // Copy column descriptors for safe use across await - let cols: Vec = unsafe { - std::slice::from_raw_parts(columns, num_columns).to_vec() - }; - - let writer_ref = unsafe { &mut *writer }; - Ok((writer_ref, cols)) - }, - result_tuple, - async { - let (writer_ref, cols) = result_tuple; - - // Get the writer's schema (stored when writer was created) - let arrow_schema = writer_ref.arrow_schema.clone(); - - // Validate column count matches schema - if cols.len() != arrow_schema.fields().len() { - return Err(anyhow::anyhow!( - "Column count mismatch: got {} columns but schema has {} fields", - cols.len(), - arrow_schema.fields().len() - )); - } - - // Get the writer - let iceberg_writer = writer_ref - .writer - .as_mut() - .ok_or_else(|| anyhow::anyhow!("Writer has been closed"))?; - - // Build Arrow arrays from column descriptors - let mut arrays: Vec = Vec::with_capacity(cols.len()); - - for (i, desc) in cols.iter().enumerate() { - let schema_field = arrow_schema.field(i); - let array = unsafe { build_arrow_array(desc, schema_field)? }; - arrays.push(array); - } - - // Create record batch using the table's Arrow schema (with proper field IDs) - let batch = RecordBatch::try_new(arrow_schema, arrays) - .map_err(|e| anyhow::anyhow!("Failed to create RecordBatch: {}", e))?; - - // Write the batch - iceberg_writer - .write(batch) - .await - .map_err(|e| anyhow::anyhow!("Failed to write batch: {}", e))?; - - Ok::<(), anyhow::Error>(()) - }, - writer: *mut IcebergDataFileWriter, - columns: *const ColumnDescriptor, - num_columns: usize -); diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index 3cdeeff..41551f6 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -33,13 +33,14 @@ export IcebergTimestampNs, IcebergTimestamptzNs export IcebergString, IcebergUuid, IcebergBinary, IcebergDecimal export Transaction, DataFiles, free_transaction!, free_data_files!, commit, transaction export FastAppendAction, free_fast_append_action!, add_data_files, apply, with_fast_append -export DataFileWriter, free_writer!, close_writer, write_columns +export DataFileWriter, free_writer!, close_writer, write_columns, set_encode_workers! export WriterConfig, CompressionCodec, UNCOMPRESSED, SNAPPY, GZIP, LZ4, ZSTD -export ColumnDescriptor, ColumnType +export ColumnDescriptor, ColumnBatch, ColumnType export COLUMN_TYPE_INT32, COLUMN_TYPE_INT64, COLUMN_TYPE_FLOAT32, COLUMN_TYPE_FLOAT64 export COLUMN_TYPE_STRING, COLUMN_TYPE_DATE, COLUMN_TYPE_TIMESTAMP, COLUMN_TYPE_TIMESTAMPTZ, COLUMN_TYPE_BOOLEAN, COLUMN_TYPE_UUID export COLUMN_TYPE_DECIMAL_INT32, COLUMN_TYPE_DECIMAL_INT64, COLUMN_TYPE_DECIMAL_INT128 export julia_type_to_column_type +export GatheredColumn, GatheredBatch, add_slice!, add_string_slice! # Always use the JLL library - override via Preferences if needed for local development # To use a local build, set the preference: diff --git a/src/writer.jl b/src/writer.jl index 563d01a..901144a 100644 --- a/src/writer.jl +++ b/src/writer.jl @@ -36,16 +36,24 @@ Configuration options for the DataFileWriter. # Fields - `prefix::String`: Prefix for output file names (default: "data") -- `target_file_size_bytes::Int`: Target size for rolling to a new file (default: 512 MB) -- `compression::CompressionCodec`: Compression codec for Parquet files (default: UNCOMPRESSED) +- `target_file_size_bytes::Int`: Target file size before rolling to a new file (default: 512 MB) +- `compression::CompressionCodec`: Parquet compression codec (default: UNCOMPRESSED) +- `dictionary_enabled::Bool`: Enable Parquet dictionary encoding globally (default: true) +- `max_row_group_size::Int`: Maximum rows per Parquet row group, 0 = parquet default (1 048 576) +- `data_page_size::Int`: Target uncompressed data page size in bytes, 0 = parquet default (1 048 576) +- `write_batch_size::Int`: Rows encoded per column chunk within a row group, 0 = parquet default (1024). + Increasing this (e.g. 65536) reduces encoding overhead for large row groups. +- `plain_encoding::Bool`: Force PLAIN encoding for all columns, bypassing the DELTA_BINARY_PACKED + default that parquet-rs uses for INT64/INT32 under PARQUET_2_0 (default: false). # Example ```julia -# Override defaults: use smaller file size and ZSTD compression config = WriterConfig( prefix = "my_data", - target_file_size_bytes = 128 * 1024 * 1024, # 128 MB (default is 512 MB) - compression = ZSTD + compression = ZSTD, + dictionary_enabled = false, # disable dict encoding for benchmarking + plain_encoding = true, # use PLAIN to avoid DELTA_BINARY_PACKED overhead + write_batch_size = 65536, # larger batches for better throughput ) writer = DataFileWriter(table, config) ``` @@ -54,6 +62,24 @@ writer = DataFileWriter(table, config) prefix::String = "data" target_file_size_bytes::Int = DEFAULT_TARGET_FILE_SIZE_BYTES compression::CompressionCodec = UNCOMPRESSED + dictionary_enabled::Bool = true + max_row_group_size::Int = 0 + data_page_size::Int = 0 + write_batch_size::Int = 0 + plain_encoding::Bool = false + statistics_enabled::Bool = true +end + +# C-layout struct matching ParquetWriterPropertiesFFI in Rust. +# Fields are ordered largest-to-smallest to match repr(C) padding rules. +struct ParquetWriterPropertiesFFI + max_row_group_size::Int64 + data_page_size::Int64 + write_batch_size::Int64 + compression_codec::Int32 + dictionary_enabled::Bool + use_plain_encoding::Bool + statistics_enabled::Bool end """ @@ -120,6 +146,24 @@ function get_column_metadata(table::Table)::Dict{Symbol, Vector{Pair{String, Str return colmeta end +""" + set_encode_workers!(n::Int) + +Set the number of threads in the global Parquet encode worker pool. + +Must be called before the first `DataFileWriter` is created (i.e. before the pool is +initialized). Throws if the pool is already running. Defaults to `Sys.CPU_THREADS` +if not set. +""" +function set_encode_workers!(n::Int) + n > 0 || throw(ArgumentError("n must be positive, got $n")) + ret = @ccall rust_lib.iceberg_set_encode_workers(n::Cint)::Int32 + ret == 0 || throw(IcebergException( + "set_encode_workers! must be called before creating any DataFileWriter" + )) + return nothing +end + """ DataFileWriter(table::Table, config::WriterConfig) -> DataFileWriter DataFileWriter(table::Table; prefix="data", target_file_size_bytes=512MB, compression=UNCOMPRESSED) -> DataFileWriter @@ -157,13 +201,22 @@ free_writer!(writer) """ function DataFileWriter(table::Table, config::WriterConfig) response = DataFileWriterResponse() - - async_ccall(response, config.prefix) do handle + parquet_props = Ref(ParquetWriterPropertiesFFI( + Int64(config.max_row_group_size), + Int64(config.data_page_size), + Int64(config.write_batch_size), + Int32(config.compression), + config.dictionary_enabled, + config.plain_encoding, + config.statistics_enabled, + )) + + async_ccall(response, config.prefix, parquet_props) do handle @ccall rust_lib.iceberg_writer_new( table::Table, config.prefix::Cstring, config.target_file_size_bytes::Int64, - Int32(config.compression)::Int32, + parquet_props::Ref{ParquetWriterPropertiesFFI}, response::Ref{DataFileWriterResponse}, handle::Ptr{Cvoid} )::Cint @@ -365,23 +418,14 @@ end # Internal helper to write raw IPC bytes to the Rust writer function _write_ipc_bytes(writer::DataFileWriter, ipc_bytes::Vector{UInt8}) - ipc_data = pointer(ipc_bytes) - ipc_len = length(ipc_bytes) - - response = Response{Cvoid}(-1, nothing, C_NULL, C_NULL) - - async_ccall(response, ipc_bytes) do handle + ret = GC.@preserve ipc_bytes begin @ccall rust_lib.iceberg_writer_write( writer.ptr::Ptr{Cvoid}, - ipc_data::Ptr{UInt8}, - ipc_len::Csize_t, - response::Ref{Response{Cvoid}}, - handle::Ptr{Cvoid} - )::Cint + pointer(ipc_bytes)::Ptr{UInt8}, + length(ipc_bytes)::Csize_t, + )::Int32 end - - @throw_on_error(response, "write", IcebergException) - + ret == 0 || throw(IcebergException("write failed (see writer close for details)")) return nothing end @@ -446,6 +490,47 @@ end # Column-based writing (zero-copy from Julia) # ========================================================================================== +""" + SliceRef + +FFI reference to a single slice of source column data for the scattered-gather writer. + +- `data_ptr`: pointer to source data array (T[]) or string pointers (Ptr{UInt8}[]) +- `lengths_ptr`: for string columns, pointer to lengths array; null for other types +- `validity_ptr`: pointer to validity bitmap (BitVector.chunks); null if all rows valid +- `sel_ptr`: pointer to selection index array (1-based Julia indices); null for sequential access +- `len`: number of rows in this slice + +All fields are 8 bytes — total struct size is 40 bytes with no padding. +""" +struct SliceRef + data_ptr::Ptr{Cvoid} + lengths_ptr::Ptr{Int64} + validity_ptr::Ptr{UInt8} + sel_ptr::Ptr{Int64} + len::Csize_t +end + +""" + GatheredColumnDescriptor + +FFI descriptor for a column to be gathered from multiple SliceRefs. +Pass an array of these to `write_columns`. + +- `slices_ptr`: pointer to array of SliceRef structs +- `num_slices`: number of SliceRef entries +- `total_rows`: sum of all slice lengths +- `column_type`: ColumnType enum value +- `is_nullable`: whether the column may contain null values +""" +struct GatheredColumnDescriptor + slices_ptr::Ptr{SliceRef} + num_slices::Csize_t + total_rows::Csize_t + column_type::Int32 + is_nullable::Bool +end + """ ColumnType @@ -632,6 +717,47 @@ function Base.push!( return batch end +""" + push!(batch::ColumnBatch, data::Vector{String}, str_ptrs::Vector{Ptr{UInt8}}, str_lens::Vector{Int64}; validity=nothing, length=nothing, column_type=nothing) + +Add a string column to the batch using pre-allocated pointer/length buffers. +The caller is responsible for filling `str_ptrs` and `str_lens` before calling this. +Avoids allocating new pointer/length arrays on every write. +""" +function Base.push!( + batch::ColumnBatch, + data::Vector{String}, + str_ptrs::Vector{Ptr{UInt8}}, + str_lens::Vector{Int64}; + validity::Union{Nothing, BitVector}=nothing, + length::Union{Nothing, Int}=nothing, + column_type::Union{Nothing, ColumnType}=nothing, +) + num_rows = length === nothing ? Base.length(str_ptrs) : length + is_nullable = validity !== nothing + col_type = column_type === nothing ? COLUMN_TYPE_STRING : column_type + + push!(batch.arrays_to_preserve, data, str_ptrs, str_lens) + + validity_ptr = if is_nullable + push!(batch.arrays_to_preserve, validity) + Ptr{UInt8}(pointer(validity.chunks)) + else + Ptr{UInt8}(C_NULL) + end + + desc = ColumnDescriptor( + Ptr{Cvoid}(pointer(str_ptrs)), + pointer(str_lens), + validity_ptr, + Csize_t(num_rows), + Int32(col_type), + is_nullable + ) + push!(batch.descriptors, desc) + return batch +end + """ push!(batch::ColumnBatch, data::Vector{T}; validity=nothing, length=nothing, column_type=nothing) where T @@ -712,29 +838,18 @@ write_columns(writer, [desc], (data, validity)) # Arrays preserved during call ``` """ function write_columns(writer::DataFileWriter, columns::Vector{ColumnDescriptor}, arrays_to_preserve) - if writer.ptr == C_NULL - throw(IcebergException("Writer has been freed")) - end + writer.ptr == C_NULL && throw(IcebergException("Writer has been freed")) + isempty(columns) && throw(IcebergException("No columns provided")) - if isempty(columns) - throw(IcebergException("No columns provided")) - end - - response = Response{Cvoid}(-1, nothing, C_NULL, C_NULL) - - # Pass arrays_to_preserve to async_ccall so GC.@preserve keeps them alive - async_ccall(response, columns, arrays_to_preserve) do handle + ret = GC.@preserve columns arrays_to_preserve begin @ccall rust_lib.iceberg_writer_write_columns( writer.ptr::Ptr{Cvoid}, pointer(columns)::Ptr{ColumnDescriptor}, length(columns)::Csize_t, - response::Ref{Response{Cvoid}}, - handle::Ptr{Cvoid} - )::Cint + )::Int32 end - @throw_on_error(response, "write_columns", IcebergException) - + ret == 0 || throw(IcebergException("write_columns failed (see writer close for details)")) return nothing end @@ -767,3 +882,280 @@ write_columns(writer, batch) function write_columns(writer::DataFileWriter, batch::ColumnBatch) write_columns(writer, batch.descriptors, batch.arrays_to_preserve) end + +# ========================================================================================== +# High-level gathered-column API +# ========================================================================================== + +""" + GatheredColumn + +Accumulates one or more source slices for a single column. Rust gathers the data +directly from source buffers when the batch is written, avoiding a Julia-side staging +copy for numeric columns. + +Typical usage: + +```julia +col = GatheredColumn(COLUMN_TYPE_INT64) +add_slice!(col, src_array) # sequential: all rows +add_slice!(col, src_array2; sel=sel_indices) # scattered: rows at sel_indices +add_slice!(col, src_array3; validity=valid_bv) # nullable slice + +str_col = GatheredColumn(COLUMN_TYPE_STRING; nullable=true) +add_string_slice!(str_col, ["a", "", "c"]; validity=BitVector([true, false, true])) +``` + +For string columns use `add_string_slice!` instead of `add_slice!`. Selection vectors +are not supported for strings: Julia strings are non-contiguous, so the caller must +build `str_ptrs`/`str_lens` arrays up-front — any row selection is applied on the Julia +side before calling `add_string_slice!`. +""" +mutable struct GatheredColumn + slices::Vector{SliceRef} + total_rows::Int + column_type::ColumnType + is_nullable::Bool + preserve::Vector{Any} # source arrays kept alive until write +end + +GatheredColumn(column_type::ColumnType; nullable::Bool=false) = + GatheredColumn(SliceRef[], 0, column_type, nullable, Any[]) + +""" + add_slice!(col::GatheredColumn, data::AbstractVector{T}; + sel=nothing, validity=nothing) + +Append a slice of `data` to `col`. + +- `sel`: optional `Vector{Int64}` of 1-based row indices into `data` to select. + If omitted, all rows of `data` are used sequentially. +- `validity`: optional `BitVector` (length = number of selected rows, `true` = valid). + Providing this marks the column as nullable. +""" +function add_slice!( + col::GatheredColumn, + data::AbstractVector{T}; + sel::Union{Nothing, Vector{Int64}} = nothing, + validity::Union{Nothing, BitVector} = nothing, +) where T + len = sel === nothing ? length(data) : length(sel) + + sel_ptr = if sel !== nothing + push!(col.preserve, sel) + pointer(sel) + else + Ptr{Int64}(C_NULL) + end + + validity_ptr = if validity !== nothing + col.is_nullable = true + push!(col.preserve, validity) + Ptr{UInt8}(pointer(validity.chunks)) + else + Ptr{UInt8}(C_NULL) + end + + push!(col.preserve, data) + push!(col.slices, SliceRef( + Ptr{Cvoid}(pointer(data)), + Ptr{Int64}(C_NULL), # lengths_ptr unused for non-string types + validity_ptr, + sel_ptr, + Csize_t(len), + )) + col.total_rows += len + return col +end + +""" + add_string_slice!(col::GatheredColumn, strings::Vector{String}; validity=nothing) + +Append a string slice to `col` from a plain `Vector{String}`. + +- `validity`: optional `BitVector` (`true` = valid, `false` = null). Marking a row null + does not require a placeholder in `strings`, but the vector must still be the same length. + +```julia +col = GatheredColumn(COLUMN_TYPE_STRING; nullable=true) +add_string_slice!(col, ["hello", "", "world"]; validity=BitVector([true, false, true])) +``` + +For performance-critical paths where pointer/length arrays are pre-allocated, use the +lower-level `add_string_slice!(col, str_ptrs, str_lens; validity)` overload directly. +""" +function add_string_slice!( + col::GatheredColumn, + strings::Vector{String}; + validity::Union{Nothing, BitVector} = nothing, +) + n = length(strings) + is_nullable = validity !== nothing + str_ptrs = Vector{Ptr{UInt8}}(undef, n) + str_lens = Vector{Int64}(undef, n) + for i in 1:n + if is_nullable && !validity[i] + str_ptrs[i] = Ptr{UInt8}(C_NULL) + str_lens[i] = 0 + else + str_ptrs[i] = pointer(strings[i]) + str_lens[i] = ncodeunits(strings[i]) + end + end + push!(col.preserve, strings) # keep String objects alive so pointers remain valid + return add_string_slice!(col, str_ptrs, str_lens; validity) +end + +""" + add_string_slice!(col::GatheredColumn, str_ptrs, str_lens; validity=nothing) + +Low-level overload: append a string slice from pre-built pointer/length arrays. +`str_ptrs` is a `Vector{Ptr{UInt8}}` of pointers to UTF-8 string data and `str_lens` +is a `Vector{Int64}` of corresponding byte lengths. The caller is responsible for keeping +the pointed-to string bytes alive until `write_columns` returns. +""" +function add_string_slice!( + col::GatheredColumn, + str_ptrs::Vector{Ptr{UInt8}}, + str_lens::Vector{Int64}; + validity::Union{Nothing, BitVector} = nothing, +) + len = length(str_ptrs) + + validity_ptr = if validity !== nothing + col.is_nullable = true + push!(col.preserve, validity) + Ptr{UInt8}(pointer(validity.chunks)) + else + Ptr{UInt8}(C_NULL) + end + + push!(col.preserve, str_ptrs, str_lens) + push!(col.slices, SliceRef( + Ptr{Cvoid}(pointer(str_ptrs)), + pointer(str_lens), + validity_ptr, + Ptr{Int64}(C_NULL), + Csize_t(len), + )) + col.total_rows += len + return col +end + +""" + GatheredBatch + +Collects a `GatheredColumn` per output column, then writes all of them in one call. + +```julia +batch = GatheredBatch() +push!(batch, col_int64) +push!(batch, col_float64) +write_columns(writer, batch) +``` + +You can also push a single-slice column inline without building a `GatheredColumn` +explicitly: + +```julia +batch = GatheredBatch() +push!(batch, src_ints, COLUMN_TYPE_INT64) +push!(batch, src_floats, COLUMN_TYPE_FLOAT64; sel=indices, validity=valid_bv) +write_columns(writer, batch) +``` +""" +mutable struct GatheredBatch + columns::Vector{GatheredColumn} +end + +GatheredBatch() = GatheredBatch(GatheredColumn[]) + +""" + push!(batch::GatheredBatch, col::GatheredColumn) + +Append an already-built `GatheredColumn` to the batch. +""" +Base.push!(batch::GatheredBatch, col::GatheredColumn) = (push!(batch.columns, col); batch) + +""" + push!(batch::GatheredBatch, data::AbstractVector, column_type::ColumnType; + sel=nothing, validity=nothing, nullable=false) + +Convenience: create a single-slice `GatheredColumn` from `data` and append it. +""" +function Base.push!( + batch::GatheredBatch, + data::AbstractVector, + column_type::ColumnType; + sel::Union{Nothing, Vector{Int64}} = nothing, + validity::Union{Nothing, BitVector} = nothing, + nullable::Bool = validity !== nothing, +) + col = GatheredColumn(column_type; nullable) + add_slice!(col, data; sel, validity) + push!(batch.columns, col) + return batch +end + +""" + write_columns(writer::DataFileWriter, batch::GatheredBatch[, extra_preserve]) + +Gather column data from Julia memory synchronously, then encode asynchronously. + +Gathers all column data from Julia memory in the calling thread using a plain blocking +`ccall`. Encode runs asynchronously in the global worker pool. + +`extra_preserve` (optional) is an additional collection of objects whose memory must +stay alive during the gather (e.g. source string arrays for zero-copy string columns). + +The source data pointed to by the `GatheredBatch` slices and `extra_preserve` must be +valid for the duration of this call. After the call returns, all Julia pointers have +been consumed and the source data may be safely released. +""" +function write_columns( + writer::DataFileWriter, + batch::GatheredBatch, + extra_preserve = nothing, +) + isempty(batch.columns) && throw(IcebergException("GatheredBatch has no columns")) + writer.ptr == C_NULL && throw(IcebergException("Writer has been freed")) + + all_slice_arrays = Vector{Vector{SliceRef}}(undef, length(batch.columns)) + descriptors = Vector{GatheredColumnDescriptor}(undef, length(batch.columns)) + preserve = Any[] + + for (i, col) in enumerate(batch.columns) + slices = col.slices + all_slice_arrays[i] = slices + append!(preserve, col.preserve) + push!(preserve, slices) + descriptors[i] = GatheredColumnDescriptor( + pointer(slices), + Csize_t(length(slices)), + Csize_t(col.total_rows), + Int32(col.column_type), + col.is_nullable, + ) + end + extra_preserve !== nothing && append!(preserve, extra_preserve) + + ret = GC.@preserve preserve all_slice_arrays descriptors begin + @ccall rust_lib.iceberg_writer_write_gathered_columns( + writer.ptr::Ptr{Cvoid}, + pointer(descriptors)::Ptr{GatheredColumnDescriptor}, + length(descriptors)::Csize_t, + )::Int32 + end + if ret != 0 + err_ptr = @ccall rust_lib.iceberg_take_gather_error()::Ptr{Cchar} + msg = if err_ptr != C_NULL + s = unsafe_string(err_ptr) + @ccall rust_lib.iceberg_destroy_cstring(err_ptr::Ptr{Cchar})::Cint + s + else + "gather failed (see writer close for details)" + end + throw(IcebergException("write_columns (gathered): $(msg)")) + end + return nothing +end diff --git a/test/writer_tests.jl b/test/writer_tests.jl index 47f67f3..1c15dd7 100644 --- a/test/writer_tests.jl +++ b/test/writer_tests.jl @@ -1148,3 +1148,267 @@ end println("\n✅ write_columns decimal nullable tests completed!") end + +@testset "Writer write_columns (GatheredBatch) API" begin + println("Testing write_columns with GatheredBatch (gathered-column) API...") + + catalog_uri = get_catalog_uri() + props = get_catalog_properties() + + catalog = nothing + table = C_NULL + data_files = nothing + test_namespace = nothing + table_name = nothing + + try + catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props) + @test catalog !== nothing + + test_namespace = ["test_gathered_$(round(Int, time() * 1000))"] + RustyIceberg.create_namespace(catalog, test_namespace) + + # Schema: id (non-nullable long), score (nullable double), tag (nullable string) + schema = Schema([ + Field(Int32(1), "id", IcebergLong(); required=true), + Field(Int32(2), "score", IcebergDouble(); required=false), + Field(Int32(3), "tag", IcebergString(); required=false), + ]) + + table_name = "gathered_test_$(round(Int, time() * 1000))" + table = RustyIceberg.create_table(catalog, test_namespace, table_name, schema) + @test table != C_NULL + println("✅ Table created") + + # --- Data layout (4 rows) --- + # id: [1, 2, 3, 4] — non-nullable, single sequential slice + # score: [1.1, null, 3.3, null] — nullable; two sequential slices, each with validity + # tag: ["alpha", null, "gamma", null] — nullable string via add_string_slice! + + data_files = RustyIceberg.with_data_file_writer(table) do writer + batch = RustyIceberg.GatheredBatch() + + # id: single sequential slice, no nulls + id_data = Int64[1, 2, 3, 4] + id_col = RustyIceberg.GatheredColumn(RustyIceberg.COLUMN_TYPE_INT64) + RustyIceberg.add_slice!(id_col, id_data) + push!(batch, id_col) + println("✅ id column built (sequential, non-nullable)") + + # score: two sequential slices, each with validity masks + # Slice 1 — src = [1.1, 9.9], validity = [true, false] → rows 0 (1.1) and 1 (null) + # Slice 2 — src = [3.3, 8.8] via selection [1] + identity [8.8] (just sequential here) + # Use scattered access for slice 2: src=[99.9, 3.3, 88.8], sel=[2] → picks 3.3 + score_src1 = Float64[1.1, 9.9] + score_valid1 = BitVector([true, false]) + + score_src2 = Float64[99.9, 3.3, 88.8] + score_sel2 = Int64[2] # picks index 2 → 3.3 (1-based) + score_valid2 = BitVector([true]) + + score_src3 = Float64[7.7, 8.8] + score_valid3 = BitVector([false]) # null for row 3 + + score_col = RustyIceberg.GatheredColumn(RustyIceberg.COLUMN_TYPE_FLOAT64; nullable=true) + RustyIceberg.add_slice!(score_col, score_src1; validity=score_valid1) + RustyIceberg.add_slice!(score_col, score_src2; sel=score_sel2, validity=score_valid2) + RustyIceberg.add_slice!(score_col, score_src3; sel=Int64[1], validity=score_valid3) + push!(batch, score_col) + println("✅ score column built (scattered + nullable)") + + # tag: string column via the high-level add_string_slice! overload + # Row 0: "alpha", row 1: null, row 2: "gamma", row 3: null + tag_col = RustyIceberg.GatheredColumn(RustyIceberg.COLUMN_TYPE_STRING; nullable=true) + RustyIceberg.add_string_slice!( + tag_col, + ["alpha", "", "gamma", ""]; + validity=BitVector([true, false, true, false]) + ) + push!(batch, tag_col) + println("✅ tag column built (string via add_string_slice!)") + + RustyIceberg.write_columns(writer, batch) + println("✅ Batch written via write_columns (GatheredBatch)") + end + @test data_files !== nothing && data_files.ptr != C_NULL + println("✅ Writer closed") + + updated_table = RustyIceberg.with_transaction(table, catalog) do tx + RustyIceberg.with_fast_append(tx) do action + RustyIceberg.add_data_files(action, data_files) + end + end + @test updated_table != C_NULL + println("✅ Transaction committed") + + tbl = read_table_data(updated_table) + @test tbl !== nothing + @test length(tbl.id) == 4 + println("✅ Read $(length(tbl.id)) rows") + + perm = sortperm(tbl.id) + sorted_ids = tbl.id[perm] + sorted_scores = tbl.score[perm] + sorted_tags = tbl.tag[perm] + + # Verify id column (non-nullable, sequential) + @test sorted_ids == Int64[1, 2, 3, 4] + println("✅ id values correct") + + # Verify score column (nullable, scattered slices) + @test !ismissing(sorted_scores[1]) && sorted_scores[1] ≈ 1.1 + @test ismissing(sorted_scores[2]) + @test !ismissing(sorted_scores[3]) && sorted_scores[3] ≈ 3.3 + @test ismissing(sorted_scores[4]) + println("✅ score values correct (including nulls)") + + # Verify tag column (nullable string) + @test !ismissing(sorted_tags[1]) && sorted_tags[1] == "alpha" + @test ismissing(sorted_tags[2]) + @test !ismissing(sorted_tags[3]) && sorted_tags[3] == "gamma" + @test ismissing(sorted_tags[4]) + println("✅ tag values correct (including nulls)") + + RustyIceberg.free_table(updated_table) + + finally + if data_files !== nothing && data_files.ptr != C_NULL + RustyIceberg.free_data_files!(data_files) + end + if table != C_NULL + RustyIceberg.free_table(table) + end + if table_name !== nothing && test_namespace !== nothing && catalog !== nothing + RustyIceberg.drop_table(catalog, test_namespace, table_name) + end + if test_namespace !== nothing && catalog !== nothing + RustyIceberg.drop_namespace(catalog, test_namespace) + end + if catalog !== nothing + RustyIceberg.free_catalog!(catalog) + end + end + + println("\n✅ write_columns (GatheredBatch) API tests completed!") +end + +@testset "Writer WriterConfig parquet properties" begin + println("Testing WriterConfig parquet writer properties...") + + catalog_uri = get_catalog_uri() + props = get_catalog_properties() + + catalog = nothing + try + catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props) + @test catalog !== nothing + println("✅ Catalog created successfully") + + call_count = Ref(0) + + # Write n_rows with the given config, commit, read back, return the NamedTuple. + # Creates and cleans up its own namespace/table. + function write_read_config(config::RustyIceberg.WriterConfig, n_rows::Int=5) + call_count[] += 1 + uid = "$(round(Int, time() * 1000))_$(call_count[])" + ns = ["test_wrcfg_$uid"] + tn = "wrcfg_$uid" + schema = Schema([ + Field(Int32(1), "id", IcebergLong(); required=true), + Field(Int32(2), "value", IcebergDouble(); required=false), + ]) + table = C_NULL + updated_table = C_NULL + data_files = nothing + try + RustyIceberg.create_namespace(catalog, ns) + table = RustyIceberg.create_table(catalog, ns, tn, schema) + data_files = RustyIceberg.with_data_file_writer(table, config) do writer + write(writer, ( + id = Int64.(1:n_rows), + value = Float64.(1:n_rows) .* 1.1, + )) + end + updated_table = RustyIceberg.with_transaction(table, catalog) do tx + RustyIceberg.with_fast_append(tx) do action + RustyIceberg.add_data_files(action, data_files) + end + end + return read_table_data(updated_table) + finally + if updated_table != C_NULL + RustyIceberg.free_table(updated_table) + end + if data_files !== nothing && data_files.ptr != C_NULL + RustyIceberg.free_data_files!(data_files) + end + if table != C_NULL + RustyIceberg.free_table(table) + RustyIceberg.drop_table(catalog, ns, tn) + RustyIceberg.drop_namespace(catalog, ns) + end + end + end + + # --- Compression codecs --- + for codec in [RustyIceberg.SNAPPY, RustyIceberg.GZIP, RustyIceberg.LZ4, RustyIceberg.ZSTD] + tbl = write_read_config(RustyIceberg.WriterConfig(compression=codec)) + @test !isnothing(tbl) + @test length(tbl.id) == 5 + perm = sortperm(tbl.id) + @test tbl.id[perm] == Int64[1, 2, 3, 4, 5] + @test tbl.value[perm] ≈ Float64[1, 2, 3, 4, 5] .* 1.1 + println("✅ Compression $codec: data correct") + end + + # --- plain_encoding=true (bypasses DELTA_BINARY_PACKED for INT64/INT32) --- + tbl = write_read_config(RustyIceberg.WriterConfig(plain_encoding=true)) + @test !isnothing(tbl) && length(tbl.id) == 5 + @test tbl.id[sortperm(tbl.id)] == Int64[1, 2, 3, 4, 5] + println("✅ plain_encoding=true: data correct") + + # --- dictionary_enabled=false --- + tbl = write_read_config(RustyIceberg.WriterConfig(dictionary_enabled=false)) + @test !isnothing(tbl) && length(tbl.id) == 5 + @test tbl.id[sortperm(tbl.id)] == Int64[1, 2, 3, 4, 5] + println("✅ dictionary_enabled=false: data correct") + + # --- statistics_enabled=false --- + tbl = write_read_config(RustyIceberg.WriterConfig(statistics_enabled=false)) + @test !isnothing(tbl) && length(tbl.id) == 5 + @test tbl.id[sortperm(tbl.id)] == Int64[1, 2, 3, 4, 5] + println("✅ statistics_enabled=false: data correct") + + # --- max_row_group_size=3 with 10 rows → forces ≥4 row groups --- + tbl = write_read_config(RustyIceberg.WriterConfig(max_row_group_size=3), 10) + @test !isnothing(tbl) && length(tbl.id) == 10 + @test tbl.id[sortperm(tbl.id)] == Int64.(1:10) + println("✅ max_row_group_size=3 (10 rows, multiple row groups): all rows intact") + + # --- write_batch_size=2 (rows encoded per column chunk within a row group) --- + tbl = write_read_config(RustyIceberg.WriterConfig(write_batch_size=2)) + @test !isnothing(tbl) && length(tbl.id) == 5 + @test tbl.id[sortperm(tbl.id)] == Int64[1, 2, 3, 4, 5] + println("✅ write_batch_size=2: data correct") + + # --- Combined: ZSTD + plain_encoding + no dict + large write_batch_size --- + config = RustyIceberg.WriterConfig( + compression = RustyIceberg.ZSTD, + plain_encoding = true, + dictionary_enabled = false, + write_batch_size = 65536, + ) + tbl = write_read_config(config, 8) + @test !isnothing(tbl) && length(tbl.id) == 8 + @test tbl.id[sortperm(tbl.id)] == Int64.(1:8) + println("✅ Combined (ZSTD + plain + no dict + large batch_size): data correct") + + finally + if catalog !== nothing + RustyIceberg.free_catalog!(catalog) + println("✅ Catalog cleaned up") + end + end + + println("\n✅ WriterConfig parquet properties tests completed!") +end