From fd2019a1e4ad85b28a36acaf0ae899b24c501974 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 9 Aug 2021 15:21:34 -0700 Subject: [PATCH 01/19] Introducing FilePatternToChunks: IO with Pangeo-Forge's FilePattern interface. This if the first of a few changes that will let users read in datasets using Pangeo-Forge's `FilePattern` interface [0]. Here, users can describe how data is stored along concat and merge dimensions. This transform will read in the datasets into chunks (and optionally, smaller `sub_chunks`). This module can be leveraged in pipelines to convert natively formatted datasets to Zarr. To make use of this transform, the user will need to install `pangeo-forge-recipes` separately. This dependency is included in the test dependencies. As on now, this transform is not exposed to the user (i.e., not included in the primary `__init__.py`). I plan to do this (and update the docs) once the module is tested and feature complete (#29). [0]: https://pangeo-forge.readthedocs.io/en/latest/file_patterns.html --- setup.py | 1 + xarray_beam/_src/pangeo.py | 150 ++++++++++++++++++++++ xarray_beam/_src/pangeo_test.py | 219 ++++++++++++++++++++++++++++++++ 3 files changed, 370 insertions(+) create mode 100644 xarray_beam/_src/pangeo.py create mode 100644 xarray_beam/_src/pangeo_test.py diff --git a/setup.py b/setup.py index baaf8d7..bf1b91d 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ 'absl-py', 'pandas', 'pytest', + 'pangeo-forge-recipes', ] setuptools.setup( diff --git a/xarray_beam/_src/pangeo.py b/xarray_beam/_src/pangeo.py new file mode 100644 index 0000000..e182b63 --- /dev/null +++ b/xarray_beam/_src/pangeo.py @@ -0,0 +1,150 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""IO with Pangeo-Forge.""" +from typing import ( + Dict, + Iterator, + Optional, + Mapping, + Tuple, + cast, +) + +import apache_beam as beam +import xarray +from apache_beam.io.filesystems import FileSystems + +from xarray_beam._src import core + + +def _zero_dimensions(dataset: xarray.Dataset) -> Mapping[str, int]: + return {dim: 0 for dim in dataset.dims.keys()} + + +def _expand_dimensions_by_key( + dataset: xarray.Dataset, + key: core.Key, + index: Tuple[int, ...], + pattern: 'FilePattern' +) -> xarray.Dataset: + """Expand the dimensions of the `Dataset` by offsets found in the `Key`.""" + combine_dims_by_name = { + combine_dim.name: (i, combine_dim) + for i, combine_dim in enumerate(pattern.combine_dims) + } + + if not combine_dims_by_name: + return dataset + + for dim_key in key.offsets.keys(): + # skip expanding dimensions if they already exist + if dim_key in dataset.dims: + continue + + dim_idx, combine_dim = combine_dims_by_name.get(dim_key, (-1, None)) + if dim_idx == -1: + raise ValueError( + f"could not find CombineDim named {dim_key!r} in pattern {pattern!r}." + ) + + dim_val = combine_dim.keys[index[dim_idx]] + dataset = dataset.expand_dims(**{dim_key: [dim_val]}) + + return dataset + + +class FilePatternToChunks(beam.PTransform): + """Open data described by a Pangeo-Forge `FilePattern` into keyed chunks.""" + + from pangeo_forge_recipes.patterns import FilePattern + + def __init__( + self, + pattern: 'FilePattern', + sub_chunks: Optional[Mapping[str, int]] = None, + xarray_open_kwargs: Optional[Dict] = None + ): + """Initialize FilePatternToChunks. + + TODO(#29): Currently, `MergeDim`s are not supported. + + Args: + pattern: a `FilePattern` describing a dataset. + sub_chunks: split each open dataset into smaller chunks. If not set, each + chunk will open the full dataset. + xarray_open_kwargs: keyword arguments to pass to `xarray.open_dataset()`. + """ + self.pattern = pattern + self.sub_chunks = sub_chunks or -1 + self.xarray_open_kwargs = xarray_open_kwargs or {} + + def _prechunk(self) -> Iterator[Tuple[core.Key, Tuple[int, ...]]]: + """Converts `FilePattern` items into keyed indexes.""" + input_chunks = {k: v or 1 for k, v in self.pattern.nitems_per_input.items()} + dim_sizes = { + k: v or self.pattern.dims[k] + for k, v, in self.pattern.concat_sequence_lens.items() + } + chunks = core.normalize_expanded_chunks(input_chunks, dim_sizes) + for key, (index, _) in zip(core.iter_chunk_keys(chunks), + self.pattern.items()): + yield key, index + + def _open_chunks( + self, + key: core.Key, + index: Tuple[int, ...] + ) -> Iterator[Tuple[core.Key, xarray.Dataset]]: + """Open datasets into chunks with XArray.""" + path = self.pattern[index] + with FileSystems().open(path) as file: + dataset = xarray.open_dataset( + file, chunks=self.sub_chunks, **self.xarray_open_kwargs + ) + dataset = _expand_dimensions_by_key(dataset, key, index, self.pattern) + + base_key = core.Key(_zero_dimensions(dataset)).with_offsets(**key.offsets) + + num_threads = len(dataset.data_vars) + + if self.sub_chunks == -1: + yield base_key, dataset.compute(num_workers=num_threads) + return + + dim_sizes = {dim: dataset.dims[dim] for dim in self.sub_chunks.keys()} + norm_sub_chunks = core.normalize_expanded_chunks( + self.sub_chunks, cast(Mapping[str, int], dim_sizes) + ) + offset_index = core.compute_offset_index( + core._chunks_to_offsets(norm_sub_chunks) + ) + for sub_key in core.iter_chunk_keys(norm_sub_chunks): + sizes = { + dim: norm_sub_chunks[dim][offset_index[dim][offset]] + for dim, offset in sub_key.offsets.items() + } + slices = core.offsets_to_slices( + sub_key.offsets, sizes=sizes, base=base_key.offsets + ) + chunk = dataset.isel(slices) + + new_key = base_key.with_offsets(**sub_key.offsets) + yield new_key, chunk.chunk().compute(num_workers=num_threads) + + def expand(self, pcoll): + return ( + pcoll + | beam.Create(self._prechunk()) + | beam.FlatMapTuple(self._open_chunks) + ) diff --git a/xarray_beam/_src/pangeo_test.py b/xarray_beam/_src/pangeo_test.py new file mode 100644 index 0000000..89d49ba --- /dev/null +++ b/xarray_beam/_src/pangeo_test.py @@ -0,0 +1,219 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for xarray_beam._src.pangeo.""" + +import contextlib +import itertools +import tempfile + +import numpy as np +from pangeo_forge_recipes.patterns import FilePattern, ConcatDim + +from xarray_beam._src import core +from xarray_beam._src import test_util +from xarray_beam._src.pangeo import FilePatternToChunks, _zero_dimensions, \ + _expand_dimensions_by_key + + +class ExpandDimensionsByKeyTest(test_util.TestCase): + TEST_DATA = test_util.dummy_era5_surface_dataset() + LEVEL = ConcatDim("level", list(range(91, 100))) + PATTERN = FilePattern(lambda level: f"gs://dir/{level}.nc", LEVEL) + + def test_expands_dimensions(self): + key = core.Key(offsets={"time": 0, "level": 0}) + + for i, (index, _) in enumerate(self.PATTERN.items()): + actual = _expand_dimensions_by_key( + self.TEST_DATA, key, index, self.PATTERN + ) + + expected_dims = dict(self.TEST_DATA.dims) + expected_dims.update({"level": 1}) + + self.assertEqual(expected_dims, dict(actual.dims)) + self.assertEqual(np.array([self.LEVEL.keys[i]]), actual["level"]) + + def test_raises_error_when_dataset_is_not_found(self): + key = core.Key({"time": 0, "boat": 0}) + index = (0,) + with self.assertRaises(ValueError) as e: + _expand_dimensions_by_key( + self.TEST_DATA, key, index, self.PATTERN + ) + self.assertIn("boat", e.exception.args[0]) + + +class FilePatternToChunksTest(test_util.TestCase): + TEST_DATA = test_util.dummy_era5_surface_dataset() + + @contextlib.contextmanager + def pattern_from_testdata(self, test_data=None) -> FilePattern: + """Produces a FilePattern for a temporary NetCDF file of test data.""" + if test_data is None: + test_data = self.TEST_DATA + + try: + with tempfile.TemporaryDirectory() as tmpdir: + target = f'{tmpdir}/era5.nc' + test_data.to_netcdf(target) + yield FilePattern(lambda: target) + finally: + pass + + @contextlib.contextmanager + def multifile_pattern( + self, + time_step: int = 479, + longitude_step: int = 47 + ) -> FilePattern: + """Produces a FilePattern for a temporary NetCDF file of test data.""" + test_data = self.TEST_DATA + + time_dim = ConcatDim('time', list(range(0, 360 * 4, time_step))) + longitude_dim = ConcatDim('longitude', list(range(0, 144, longitude_step))) + + try: + with tempfile.TemporaryDirectory() as tmpdir: + def make_path(time: int, longitude: int) -> str: + return f'{tmpdir}/era5-{time}-{longitude}.nc' + + for time in time_dim.keys: + for long in longitude_dim.keys: + chunk = test_data.isel( + time=slice(time, time + time_step), + longitude=slice(long, long + longitude_step) + ) + chunk.to_netcdf(make_path(time, long)) + yield FilePattern(make_path, time_dim, longitude_dim) + finally: + pass + + def test_prechunk_converts_correctly(self): + pattern = FilePattern( + lambda time: f"gs://bucket/{time:02d}/{time:02d}.nc", + ConcatDim("time", list(range(1, 31))), + ) + + transform = FilePatternToChunks(pattern) + + expected = [core.Key({"time": i}) for i in range(0, 30)] + actual = [key for key, _ in transform._prechunk()] + + self.assertEqual(expected, actual) + + def test_prechunk_with_two_dims_converts_correctly(self): + pattern = FilePattern( + lambda time, level: f"gs://bucket/{time:02d}/{level:02d}.nc", + ConcatDim("time", list(range(1, 31))), + ConcatDim("level", list(range(5))), + ) + + transform = FilePatternToChunks(pattern) + + expected = [core.Key({"time": i, "level": j}) + for i, j in itertools.product(range(30), range(5))] + actual = [key for key, _ in transform._prechunk()] + + self.assertEqual(expected, actual) + + def test_prechunk_from_pattern_with_nitemsper_converts_correctly(self): + pattern = FilePattern( + lambda time, level: f"gs://bucket/{time:02d}/{level:02d}.nc", + ConcatDim("time", list(range(1, 31)), nitems_per_file=24), + ConcatDim("level", list(range(5))), + ) + + transform = FilePatternToChunks(pattern) + + expected = [core.Key({"time": i, "level": j}) + for i, j in itertools.product(range(0, 30 * 24, 24), range(5))] + actual = [key for key, _ in transform._prechunk()] + + self.assertEqual(expected, actual) + + def test_no_subchunks_returns_single_dataset(self): + expected = [(core.Key(_zero_dimensions(self.TEST_DATA)), self.TEST_DATA)] + with self.pattern_from_testdata() as pattern: + actual = test_util.EagerPipeline() | FilePatternToChunks(pattern) + + self.assertIdenticalChunks(actual, expected) + + def test_single_subchunks_returns_multiple_datasets(self): + base_key = core.Key(_zero_dimensions(self.TEST_DATA)) + + with self.pattern_from_testdata() as pattern: + result = ( + test_util.EagerPipeline() + | FilePatternToChunks(pattern, sub_chunks={"longitude": 48}) + ) + + expected_keys = [base_key.with_offsets(longitude=i) + for i in range(0, 144, 48)] + expected_sizes = [{"time": 365 * 4, "latitude": 73, "longitude": 48} + for _ in range(3)] + actual_keys = [key for key, _ in result] + actual_sizes = [dict(ds.sizes) for _, ds in result] + + self.assertEqual(expected_keys, actual_keys) + self.assertEqual(expected_sizes, actual_sizes) + + def test_multiple_subchunks_returns_multiple_datasets(self): + base_key = core.Key(_zero_dimensions(self.TEST_DATA)) + + with self.pattern_from_testdata() as pattern: + result = ( + test_util.EagerPipeline() + | FilePatternToChunks(pattern, + sub_chunks={"longitude": 48, "latitude": 24}) + ) + + expected_keys = [ + base_key.with_offsets(longitude=o, latitude=a) + for o, a in itertools.product(range(0, 144, 48), range(0, 73, 24)) + ] + expected_sizes = [ + {"time": 365 * 4, "longitude": o, "latitude": a} + for o, a, in itertools.product([48, 48, 48], [24, 24, 24, 1]) + ] + actual_keys = [key for key, _ in result] + actual_sizes = [dict(ds.sizes) for _, ds in result] + + self.assertEqual(expected_keys, actual_keys) + self.assertEqual(expected_sizes, actual_sizes) + + def test_single_subchunks_over_multiple_files_returns_multiple_datasets(self): + base_key = core.Key(_zero_dimensions(self.TEST_DATA)) + + with self.multifile_pattern() as pattern: + result = ( + test_util.EagerPipeline() + | FilePatternToChunks(pattern, sub_chunks={"latitude": 24}) + ) + + expected_keys = [ + base_key.with_offsets(latitude=a, longitude=o, time=t) + for t, o, a in itertools.product(range(4), range(4), range(0, 73, 24)) + ] + expected_sizes = [ + {"time": t, "latitude": a, "longitude": o} + for t, o, a, in + itertools.product([479, 479, 479, 23], [47, 47, 47, 3], [24, 24, 24, 1]) + ] + + actual_keys = [key for key, _ in result] + actual_sizes = [dict(ds.sizes) for _, ds in result] + + self.assertEqual(expected_keys, actual_keys) + self.assertEqual(expected_sizes, actual_sizes) From 48162d28d7a4b9949a80ca2876750034905e82d2 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 9 Aug 2021 16:21:28 -0700 Subject: [PATCH 02/19] Including scipy as test dependency (so we can write NetCDF files with XArray). --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index bf1b91d..1d8db03 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ 'pandas', 'pytest', 'pangeo-forge-recipes', + 'scipy', ] setuptools.setup( From f211072b57c4c9867b115b3a9ea107c5feb5d380 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 14 Sep 2021 16:30:29 -0700 Subject: [PATCH 03/19] Initial PR feedback, WIP --- .../_src/{pangeo.py => pangeo_forge.py} | 89 ++++++++--------- .../{pangeo_test.py => pangeo_forge_test.py} | 97 ++++++++----------- 2 files changed, 83 insertions(+), 103 deletions(-) rename xarray_beam/_src/{pangeo.py => pangeo_forge.py} (60%) rename xarray_beam/_src/{pangeo_test.py => pangeo_forge_test.py} (71%) diff --git a/xarray_beam/_src/pangeo.py b/xarray_beam/_src/pangeo_forge.py similarity index 60% rename from xarray_beam/_src/pangeo.py rename to xarray_beam/_src/pangeo_forge.py index e182b63..c932477 100644 --- a/xarray_beam/_src/pangeo.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -13,19 +13,19 @@ # limitations under the License. """IO with Pangeo-Forge.""" from typing import ( - Dict, - Iterator, - Optional, - Mapping, - Tuple, - cast, + Dict, + Iterator, + Optional, + Mapping, + Tuple, + cast, ) import apache_beam as beam import xarray from apache_beam.io.filesystems import FileSystems -from xarray_beam._src import core +from xarray_beam._src import core, rechunk def _zero_dimensions(dataset: xarray.Dataset) -> Mapping[str, int]: @@ -89,62 +89,53 @@ def __init__( self.sub_chunks = sub_chunks or -1 self.xarray_open_kwargs = xarray_open_kwargs or {} - def _prechunk(self) -> Iterator[Tuple[core.Key, Tuple[int, ...]]]: + if pattern.merge_dims: + raise ValueError("patters with `MergeDim`s are not supported.") + + def _prechunk(self) -> Iterator[Tuple[core.Key, Tuple[int, ...], str]]: """Converts `FilePattern` items into keyed indexes.""" - input_chunks = {k: v or 1 for k, v in self.pattern.nitems_per_input.items()} + # Default to 1 chunk per item, even though there may be more on reading. + # We discover the actual items-per-input in the _open_chunks() phase. + initial_chunks = {k: v or 1 + for k, v in self.pattern.nitems_per_input.items()} dim_sizes = { k: v or self.pattern.dims[k] for k, v, in self.pattern.concat_sequence_lens.items() } - chunks = core.normalize_expanded_chunks(input_chunks, dim_sizes) - for key, (index, _) in zip(core.iter_chunk_keys(chunks), - self.pattern.items()): - yield key, index + chunks = core.normalize_expanded_chunks(initial_chunks, dim_sizes) + for key, (index, path) in zip(core.iter_chunk_keys(chunks), + self.pattern.items()): + yield key, index, path - def _open_chunks( - self, - key: core.Key, - index: Tuple[int, ...] - ) -> Iterator[Tuple[core.Key, xarray.Dataset]]: + def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: """Open datasets into chunks with XArray.""" - path = self.pattern[index] - with FileSystems().open(path) as file: - dataset = xarray.open_dataset( - file, chunks=self.sub_chunks, **self.xarray_open_kwargs - ) - dataset = _expand_dimensions_by_key(dataset, key, index, self.pattern) + for index, path in self.pattern.items(): + with FileSystems().open(path) as file: + key = core.Key(index) + base_key = core.Key(_zero_dimensions(dataset)).with_offsets( + **key.offsets + ) - base_key = core.Key(_zero_dimensions(dataset)).with_offsets(**key.offsets) + dataset = xarray.open_dataset( + file, chunks=self.sub_chunks, **self.xarray_open_kwargs + ) + dataset = _expand_dimensions_by_key(dataset, key, index, self.pattern) - num_threads = len(dataset.data_vars) - if self.sub_chunks == -1: - yield base_key, dataset.compute(num_workers=num_threads) - return + num_threads = len(dataset.data_vars) - dim_sizes = {dim: dataset.dims[dim] for dim in self.sub_chunks.keys()} - norm_sub_chunks = core.normalize_expanded_chunks( - self.sub_chunks, cast(Mapping[str, int], dim_sizes) - ) - offset_index = core.compute_offset_index( - core._chunks_to_offsets(norm_sub_chunks) - ) - for sub_key in core.iter_chunk_keys(norm_sub_chunks): - sizes = { - dim: norm_sub_chunks[dim][offset_index[dim][offset]] - for dim, offset in sub_key.offsets.items() - } - slices = core.offsets_to_slices( - sub_key.offsets, sizes=sizes, base=base_key.offsets - ) - chunk = dataset.isel(slices) + if self.sub_chunks == -1: + yield base_key, dataset.compute(num_workers=num_threads) + return - new_key = base_key.with_offsets(**sub_key.offsets) - yield new_key, chunk.chunk().compute(num_workers=num_threads) + for new_key, chunk in rechunk.split_chunks(base_key, dataset, + self.sub_chunks): + yield new_key, chunk.compute(num_workers=num_threads) def expand(self, pcoll): return ( pcoll - | beam.Create(self._prechunk()) - | beam.FlatMapTuple(self._open_chunks) + | beam.Create([None]) + | beam.FlatMap(self._open_chunks) ) + diff --git a/xarray_beam/_src/pangeo_test.py b/xarray_beam/_src/pangeo_forge_test.py similarity index 71% rename from xarray_beam/_src/pangeo_test.py rename to xarray_beam/_src/pangeo_forge_test.py index 89d49ba..1019aea 100644 --- a/xarray_beam/_src/pangeo_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -22,55 +22,54 @@ from xarray_beam._src import core from xarray_beam._src import test_util -from xarray_beam._src.pangeo import FilePatternToChunks, _zero_dimensions, \ - _expand_dimensions_by_key +from xarray_beam._src.pangeo_forge import ( + FilePatternToChunks, + _expand_dimensions_by_key +) class ExpandDimensionsByKeyTest(test_util.TestCase): - TEST_DATA = test_util.dummy_era5_surface_dataset() - LEVEL = ConcatDim("level", list(range(91, 100))) - PATTERN = FilePattern(lambda level: f"gs://dir/{level}.nc", LEVEL) + + def setUp(self): + self.test_data = test_util.dummy_era5_surface_dataset() + self.level = ConcatDim("level", list(range(91, 100))) + self.pattern = FilePattern(lambda level: f"gs://dir/{level}.nc", self.level) def test_expands_dimensions(self): key = core.Key(offsets={"time": 0, "level": 0}) - for i, (index, _) in enumerate(self.PATTERN.items()): + for i, (index, _) in enumerate(self.pattern.items()): actual = _expand_dimensions_by_key( - self.TEST_DATA, key, index, self.PATTERN + self.test_data, key, index, self.pattern ) - expected_dims = dict(self.TEST_DATA.dims) + expected_dims = dict(self.test_data.dims) expected_dims.update({"level": 1}) self.assertEqual(expected_dims, dict(actual.dims)) - self.assertEqual(np.array([self.LEVEL.keys[i]]), actual["level"]) + self.assertEqual(np.array([self.level.keys[i]]), actual["level"]) def test_raises_error_when_dataset_is_not_found(self): key = core.Key({"time": 0, "boat": 0}) index = (0,) - with self.assertRaises(ValueError) as e: + with self.assertRaisesRegex(ValueError, "boat") as e: _expand_dimensions_by_key( - self.TEST_DATA, key, index, self.PATTERN + self.test_data, key, index, self.pattern ) - self.assertIn("boat", e.exception.args[0]) class FilePatternToChunksTest(test_util.TestCase): - TEST_DATA = test_util.dummy_era5_surface_dataset() + + def setUp(self): + self.test_data = test_util.dummy_era5_surface_dataset() @contextlib.contextmanager - def pattern_from_testdata(self, test_data=None) -> FilePattern: + def pattern_from_testdata(self) -> FilePattern: """Produces a FilePattern for a temporary NetCDF file of test data.""" - if test_data is None: - test_data = self.TEST_DATA - - try: - with tempfile.TemporaryDirectory() as tmpdir: - target = f'{tmpdir}/era5.nc' - test_data.to_netcdf(target) - yield FilePattern(lambda: target) - finally: - pass + with tempfile.TemporaryDirectory() as tmpdir: + target = f'{tmpdir}/era5.nc' + self.test_data.to_netcdf(target) + yield FilePattern(lambda: target) @contextlib.contextmanager def multifile_pattern( @@ -79,26 +78,21 @@ def multifile_pattern( longitude_step: int = 47 ) -> FilePattern: """Produces a FilePattern for a temporary NetCDF file of test data.""" - test_data = self.TEST_DATA - time_dim = ConcatDim('time', list(range(0, 360 * 4, time_step))) longitude_dim = ConcatDim('longitude', list(range(0, 144, longitude_step))) - try: - with tempfile.TemporaryDirectory() as tmpdir: - def make_path(time: int, longitude: int) -> str: - return f'{tmpdir}/era5-{time}-{longitude}.nc' - - for time in time_dim.keys: - for long in longitude_dim.keys: - chunk = test_data.isel( - time=slice(time, time + time_step), - longitude=slice(long, long + longitude_step) - ) - chunk.to_netcdf(make_path(time, long)) - yield FilePattern(make_path, time_dim, longitude_dim) - finally: - pass + with tempfile.TemporaryDirectory() as tmpdir: + def make_path(time: int, longitude: int) -> str: + return f'{tmpdir}/era5-{time}-{longitude}.nc' + + for time in time_dim.keys: + for long in longitude_dim.keys: + chunk = self.test_data.isel( + time=slice(time, time + time_step), + longitude=slice(long, long + longitude_step) + ) + chunk.to_netcdf(make_path(time, long)) + yield FilePattern(make_path, time_dim, longitude_dim) def test_prechunk_converts_correctly(self): pattern = FilePattern( @@ -109,7 +103,7 @@ def test_prechunk_converts_correctly(self): transform = FilePatternToChunks(pattern) expected = [core.Key({"time": i}) for i in range(0, 30)] - actual = [key for key, _ in transform._prechunk()] + actual = [key for key, *_ in transform._prechunk()] self.assertEqual(expected, actual) @@ -124,7 +118,7 @@ def test_prechunk_with_two_dims_converts_correctly(self): expected = [core.Key({"time": i, "level": j}) for i, j in itertools.product(range(30), range(5))] - actual = [key for key, _ in transform._prechunk()] + actual = [key for key, *_ in transform._prechunk()] self.assertEqual(expected, actual) @@ -139,27 +133,26 @@ def test_prechunk_from_pattern_with_nitemsper_converts_correctly(self): expected = [core.Key({"time": i, "level": j}) for i, j in itertools.product(range(0, 30 * 24, 24), range(5))] - actual = [key for key, _ in transform._prechunk()] + actual = [key for key, *_ in transform._prechunk()] self.assertEqual(expected, actual) def test_no_subchunks_returns_single_dataset(self): - expected = [(core.Key(_zero_dimensions(self.TEST_DATA)), self.TEST_DATA)] + expected = [(core.Key({"time": 0, "latitude": 0, "longitude": 0}), + self.test_data)] with self.pattern_from_testdata() as pattern: actual = test_util.EagerPipeline() | FilePatternToChunks(pattern) self.assertIdenticalChunks(actual, expected) def test_single_subchunks_returns_multiple_datasets(self): - base_key = core.Key(_zero_dimensions(self.TEST_DATA)) - with self.pattern_from_testdata() as pattern: result = ( test_util.EagerPipeline() | FilePatternToChunks(pattern, sub_chunks={"longitude": 48}) ) - expected_keys = [base_key.with_offsets(longitude=i) + expected_keys = [core.Key({"time": 0, "latitude": 0, "longitude": i}) for i in range(0, 144, 48)] expected_sizes = [{"time": 365 * 4, "latitude": 73, "longitude": 48} for _ in range(3)] @@ -170,8 +163,6 @@ def test_single_subchunks_returns_multiple_datasets(self): self.assertEqual(expected_sizes, actual_sizes) def test_multiple_subchunks_returns_multiple_datasets(self): - base_key = core.Key(_zero_dimensions(self.TEST_DATA)) - with self.pattern_from_testdata() as pattern: result = ( test_util.EagerPipeline() @@ -180,7 +171,7 @@ def test_multiple_subchunks_returns_multiple_datasets(self): ) expected_keys = [ - base_key.with_offsets(longitude=o, latitude=a) + core.Key({"time": 0, "latitude": a, "longitude": o}) for o, a in itertools.product(range(0, 144, 48), range(0, 73, 24)) ] expected_sizes = [ @@ -194,8 +185,6 @@ def test_multiple_subchunks_returns_multiple_datasets(self): self.assertEqual(expected_sizes, actual_sizes) def test_single_subchunks_over_multiple_files_returns_multiple_datasets(self): - base_key = core.Key(_zero_dimensions(self.TEST_DATA)) - with self.multifile_pattern() as pattern: result = ( test_util.EagerPipeline() @@ -203,7 +192,7 @@ def test_single_subchunks_over_multiple_files_returns_multiple_datasets(self): ) expected_keys = [ - base_key.with_offsets(latitude=a, longitude=o, time=t) + core.Key({"time": t, "latitude": a, "longitude": o}) for t, o, a in itertools.product(range(4), range(4), range(0, 73, 24)) ] expected_sizes = [ From c12536465327f6f13556679a2df9eaa530fa5e02 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 14 Sep 2021 17:23:12 -0700 Subject: [PATCH 04/19] Better translation of FilePatternIndex to Key. Code is passing tests; cleanup and e2e testing is still needed. --- xarray_beam/_src/pangeo_forge.py | 46 ++++++++++++++++++++------- xarray_beam/_src/pangeo_forge_test.py | 5 +-- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index c932477..dfdbac5 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -35,13 +35,15 @@ def _zero_dimensions(dataset: xarray.Dataset) -> Mapping[str, int]: def _expand_dimensions_by_key( dataset: xarray.Dataset, key: core.Key, - index: Tuple[int, ...], + index: 'FilePatternIndex', pattern: 'FilePattern' ) -> xarray.Dataset: """Expand the dimensions of the `Dataset` by offsets found in the `Key`.""" combine_dims_by_name = { - combine_dim.name: (i, combine_dim) - for i, combine_dim in enumerate(pattern.combine_dims) + combine_dim.name: combine_dim for combine_dim in pattern.combine_dims + } + index_by_name = { + idx.name: idx for idx in index } if not combine_dims_by_name: @@ -52,22 +54,42 @@ def _expand_dimensions_by_key( if dim_key in dataset.dims: continue - dim_idx, combine_dim = combine_dims_by_name.get(dim_key, (-1, None)) - if dim_idx == -1: + try: + combine_dim = combine_dims_by_name[dim_key] + except KeyError: raise ValueError( f"could not find CombineDim named {dim_key!r} in pattern {pattern!r}." ) - dim_val = combine_dim.keys[index[dim_idx]] + dim_val = combine_dim.keys[index_by_name[dim_key].index] dataset = dataset.expand_dims(**{dim_key: [dim_val]}) return dataset +def _pattern_index_to_key(index: 'FilePatternIndex') -> core.Key: + """Translate a `FilePatternIndex` to a `Key`.""" + from pangeo_forge_recipes.patterns import CombineOp + + offsets = {} + for dim in index: + if dim.operation is CombineOp.MERGE: + raise ValueError("patterns with `MergeDim`s are not supported.") + elif dim.operation is CombineOp.CONCAT: + offsets[dim.name] = dim.index + else: + raise ValueError("only concat `CombineOp`s are supported.") + + return core.Key(offsets=offsets) + + class FilePatternToChunks(beam.PTransform): """Open data described by a Pangeo-Forge `FilePattern` into keyed chunks.""" - from pangeo_forge_recipes.patterns import FilePattern + from pangeo_forge_recipes.patterns import ( + FilePattern, + FilePatternIndex, + ) def __init__( self, @@ -90,7 +112,7 @@ def __init__( self.xarray_open_kwargs = xarray_open_kwargs or {} if pattern.merge_dims: - raise ValueError("patters with `MergeDim`s are not supported.") + raise ValueError("patterns with `MergeDim`s are not supported.") def _prechunk(self) -> Iterator[Tuple[core.Key, Tuple[int, ...], str]]: """Converts `FilePattern` items into keyed indexes.""" @@ -111,16 +133,16 @@ def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: """Open datasets into chunks with XArray.""" for index, path in self.pattern.items(): with FileSystems().open(path) as file: - key = core.Key(index) - base_key = core.Key(_zero_dimensions(dataset)).with_offsets( - **key.offsets - ) + key = _pattern_index_to_key(index) dataset = xarray.open_dataset( file, chunks=self.sub_chunks, **self.xarray_open_kwargs ) dataset = _expand_dimensions_by_key(dataset, key, index, self.pattern) + base_key = core.Key(_zero_dimensions(dataset)).with_offsets( + **key.offsets + ) num_threads = len(dataset.data_vars) diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index 1019aea..82687d2 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -18,7 +18,8 @@ import tempfile import numpy as np -from pangeo_forge_recipes.patterns import FilePattern, ConcatDim +from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, DimIndex, \ + CombineOp from xarray_beam._src import core from xarray_beam._src import test_util @@ -51,7 +52,7 @@ def test_expands_dimensions(self): def test_raises_error_when_dataset_is_not_found(self): key = core.Key({"time": 0, "boat": 0}) - index = (0,) + index = (DimIndex('time', 0, 1, CombineOp.CONCAT),) with self.assertRaisesRegex(ValueError, "boat") as e: _expand_dimensions_by_key( self.test_data, key, index, self.pattern From 5896d454099bd000680f04e2d8787dacb65b3715 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 14 Sep 2021 17:37:48 -0700 Subject: [PATCH 05/19] Initial cleanup; fixing broken CI. --- setup.py | 1 + xarray_beam/_src/pangeo_forge.py | 31 ++++--------------- xarray_beam/_src/pangeo_forge_test.py | 43 --------------------------- 3 files changed, 7 insertions(+), 68 deletions(-) diff --git a/setup.py b/setup.py index 1d8db03..663f0b3 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ 'pytest', 'pangeo-forge-recipes', 'scipy', + 'h5netcdf' ] setuptools.setup( diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index dfdbac5..86338b0 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -13,12 +13,11 @@ # limitations under the License. """IO with Pangeo-Forge.""" from typing import ( - Dict, - Iterator, - Optional, - Mapping, - Tuple, - cast, + Dict, + Iterator, + Optional, + Mapping, + Tuple, ) import apache_beam as beam @@ -86,10 +85,7 @@ def _pattern_index_to_key(index: 'FilePatternIndex') -> core.Key: class FilePatternToChunks(beam.PTransform): """Open data described by a Pangeo-Forge `FilePattern` into keyed chunks.""" - from pangeo_forge_recipes.patterns import ( - FilePattern, - FilePatternIndex, - ) + from pangeo_forge_recipes.patterns import FilePattern def __init__( self, @@ -114,21 +110,6 @@ def __init__( if pattern.merge_dims: raise ValueError("patterns with `MergeDim`s are not supported.") - def _prechunk(self) -> Iterator[Tuple[core.Key, Tuple[int, ...], str]]: - """Converts `FilePattern` items into keyed indexes.""" - # Default to 1 chunk per item, even though there may be more on reading. - # We discover the actual items-per-input in the _open_chunks() phase. - initial_chunks = {k: v or 1 - for k, v in self.pattern.nitems_per_input.items()} - dim_sizes = { - k: v or self.pattern.dims[k] - for k, v, in self.pattern.concat_sequence_lens.items() - } - chunks = core.normalize_expanded_chunks(initial_chunks, dim_sizes) - for key, (index, path) in zip(core.iter_chunk_keys(chunks), - self.pattern.items()): - yield key, index, path - def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: """Open datasets into chunks with XArray.""" for index, path in self.pattern.items(): diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index 82687d2..318aa88 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -95,49 +95,6 @@ def make_path(time: int, longitude: int) -> str: chunk.to_netcdf(make_path(time, long)) yield FilePattern(make_path, time_dim, longitude_dim) - def test_prechunk_converts_correctly(self): - pattern = FilePattern( - lambda time: f"gs://bucket/{time:02d}/{time:02d}.nc", - ConcatDim("time", list(range(1, 31))), - ) - - transform = FilePatternToChunks(pattern) - - expected = [core.Key({"time": i}) for i in range(0, 30)] - actual = [key for key, *_ in transform._prechunk()] - - self.assertEqual(expected, actual) - - def test_prechunk_with_two_dims_converts_correctly(self): - pattern = FilePattern( - lambda time, level: f"gs://bucket/{time:02d}/{level:02d}.nc", - ConcatDim("time", list(range(1, 31))), - ConcatDim("level", list(range(5))), - ) - - transform = FilePatternToChunks(pattern) - - expected = [core.Key({"time": i, "level": j}) - for i, j in itertools.product(range(30), range(5))] - actual = [key for key, *_ in transform._prechunk()] - - self.assertEqual(expected, actual) - - def test_prechunk_from_pattern_with_nitemsper_converts_correctly(self): - pattern = FilePattern( - lambda time, level: f"gs://bucket/{time:02d}/{level:02d}.nc", - ConcatDim("time", list(range(1, 31)), nitems_per_file=24), - ConcatDim("level", list(range(5))), - ) - - transform = FilePatternToChunks(pattern) - - expected = [core.Key({"time": i, "level": j}) - for i, j in itertools.product(range(0, 30 * 24, 24), range(5))] - actual = [key for key, *_ in transform._prechunk()] - - self.assertEqual(expected, actual) - def test_no_subchunks_returns_single_dataset(self): expected = [(core.Key({"time": 0, "latitude": 0, "longitude": 0}), self.test_data)] From e0f22df997c96dbed314529dfede99df3c939d67 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 14 Sep 2021 22:45:03 -0700 Subject: [PATCH 06/19] `expand` step is splittable (only uses a `create` fn). Added comment to explain early return. --- xarray_beam/_src/pangeo_forge.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index 86338b0..b848c8d 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -110,7 +110,7 @@ def __init__( if pattern.merge_dims: raise ValueError("patterns with `MergeDim`s are not supported.") - def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: + def _open_chunks(self) -> Iterator[Tuple[core.Key, xarray.Dataset]]: """Open datasets into chunks with XArray.""" for index, path in self.pattern.items(): with FileSystems().open(path) as file: @@ -127,6 +127,8 @@ def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: num_threads = len(dataset.data_vars) + # If sub_chunks is not set by the user, treat the dataset as a single + # chunk. if self.sub_chunks == -1: yield base_key, dataset.compute(num_workers=num_threads) return @@ -136,9 +138,6 @@ def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: yield new_key, chunk.compute(num_workers=num_threads) def expand(self, pcoll): - return ( - pcoll - | beam.Create([None]) - | beam.FlatMap(self._open_chunks) - ) + return pcoll | beam.Create(self._open_chunks) + From 83fe58245aa238c68167a9af119d9a575728f6c5 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 14 Sep 2021 22:49:19 -0700 Subject: [PATCH 07/19] Clean up file whitespace --- xarray_beam/_src/pangeo_forge.py | 2 -- xarray_beam/_src/pangeo_forge_test.py | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index b848c8d..c1becca 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -139,5 +139,3 @@ def _open_chunks(self) -> Iterator[Tuple[core.Key, xarray.Dataset]]: def expand(self, pcoll): return pcoll | beam.Create(self._open_chunks) - - diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index 318aa88..676a1cb 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -24,8 +24,8 @@ from xarray_beam._src import core from xarray_beam._src import test_util from xarray_beam._src.pangeo_forge import ( - FilePatternToChunks, - _expand_dimensions_by_key + FilePatternToChunks, + _expand_dimensions_by_key ) From 8cf71179ba48ce8942274b0e3f31fcd9da95fb31 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 14 Sep 2021 23:21:02 -0700 Subject: [PATCH 08/19] Revert create strategy. --- xarray_beam/_src/pangeo_forge.py | 9 +++++++-- xarray_beam/_src/pangeo_forge_test.py | 15 ++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index c1becca..ee9a52c 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -110,7 +110,7 @@ def __init__( if pattern.merge_dims: raise ValueError("patterns with `MergeDim`s are not supported.") - def _open_chunks(self) -> Iterator[Tuple[core.Key, xarray.Dataset]]: + def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: """Open datasets into chunks with XArray.""" for index, path in self.pattern.items(): with FileSystems().open(path) as file: @@ -138,4 +138,9 @@ def _open_chunks(self) -> Iterator[Tuple[core.Key, xarray.Dataset]]: yield new_key, chunk.compute(num_workers=num_threads) def expand(self, pcoll): - return pcoll | beam.Create(self._open_chunks) + return ( + pcoll + | beam.Create([None]) + | beam.FlatMap(self._open_chunks) + + ) diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index 676a1cb..d3509a5 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -18,8 +18,12 @@ import tempfile import numpy as np -from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, DimIndex, \ +from pangeo_forge_recipes.patterns import ( + FilePattern, + ConcatDim, + DimIndex, CombineOp +) from xarray_beam._src import core from xarray_beam._src import test_util @@ -112,13 +116,14 @@ def test_single_subchunks_returns_multiple_datasets(self): expected_keys = [core.Key({"time": 0, "latitude": 0, "longitude": i}) for i in range(0, 144, 48)] - expected_sizes = [{"time": 365 * 4, "latitude": 73, "longitude": 48} - for _ in range(3)] + expected_datasets = [ + self.test_data.isel({'longitude': i}) for i in range(0, 144, 48) + ] actual_keys = [key for key, _ in result] - actual_sizes = [dict(ds.sizes) for _, ds in result] + actual_datasets = [ds for _, ds in result] self.assertEqual(expected_keys, actual_keys) - self.assertEqual(expected_sizes, actual_sizes) + self.assertEqual(expected_datasets, actual_datasets) def test_multiple_subchunks_returns_multiple_datasets(self): with self.pattern_from_testdata() as pattern: From 8d393748caae160afb20ed3382518dc1496cedab Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 20 Sep 2021 10:45:28 -0700 Subject: [PATCH 09/19] Simplified FilePatternToChunks transform -- no split_chunks / sub_chunks. This transform will now only open file pattern datasets as whole chunks. Re-chunk (i.e. "sub_chunk"s) can be delegated to a SplitChunk() transform layered after this one. --- xarray_beam/_src/pangeo_forge.py | 50 ++++----------- xarray_beam/_src/pangeo_forge_test.py | 87 ++++++++------------------- 2 files changed, 36 insertions(+), 101 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index ee9a52c..115da93 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -24,7 +24,7 @@ import xarray from apache_beam.io.filesystems import FileSystems -from xarray_beam._src import core, rechunk +from xarray_beam._src import core def _zero_dimensions(dataset: xarray.Dataset) -> Mapping[str, int]: @@ -33,7 +33,6 @@ def _zero_dimensions(dataset: xarray.Dataset) -> Mapping[str, int]: def _expand_dimensions_by_key( dataset: xarray.Dataset, - key: core.Key, index: 'FilePatternIndex', pattern: 'FilePattern' ) -> xarray.Dataset: @@ -48,7 +47,7 @@ def _expand_dimensions_by_key( if not combine_dims_by_name: return dataset - for dim_key in key.offsets.keys(): + for dim_key in index_by_name.keys(): # skip expanding dimensions if they already exist if dim_key in dataset.dims: continue @@ -66,22 +65,6 @@ def _expand_dimensions_by_key( return dataset -def _pattern_index_to_key(index: 'FilePatternIndex') -> core.Key: - """Translate a `FilePatternIndex` to a `Key`.""" - from pangeo_forge_recipes.patterns import CombineOp - - offsets = {} - for dim in index: - if dim.operation is CombineOp.MERGE: - raise ValueError("patterns with `MergeDim`s are not supported.") - elif dim.operation is CombineOp.CONCAT: - offsets[dim.name] = dim.index - else: - raise ValueError("only concat `CombineOp`s are supported.") - - return core.Key(offsets=offsets) - - class FilePatternToChunks(beam.PTransform): """Open data described by a Pangeo-Forge `FilePattern` into keyed chunks.""" @@ -90,7 +73,6 @@ class FilePatternToChunks(beam.PTransform): def __init__( self, pattern: 'FilePattern', - sub_chunks: Optional[Mapping[str, int]] = None, xarray_open_kwargs: Optional[Dict] = None ): """Initialize FilePatternToChunks. @@ -99,12 +81,9 @@ def __init__( Args: pattern: a `FilePattern` describing a dataset. - sub_chunks: split each open dataset into smaller chunks. If not set, each - chunk will open the full dataset. xarray_open_kwargs: keyword arguments to pass to `xarray.open_dataset()`. """ self.pattern = pattern - self.sub_chunks = sub_chunks or -1 self.xarray_open_kwargs = xarray_open_kwargs or {} if pattern.merge_dims: @@ -112,35 +91,28 @@ def __init__( def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: """Open datasets into chunks with XArray.""" + max_size_idx = {} for index, path in self.pattern.items(): with FileSystems().open(path) as file: - key = _pattern_index_to_key(index) - dataset = xarray.open_dataset( - file, chunks=self.sub_chunks, **self.xarray_open_kwargs - ) - dataset = _expand_dimensions_by_key(dataset, key, index, self.pattern) + dataset = xarray.open_dataset(file, **self.xarray_open_kwargs) + dataset = _expand_dimensions_by_key(dataset, index, self.pattern) + + if not max_size_idx: + max_size_idx = dataset.sizes base_key = core.Key(_zero_dimensions(dataset)).with_offsets( - **key.offsets + **{dim.name: max_size_idx[dim.name] * dim.index for dim in index} ) num_threads = len(dataset.data_vars) - # If sub_chunks is not set by the user, treat the dataset as a single - # chunk. - if self.sub_chunks == -1: - yield base_key, dataset.compute(num_workers=num_threads) - return - - for new_key, chunk in rechunk.split_chunks(base_key, dataset, - self.sub_chunks): - yield new_key, chunk.compute(num_workers=num_threads) + yield base_key, dataset.compute(num_workers=num_threads) def expand(self, pcoll): return ( pcoll | beam.Create([None]) | beam.FlatMap(self._open_chunks) - ) + diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index d3509a5..c926de3 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -18,6 +18,7 @@ import tempfile import numpy as np +from absl.testing import parameterized from pangeo_forge_recipes.patterns import ( FilePattern, ConcatDim, @@ -99,73 +100,35 @@ def make_path(time: int, longitude: int) -> str: chunk.to_netcdf(make_path(time, long)) yield FilePattern(make_path, time_dim, longitude_dim) - def test_no_subchunks_returns_single_dataset(self): - expected = [(core.Key({"time": 0, "latitude": 0, "longitude": 0}), - self.test_data)] + def test_returns_single_dataset(self): + expected = [ + (core.Key({"time": 0, "latitude": 0, "longitude": 0}), self.test_data) + ] with self.pattern_from_testdata() as pattern: actual = test_util.EagerPipeline() | FilePatternToChunks(pattern) self.assertIdenticalChunks(actual, expected) - def test_single_subchunks_returns_multiple_datasets(self): - with self.pattern_from_testdata() as pattern: - result = ( - test_util.EagerPipeline() - | FilePatternToChunks(pattern, sub_chunks={"longitude": 48}) - ) - - expected_keys = [core.Key({"time": 0, "latitude": 0, "longitude": i}) - for i in range(0, 144, 48)] - expected_datasets = [ - self.test_data.isel({'longitude': i}) for i in range(0, 144, 48) - ] - actual_keys = [key for key, _ in result] - actual_datasets = [ds for _, ds in result] - - self.assertEqual(expected_keys, actual_keys) - self.assertEqual(expected_datasets, actual_datasets) - - def test_multiple_subchunks_returns_multiple_datasets(self): - with self.pattern_from_testdata() as pattern: - result = ( - test_util.EagerPipeline() - | FilePatternToChunks(pattern, - sub_chunks={"longitude": 48, "latitude": 24}) + @parameterized.parameters( + dict(time_step=479, longitude_step=47), + dict(time_step=365, longitude_step=72), + dict(time_step=292, longitude_step=71), + dict(time_step=291, longitude_step=48), + ) + def test_returns_multiple_datasets(self, time_step: int, longitude_step: int): + expected = [ + ( + core.Key({"time": t, "latitude": 0, "longitude": o}), + self.test_data.isel( + time=slice(t, t + time_step), + longitude=slice(o, o + longitude_step) + ) + ) for t, o in itertools.product( + range(0, 360 * 4, time_step), + range(0, 144, longitude_step) ) - - expected_keys = [ - core.Key({"time": 0, "latitude": a, "longitude": o}) - for o, a in itertools.product(range(0, 144, 48), range(0, 73, 24)) - ] - expected_sizes = [ - {"time": 365 * 4, "longitude": o, "latitude": a} - for o, a, in itertools.product([48, 48, 48], [24, 24, 24, 1]) ] - actual_keys = [key for key, _ in result] - actual_sizes = [dict(ds.sizes) for _, ds in result] - - self.assertEqual(expected_keys, actual_keys) - self.assertEqual(expected_sizes, actual_sizes) - - def test_single_subchunks_over_multiple_files_returns_multiple_datasets(self): - with self.multifile_pattern() as pattern: - result = ( - test_util.EagerPipeline() - | FilePatternToChunks(pattern, sub_chunks={"latitude": 24}) - ) - - expected_keys = [ - core.Key({"time": t, "latitude": a, "longitude": o}) - for t, o, a in itertools.product(range(4), range(4), range(0, 73, 24)) - ] - expected_sizes = [ - {"time": t, "latitude": a, "longitude": o} - for t, o, a, in - itertools.product([479, 479, 479, 23], [47, 47, 47, 3], [24, 24, 24, 1]) - ] - - actual_keys = [key for key, _ in result] - actual_sizes = [dict(ds.sizes) for _, ds in result] + with self.multifile_pattern(time_step, longitude_step) as pattern: + actual = test_util.EagerPipeline() | FilePatternToChunks(pattern) - self.assertEqual(expected_keys, actual_keys) - self.assertEqual(expected_sizes, actual_sizes) + self.assertIdenticalChunks(actual, expected) From bb4675bd60b57dce2f005f9ae9b0bcc7ecd5e6d6 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 20 Sep 2021 11:41:45 -0700 Subject: [PATCH 10/19] Fixed broken unit tests. --- xarray_beam/_src/pangeo_forge_test.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index c926de3..a4ab61b 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -42,11 +42,9 @@ def setUp(self): self.pattern = FilePattern(lambda level: f"gs://dir/{level}.nc", self.level) def test_expands_dimensions(self): - key = core.Key(offsets={"time": 0, "level": 0}) - for i, (index, _) in enumerate(self.pattern.items()): actual = _expand_dimensions_by_key( - self.test_data, key, index, self.pattern + self.test_data, index, self.pattern ) expected_dims = dict(self.test_data.dims) @@ -56,11 +54,10 @@ def test_expands_dimensions(self): self.assertEqual(np.array([self.level.keys[i]]), actual["level"]) def test_raises_error_when_dataset_is_not_found(self): - key = core.Key({"time": 0, "boat": 0}) - index = (DimIndex('time', 0, 1, CombineOp.CONCAT),) - with self.assertRaisesRegex(ValueError, "boat") as e: + index = (DimIndex('boat', 0, 1, CombineOp.CONCAT),) + with self.assertRaisesRegex(ValueError, "boat"): _expand_dimensions_by_key( - self.test_data, key, index, self.pattern + self.test_data, index, self.pattern ) From 1df6e817e2b90e508f0e54d0e8ef4b4ad40ccf2f Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 20 Sep 2021 11:46:04 -0700 Subject: [PATCH 11/19] Using an "all close" instead of an "identical" assert. --- xarray_beam/_src/pangeo_forge_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index a4ab61b..f13695a 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -128,4 +128,4 @@ def test_returns_multiple_datasets(self, time_step: int, longitude_step: int): with self.multifile_pattern(time_step, longitude_step) as pattern: actual = test_util.EagerPipeline() | FilePatternToChunks(pattern) - self.assertIdenticalChunks(actual, expected) + self.assertAllCloseChunks(actual, expected) From b23167302a86d0cdd8cdff355ec108591f8e7c17 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 20 Sep 2021 11:50:15 -0700 Subject: [PATCH 12/19] Single chunks are also now using "all close". --- xarray_beam/_src/pangeo_forge_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index f13695a..86c3f87 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -104,7 +104,7 @@ def test_returns_single_dataset(self): with self.pattern_from_testdata() as pattern: actual = test_util.EagerPipeline() | FilePatternToChunks(pattern) - self.assertIdenticalChunks(actual, expected) + self.assertAllCloseChunks(actual, expected) @parameterized.parameters( dict(time_step=479, longitude_step=47), From c9d9f8cda7b665a01cc6866c121c5d8c68c6d504 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 20 Sep 2021 22:33:44 -0700 Subject: [PATCH 13/19] Updating file open capability to support grib files. As a backup to the `FileSystems().open(...)` method, we use fsspec to create a local copy of the data for opening with `xr.open_dataset(...)`. --- xarray_beam/_src/pangeo_forge.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index 115da93..fd0d653 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """IO with Pangeo-Forge.""" +import contextlib from typing import ( Dict, Iterator, @@ -21,6 +22,7 @@ ) import apache_beam as beam +import fsspec import xarray from apache_beam.io.filesystems import FileSystems @@ -89,13 +91,33 @@ def __init__( if pattern.merge_dims: raise ValueError("patterns with `MergeDim`s are not supported.") + @contextlib.contextmanager + def _open_dataset(self, path: str) -> xarray.Dataset: + """Open as an XArray Dataset, sometimes with local caching.""" + fs_file = None + with FileSystems().open(path) as file: + try: + dataset = xarray.open_dataset(file, **self.xarray_open_kwargs) + except (TypeError, OSError): + # The cfgrib engine (and others) may fail with the FileSystems method of + # opening with BufferedReaders. Here, we open the data locally to make + # it easier to work with XArray. + fs_file = fsspec.open_local(f"simplecache::{path}", + simplecache={'cache_storage': '/tmp/files'}) + dataset = xarray.open_dataset(fs_file, **self.xarray_open_kwargs) + + yield dataset + + if fs_file: + fs_file.close() + def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: """Open datasets into chunks with XArray.""" max_size_idx = {} + for index, path in self.pattern.items(): - with FileSystems().open(path) as file: + with self._open_dataset(path) as dataset: - dataset = xarray.open_dataset(file, **self.xarray_open_kwargs) dataset = _expand_dimensions_by_key(dataset, index, self.pattern) if not max_size_idx: @@ -115,4 +137,3 @@ def expand(self, pcoll): | beam.Create([None]) | beam.FlatMap(self._open_chunks) ) - From dd14005b5f1add44e3699bcade91ef0cb7238f95 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Tue, 21 Sep 2021 17:31:27 -0700 Subject: [PATCH 14/19] Added back sub-chunks; `Create` + `FlatMap` is now splittable. --- xarray_beam/_src/pangeo_forge.py | 46 ++++++++++------ xarray_beam/_src/pangeo_forge_test.py | 76 ++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 17 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index fd0d653..7fc619a 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -26,7 +26,7 @@ import xarray from apache_beam.io.filesystems import FileSystems -from xarray_beam._src import core +from xarray_beam._src import core, rechunk def _zero_dimensions(dataset: xarray.Dataset) -> Mapping[str, int]: @@ -70,11 +70,12 @@ def _expand_dimensions_by_key( class FilePatternToChunks(beam.PTransform): """Open data described by a Pangeo-Forge `FilePattern` into keyed chunks.""" - from pangeo_forge_recipes.patterns import FilePattern + from pangeo_forge_recipes.patterns import FilePattern, FilePatternIndex def __init__( self, pattern: 'FilePattern', + sub_chunks: Optional[Mapping[str, int]] = None, xarray_open_kwargs: Optional[Dict] = None ): """Initialize FilePatternToChunks. @@ -83,10 +84,14 @@ def __init__( Args: pattern: a `FilePattern` describing a dataset. + sub_chunks: split each open dataset into smaller chunks. If not set, the + transform will return one file per chunk. xarray_open_kwargs: keyword arguments to pass to `xarray.open_dataset()`. """ self.pattern = pattern + self.sub_chunks = sub_chunks self.xarray_open_kwargs = xarray_open_kwargs or {} + self._max_size_idx = {} if pattern.merge_dims: raise ValueError("patterns with `MergeDim`s are not supported.") @@ -111,29 +116,38 @@ def _open_dataset(self, path: str) -> xarray.Dataset: if fs_file: fs_file.close() - def _open_chunks(self, _) -> Iterator[Tuple[core.Key, xarray.Dataset]]: + def _open_chunks( + self, + index: 'FilePatternIndex', + path: str + ) -> Iterator[Tuple[core.Key, xarray.Dataset]]: """Open datasets into chunks with XArray.""" - max_size_idx = {} - - for index, path in self.pattern.items(): - with self._open_dataset(path) as dataset: + with self._open_dataset(path) as dataset: - dataset = _expand_dimensions_by_key(dataset, index, self.pattern) + dataset = _expand_dimensions_by_key(dataset, index, self.pattern) - if not max_size_idx: - max_size_idx = dataset.sizes + if not self._max_size_idx: + self._max_size_idx = dataset.sizes - base_key = core.Key(_zero_dimensions(dataset)).with_offsets( - **{dim.name: max_size_idx[dim.name] * dim.index for dim in index} - ) + base_key = core.Key(_zero_dimensions(dataset)).with_offsets( + **{dim.name: self._max_size_idx[dim.name] * dim.index for dim in index} + ) - num_threads = len(dataset.data_vars) + num_threads = len(dataset.data_vars) + # If sub_chunks is not set by the user, treat the dataset as a single + # chunk. + if self.sub_chunks is None: yield base_key, dataset.compute(num_workers=num_threads) + return + + for new_key, chunk in rechunk.split_chunks(base_key, dataset, + self.sub_chunks): + yield new_key, chunk.compute(num_workers=num_threads) def expand(self, pcoll): return ( pcoll - | beam.Create([None]) - | beam.FlatMap(self._open_chunks) + | beam.Create(list(self.pattern.items())) + | beam.FlatMapTuple(self._open_chunks) ) diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index 86c3f87..b567697 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -16,6 +16,7 @@ import contextlib import itertools import tempfile +from typing import Dict import numpy as np from absl.testing import parameterized @@ -26,6 +27,7 @@ CombineOp ) +from xarray_beam import split_chunks from xarray_beam._src import core from xarray_beam._src import test_util from xarray_beam._src.pangeo_forge import ( @@ -106,13 +108,52 @@ def test_returns_single_dataset(self): self.assertAllCloseChunks(actual, expected) + def test_single_subchunks_returns_multiple_datasets(self): + with self.pattern_from_testdata() as pattern: + result = ( + test_util.EagerPipeline() + | FilePatternToChunks(pattern, sub_chunks={"longitude": 48}) + ) + + expected = [ + ( + core.Key({"time": 0, "latitude": 0, "longitude": i}), + self.test_data.isel(longitude=slice(i, i + 48)) + ) + for i in range(0, 144, 48) + ] + self.assertAllCloseChunks(result, expected) + + def test_multiple_subchunks_returns_multiple_datasets(self): + with self.pattern_from_testdata() as pattern: + result = ( + test_util.EagerPipeline() + | FilePatternToChunks(pattern, + sub_chunks={"longitude": 48, "latitude": 24}) + ) + + expected = [ + ( + core.Key({"time": 0, "longitude": o, "latitude": a}), + self.test_data.isel(longitude=slice(o, o + 48), + latitude=slice(a, a + 24)) + ) + for o, a in itertools.product(range(0, 144, 48), range(0, 73, 24)) + ] + + self.assertAllCloseChunks(result, expected) + @parameterized.parameters( dict(time_step=479, longitude_step=47), dict(time_step=365, longitude_step=72), dict(time_step=292, longitude_step=71), dict(time_step=291, longitude_step=48), ) - def test_returns_multiple_datasets(self, time_step: int, longitude_step: int): + def test_multiple_datasets_returns_multiple_datasets( + self, + time_step: int, + longitude_step: int + ): expected = [ ( core.Key({"time": t, "latitude": 0, "longitude": o}), @@ -129,3 +170,36 @@ def test_returns_multiple_datasets(self, time_step: int, longitude_step: int): actual = test_util.EagerPipeline() | FilePatternToChunks(pattern) self.assertAllCloseChunks(actual, expected) + + @parameterized.parameters( + dict(time_step=365, longitude_step=72, sub_chunks={"latitude": 36}), + dict(time_step=365, longitude_step=72, sub_chunks={"longitude": 36}), + dict(time_step=365, longitude_step=72, + sub_chunks={"longitude": 36, "latitude": 66}), + ) + def test_multiple_datasets_with_subchunks_returns_multiple_datasets( + self, + time_step: int, + longitude_step: int, + sub_chunks: Dict[str, int], + ): + + expected = [] + for t, o in itertools.product(range(0, 360 * 4, time_step), + range(0, 144, longitude_step)): + expected.extend( + split_chunks( + core.Key({"latitude": 0, "longitude": o, "time": t}), + self.test_data.isel( + time=slice(t, t + time_step), + longitude=slice(o, o + longitude_step) + ), + sub_chunks) + ) + with self.multifile_pattern(time_step, longitude_step) as pattern: + actual = test_util.EagerPipeline() | FilePatternToChunks( + pattern, + sub_chunks=sub_chunks + ) + + self.assertAllCloseChunks(actual, expected) From 73671d2995c4c96b3800b18bb21f029efe5cecf8 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 22 Sep 2021 09:56:26 -0700 Subject: [PATCH 15/19] Renaming 'sub_chunks' to 'chunks'. --- xarray_beam/_src/pangeo_forge.py | 13 ++++++------- xarray_beam/_src/pangeo_forge_test.py | 16 ++++++++-------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index 7fc619a..59d88d3 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -75,7 +75,7 @@ class FilePatternToChunks(beam.PTransform): def __init__( self, pattern: 'FilePattern', - sub_chunks: Optional[Mapping[str, int]] = None, + chunks: Optional[Mapping[str, int]] = None, xarray_open_kwargs: Optional[Dict] = None ): """Initialize FilePatternToChunks. @@ -84,12 +84,12 @@ def __init__( Args: pattern: a `FilePattern` describing a dataset. - sub_chunks: split each open dataset into smaller chunks. If not set, the + chunks: split each open dataset into smaller chunks. If not set, the transform will return one file per chunk. xarray_open_kwargs: keyword arguments to pass to `xarray.open_dataset()`. """ self.pattern = pattern - self.sub_chunks = sub_chunks + self.chunks = chunks self.xarray_open_kwargs = xarray_open_kwargs or {} self._max_size_idx = {} @@ -135,14 +135,13 @@ def _open_chunks( num_threads = len(dataset.data_vars) - # If sub_chunks is not set by the user, treat the dataset as a single - # chunk. - if self.sub_chunks is None: + # If chunks is not set by the user, treat the dataset as a single chunk. + if self.chunks is None: yield base_key, dataset.compute(num_workers=num_threads) return for new_key, chunk in rechunk.split_chunks(base_key, dataset, - self.sub_chunks): + self.chunks): yield new_key, chunk.compute(num_workers=num_threads) def expand(self, pcoll): diff --git a/xarray_beam/_src/pangeo_forge_test.py b/xarray_beam/_src/pangeo_forge_test.py index b567697..eb3b7d4 100644 --- a/xarray_beam/_src/pangeo_forge_test.py +++ b/xarray_beam/_src/pangeo_forge_test.py @@ -112,7 +112,7 @@ def test_single_subchunks_returns_multiple_datasets(self): with self.pattern_from_testdata() as pattern: result = ( test_util.EagerPipeline() - | FilePatternToChunks(pattern, sub_chunks={"longitude": 48}) + | FilePatternToChunks(pattern, chunks={"longitude": 48}) ) expected = [ @@ -129,7 +129,7 @@ def test_multiple_subchunks_returns_multiple_datasets(self): result = ( test_util.EagerPipeline() | FilePatternToChunks(pattern, - sub_chunks={"longitude": 48, "latitude": 24}) + chunks={"longitude": 48, "latitude": 24}) ) expected = [ @@ -172,16 +172,16 @@ def test_multiple_datasets_returns_multiple_datasets( self.assertAllCloseChunks(actual, expected) @parameterized.parameters( - dict(time_step=365, longitude_step=72, sub_chunks={"latitude": 36}), - dict(time_step=365, longitude_step=72, sub_chunks={"longitude": 36}), + dict(time_step=365, longitude_step=72, chunks={"latitude": 36}), + dict(time_step=365, longitude_step=72, chunks={"longitude": 36}), dict(time_step=365, longitude_step=72, - sub_chunks={"longitude": 36, "latitude": 66}), + chunks={"longitude": 36, "latitude": 66}), ) def test_multiple_datasets_with_subchunks_returns_multiple_datasets( self, time_step: int, longitude_step: int, - sub_chunks: Dict[str, int], + chunks: Dict[str, int], ): expected = [] @@ -194,12 +194,12 @@ def test_multiple_datasets_with_subchunks_returns_multiple_datasets( time=slice(t, t + time_step), longitude=slice(o, o + longitude_step) ), - sub_chunks) + chunks) ) with self.multifile_pattern(time_step, longitude_step) as pattern: actual = test_util.EagerPipeline() | FilePatternToChunks( pattern, - sub_chunks=sub_chunks + chunks=chunks ) self.assertAllCloseChunks(actual, expected) From 209632d8475603ff6a8c10ccd3c8f4572e225ff7 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 22 Sep 2021 10:01:39 -0700 Subject: [PATCH 16/19] _open_dataset() has error handling for open_local call. Co-authored-by: Stephan Hoyer --- xarray_beam/_src/pangeo_forge.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index 7fc619a..608d20f 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -99,22 +99,16 @@ def __init__( @contextlib.contextmanager def _open_dataset(self, path: str) -> xarray.Dataset: """Open as an XArray Dataset, sometimes with local caching.""" - fs_file = None with FileSystems().open(path) as file: try: - dataset = xarray.open_dataset(file, **self.xarray_open_kwargs) + yield xarray.open_dataset(file, **self.xarray_open_kwargs) except (TypeError, OSError): # The cfgrib engine (and others) may fail with the FileSystems method of # opening with BufferedReaders. Here, we open the data locally to make # it easier to work with XArray. - fs_file = fsspec.open_local(f"simplecache::{path}", - simplecache={'cache_storage': '/tmp/files'}) - dataset = xarray.open_dataset(fs_file, **self.xarray_open_kwargs) - - yield dataset - - if fs_file: - fs_file.close() + with fsspec.open_local(f"simplecache::{path}", + simplecache={'cache_storage': '/tmp/files'}) as fs_file: + yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs) def _open_chunks( self, From c3a668aa305e803692e27c585eb44772be825081 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 22 Sep 2021 10:11:16 -0700 Subject: [PATCH 17/19] Added 'local_copy' option. --- xarray_beam/_src/pangeo_forge.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index 48eb2df..8fd4662 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -76,6 +76,7 @@ def __init__( self, pattern: 'FilePattern', chunks: Optional[Mapping[str, int]] = None, + local_copy: bool = True, xarray_open_kwargs: Optional[Dict] = None ): """Initialize FilePatternToChunks. @@ -86,10 +87,12 @@ def __init__( pattern: a `FilePattern` describing a dataset. chunks: split each open dataset into smaller chunks. If not set, the transform will return one file per chunk. + local_copy: allow creating local copies of data found in the file pattern. xarray_open_kwargs: keyword arguments to pass to `xarray.open_dataset()`. """ self.pattern = pattern self.chunks = chunks + self.local_copy = local_copy self.xarray_open_kwargs = xarray_open_kwargs or {} self._max_size_idx = {} @@ -102,12 +105,18 @@ def _open_dataset(self, path: str) -> xarray.Dataset: with FileSystems().open(path) as file: try: yield xarray.open_dataset(file, **self.xarray_open_kwargs) - except (TypeError, OSError): + except (TypeError, OSError) as e: + + if not self.local_copy: + raise ValueError(f'cannot open {path!r} with buffering.') from e + # The cfgrib engine (and others) may fail with the FileSystems method of # opening with BufferedReaders. Here, we open the data locally to make # it easier to work with XArray. - with fsspec.open_local(f"simplecache::{path}", - simplecache={'cache_storage': '/tmp/files'}) as fs_file: + with fsspec.open_local( + f"simplecache::{path}", + simplecache={'cache_storage': '/tmp/files'} + ) as fs_file: yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs) def _open_chunks( From d7f284b5e67f05ec678cad62c355b7ba4a620339 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 22 Sep 2021 10:40:25 -0700 Subject: [PATCH 18/19] Imperative 'local_copy' flag instead of a fallback. --- xarray_beam/_src/pangeo_forge.py | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index 8fd4662..34c8d86 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -76,7 +76,7 @@ def __init__( self, pattern: 'FilePattern', chunks: Optional[Mapping[str, int]] = None, - local_copy: bool = True, + local_copy: bool = False, xarray_open_kwargs: Optional[Dict] = None ): """Initialize FilePatternToChunks. @@ -87,7 +87,8 @@ def __init__( pattern: a `FilePattern` describing a dataset. chunks: split each open dataset into smaller chunks. If not set, the transform will return one file per chunk. - local_copy: allow creating local copies of data found in the file pattern. + local_copy: Open files from the pattern with local copies instead of a + buffered reader. xarray_open_kwargs: keyword arguments to pass to `xarray.open_dataset()`. """ self.pattern = pattern @@ -102,22 +103,15 @@ def __init__( @contextlib.contextmanager def _open_dataset(self, path: str) -> xarray.Dataset: """Open as an XArray Dataset, sometimes with local caching.""" - with FileSystems().open(path) as file: - try: - yield xarray.open_dataset(file, **self.xarray_open_kwargs) - except (TypeError, OSError) as e: - - if not self.local_copy: - raise ValueError(f'cannot open {path!r} with buffering.') from e - - # The cfgrib engine (and others) may fail with the FileSystems method of - # opening with BufferedReaders. Here, we open the data locally to make - # it easier to work with XArray. - with fsspec.open_local( - f"simplecache::{path}", - simplecache={'cache_storage': '/tmp/files'} - ) as fs_file: - yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs) + if self.local_copy: + with fsspec.open_local( + f"simplecache::{path}", + simplecache={'cache_storage': '/tmp/files'} + ) as fs_file: + yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs) + else: + with FileSystems().open(path) as file: + yield xarray.open_dataset(file, **self.xarray_open_kwargs) def _open_chunks( self, From 0ee8980bacffbab030e5c790acc3601b4da6a931 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 22 Sep 2021 11:07:11 -0700 Subject: [PATCH 19/19] `open_local` downloads file to a temporary directory. The old contextmanager approach wasn't applicable, since `open_local` returns a string (path to the open file). --- xarray_beam/_src/pangeo_forge.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/xarray_beam/_src/pangeo_forge.py b/xarray_beam/_src/pangeo_forge.py index 34c8d86..d065a4a 100644 --- a/xarray_beam/_src/pangeo_forge.py +++ b/xarray_beam/_src/pangeo_forge.py @@ -13,6 +13,7 @@ # limitations under the License. """IO with Pangeo-Forge.""" import contextlib +import tempfile from typing import ( Dict, Iterator, @@ -104,11 +105,12 @@ def __init__( def _open_dataset(self, path: str) -> xarray.Dataset: """Open as an XArray Dataset, sometimes with local caching.""" if self.local_copy: - with fsspec.open_local( + with tempfile.TemporaryDirectory() as tmpdir: + local_file = fsspec.open_local( f"simplecache::{path}", - simplecache={'cache_storage': '/tmp/files'} - ) as fs_file: - yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs) + simplecache={'cache_storage': tmpdir} + ) + yield xarray.open_dataset(local_file, **self.xarray_open_kwargs) else: with FileSystems().open(path) as file: yield xarray.open_dataset(file, **self.xarray_open_kwargs)