Skip to content

Parallel Xarray record batch reading via blocks partition factory.#108

Merged
alxmrs merged 1 commit intomainfrom
fix-106
Feb 9, 2026
Merged

Parallel Xarray record batch reading via blocks partition factory.#108
alxmrs merged 1 commit intomainfrom
fix-106

Conversation

@alxmrs
Copy link
Owner

@alxmrs alxmrs commented Feb 1, 2026

Fixes #106.

@alxmrs
Copy link
Owner Author

alxmrs commented Feb 1, 2026

@maximedion2 May I have your review? Also, may I add you as a collaborator on this project? :)

@alxmrs alxmrs marked this pull request as ready for review February 1, 2026 01:16
@maximedion2
Copy link
Collaborator

@maximedion2 May I have your review? Also, may I add you as a collaborator on this project? :)

Sure! I'll take a closer look tomorrow. So this should allow for the creation of parallel partitions right? After this is merged in, I can give this a try and compare with what I built so far?

Yeah you can add me. So since I added the python bindings, and I've also been digging pretty deep into the "query engine" part of datafusion, I'm realizing that I don't really have to build everything as one big crate/package with like one entry point that exposes everything I want to do. Datafusion, both on the rust and python side, is modular in the sense that you can spin up vanilla datafusion (or even something like sedonadb, directly built on top of datafusion) and just register a custom execution plan, optimizer rules, udfs, etc... and if you're careful everything will just work. Or you can pick something from sedonadb, something from a custom package (e.g. to read zarr), register both with vanilla datafusion, and combine as needed.

All that to say that there can be multiple zarr engines to feed into datafusion, one can pick whichever one works best for a given use case, and I can expose other functionalities that I want to implement as standalone components too, those can be mixed and matched with various zarr engines, etc...

For example, as of now, I suspect (but I don't know for sure yet) that what I wrote would be more efficient and faster e.g. due to reading the raw data directly and not going through pandas, but on the other end my implementation doesn't handle any of the transformation xarray does under the hood, like applying scaling factors and automatically transforming data based on the metadata. I'll get to it at some point, but I'll have to look up a bunch of xarray docs, and if new things are introduced I'd have to update my code, etc... whereas what you did starts with xarray and picks up any standards and features automatically. So I can already see pros and cons for both approaches.

In any case, looking forward to collaborating more on this!

@alxmrs
Copy link
Owner Author

alxmrs commented Feb 1, 2026

So this should allow for the creation of parallel partitions right? After this is merged in, I can give this a try and compare with what I built so far?

Yes and yes! I might have to defer to you for the best way to get a hotfix. I think after this lands, I may make a release for easier distribution.

@alxmrs
Copy link
Owner Author

alxmrs commented Feb 1, 2026

I'm realizing that I don't really have to build everything as one big crate/package with like one entry point that exposes everything I want to do.

I would love to contribute to core components of this RDBMS with you. I especially have opinions about the query engine. I have a colleague who would be really down to contribute as well.

WRT sedonadb, maybe @ljstrnadiii would like to join us in this effort.

@alxmrs
Copy link
Owner Author

alxmrs commented Feb 1, 2026

For example, as of now, I suspect (but I don't know for sure yet) that what I wrote would be more efficient and faster e.g. due to reading the raw data directly and not going through pandas, but on the other end my implementation doesn't handle any of the transformation xarray does under the hood, like applying scaling factors and automatically transforming data based on the metadata. I'll get to it at some point, but I'll have to look up a bunch of xarray docs, and if new things are introduced I'd have to update my code, etc... whereas what you did starts with xarray and picks up any standards and features automatically. So I can already see pros and cons for both approaches.

Totally agree! And, I like that datafusion/arrow works as a substrate to join across these different models. For example, we could join data from a tile server via Xarray with Zarr data backed by your engine.

Beyond this, I am very interested in developing bespoke query optimizers for this data domain, say, to make joins efficient.

@ljstrnadiii
Copy link
Contributor

I'm realizing that I don't really have to build everything as one big crate/package with like one entry point that exposes everything I want to do.

I would love to contribute to core components of this RDBMS with you. I especially have opinions about the query engine. I have a colleague who would be really down to contribute as well.

WRT sedonadb, maybe @ljstrnadiii would like to join us in this effort.

What did you have in mind exactly? I'm looking for an excuse to learn rust, more about this repo and how it might integrate with SedonaDB. In particular I'd like to see spatial joins, zonal stats, etc and predicate handling at the block level if spatial dims and transform are present for example.

.collect();

// Create the StreamingTable with multiple partitions
let table = StreamingTable::try_new(schema_ref, partitions).map_err(|e| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep looking at the docs I think this makes sense, this creates a TableProvider with all your partitions, which implements scan, which produces an ExecutionPlan (without you explicitly building the ExecutionPlan).

On thing though, the code isn't in this PR, but in your PartitionStream implementation for PyArrowStreamPartition, inside the try_stream!, you are calling Python::attach, which basically acquires the GIL, but the GIL can only be acquired by one thread at a time. I think it's essentially acting as a mutex_guard. So I think, and I'm not sure about this but you can probably test on a decent sized zarr store and check the core usage, that while you are indeed creating partitions that will run in parallel with this, in practice you might only have one partition reading data at a time, with the other ones waiting.

Now, even if it is the case, I think what you have here, with partitions, would have a big performance advantage over a single partition, because if you create multiple partitions from the start, downstream execution plans can leverage that. For example, say you have 4 partitions, say partition 1 reads some data, streams it, then partition 2 reads data, streams it, sequentially, but while partition 2 is reading data, whatever operations you have downstream can start working on the data streamed from partition 1. So you might have a "slow" start, but as data is being read, the query could start leveraging the partitions (assuming you have more operations after just reading the data).

On that note though, looking at the PartitionStream implementation, I think right now, one partition would acquire the GIL, read all its batches, and only then let another partition acquire the GIL. It would probably be better to allow partitions to be "interleaved", i.e. partition 1 reads one batch, partition 2 one batch, ..., back to partition 1 for its second batch, and so on. I think you can accomplish that by using allow_threads, https://pyo3.rs/v0.9.2/parallelism. Like even if just a little bit of rust code runs within an allow_threads, I think it might allow a different partition to acquire the GIL.

Okay done with my overly long ramblings! I'm not 100% about all of the above, but I think that if you have some decent sized data to test this on easily, it might be worth just trying with allow_threads, see if it impacts performance (I think you'd have to include some compute heavy operations that can run on individual batches to see the difference though).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll look into your GIL/parallelism enhancement in a future PR! Thank you so much for the suggestion!

I will add an acceptance test to see if the GIL acquisition is really slowing us down. From my experience so far, we acquire the GIL just to schedule the partitions, but since they are arrow streams ("send"-like), the processing happens in parallel in DF.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah I'm not sure anymore then... in any case let me know what you find out if you test it out!

@maximedion2
Copy link
Collaborator

So this should allow for the creation of parallel partitions right? After this is merged in, I can give this a try and compare with what I built so far?

Yes and yes! I might have to defer to you for the best way to get a hotfix. I think after this lands, I may make a release for easier distribution.

As long as I can build the package locally, which is what I currently do with my implementation anyway. I should be able to run some tests. I'll let you know!

@maximedion2
Copy link
Collaborator

I'm realizing that I don't really have to build everything as one big crate/package with like one entry point that exposes everything I want to do.

I would love to contribute to core components of this RDBMS with you. I especially have opinions about the query engine. I have a colleague who would be really down to contribute as well.
WRT sedonadb, maybe @ljstrnadiii would like to join us in this effort.

What did you have in mind exactly? I'm looking for an excuse to learn rust, more about this repo and how it might integrate with SedonaDB. In particular I'd like to see spatial joins, zonal stats, etc and predicate handling at the block level if spatial dims and transform are present for example.

I do have several things I'd like to try, including some optimization and different implementations of spatial joins. Those are already in sedonadb, I was looking at the code very recently, and I think it's possible to have a separate implementations that you can register in sedonadb to compare things.

Regarding predicate handling at the block level, do you mean something like filter pushdowns? I do have that implemented in my crate/package, basically I can skip reading chunks (like actual zarr chunks) if no data matches filter predicates. But when it comes to spatial operations, it would only support filtering by a hard coded e.g. polygon, like join predicates aren't pushed down to the reader, that's much more complicated (but very interesting).

In any case, I will create issues in my crate for various ideas I have. Once I start working on spatial joins, I'll be diving pretty deep into how things work in sedonadb, we can discuss in more details if you're interested!

@alxmrs alxmrs merged commit 01a5dba into main Feb 9, 2026
12 checks passed
@alxmrs alxmrs deleted the fix-106 branch February 9, 2026 21:41
@maximedion2
Copy link
Collaborator

I'm realizing that I don't really have to build everything as one big crate/package with like one entry point that exposes everything I want to do.

I would love to contribute to core components of this RDBMS with you. I especially have opinions about the query engine. I have a colleague who would be really down to contribute as well.
WRT sedonadb, maybe @ljstrnadiii would like to join us in this effort.

What did you have in mind exactly? I'm looking for an excuse to learn rust, more about this repo and how it might integrate with SedonaDB. In particular I'd like to see spatial joins, zonal stats, etc and predicate handling at the block level if spatial dims and transform are present for example.

I do have several things I'd like to try, including some optimization and different implementations of spatial joins. Those are already in sedonadb, I was looking at the code very recently, and I think it's possible to have a separate implementations that you can register in sedonadb to compare things.

Regarding predicate handling at the block level, do you mean something like filter pushdowns? I do have that implemented in my crate/package, basically I can skip reading chunks (like actual zarr chunks) if no data matches filter predicates. But when it comes to spatial operations, it would only support filtering by a hard coded e.g. polygon, like join predicates aren't pushed down to the reader, that's much more complicated (but very interesting).

In any case, I will create issues in my crate for various ideas I have. Once I start working on spatial joins, I'll be diving pretty deep into how things work in sedonadb, we can discuss in more details if you're interested!

@ljstrnadiii I invited you as a contributor in the repo I'm working on, I'm taking a little break from zarr stuff (will definitely come back to it though) to try out a spatial join implementation I've been wanting to look at for a while, for now just for fun, I have a branch where I'm implementing this let me know if you take a look or have any questions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support proper parallelism via proper partition handling on the rust/datafusion side.

3 participants