Conversation
…rom arrow streams, which are lazy, to implement the table registration, which in the default df library, is not lazy enough.
…(XarrayContext). This leads to two failed tests, but this could be caused by test errors.
The previous implementation stored a single Arrow stream that could only be consumed once, causing subsequent queries on the same table to return empty results. This broke filters and aggregations. Changes: - Modify Rust PyArrowStreamPartition to accept a factory function instead of a stream object. The factory is called on each execute() to create a fresh stream, allowing multiple queries on the same table. - Update LazyArrowStreamTable to take a factory and schema instead of consuming a stream directly. - Update Python read_xarray_table to create a factory function that produces fresh XarrayRecordBatchReader instances. - Update tests to use the new factory-based API via read_xarray_table. This enables proper lazy evaluation while supporting multiple queries on registered tables, fixing the failing filter and aggregation tests.
XarrayRecordBatchReader already implements __arrow_c_stream__, so there's no need to wrap it in pa.RecordBatchReader.from_stream(). The Rust code can consume it directly via ArrowArrayStreamReader::from_pyarrow_bound.
| &self.schema | ||
| } | ||
|
|
||
| fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream { |
There was a problem hiding this comment.
CC: @maximedion2 I could use your expertise and attention here. This is new since the last review.
There was a problem hiding this comment.
CC: @maximedion2 I could use your expertise and attention here. This is new since the last review.
So, I've checked a couple times today, it seems you've been changing this part of the code a few times? Are you still working on this?
The python <> rust part is confusing me a bit, I still have to wrap my head around grabbing the GIL everywhere, but generally speaking even without that, I'm not sure I completely follow. So there's 2 concepts here, a partition and a batch. In datafusion, from what I understand, partitions run in parallel, batches are streamed sequentially, for a given partition. For example, if you have 2 folders with 10 parquet files in each, you can spin up 2 partitions, each of which will produce 10 batches. The purpose of the partitions is to allow parallelism, and the purpose of batches is to allow work to progress down the the execution graph while earlier steps are running, i.e. as long as you don't need to materialize all the data, your first batch can go through operations while the second batch is read, third batch is read, etc...
Here from what I understand you are producing a single partition, that's the purpose of PartitionStream. That partition will produce multiple batches, sequentially, so I don't think I understand why there's a mutex?
The "state" pattern you are using is what I initially did for my zarr stream, see here https://github.com/datafusion-contrib/arrow-zarr/blob/fa08cf93f369159ce09e843d68ab90fd64a3d3b3/src/zarr_store_opener/zarr_data_stream.rs#L790. It's quite complicated, the state has to hold the futures to keep track of them, etc... Kyle refactored this part to what it looks like now (see this https://github.com/datafusion-contrib/arrow-zarr/blob/e61b8df47fc3dfd1a95b305c82420276b9efab86/src/zarr_store_opener/zarr_data_stream.rs#L757-L789), on the PR we had a discussion but in the end I did like what he did, simple and cleaner approach I think.
I think, given that the batch production logic here is fairly straightforward, that there's probably a more direct way to generate the record batch stream? I'll have to look more into this, like I said I'm still not completely comfortable with the back and forth with python haha.
There was a problem hiding this comment.
Hmm, looking more at this, you might not be able to directly convert your reader into a stream because it's a python class, but maybe you can still use the try_stream macro in which you call "read_next_batch" through the python interface? your reader already handles returning None when it's done reading right, so that might be a short and clean approach? not 100% sure if the calls to the python interpreter would break this, but maybe worth a try.
There was a problem hiding this comment.
Thanks Maxime! I really appreciate your review! Both of these ideas were excellent notes of feedback. Since you explained all this, I did some of my homework and read all the docs in the datafusion crate. Thanks for the first explanation of the parallelism model in Datafusion -- I agree that I'll likely need to create my own execution plan instead of just using a StreamPartition. In any case, the mutex wasn't needed and try_stream makes it much cleaner.
I still need to figure out how to address #106. See the "Parallelism" note at the top of the file, but I can probably address this later:
/! ## Parallel Execution Note
//!
//! When using DataFusion's parallel execution (multiple partitions), aggregation queries
//! without ORDER BY may return partial results due to how our stream interacts with
//! DataFusion's async runtime. To ensure complete results:
//! - Add ORDER BY to aggregation queries, or
//! - UseSessionConfig().with_target_partitions(1)for single-threaded execution
//! TODO(#106): Implement proper parallelism and partition handling.
There was a problem hiding this comment.
Ok. I have a plan to fix 106, but it requires changing our python code. I'll merge this for now and make subsequent changes.
Fixes #17, #93.