-
Notifications
You must be signed in to change notification settings - Fork 10
Description
Overview
This is a parent tracking issue for all performance optimizations needed to make xarray-sql practical for ERA5-scale datasets (~732,072 partitions, ~38.4M rows/partition, ~250 variables).
Benchmarks and profiling context: see #75 (Rerun perf_tests post DataFusion).
Completed ✅
- Projection pushdown into partition factory — Pass column projection into the partition factory to avoid loading unnecessary variables. For a dataset with 250 variables, a single-column query previously loaded all 250 arrays. (Implemented)
- Precompute coordinate arrays in partition_metadata —
ds.coords[dim].valueswas re-accessed O(N_partitions × N_dims) times (2.9M calls for ERA5). Now extracted once per dimension before the loop. (Implemented)
Open Sub-issues
Memory & Data Path
-
Replace pandas round-trip with direct numpy → Arrow RecordBatch construction (#121-placeholder)
Current path: xarray arrays →
to_dataframe()→ pandas MultiIndex →reset_index()→ flat DataFrame →pa.RecordBatch.from_pandas(). This is a 3-copy chain. For a 38.4M-row, 5-column partition: ~1.5 GB peak to produce ~307 MB output. Direct numpy → Arrow avoids all pandas overhead. This is the primary memory bottleneck per partition. -
Emit multiple small RecordBatches per partition instead of one giant batch
Each factory currently wraps a single 38.4M-row RecordBatch. DataFusion must fully materialize the partition before processing any of it, then re-splits to its 8192-row default — duplicating memory. Emitting batches of ~65,536 rows via a generator allows DataFusion to pipeline filtering/aggregation and reduces peak memory from O(partition_size) to O(batch_size).
-
Avoid materializing all block dicts and factory closures at registration time
read_xarray_table()currently forces all 732,072 partition block dicts, metadata dicts, and closures into memory before any query runs — ~1.7 GB RAM. TheLazyArrowStreamTableconstructor should accept a callable that generates(factory, metadata)pairs on demand, or expose a streaming registration API.
Concurrency
-
Release GIL during partition load to enable DataFusion parallelism
PyArrowStreamPartition::execute()holds the GIL for the entirepivot() + to_dataframe()call — serializing all partition loading through the GIL regardless of DataFusion's thread pool size. Options:tokio::task::spawn_blocking, or restructuring the factory to release the GIL at numpy/Arrow boundaries (many numpy ops release the GIL already).
Additional Pushdown
-
Limit pushdown — avoid loading full partitions for LIMIT queries
SELECT * FROM ds LIMIT 100currently materializes all 38.4M rows of the first matching partition. Thescan()method can pass the limit through the partition stream and the factory can stop early once enough rows are produced.
Chunking & Metadata
-
Document and support multi-dimensional chunking for pruning on non-time dimensions
Currently, filter pushdown only prunes on the chunking dimension. For ERA5 chunked by time,
WHERE lat BETWEEN 30 AND 60reads all 732,072 partitions. Multi-dim chunking (chunks={'time': 1, 'lat': 90, 'lon': 180}) enables lat/lon pruning too.PrunableStreamingTablealready stores multi-dim bounds — this is primarily a documentation, testing, and chunk-size recommendation task. -
Cache/serialize partition metadata across sessions
partition_metadata()recomputes coordinate min/max bounds for all partitions on every session start. For ERA5 over GCS, this adds significant latency from 732,072 coordinate reads with network overhead. Persist bounds to a sidecar file (JSON/Parquet) keyed by dataset path + chunk spec.API sketch:
table = read_xarray_table(ds, chunks={'time': 1}, metadata_cache='./era5_meta.parquet')
Priority Order (suggested)
- Replace pandas round-trip (Fugue integration #13 above) — biggest per-partition memory win
- Emit multiple RecordBatches — enables pipelining
- Release GIL — enables true parallelism
- Lazy partition registration — reduces startup memory
- Limit pushdown — improves interactive queries
- Metadata caching — reduces remote dataset startup latency
- Multi-dim chunking docs — enables spatial/non-time pruning