-
Notifications
You must be signed in to change notification settings - Fork 10
Implemented Zarr / Xarray Catalog Provider for Multiple Tables #141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,6 +88,7 @@ dev = [ | |
| "py-spy>=0.4.0", | ||
| "pyink>=24.10.1", | ||
| "maturin>=1.9.1", | ||
| "pre-commit>=4.5.1", | ||
| ] | ||
|
|
||
| [tool.uv] | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |||||||||
|
|
||||||||||
| import pyarrow as pa | ||||||||||
| import xarray as xr | ||||||||||
| import datafusion as dfn | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A more typical import pattern is to import from datafusion: We don't have a convention of import as |
||||||||||
|
|
||||||||||
| from .df import ( | ||||||||||
| Block, | ||||||||||
|
|
@@ -272,3 +273,106 @@ def partition_pairs(): | |||||||||
| yield make_partition_factory(block), _block_metadata(coord_arrays, block) | ||||||||||
|
|
||||||||||
| return LazyArrowStreamTable(partition_pairs(), schema) | ||||||||||
|
|
||||||||||
| def group_vars_by_dims(ds): | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because this isn't a public API, I think this should be made private (i.e., prefix with a
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also add type annotations to the inputs and outputs of this function. |
||||||||||
| """ | ||||||||||
| Group variables in the dataset based on shared dims | ||||||||||
|
Comment on lines
+278
to
+279
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The style I prefer is that this all exists on one line and ends in a full stop.
Suggested change
|
||||||||||
|
|
||||||||||
| ("time", "lat", "lon"): ["temperature_2m", "wind_speed"], | ||||||||||
| ("time", "lat", "lon", "level"): ["pressure", "humidity"] | ||||||||||
|
Comment on lines
+281
to
+282
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is pretty close to a doctest. Can we format the comment like so? We don't have to actually run docstests: https://docs.python.org/3/library/doctest.html |
||||||||||
| """ | ||||||||||
| groups = {} | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🐑 (the sheep means a sheepish suggestion, i.e. optional): Consider using a |
||||||||||
|
|
||||||||||
| for var_name, var in ds.data_vars.items(): | ||||||||||
| dims = var.dims | ||||||||||
|
|
||||||||||
| if dims not in groups: | ||||||||||
| groups[dims] = [] | ||||||||||
|
|
||||||||||
| groups[dims].append(var_name) | ||||||||||
|
|
||||||||||
| return groups | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def dims_to_table_name(dims): | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This probably can also be made private. And, let's add type annotations to the inputs & outputs.
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could see value in making this a method in the class below. That way, it would have access to class variables.
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we did this, we could rename it |
||||||||||
| """ | ||||||||||
| "time", "lat", "lon" -> "time_lat_lon" | ||||||||||
| """ | ||||||||||
| return "_".join(dims) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class XarraySchemaProvider(dfn.catalog.SchemaProvider): | ||||||||||
| """ | ||||||||||
| Custom datafusion schema that holds the tables | ||||||||||
| """ | ||||||||||
|
Comment on lines
+305
to
+307
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can improve this description. It only tells us it's custom, it doesn't say how it presents tables or schemas from Xarray datasets. |
||||||||||
|
|
||||||||||
| def __init__(self, ds, groups, chunks): | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add types here. |
||||||||||
| # dictionary to store the tables | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's omit this comment. |
||||||||||
| self.tables = {} | ||||||||||
|
|
||||||||||
| # create a table for for each group of vars | ||||||||||
| for dims, var_names in groups.items(): | ||||||||||
| table_name = dims_to_table_name(dims) | ||||||||||
| subset = ds[var_names] | ||||||||||
| self.tables[table_name] = read_xarray_table(subset, chunks) | ||||||||||
|
|
||||||||||
| def table_names(self): | ||||||||||
| return set(self.tables.keys()) | ||||||||||
|
|
||||||||||
| def table(self, name): | ||||||||||
| return self.tables.get(name) | ||||||||||
|
|
||||||||||
| def table_exist(self, name): | ||||||||||
| return name in self.tables | ||||||||||
|
|
||||||||||
| def register_table(self, name, table): | ||||||||||
| self.tables[name] = table | ||||||||||
|
Comment on lines
+322
to
+329
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add types to these methods. |
||||||||||
|
|
||||||||||
| def deregister_table(self, name, cascade=True): | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is cascade supposed to do? Maybe we can add a TODO and/or create an issue to implement this feature properly; my guess is that it is more involved. |
||||||||||
| del self.tables[name] | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class XarrayCatalogProvider(dfn.catalog.CatalogProvider): | ||||||||||
| """ | ||||||||||
| Custom datafusion catalog that holds the schemas | ||||||||||
| """ | ||||||||||
|
Comment on lines
+336
to
+338
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fact is apparent from the type CatalogProvider, IIUC. What is special about this specific subclass? |
||||||||||
|
|
||||||||||
| # Constructor | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's omit this comment. |
||||||||||
| def __init__(self, ds, schema_name, chunks): | ||||||||||
| groups = group_vars_by_dims(ds) | ||||||||||
|
|
||||||||||
| # dictionary to store schemas using previous schema class | ||||||||||
| """ | ||||||||||
| "data": { | ||||||||||
| "time_lat_lon": [temperature_2m, wind_speed], | ||||||||||
| "time_lat_lon_level": [pressure, humidity] | ||||||||||
| } | ||||||||||
| """ | ||||||||||
|
Comment on lines
+342
to
+350
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of comment, how about we create a custom type annotation (with a docstring) to help us understand the structure of the output? |
||||||||||
| self.schemas = {schema_name: XarraySchemaProvider(ds, groups, chunks)} | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't there be multiple schemas, one per table? I think we may need to pull some logic up.
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe there should be multiple schema, probably one per each group that you calculate. |
||||||||||
|
|
||||||||||
| """ | ||||||||||
| Other methods from test_catalog.py | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| def schema_names(self): | ||||||||||
| return set(self.schemas.keys()) | ||||||||||
|
|
||||||||||
| def schema(self, name): | ||||||||||
| return self.schemas.get(name) | ||||||||||
|
|
||||||||||
| def register_schema(self, name, schema): | ||||||||||
| self.schemas[name] = schema | ||||||||||
|
|
||||||||||
| def deregister_schema(self, name, cascade=True): | ||||||||||
| del self.schemas[name] | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def register_catalog_from_dataset( | ||||||||||
| ctx, ds, catalog_name="xarray", schema_name="data", chunks=None | ||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of |
||||||||||
| ): | ||||||||||
| """ | ||||||||||
| Main function. Takes an xarray dataset and registers it | ||||||||||
| with DataFusion so you can query it with SQL. | ||||||||||
|
Comment on lines
+374
to
+375
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please explain how we are creating multiple tables from a single Xarray dataset. This docstring will soon become part of our public API. Further, please follow this style guide for writing docstrings: https://google.github.io/styleguide/pyguide.html#383-functions-and-methods
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, I believe this function should live as a new method in the XarrayContext in sql.py. |
||||||||||
| """ | ||||||||||
| catalog = XarrayCatalogProvider(ds, schema_name, chunks) | ||||||||||
| ctx.register_catalog_provider(catalog_name, catalog) | ||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,11 +2,14 @@ | |
| from datafusion import SessionContext | ||
|
|
||
| from .df import Chunks | ||
| from .reader import read_xarray_table | ||
| from .reader import read_xarray_table, register_catalog_from_dataset | ||
|
|
||
|
|
||
| class XarrayContext(SessionContext): | ||
| """A datafusion `SessionContext` that also supports `xarray.Dataset`s.""" | ||
| """ | ||
| A regular DataFusion SessionContext but with an extra method | ||
| for registering xarray datasets. | ||
| """ | ||
|
Comment on lines
+9
to
+12
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert my docstring for now. |
||
|
|
||
| def from_dataset( | ||
| self, | ||
|
|
@@ -16,3 +19,8 @@ def from_dataset( | |
| ): | ||
| table = read_xarray_table(input_table, chunks) | ||
| self.register_table(table_name, table) | ||
|
|
||
| def register_catalog_from_dataset( | ||
| self, ds, catalog_name="xarray", schema_name="data", chunks=None | ||
| ): | ||
| register_catalog_from_dataset(self, ds, catalog_name, schema_name, chunks) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I think the body of this help fn should just live in this method. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea, thanks.