Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ It was initially started by the [Computational Radiation Physics Group](https://
The following people and institutions [contributed](https://github.com/openPMD/openPMD-api/graphs/contributors) to openPMD-api:

* [Axel Huebl (HZDR, now LBNL)](https://github.com/ax3l):
project lead, releases, documentation, automated CI/CD, Python bindings, installation & packaging, prior reference implementations
project lead, releases, documentation, automated CI/CD, Python bindings, Dask, installation & packaging, prior reference implementations
* [Fabian Koller (HZDR)](https://github.com/C0nsultant):
initial library design and implementation with HDF5 & ADIOS1 backend
* [Franz Poeschel (CASUS)](https://github.com/franzpoeschel):
Expand Down
15 changes: 15 additions & 0 deletions examples/11_particle_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,18 @@
# example3: save all data data to parquet files
delayed_save = delayed(df.to_parquet("electrons.parquet"))
delayed_save.compute()

# Dask example 2: meshes
if found_dask:
E_x = s.iterations[400].meshes["E"]["x"]
E_y = s.iterations[400].meshes["E"]["y"]
E_z = s.iterations[400].meshes["E"]["z"]
darr_x = E_x.to_dask_array()
darr_y = E_y.to_dask_array()
darr_z = E_z.to_dask_array()

# example compute intensity
Intensity = darr_x * darr_x + darr_y * darr_y + darr_z * darr_z
Intensity_max = Intensity.max().compute()
idx_max = da.argwhere(Intensity == Intensity_max).compute()[0]
print("maximum intensity I={} at index={}".format(Intensity_max, idx_max))
140 changes: 140 additions & 0 deletions src/binding/python/openpmd_api/DaskArray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
This file is part of the openPMD-api.

Copyright 2021 openPMD contributors
Authors: Axel Huebl
License: LGPLv3+
"""
import math
import numpy as np
try:
from dask.array import from_array
found_dask = True
except ImportError:
found_dask = False


class DaskRecordComponent:
# shape, .ndim, .dtype and support numpy-style slicing
def __init__(self, record_component):
self.rc = record_component

@property
def shape(self):
# fixme: https://github.com/openPMD/openPMD-api/issues/808
return tuple(self.rc.shape)

@property
def ndim(self):
return self.rc.ndim

@property
def dtype(self):
return self.rc.dtype

def __getitem__(self, slices):
"""here we support what Record_Component implements: a tuple of slices,
a slice or an index; we do not support fancy indexing
"""
# FIXME: implement handling of zero-slices in Record_Component
# https://github.com/openPMD/openPMD-api/issues/957
all_zero = True
for s in slices:
if s != np.s_[0:0]:
all_zero = False
if all_zero:
return np.array([], dtype=self.dtype)

data = self.rc[slices]
self.rc.series_flush()
if not math.isclose(1.0, self.rc.unit_SI):
data = np.multiply(data, self.rc.unit_SI)

return data


def record_component_to_daskarray(record_component):
"""
Load a RecordComponent into a Dask.array.

Parameters
----------
record_component : openpmd_api.Record_Component
A record component class in openPMD-api.

Returns
-------
dask.array
A dask array.

Raises
------
ImportError
Raises an exception if dask is not installed

See Also
--------
openpmd_api.BaseRecordComponent.available_chunks : available chunks that
are used internally to parallelize reading
dask.array : the (potentially distributed) array object created here
"""
if not found_dask:
raise ImportError("dask NOT found. Install dask for Dask DataFrame "
"support.")

# get optimal chunks
chunks = record_component.available_chunks()

# sort and prepare the chunks for Dask's array API
# https://docs.dask.org/en/latest/array-chunks.html
# https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
# sorted and unique
offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks])))
offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim]

# print("offsets_sorted_unique_per_dim=",
# list(offsets_sorted_unique_per_dim))

# case 1: PIConGPU static load balancing (works with Dask assumptions,
# chunk option no. 3)
# all chunks in the same column have the same column width although
# individual columns have different widths
# case 2: AMReX boxes
# all chunks are multiple of a common block size, offsets are a multiple
# of a common blocksize
# problem: too limited description in Dask
# https://github.com/dask/dask/issues/7475
# work-around: create smaller chunks (this incurs a read cost) by forcing
# into case 1
# (this can lead to larger blocks than using the gcd of the
# extents aka AMReX block size)
common_chunk_widths_per_dim = list()
for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
# print("d=", d, offsets_in_dim, record_component.shape[d])
offsets_in_dim_arr = np.array(offsets_in_dim)
# note: this is in the right order of rows/columns, contrary to a
# sorted extent list from chunks
extents_in_dim = np.zeros_like(offsets_in_dim_arr)
extents_in_dim[:-1] = offsets_in_dim_arr[1:]
extents_in_dim[-1] = record_component.shape[d]
if len(extents_in_dim) > 1:
extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
extents_in_dim[-1] -= offsets_in_dim_arr[-1]
# print("extents_in_dim=", extents_in_dim)
common_chunk_widths_per_dim.append(tuple(extents_in_dim))

common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim)
# print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim)

da = from_array(
DaskRecordComponent(record_component),
chunks=common_chunk_widths_per_dim,
# name=None,
asarray=True,
fancy=False,
# getitem=None,
# meta=None,
# inline_array=False
)

return da
2 changes: 2 additions & 0 deletions src/binding/python/openpmd_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .openpmd_api_cxx import * # noqa
from .DataFrame import particles_to_dataframe
from .DaskDataFrame import particles_to_daskdataframe
from .DaskArray import record_component_to_daskarray


__version__ = cxx.__version__
Expand All @@ -12,6 +13,7 @@
# extend CXX classes with extra methods
ParticleSpecies.to_df = particles_to_dataframe # noqa
ParticleSpecies.to_dask = particles_to_daskdataframe # noqa
Record_Component.to_dask_array = record_component_to_daskarray # noqa

# TODO remove in future versions (deprecated)
Access_Type = Access # noqa