Skip to content

Collapse scan parameters into constructors, remove with_* functions#88

Open
robertbuessow wants to merge 14 commits intomainfrom
rb-split-scan
Open

Collapse scan parameters into constructors, remove with_* functions#88
robertbuessow wants to merge 14 commits intomainfrom
rb-split-scan

Conversation

@robertbuessow
Copy link
Copy Markdown

Summary

  • All scan configuration is now passed directly to new_scan / new_incremental_scan as keyword arguments, using Int64 with -1 as the "not set" sentinel (consistent with the existing snapshot_id / SNAPSHOT_ID_NONE convention)
  • The scan is built immediately inside the Rust constructor — build! is no longer a separate step, and the builder: Option<…> / scan: Option<…> fields are replaced by plain scan: TableScan and file_io: FileIO
  • All standalone iceberg_*_with_* FFI functions deleted from Rust, along with the macros that generated them (impl_select_columns!, impl_scan_builder_method!, impl_with_serialization_concurrency_limit!, impl_scan_build!)
  • Julia build!, select_columns!, and all with_*! functions removed; exports updated; tests and doc examples updated throughout

Before:

scan = new_scan(table)
select_columns!(scan, ["col1", "col2"])
with_batch_size!(scan, UInt(1024))
with_snapshot_id!(scan, snapshot_id)
stream = scan!(scan)

After:

scan = new_scan(table; column_names=["col1", "col2"], batch_size=Int64(1024), snapshot_id=snapshot_id)
stream = scan!(scan)

Test plan

  • make run-containers to start MinIO + Iceberg REST catalog
  • make test — all existing scan, incremental scan, split-scan, and catalog tests pass
  • Verify new_scan(table) with no kwargs still works (all defaults preserved)

🤖 Generated with Claude Code

robertbuessow and others added 8 commits April 22, 2026 15:52
Adds a split-scan API for incremental scans (append files + positional
delete files), mirroring the existing full-scan split API. Positional
deletes are pre-materialized at planning time as (file_path, Vec<u64>)
to avoid exposing the private DeleteVector type across the crate
boundary. Append files are read via the public StreamsInto<ArrowReader>
trait by wrapping each task in a single-element stream.

Also adds file_path accessors for all three file handle types
(FileScanHandle, IncrementalAppendFileHandle, IncrementalPosDeleteFileHandle)
and exposes batch_size on IcebergArrowReaderContext for delete-task
batch sizing.

Naming cleanup throughout:
- prefetch_depth removed (was wired up but never applied to the stream)
- AppendTask/PosDeleteTask → AppendFile/PosDeleteFile
- free_incremental_scan!, free_file_scan, free_file_scan_stream,
  free_incremental_*_file, free_incremental_*_file_stream collapsed
  onto free_scan!, free_file, free_file_stream via multiple dispatch
- file_scan_record_count / append_file_record_count → record_count
- file_scan_file_path → file_path
- OpaqueResponse moved from full.jl to scan_common.jl (shared)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Exposes the number of deleted row positions in a positional-delete file
via `record_count(::IncrementalPosDeleteFileHandle)`. Unlike append
files (where record_count may be unavailable), the count is always
known because positions are fully materialized at planning time, so the
return type is plain Int64.

Adds a test verifying the count is non-negative per file and sums to
the expected total of 1 for the incremental test table.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Renames following Julia's convention that functions mutating or consuming
their arguments end with !:

  next_file      → next_file!       (advances a stream)
  read_file      → read_file!       (consumes the file handle)
  read_file_scan → read_file_scan!  (consumes the file handle)
  free_stream    → free_stream!
  free_table     → free_table!
  free_batch     → free_batch!
  free_file      → free_file!
  free_file_stream → free_file_stream!
  free_reader    → free_reader!

Functions that already had ! (free_scan!, free_catalog!, free_transaction!,
free_writer!, etc.) are unchanged. The underlying Rust FFI symbol names
are unchanged.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
All scan configuration is now passed directly to `new_scan` / `new_incremental_scan`
as keyword arguments (Int64 with -1 as "not set" sentinel, consistent with the
existing snapshot_id convention). The scan is also built immediately inside the
Rust constructor — `build!` is no longer a separate step.

Changes:
- Rust: `iceberg_new_scan` and `iceberg_new_incremental_scan` now accept all
  parameters (column selection, concurrency limits, batch size, metadata columns,
  snapshot id, task prefetch depth) and call `builder.build()` before returning.
  Scan structs simplified: `builder: Option<…>` and `scan: Option<…>` replaced
  by a plain `scan: TableScan` / `IncrementalTableScan` and `file_io: FileIO`.
- Rust: removed all standalone `iceberg_*_with_*` FFI functions and the macros
  that generated them (`impl_select_columns!`, `impl_scan_builder_method!`,
  `impl_with_serialization_concurrency_limit!`, `impl_scan_build!`).
- Julia: `new_scan` and `new_incremental_scan` expose all parameters as keyword
  args; `build!`, `select_columns!`, and all `with_*!` functions removed.
- Tests and exports updated throughout.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@robertbuessow robertbuessow added enhancement New feature or request rust Pull requests that update rust code labels Apr 24, 2026
robertbuessow and others added 3 commits April 27, 2026 15:43
Log all scan performance parameters (data_file_concurrency,
manifest_file_concurrency, manifest_entry_concurrency, batch_size,
serialization_concurrency, task_prefetch_depth) via tracing::info! in
both iceberg_new_scan and iceberg_new_incremental_scan, using the
resolved 0-sentinel values that are stored on the scan struct. The
manifest concurrency params are logged as raw -1-sentinel values since
they are passed directly to the builder and not stored.

Also fix a stale comment in the split-scan read path (stream of tasks,
not batches) and add a file_metadata(scan) convenience function to
Julia that collects all file paths and record counts from manifest
metadata in a single pass without opening any data files.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Add per-file batch prefetch queue to the split-scan path so batches are
decoded and serialized ahead of Julia's next_batch calls:

- iceberg_create_reader gains a batch_prefetch_depth parameter (0 = default 4)
  stored in IcebergArrowReaderContext
- iceberg_read_file_scan_task now spawns a background Tokio task that eagerly
  drains the serialized Arrow stream into a bounded mpsc channel; Julia's
  next_batch receives from that channel without waiting for decode
- The shared ArrowReader (cloned per task) means all per-file tasks share one
  CachingDeleteFileLoader, so delete files are fetched at most once per pipeline
  run regardless of how many data files reference them

Performance monitoring (SplitScanStats) added to table.rs with lock-free
atomic counters tracking: files opened, batches/bytes produced,
fetch+serialize time, channel backpressure time (producer blocked = Julia
slow), and next_batch wait time (Julia blocked = producer slow). Exposed via
iceberg_print_split_scan_stats() / iceberg_reset_split_scan_stats() FFI and
RustyIceberg.print_split_scan_stats() / reset_split_scan_stats!() in Julia.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Comment thread src/RustyIceberg.jl
Free the memory associated with an Iceberg table.
"""
function free_table(table::Table)
function free_table!(table::Table)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table is still a pointer, and it's value is not mutated no?

robertbuessow and others added 3 commits April 28, 2026 19:24
Add IcebergWarmFileScanStream — a new scan path modelled on
ordered_file_pipeline.rs that yields one IcebergWarmFileScan per file
instead of a single flat batch stream. Concurrently processes up to
task_prefetch_depth files, each backed by its own bounded mpsc channel
(capacity = batch_prefetch_depth) and a 100 MB semaphore for
backpressure. Permits are released as Julia drains batches via the
per-file IcebergArrowStream.

Key design decisions:
- Shared ArrowReader cloned per task so all files share one
  CachingDeleteFileLoader (delete files fetched at most once per run)
- FuturesOrdered maintains strict manifest order while keeping N
  tasks in flight (same sliding-window pattern as ordered_file_pipeline)
- batch_prefetch_depth flows from IcebergArrowReaderContext through
  run_warm into spawn_file_task's channel capacity

Stats (WarmScanStats) track wall time, throughput, parallelism,
per-phase timings (reader setup, fetch+decode, serialize, semaphore),
plus batch_send_ns (per-file channel) and file_send_ns (outer channel)
to distinguish which channel is the bottleneck.

Julia API: plan_files_warm / next_file! / read_file! / free_warm_file!
with print_warm_scan_stats / reset_warm_scan_stats!.

Also update tests to use read_file! instead of the old read_file_scan!
name, and fix doc comment warnings on export_runtime_op! invocations.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Extend the warm file stream to cover incremental scans (append files):

- Add run_warm_orchestrator<T, S, F> — a single generic FuturesOrdered
  loop parameterised by a spawn_fn closure, replacing the two near-identical
  run_warm / run_warm_append bodies. Both are now thin wrappers.

- Add drain_batch_stream helper (phases 2-4: fetch+decode → serialize →
  semaphore → channel send) shared by process_file_inner (full scan) and
  process_incremental_append_file_inner (incremental).

- Add run_file_task generic wrapper for the error-send + track_task_end
  pattern, eliminating duplicate boilerplate in process_file and
  process_incremental_append_file.

- Add iceberg_plan_incremental_warm FFI (incremental.rs): calls plan_files()
  once, returns IcebergWarmFileScanStream for append files + standard
  IcebergIncrementalPosDeleteFileStream for delete files via the new
  IcebergWarmIncrementalStreamsResponse type.

- Add plan_incremental_warm(scan, reader) Julia API in incremental.jl,
  returning (WarmFileScanStream, DeleteFileStream) — same consumer code
  works for both full and incremental append files.

- Fix export renames: free_table!/free_batch!/free_stream! in RustyIceberg.jl
  and writer_tests.jl.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request rust Pull requests that update rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants