diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml new file mode 100644 index 00000000..0e805b04 --- /dev/null +++ b/.github/workflows/test_s3_minio.yml @@ -0,0 +1,68 @@ +# adapted GA workflow from https://github.com/stackhpc/s3-active-storage-rs +--- +name: S3/Minio Exploratory Test + +on: + push: + branches: + - main + - real_world_s3_tests + schedule: + - cron: '0 0 * * *' # nightly + +jobs: + linux-test: + runs-on: "ubuntu-latest" + strategy: + matrix: + python-version: ["3.9", "3.10", "3.11"] + fail-fast: false + name: Linux Python ${{ matrix.python-version }} + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - uses: conda-incubator/setup-miniconda@v2 + with: + python-version: ${{ matrix.python-version }} + miniforge-version: "latest" + miniforge-variant: Mambaforge + use-mamba: true + - shell: bash -l {0} + run: conda --version + - shell: bash -l {0} + run: python -V + - name: Export proxy + run: | + echo 'PROXY_URL = "http://localhost:8080"' >> config.py + - name: Start minio object storage + run: tests/s3_exploratory/minio_scripts/minio-start + - name: Wait for minio object storage to start + run: | + until curl -if http://localhost:9001; do + sleep 1; + done + - name: Run S3ActiveStorage container + run: docker run -it --detach --rm --net=host --name s3-active-storage ghcr.io/stackhpc/s3-active-storage-rs:latest + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - uses: conda-incubator/setup-miniconda@v2 + with: + activate-environment: activestorage-minio + environment-file: environment.yml + python-version: ${{ matrix.python-version }} + miniforge-version: "latest" + miniforge-variant: Mambaforge + use-mamba: true + - name: Install PyActiveStorage and run tests + shell: bash -l {0} + run: | + conda --version + python -V + which python + pip install -e . + pytest tests/s3_exploratory/test_s3_reduction.py + - name: Stop minio object storage + run: tests/s3_exploratory/minio_scripts/minio-stop + if: always() diff --git a/activestorage/active.py b/activestorage/active.py index 102d5f88..f612821b 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -2,6 +2,9 @@ import numpy as np import pathlib +import h5netcdf +import s3fs + #FIXME: Consider using h5py throughout, for more generality from netCDF4 import Dataset from zarr.indexing import ( @@ -13,6 +16,31 @@ from activestorage import netcdf_to_zarr as nz +def load_from_s3(uri): + """ + Load a netCDF4-like object from S3. + + First, set up an S3 filesystem with s3fs.S3FileSystem. + Then open the uri with this FS -> s3file + s3file is a File-like object: a memory view but wih all the metadata + gubbins inside it (no data!) + calling >> ds = netCDF4.Dataset(s3file) << + will throw a FileNotFoundError because the netCDF4 library is always looking for + a local file, resulting in [Errno 2] No such file or directory: + '' + instead, we use h5netcdf: https://github.com/h5netcdf/h5netcdf + a Python binder straight to HDF5-netCDF4 interface, that doesn't need a "local" file + """ + fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio + secret=S3_SECRET_KEY, # eg "minioadmin" for Minio + client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio + with fs.open(uri, 'rb') as s3file: + ds = h5netcdf.File(s3file, 'r', invalid_netcdf=True) + print(f"Dataset loaded from S3 via h5netcdf: {ds}") + + return ds + + class Active: """ Instantiates an interface to active storage which contains either zarr files @@ -37,7 +65,7 @@ def __new__(cls, *args, **kwargs): } return instance - def __init__(self, uri, ncvar, missing_value=None, _FillValue=None, valid_min=None, valid_max=None): + def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue=None, valid_min=None, valid_max=None): """ Instantiate with a NetCDF4 dataset and the variable of interest within that file. (We need the variable, because we need variable specific metadata from within that @@ -48,7 +76,10 @@ def __init__(self, uri, ncvar, missing_value=None, _FillValue=None, valid_min=No self.uri = uri if self.uri is None: raise ValueError(f"Must use a valid file for uri. Got {self.uri}") - if not os.path.isfile(self.uri): + self.storage_type = storage_type + if self.storage_type == "s3": + USE_S3 = True + if not os.path.isfile(self.uri) and not self.storage_type: raise ValueError(f"Must use existing file for uri. {self.uri} not found") self.ncvar = ncvar if self.ncvar is None: @@ -65,14 +96,21 @@ def __init__(self, uri, ncvar, missing_value=None, _FillValue=None, valid_min=No # If the user actually wrote the data with no fill value, or the # default fill value is in play, then this might go wrong. if (missing_value, _FillValue, valid_min, valid_max) == (None, None, None, None): - ds = Dataset(uri) + if storage_type is None: + ds = Dataset(uri) + elif storage_type == "s3": + ds = load_from_s3(uri) try: ds_var = ds[ncvar] except IndexError as exc: print(f"Dataset {ds} does not contain ncvar {ncvar!r}.") raise exc - self._filters = ds_var.filters() + try: + self._filters = ds_var.filters() + # ds from h5netcdf may not have _filters and other such metadata + except AttributeError: + self._filters = None self._missing = getattr(ds_var, 'missing_value', None) self._fillvalue = getattr(ds_var, '_FillValue', None) valid_min = getattr(ds_var, 'valid_min', None) @@ -229,7 +267,11 @@ def _via_kerchunk(self, index): """ # FIXME: Order of calls is hardcoded' if self.zds is None: - ds = nz.load_netcdf_zarr_generic(self.uri, self.ncvar) + print(f"Kerchunking file {self.uri} with variable " + f"{self.ncvar} for storage type {self.storage_type}") + ds = nz.load_netcdf_zarr_generic(self.uri, + self.ncvar, + self.storage_type) # The following is a hangove from exploration # and is needed if using the original doing it ourselves # self.zds = make_an_array_instance_active(ds) @@ -336,6 +378,10 @@ def _process_chunk(self, fsref, chunk_coords, chunk_selection, out, counts, key = f"{self.ncvar}/{coord}" rfile, offset, size = tuple(fsref[key]) + if self.storage_type == "s3": + USE_S3 = True + else: + USE_S3 = False if USE_S3: object = os.path.basename(rfile) tmp, count = s3_reduce_chunk(S3_ACTIVE_STORAGE_URL, S3_ACCESS_KEY, diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 44f78c1c..c0605419 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -3,7 +3,9 @@ import zarr import ujson import fsspec +import s3fs +from activestorage.config import * from kerchunk.hdf import SingleHdf5ToZarr @@ -11,6 +13,8 @@ def gen_json(file_url, fs, fs2, varname, **so): """Generate a json file that contains the kerchunk-ed data for Zarr.""" # set some name for the output json file fname = os.path.splitext(file_url)[0] + if "s3:" in fname: + fname = os.path.basename(fname) outf = f'{fname}_{varname}.json' # vanilla file name # write it out if it's not there @@ -61,13 +65,25 @@ def open_zarr_group(out_json, varname): return zarr_array -def load_netcdf_zarr_generic(fileloc, varname, build_dummy=True): +def load_netcdf_zarr_generic(fileloc, varname, storage_type, build_dummy=True): """Pass a netCDF4 file to be shaped as Zarr file by kerchunk.""" - so = dict(mode='rb', anon=True, default_fill_cache=False, - default_cache_type='first') # args to fs.open() - # default_fill_cache=False avoids caching data in between - # file chunks to lower memory usage - fs = fsspec.filesystem('') # local, for S3: ('s3', anon=True) + print(f"Storage type {storage_type}") + object_filesystems = ["s3"] + + # "local"/POSIX files; use a local FS with fsspec + if storage_type not in object_filesystems: + so = dict(mode='rb', anon=True, default_fill_cache=False, + default_cache_type='first') # args to fs.open() + # default_fill_cache=False avoids caching data in between + # file chunks to lower memory usage + fs = fsspec.filesystem('') + # open file in memory view mode straight from the S3 object storage + elif storage_type == "s3": + fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio + secret=S3_SECRET_KEY, # eg "minioadmin" for Minio + client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio + so = {} + fs2 = fsspec.filesystem('') # local file system to save final json to out_json = gen_json(fileloc, fs, fs2, varname) @@ -76,5 +92,3 @@ def load_netcdf_zarr_generic(fileloc, varname, build_dummy=True): ref_ds = open_zarr_group(out_json, varname) return ref_ds - - diff --git a/environment.yml b/environment.yml index 20d90712..debd6755 100644 --- a/environment.yml +++ b/environment.yml @@ -8,6 +8,7 @@ dependencies: - python >=3.9 - dask - fsspec + - h5netcdf - h5py # needed by Kerchunk - kerchunk - netcdf4 diff --git a/setup.cfg b/setup.cfg index 8bbd9484..728ce2d3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,6 +2,7 @@ addopts = # --doctest-modules --ignore=old_code/ + --ignore=tests/s3_exploratory --cov=activestorage --cov-report=xml:test-reports/coverage.xml --cov-report=html:test-reports/coverage_html diff --git a/setup.py b/setup.py index 44c12b50..bbc48d44 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ 'install': [ 'dask', 'fsspec', + 'h5netcdf', 'h5py', # needed by Kerchunk 'kerchunk', 'netcdf4', diff --git a/tests/s3_exploratory/config_minio.py b/tests/s3_exploratory/config_minio.py new file mode 100644 index 00000000..9a004aba --- /dev/null +++ b/tests/s3_exploratory/config_minio.py @@ -0,0 +1,19 @@ +# This file contains configuration for PyActiveStorage. + +# Force True for S3 exploratory tests +USE_S3 = True + +# URL of S3 Active Storage server. +S3_ACTIVE_STORAGE_URL = "http://localhost:8080" + +# URL of S3 object store. +S3_URL = "http://localhost:9000" + +# S3 access key / username. +S3_ACCESS_KEY = "minioadmin" + +# S3 secret key / password. +S3_SECRET_KEY = "minioadmin" + +# S3 bucket. +S3_BUCKET = "pyactivestorage" diff --git a/tests/s3_exploratory/minio_scripts/minio-start b/tests/s3_exploratory/minio_scripts/minio-start new file mode 100755 index 00000000..16aecaa4 --- /dev/null +++ b/tests/s3_exploratory/minio_scripts/minio-start @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# Use anon storage volume so that test data is removed when container is stopped +exec docker run --detach --rm -p 9000:9000 -p 9001:9001 -v :/data --name minio minio/minio server data --console-address ":9001" diff --git a/tests/s3_exploratory/minio_scripts/minio-stop b/tests/s3_exploratory/minio_scripts/minio-stop new file mode 100755 index 00000000..663b8b4e --- /dev/null +++ b/tests/s3_exploratory/minio_scripts/minio-stop @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +exec docker stop minio diff --git a/tests/s3_exploratory/test_data/cesm2_native.nc b/tests/s3_exploratory/test_data/cesm2_native.nc new file mode 100644 index 00000000..61a4ecf3 Binary files /dev/null and b/tests/s3_exploratory/test_data/cesm2_native.nc differ diff --git a/tests/s3_exploratory/test_s3_reduction.py b/tests/s3_exploratory/test_s3_reduction.py new file mode 100644 index 00000000..c7ecaa04 --- /dev/null +++ b/tests/s3_exploratory/test_s3_reduction.py @@ -0,0 +1,157 @@ +import os +import numpy as np +import pytest +import s3fs +import tempfile + +from activestorage.active import Active +from activestorage.dummy_data import make_vanilla_ncdata +import activestorage.storage as st +from activestorage.s3 import reduce_chunk as s3_reduce_chunk +from numpy.testing import assert_array_equal +from pathlib import Path + +from config_minio import * + + +def make_tempfile(): + """Make dummy data.""" + temp_folder = tempfile.mkdtemp() + s3_testfile = os.path.join(temp_folder, + 's3_test_bizarre.nc') # Bryan likes this name + print(f"S3 Test file is {s3_testfile}") + if not os.path.exists(s3_testfile): + make_vanilla_ncdata(filename=s3_testfile) + + local_testfile = os.path.join(temp_folder, + 'local_test_bizarre.nc') # Bryan again + print(f"Local Test file is {local_testfile}") + if not os.path.exists(local_testfile): + make_vanilla_ncdata(filename=local_testfile) + + return s3_testfile, local_testfile + + +def upload_to_s3(server, username, password, bucket, object, rfile): + """Upload a file to an S3 object store.""" + s3_fs = s3fs.S3FileSystem(key=username, secret=password, client_kwargs={'endpoint_url': server}) + # Make sure s3 bucket exists + try: + s3_fs.mkdir(bucket) + except FileExistsError: + pass + + s3_fs.put_file(rfile, os.path.join(bucket, object)) + + return os.path.join(bucket, object) + + +def test_Active(): + """ + Shows what we expect an active example test to achieve and provides "the right answer" + Done twice: POSIX active and S3 active; we compare results. + + identical to tests/test_harness.py::testActive() + + """ + # make dummy data + s3_testfile, local_testfile = make_tempfile() + + # put s3 dummy data onto S3. then rm from local + object = os.path.basename(s3_testfile) + bucket_file = upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, + S3_BUCKET, object, s3_testfile) + os.remove(s3_testfile) + s3_testfile_uri = os.path.join("s3://", bucket_file) + print("S3 file uri", s3_testfile_uri) + + # run Active on s3 file + active = Active(s3_testfile_uri, "data", "s3") + active.method = "mean" + result1 = active[0:2, 4:6, 7:9] + print(result1) + + # run Active on local file + active = Active(local_testfile, "data") + active._version = 2 + active.method = "mean" + active.components = True + result2 = active[0:2, 4:6, 7:9] + print(result2) + + assert_array_equal(result1, result2["sum"]/result2["n"]) + + +@pytest.fixture +def test_data_path(): + """Path to test data for CMOR fixes.""" + return Path(__file__).resolve().parent / 'test_data' + + +def test_with_valid_netCDF_file(test_data_path): + """ + Test as above but with an actual netCDF4 file. + Also, this has _FillValue and missing_value + + identical to tests/test_bigger_data.py::test_cesm2_native + + """ + ncfile = str(test_data_path / "cesm2_native.nc") + + # run POSIX (local) Active + active = Active(ncfile, "TREFHT") + active._version = 2 + active.method = "mean" + active.components = True + result2 = active[4:5, 1:2] + print(result2) + + # put data onto S3. then rm from local + object = os.path.basename(ncfile) + bucket_file = upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, + S3_BUCKET, object, ncfile) + os.remove(ncfile) + s3_testfile_uri = os.path.join("s3://", bucket_file) + print("S3 file uri", s3_testfile_uri) + + # run Active on s3 file + active = Active(s3_testfile_uri, "TREFHT", "s3") + active._version = 2 + active.method = "mean" + active.components = True + result1 = active[4:5, 1:2] + print(result1) + + # expect {'sum': array([[[2368.3232]]], dtype=float32), 'n': array([[[8]]])} + # check for typing and structure + np.testing.assert_array_equal(result1["sum"], np.array([[[2368.3232]]], dtype="float32")) + np.testing.assert_array_equal(result1["n"], np.array([[[8]]])) + + assert_array_equal(result1, result2) + + +def test_s3_reduce_chunk(): + """Unit test for s3_reduce_chunk.""" + rfile = "tests/test_data/cesm2_native.nc" + offset = 2 + size = 128 + object = os.path.basename(rfile) + + # create bucket and upload to Minio's S3 bucket + upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, + S3_BUCKET, object, rfile) + + # remove file during test session to be sure + # workflow uses uploaded file to S3 bucket + os.remove(rfile) + + # call s3_reduce_chunk + tmp, count = s3_reduce_chunk(S3_ACTIVE_STORAGE_URL, S3_ACCESS_KEY, + S3_SECRET_KEY, S3_URL, S3_BUCKET, + object, offset, size, + None, None, [], + np.dtype("int32"), (32, ), + "C", [slice(0, 2, 1), ], + "min") + assert tmp == 134351386 + assert count == None diff --git a/tests/test_missing.py b/tests/test_missing.py index 4c236698..96bad655 100644 --- a/tests/test_missing.py +++ b/tests/test_missing.py @@ -1,6 +1,7 @@ import os from this import d import numpy as np +import pytest import shutil import tempfile import unittest @@ -9,65 +10,49 @@ from activestorage import dummy_data as dd -class TestActive(unittest.TestCase): +def _doit(testfile): """ - Test basic functionality + Compare and contrast vanilla mean with actual means """ - - def setUp(self): - """ - Ensure there is test data - """ - self.temp_folder = tempfile.mkdtemp() - - def tearDown(self): - """Remove temp folder.""" - shutil.rmtree(self.temp_folder) - - - def _doit(self, testfile): - """ - Compare and contrast vanilla mean with actual means - """ - active = Active(testfile, "data") - active._version = 0 - d = active[0:2, 4:6, 7:9] - mean_result = np.mean(d) - - active = Active(testfile, "data") - active._version = 2 - active.method = "mean" - active.components = True - result2 = active[0:2, 4:6, 7:9] - self.assertEqual(mean_result, result2["sum"]/result2["n"]) - - - def test_partially_missing_data(self): - testfile = os.path.join(self.temp_folder, 'test_partially_missing_data.nc') - r = dd.make_partially_missing_ncdata(testfile) - self._doit(testfile) - - def test_missing(self): - testfile = os.path.join(self.temp_folder, 'test_missing.nc') - r = dd.make_partially_missing_ncdata(testfile) - self._doit(testfile) - - def test_fillvalue(self): - testfile = os.path.join(self.temp_folder, 'test_fillvalue.nc') - r = dd.make_fillvalue_ncdata(testfile) - self._doit(testfile) - - def test_validmin(self): - testfile = os.path.join(self.temp_folder, 'test_validmin.nc') - r = dd.make_validmin_ncdata(testfile) - self._doit(testfile) - - def test_validmax(self): - testfile = os.path.join(self.temp_folder, 'test_validmax.nc') - r = dd.make_validmax_ncdata(testfile) - self._doit(testfile) - - def test_validrange(self): - testfile = os.path.join(self.temp_folder, 'test_validrange.nc') - r = dd.make_validrange_ncdata(testfile) - self._doit(testfile) + active = Active(testfile, "data") + active._version = 0 + d = active[0:2, 4:6, 7:9] + mean_result = np.mean(d) + + active = Active(testfile, "data") + active._version = 2 + active.method = "mean" + active.components = True + result2 = active[0:2, 4:6, 7:9] + np.testing.assert_array_equal(mean_result, result2["sum"]/result2["n"]) + + +def test_partially_missing_data(tmp_path): + testfile = str(tmp_path / 'test_partially_missing_data.nc') + r = dd.make_partially_missing_ncdata(testfile) + _doit(testfile) + +def test_missing(tmp_path): + testfile = str(tmp_path / 'test_missing.nc') + r = dd.make_partially_missing_ncdata(testfile) + _doit(testfile) + +def test_fillvalue(tmp_path): + testfile = str(tmp_path / 'test_fillvalue.nc') + r = dd.make_fillvalue_ncdata(testfile) + _doit(testfile) + +def test_validmin(tmp_path): + testfile = str(tmp_path / 'test_validmin.nc') + r = dd.make_validmin_ncdata(testfile) + _doit(testfile) + +def test_validmax(tmp_path): + testfile = str(tmp_path / 'test_validmax.nc') + r = dd.make_validmax_ncdata(testfile) + _doit(testfile) + +def test_validrange(tmp_path): + testfile = str(tmp_path / 'test_validrange.nc') + r = dd.make_validrange_ncdata(testfile) + _doit(testfile) diff --git a/tests/unit/test_s3.py b/tests/unit/test_s3.py index 3537dc85..81f7d23a 100644 --- a/tests/unit/test_s3.py +++ b/tests/unit/test_s3.py @@ -51,7 +51,7 @@ def test_s3_reduce_chunk(mock_request): assert tmp == result # count is None; no missing data yet in S3 - assert count == None + assert count is None expected_url = f"{active_url}/v1/{operation}/" expected_data = { diff --git a/tests/unit/test_storage_types.py b/tests/unit/test_storage_types.py new file mode 100644 index 00000000..d265d059 --- /dev/null +++ b/tests/unit/test_storage_types.py @@ -0,0 +1,15 @@ +import botocore +import os +import numpy as np +import pytest + +from activestorage.active import Active + + +def test_s3_active(): + """Test stack when call to Active contains storage_type == s3.""" + active_url = "https://s3.example.com" + s3_testfile = "s3_test_bizarre.nc" + + with pytest.raises(botocore.exceptions.ParamValidationError): + Active(os.path.join(active_url, s3_testfile), "data", "s3")