Skip to content

Introducing FilePatternToChunks: IO with Pangeo-Forge's FilePattern interface.#31

Merged
copybara-service[bot] merged 20 commits into
google:mainfrom
alxmrs:pangeo-fp
Sep 22, 2021
Merged

Introducing FilePatternToChunks: IO with Pangeo-Forge's FilePattern interface.#31
copybara-service[bot] merged 20 commits into
google:mainfrom
alxmrs:pangeo-fp

Conversation

@alxmrs
Copy link
Copy Markdown
Contributor

@alxmrs alxmrs commented Aug 9, 2021

This if the first of a few changes that will let users read in datasets using Pangeo-Forge's FilePattern interface 0. Here, users can describe how data is stored along concat and merge dimensions. This transform will read in the datasets into chunks. This module can be leveraged in pipelines to convert natively formatted datasets to Zarr.

To make use of this transform, the user will need to install pangeo-forge-recipes separately. This dependency is included in the test dependencies.

As of now, this transform is not exposed to the user (i.e., not included in the primary __init__.py). I plan to do this (and update the docs) once the module is tested and feature complete (#29).

…nterface.

This if the first of a few changes that will let users read in datasets using Pangeo-Forge's `FilePattern` interface [0]. Here, users can describe how data is stored along concat and merge dimensions. This transform will read in the datasets into chunks (and optionally, smaller `sub_chunks`). This module can be leveraged in pipelines to convert natively formatted datasets to Zarr.

To make use of this transform, the user will need to install `pangeo-forge-recipes` separately. This dependency is included in the test dependencies.

As on now, this transform is not exposed to the user (i.e., not included in the primary `__init__.py`). I plan to do this (and update the docs) once the module is tested and feature complete (google#29).

[0]: https://pangeo-forge.readthedocs.io/en/latest/file_patterns.html
@google-cla google-cla Bot added the cla: yes label Aug 9, 2021
Comment thread xarray_beam/_src/pangeo.py Outdated
) -> Iterator[Tuple[core.Key, xarray.Dataset]]:
"""Open datasets into chunks with XArray."""
path = self.pattern[index]
with FileSystems().open(path) as file:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do Beam's filesystems really all work out of the box with Xarray? If so, that's awesome!

Can you verify that it works with both netCDF3 and netCDF4 files? These would be using different underlying storage backends (scipy vs h5netcdf).

To be honest, I'm a little skeptical that this will work well. I suspect we'll end up up needing to copy temporary files to local disk (but I'd love to be proven wrong!)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me experiment and see how this works. In my tests in the previous iteration of this change, this worked well with GCS's IO objects.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I experimented a bit more with this based on @mjwillson's suggestion.

Amazingly, it seems that uses file-like objects in Xarray does actually work as used here, though making a local copy might still have better performance.

What doesn't work yet -- but hopefully with small upstream changes to Xarray could work -- is passing xarray datasets opened with these file-like objects into a Beam pipeilne. That could let us do the actual data loading from netCDF in separate workers, which could be quite a win!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unclear to me how this would not work in a Beam pipeline (or, what needs to be done to get this win). Can you explain a bit more?

Is this a correct understanding: With the change you're referring to, we could pickle the XArray open command (with the file-like object) as PCollections, which would allow us to split the open across workers?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a correct understanding: With the change you're referring to, we could pickle the XArray open command (with the file-like object) as PCollections, which would allow us to split the open across workers?

With this change, we could pickle lazy xarray.Dataset objects corresponding to open netCDF files and pass them between stages in in a Beam pipeline.

Some data would still need to get loaded on worker on which xarray.open_dataset() is called, but this could be much less data than the entire file (e.g., only the "metadata" part of the file). The bulk of the loading work could be split across multiple workers, which could be quite useful for processing large (GB+) netCDF files.

Comment thread xarray_beam/_src/pangeo.py
Comment thread xarray_beam/_src/pangeo.py Outdated
Comment thread xarray_beam/_src/pangeo.py Outdated
Comment thread xarray_beam/_src/pangeo.py Outdated
Comment thread xarray_beam/_src/pangeo.py Outdated
Comment thread xarray_beam/_src/pangeo_test.py Outdated
Comment thread xarray_beam/_src/pangeo_test.py Outdated
Comment thread xarray_beam/_src/pangeo_test.py Outdated
Comment thread xarray_beam/_src/pangeo_test.py Outdated
Comment thread xarray_beam/_src/pangeo_forge.py Outdated
Comment thread xarray_beam/_src/pangeo_forge.py Outdated
…nks.

This transform will now only open file pattern datasets as whole chunks. Re-chunk (i.e. "sub_chunk"s) can be delegated to a SplitChunk() transform layered after this one.
As a backup to the `FileSystems().open(...)` method, we use fsspec to create a local copy of the data for opening with `xr.open_dataset(...)`.
Copy link
Copy Markdown
Contributor

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks Alex!

Comment thread xarray_beam/_src/pangeo_forge.py Outdated
Comment thread xarray_beam/_src/pangeo_forge.py Outdated
@shoyer shoyer added the pull ready Ready for Copybara import and testing label Sep 22, 2021
Comment thread xarray_beam/_src/pangeo_forge.py Outdated
Comment on lines +106 to +120
try:
yield xarray.open_dataset(file, **self.xarray_open_kwargs)
except (TypeError, OSError) as e:

if not self.local_copy:
raise ValueError(f'cannot open {path!r} with buffering.') from e

# The cfgrib engine (and others) may fail with the FileSystems method of
# opening with BufferedReaders. Here, we open the data locally to make
# it easier to work with XArray.
with fsspec.open_local(
f"simplecache::{path}",
simplecache={'cache_storage': '/tmp/files'}
) as fs_file:
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using local_copy as a fall-back, can we just use an if statement?

Suggested change
try:
yield xarray.open_dataset(file, **self.xarray_open_kwargs)
except (TypeError, OSError) as e:
if not self.local_copy:
raise ValueError(f'cannot open {path!r} with buffering.') from e
# The cfgrib engine (and others) may fail with the FileSystems method of
# opening with BufferedReaders. Here, we open the data locally to make
# it easier to work with XArray.
with fsspec.open_local(
f"simplecache::{path}",
simplecache={'cache_storage': '/tmp/files'}
) as fs_file:
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs)
if self.local_copy:
# The cfgrib engine (and others) may fail with the FileSystems method of
# opening with BufferedReaders. Here, we open the data locally to make
# it easier to work with XArray.
with fsspec.open_local(
f"simplecache::{path}",
simplecache={'cache_storage': '/tmp/files'}
) as fs_file:
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs)
else:
yield xarray.open_dataset(file, **self.xarray_open_kwargs)

The old contextmanager approach wasn't applicable, since `open_local` returns a string (path to the open file).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla: yes pull ready Ready for Copybara import and testing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants