diff --git a/README.md b/README.md index b2e83598b4..d995a91362 100644 --- a/README.md +++ b/README.md @@ -365,7 +365,7 @@ It was initially started by the [Computational Radiation Physics Group](https:// The following people and institutions [contributed](https://github.com/openPMD/openPMD-api/graphs/contributors) to openPMD-api: * [Axel Huebl (HZDR, now LBNL)](https://github.com/ax3l): - project lead, releases, documentation, automated CI/CD, Python bindings, installation & packaging, prior reference implementations + project lead, releases, documentation, automated CI/CD, Python bindings, Dask, installation & packaging, prior reference implementations * [Fabian Koller (HZDR)](https://github.com/C0nsultant): initial library design and implementation with HDF5 & ADIOS1 backend * [Franz Poeschel (CASUS)](https://github.com/franzpoeschel): diff --git a/examples/11_particle_dataframe.py b/examples/11_particle_dataframe.py index 6af14ee8d5..ba0b86e9a8 100755 --- a/examples/11_particle_dataframe.py +++ b/examples/11_particle_dataframe.py @@ -57,3 +57,18 @@ # example3: save all data data to parquet files delayed_save = delayed(df.to_parquet("electrons.parquet")) delayed_save.compute() + +# Dask example 2: meshes +if found_dask: + E_x = s.iterations[400].meshes["E"]["x"] + E_y = s.iterations[400].meshes["E"]["y"] + E_z = s.iterations[400].meshes["E"]["z"] + darr_x = E_x.to_dask_array() + darr_y = E_y.to_dask_array() + darr_z = E_z.to_dask_array() + + # example compute intensity + Intensity = darr_x * darr_x + darr_y * darr_y + darr_z * darr_z + Intensity_max = Intensity.max().compute() + idx_max = da.argwhere(Intensity == Intensity_max).compute()[0] + print("maximum intensity I={} at index={}".format(Intensity_max, idx_max)) diff --git a/src/binding/python/openpmd_api/DaskArray.py b/src/binding/python/openpmd_api/DaskArray.py new file mode 100644 index 0000000000..1cb4260700 --- /dev/null +++ b/src/binding/python/openpmd_api/DaskArray.py @@ -0,0 +1,140 @@ +""" +This file is part of the openPMD-api. + +Copyright 2021 openPMD contributors +Authors: Axel Huebl +License: LGPLv3+ +""" +import math +import numpy as np +try: + from dask.array import from_array + found_dask = True +except ImportError: + found_dask = False + + +class DaskRecordComponent: + # shape, .ndim, .dtype and support numpy-style slicing + def __init__(self, record_component): + self.rc = record_component + + @property + def shape(self): + # fixme: https://github.com/openPMD/openPMD-api/issues/808 + return tuple(self.rc.shape) + + @property + def ndim(self): + return self.rc.ndim + + @property + def dtype(self): + return self.rc.dtype + + def __getitem__(self, slices): + """here we support what Record_Component implements: a tuple of slices, + a slice or an index; we do not support fancy indexing + """ + # FIXME: implement handling of zero-slices in Record_Component + # https://github.com/openPMD/openPMD-api/issues/957 + all_zero = True + for s in slices: + if s != np.s_[0:0]: + all_zero = False + if all_zero: + return np.array([], dtype=self.dtype) + + data = self.rc[slices] + self.rc.series_flush() + if not math.isclose(1.0, self.rc.unit_SI): + data = np.multiply(data, self.rc.unit_SI) + + return data + + +def record_component_to_daskarray(record_component): + """ + Load a RecordComponent into a Dask.array. + + Parameters + ---------- + record_component : openpmd_api.Record_Component + A record component class in openPMD-api. + + Returns + ------- + dask.array + A dask array. + + Raises + ------ + ImportError + Raises an exception if dask is not installed + + See Also + -------- + openpmd_api.BaseRecordComponent.available_chunks : available chunks that + are used internally to parallelize reading + dask.array : the (potentially distributed) array object created here + """ + if not found_dask: + raise ImportError("dask NOT found. Install dask for Dask DataFrame " + "support.") + + # get optimal chunks + chunks = record_component.available_chunks() + + # sort and prepare the chunks for Dask's array API + # https://docs.dask.org/en/latest/array-chunks.html + # https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions + # sorted and unique + offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks]))) + offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim] + + # print("offsets_sorted_unique_per_dim=", + # list(offsets_sorted_unique_per_dim)) + + # case 1: PIConGPU static load balancing (works with Dask assumptions, + # chunk option no. 3) + # all chunks in the same column have the same column width although + # individual columns have different widths + # case 2: AMReX boxes + # all chunks are multiple of a common block size, offsets are a multiple + # of a common blocksize + # problem: too limited description in Dask + # https://github.com/dask/dask/issues/7475 + # work-around: create smaller chunks (this incurs a read cost) by forcing + # into case 1 + # (this can lead to larger blocks than using the gcd of the + # extents aka AMReX block size) + common_chunk_widths_per_dim = list() + for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim): + # print("d=", d, offsets_in_dim, record_component.shape[d]) + offsets_in_dim_arr = np.array(offsets_in_dim) + # note: this is in the right order of rows/columns, contrary to a + # sorted extent list from chunks + extents_in_dim = np.zeros_like(offsets_in_dim_arr) + extents_in_dim[:-1] = offsets_in_dim_arr[1:] + extents_in_dim[-1] = record_component.shape[d] + if len(extents_in_dim) > 1: + extents_in_dim[:-1] -= offsets_in_dim_arr[:-1] + extents_in_dim[-1] -= offsets_in_dim_arr[-1] + # print("extents_in_dim=", extents_in_dim) + common_chunk_widths_per_dim.append(tuple(extents_in_dim)) + + common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim) + # print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim) + + da = from_array( + DaskRecordComponent(record_component), + chunks=common_chunk_widths_per_dim, + # name=None, + asarray=True, + fancy=False, + # getitem=None, + # meta=None, + # inline_array=False + ) + + return da diff --git a/src/binding/python/openpmd_api/__init__.py b/src/binding/python/openpmd_api/__init__.py index c7102c873d..af045c871c 100644 --- a/src/binding/python/openpmd_api/__init__.py +++ b/src/binding/python/openpmd_api/__init__.py @@ -2,6 +2,7 @@ from .openpmd_api_cxx import * # noqa from .DataFrame import particles_to_dataframe from .DaskDataFrame import particles_to_daskdataframe +from .DaskArray import record_component_to_daskarray __version__ = cxx.__version__ @@ -12,6 +13,7 @@ # extend CXX classes with extra methods ParticleSpecies.to_df = particles_to_dataframe # noqa ParticleSpecies.to_dask = particles_to_daskdataframe # noqa +Record_Component.to_dask_array = record_component_to_daskarray # noqa # TODO remove in future versions (deprecated) Access_Type = Access # noqa