Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
9f7783d
ignore s3 exploratory tests in normal runs
valeriupredoi Jun 9, 2023
8069cef
add s3 test run GA workflow
valeriupredoi Jun 9, 2023
d854997
change GA test feature branch
valeriupredoi Jun 9, 2023
ea2cf98
add s3 exploratory test repo
valeriupredoi Jun 9, 2023
e84f21b
correct path to minio scripts
valeriupredoi Jun 9, 2023
fc5766a
run s3 AS container
valeriupredoi Jun 9, 2023
37ca643
run Active component too
valeriupredoi Jun 9, 2023
cccc8d2
temp deactivate check on physical file in active class
valeriupredoi Jun 9, 2023
d0b9805
shove in use s3
valeriupredoi Jun 9, 2023
1c10fb8
run with s3 file
valeriupredoi Jun 9, 2023
405b5eb
try load netCDF from s3
valeriupredoi Jun 9, 2023
60e1cb3
pass correct s3 uri
valeriupredoi Jun 9, 2023
bf280ab
of course I forgot import
valeriupredoi Jun 9, 2023
68a7777
try without s3 prefit for 3 file path
valeriupredoi Jun 9, 2023
0fa23f2
possible correct s3 path
valeriupredoi Jun 9, 2023
66dce92
possible correct s3 path
valeriupredoi Jun 9, 2023
de1939d
try with context manager
valeriupredoi Jun 9, 2023
6c78fce
use s3fs directly
valeriupredoi Jun 9, 2023
fee58b3
add client server arg
valeriupredoi Jun 9, 2023
a1d3ac0
try without localhost
valeriupredoi Jun 9, 2023
f076c6c
add xarray for test
valeriupredoi Jun 9, 2023
1efe1e7
add xarray for test
valeriupredoi Jun 9, 2023
6981740
use xarray for test
valeriupredoi Jun 9, 2023
482ffb5
wrong engine
valeriupredoi Jun 9, 2023
f17d08f
try mfdataset
valeriupredoi Jun 9, 2023
ab0674b
forget about xarray
valeriupredoi Jun 9, 2023
77bfef9
forget about xarray
valeriupredoi Jun 9, 2023
5b4f64e
hat does that s3 file like look like
valeriupredoi Jun 9, 2023
6003b74
how does that s3 file look like
valeriupredoi Jun 9, 2023
71051e4
asdd h5netcdf to env
valeriupredoi Jun 12, 2023
624c849
asdd h5netcdf to env
valeriupredoi Jun 12, 2023
30c33d3
try h5netcdf layer
valeriupredoi Jun 12, 2023
514b1b6
dont call netcdf4
valeriupredoi Jun 12, 2023
9dcb03d
skeip filers xtraction
valeriupredoi Jun 12, 2023
1e0ee8d
fix test
valeriupredoi Jun 12, 2023
83e3e85
tweak call
valeriupredoi Jun 12, 2023
04ee231
switch to s3fs
valeriupredoi Jun 12, 2023
b8f2761
give it a slash free name
valeriupredoi Jun 12, 2023
4af7c4c
bleghh
valeriupredoi Jun 12, 2023
c7b4fef
pytested test
valeriupredoi Jun 12, 2023
f2a1478
refine a bit
valeriupredoi Jun 12, 2023
1a43bf1
corrected check for file
valeriupredoi Jun 12, 2023
20a806e
posix to str paths
valeriupredoi Jun 12, 2023
4b5c2d8
made an s3 loader function
valeriupredoi Jun 12, 2023
770a9d0
blegh colon dude
valeriupredoi Jun 12, 2023
621f85d
return something too
valeriupredoi Jun 12, 2023
945b32f
load s3 connection params from config
valeriupredoi Jun 12, 2023
0d3492f
add test data file
valeriupredoi Jun 14, 2023
ea91080
add netCDF real file test
valeriupredoi Jun 14, 2023
d1fa7fb
missing import
valeriupredoi Jun 14, 2023
5272d7d
pass correct var
valeriupredoi Jun 14, 2023
4c5b311
fail to visualize results
valeriupredoi Jun 14, 2023
5ae2666
fixed test
valeriupredoi Jun 14, 2023
b0dc572
correct version of test
valeriupredoi Jun 14, 2023
4ef10cb
add to local s3 unit test
valeriupredoi Jun 14, 2023
2fb70fd
add mention in test
valeriupredoi Jun 14, 2023
273f034
correct file passed eh
valeriupredoi Jun 14, 2023
8bf1454
simple raiser test
valeriupredoi Jun 14, 2023
04f29a4
remove unnecessary test
valeriupredoi Jun 15, 2023
5c7f86b
start a unit test for storage types
valeriupredoi Jun 15, 2023
f401deb
Merge branch 'main' into real_world_s3_tests
valeriupredoi Jun 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .github/workflows/test_s3_minio.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# adapted GA workflow from https://github.com/stackhpc/s3-active-storage-rs
---
name: S3/Minio Exploratory Test

on:
push:
branches:
- main
- real_world_s3_tests
schedule:
- cron: '0 0 * * *' # nightly

jobs:
linux-test:
runs-on: "ubuntu-latest"
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11"]
fail-fast: false
name: Linux Python ${{ matrix.python-version }}
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- uses: conda-incubator/setup-miniconda@v2
with:
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
miniforge-variant: Mambaforge
use-mamba: true
- shell: bash -l {0}
run: conda --version
- shell: bash -l {0}
run: python -V
- name: Export proxy
run: |
echo 'PROXY_URL = "http://localhost:8080"' >> config.py
- name: Start minio object storage
run: tests/s3_exploratory/minio_scripts/minio-start
- name: Wait for minio object storage to start
run: |
until curl -if http://localhost:9001; do
sleep 1;
done
- name: Run S3ActiveStorage container
run: docker run -it --detach --rm --net=host --name s3-active-storage ghcr.io/stackhpc/s3-active-storage-rs:latest
- uses: actions/checkout@v3
with:
fetch-depth: 0
- uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: activestorage-minio
environment-file: environment.yml
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
miniforge-variant: Mambaforge
use-mamba: true
- name: Install PyActiveStorage and run tests
shell: bash -l {0}
run: |
conda --version
python -V
which python
pip install -e .
pytest tests/s3_exploratory/test_s3_reduction.py
- name: Stop minio object storage
run: tests/s3_exploratory/minio_scripts/minio-stop
if: always()
56 changes: 51 additions & 5 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import numpy as np
import pathlib

import h5netcdf
import s3fs

#FIXME: Consider using h5py throughout, for more generality
from netCDF4 import Dataset
from zarr.indexing import (
Expand All @@ -13,6 +16,31 @@
from activestorage import netcdf_to_zarr as nz


def load_from_s3(uri):
"""
Load a netCDF4-like object from S3.

First, set up an S3 filesystem with s3fs.S3FileSystem.
Then open the uri with this FS -> s3file
s3file is a File-like object: a memory view but wih all the metadata
gubbins inside it (no data!)
calling >> ds = netCDF4.Dataset(s3file) <<
will throw a FileNotFoundError because the netCDF4 library is always looking for
a local file, resulting in [Errno 2] No such file or directory:
'<File-like object S3FileSystem, pyactivestorage/s3_test_bizarre.nc>'
instead, we use h5netcdf: https://github.com/h5netcdf/h5netcdf
a Python binder straight to HDF5-netCDF4 interface, that doesn't need a "local" file
"""
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio
secret=S3_SECRET_KEY, # eg "minioadmin" for Minio
client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio
with fs.open(uri, 'rb') as s3file:
ds = h5netcdf.File(s3file, 'r', invalid_netcdf=True)
print(f"Dataset loaded from S3 via h5netcdf: {ds}")

return ds


class Active:
"""
Instantiates an interface to active storage which contains either zarr files
Expand All @@ -37,7 +65,7 @@ def __new__(cls, *args, **kwargs):
}
return instance

def __init__(self, uri, ncvar, missing_value=None, _FillValue=None, valid_min=None, valid_max=None):
def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue=None, valid_min=None, valid_max=None):
"""
Instantiate with a NetCDF4 dataset and the variable of interest within that file.
(We need the variable, because we need variable specific metadata from within that
Expand All @@ -48,7 +76,10 @@ def __init__(self, uri, ncvar, missing_value=None, _FillValue=None, valid_min=No
self.uri = uri
if self.uri is None:
raise ValueError(f"Must use a valid file for uri. Got {self.uri}")
if not os.path.isfile(self.uri):
self.storage_type = storage_type
if self.storage_type == "s3":
USE_S3 = True
if not os.path.isfile(self.uri) and not self.storage_type:
raise ValueError(f"Must use existing file for uri. {self.uri} not found")
self.ncvar = ncvar
if self.ncvar is None:
Expand All @@ -65,14 +96,21 @@ def __init__(self, uri, ncvar, missing_value=None, _FillValue=None, valid_min=No
# If the user actually wrote the data with no fill value, or the
# default fill value is in play, then this might go wrong.
if (missing_value, _FillValue, valid_min, valid_max) == (None, None, None, None):
ds = Dataset(uri)
if storage_type is None:
ds = Dataset(uri)
elif storage_type == "s3":
ds = load_from_s3(uri)
try:
ds_var = ds[ncvar]
except IndexError as exc:
print(f"Dataset {ds} does not contain ncvar {ncvar!r}.")
raise exc

self._filters = ds_var.filters()
try:
self._filters = ds_var.filters()
# ds from h5netcdf may not have _filters and other such metadata
except AttributeError:
self._filters = None
self._missing = getattr(ds_var, 'missing_value', None)
self._fillvalue = getattr(ds_var, '_FillValue', None)
valid_min = getattr(ds_var, 'valid_min', None)
Expand Down Expand Up @@ -229,7 +267,11 @@ def _via_kerchunk(self, index):
"""
# FIXME: Order of calls is hardcoded'
if self.zds is None:
ds = nz.load_netcdf_zarr_generic(self.uri, self.ncvar)
print(f"Kerchunking file {self.uri} with variable "
f"{self.ncvar} for storage type {self.storage_type}")
ds = nz.load_netcdf_zarr_generic(self.uri,
self.ncvar,
self.storage_type)
# The following is a hangove from exploration
# and is needed if using the original doing it ourselves
# self.zds = make_an_array_instance_active(ds)
Expand Down Expand Up @@ -336,6 +378,10 @@ def _process_chunk(self, fsref, chunk_coords, chunk_selection, out, counts,
key = f"{self.ncvar}/{coord}"
rfile, offset, size = tuple(fsref[key])

if self.storage_type == "s3":
USE_S3 = True
else:
USE_S3 = False
if USE_S3:
object = os.path.basename(rfile)
tmp, count = s3_reduce_chunk(S3_ACTIVE_STORAGE_URL, S3_ACCESS_KEY,
Expand Down
30 changes: 22 additions & 8 deletions activestorage/netcdf_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import zarr
import ujson
import fsspec
import s3fs

from activestorage.config import *
from kerchunk.hdf import SingleHdf5ToZarr


def gen_json(file_url, fs, fs2, varname, **so):
"""Generate a json file that contains the kerchunk-ed data for Zarr."""
# set some name for the output json file
fname = os.path.splitext(file_url)[0]
if "s3:" in fname:
fname = os.path.basename(fname)
outf = f'{fname}_{varname}.json' # vanilla file name

# write it out if it's not there
Expand Down Expand Up @@ -61,13 +65,25 @@ def open_zarr_group(out_json, varname):
return zarr_array


def load_netcdf_zarr_generic(fileloc, varname, build_dummy=True):
def load_netcdf_zarr_generic(fileloc, varname, storage_type, build_dummy=True):
"""Pass a netCDF4 file to be shaped as Zarr file by kerchunk."""
so = dict(mode='rb', anon=True, default_fill_cache=False,
default_cache_type='first') # args to fs.open()
# default_fill_cache=False avoids caching data in between
# file chunks to lower memory usage
fs = fsspec.filesystem('') # local, for S3: ('s3', anon=True)
print(f"Storage type {storage_type}")
object_filesystems = ["s3"]

# "local"/POSIX files; use a local FS with fsspec
if storage_type not in object_filesystems:
so = dict(mode='rb', anon=True, default_fill_cache=False,
default_cache_type='first') # args to fs.open()
# default_fill_cache=False avoids caching data in between
# file chunks to lower memory usage
fs = fsspec.filesystem('')
# open file in memory view mode straight from the S3 object storage
elif storage_type == "s3":
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio
secret=S3_SECRET_KEY, # eg "minioadmin" for Minio
client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio
so = {}

fs2 = fsspec.filesystem('') # local file system to save final json to
out_json = gen_json(fileloc, fs, fs2, varname)

Expand All @@ -76,5 +92,3 @@ def load_netcdf_zarr_generic(fileloc, varname, build_dummy=True):
ref_ds = open_zarr_group(out_json, varname)

return ref_ds


1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies:
- python >=3.9
- dask
- fsspec
- h5netcdf
- h5py # needed by Kerchunk
- kerchunk
- netcdf4
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
addopts =
# --doctest-modules
--ignore=old_code/
--ignore=tests/s3_exploratory
--cov=activestorage
--cov-report=xml:test-reports/coverage.xml
--cov-report=html:test-reports/coverage_html
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
'install': [
'dask',
'fsspec',
'h5netcdf',
'h5py', # needed by Kerchunk
'kerchunk',
'netcdf4',
Expand Down
19 changes: 19 additions & 0 deletions tests/s3_exploratory/config_minio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# This file contains configuration for PyActiveStorage.

# Force True for S3 exploratory tests
USE_S3 = True

# URL of S3 Active Storage server.
S3_ACTIVE_STORAGE_URL = "http://localhost:8080"

# URL of S3 object store.
S3_URL = "http://localhost:9000"

# S3 access key / username.
S3_ACCESS_KEY = "minioadmin"

# S3 secret key / password.
S3_SECRET_KEY = "minioadmin"

# S3 bucket.
S3_BUCKET = "pyactivestorage"
4 changes: 4 additions & 0 deletions tests/s3_exploratory/minio_scripts/minio-start
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env bash

# Use anon storage volume so that test data is removed when container is stopped
exec docker run --detach --rm -p 9000:9000 -p 9001:9001 -v :/data --name minio minio/minio server data --console-address ":9001"
3 changes: 3 additions & 0 deletions tests/s3_exploratory/minio_scripts/minio-stop
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

exec docker stop minio
Binary file added tests/s3_exploratory/test_data/cesm2_native.nc
Binary file not shown.
Loading