Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,25 @@ class ParallelAnalysisBase(object):
This class will automatically take care of setting up the trajectory
reader for iterating in parallel.

To parallelize the analysis ``ParallelAnalysisBase`` separates the
trajectory into work blocks containing multiple frames. The number of
blocks is equal to the number of available cores or dask workers. This
minimizes the number of python processes that are started during a
calculation. Accumulation of frames within a block happens in the
`self._reduce` function. A consequence when using dask is that adding
additional workers during a computation will not result in an reduction
of run-time.


To define a new Analysis,
:class:`~pmda.parallel.ParallelAnalysisBase` needs to be
subclassed and
:meth:`~pmda.parallel.ParallelAnalysisBase._single_frame` and
:meth:`~pmda.parallel.ParallelAnalysisBase._conclude` must be
defined. It is also possible to define
:meth:`~~pmda.parallel.ParallelAnalysisBase._prepare` for
pre-processing. See the example below.
pre-processing and :meth:`~~pmda.parallel.ParallelAnalysisBase._reduce`
for custom reduce operation on the work blocks. See the example below.

.. code-block:: python

Expand All @@ -121,7 +132,30 @@ def _conclude(self):
# sensible new variable.
self.results = np.hstack(self._results)
# Apply normalisation and averaging to results here if wanted.
self.results /= np.sum(self.results)
self.results /= np.sum(self.results

@staticmethod
def _reduce(res, result_single_frame):
# NOT REQUIRED
# Called for every frame. ``res`` contains all the results
# before current time step, and ``result_single_frame`` is the
# result of self._single_frame for the current time step. The
# return value is the updated ``res``. The default is to append
# results to a python list. This approach is sufficient for
# time-series data.
res.append(results_single_frame)
# This is not suitable for every analysis. To add results over
# multiple frames this function can be overwritten. The default
# value for ``res`` is an empty list. Here we change the type to
# the return type of `self._single_frame`. Afterwards we can
# safely use addition to accumulate the results.
if res == []:
res = result_single_frame
else:
res += result_single_frame
# If you overwrite this function *always* return the updated
# ``res`` at the end.
return res

Afterwards the new analysis can be run like this.

Expand Down Expand Up @@ -322,9 +356,15 @@ def _dask_helper(self, start, stop, step, indices, top, traj):
with timeit() as b_io:
ts = u.trajectory[i]
with timeit() as b_compute:
res.append(self._single_frame(ts, agroups))
res = self._reduce(res, self._single_frame(ts, agroups))
times_io.append(b_io.elapsed)
times_compute.append(b_compute.elapsed)

return np.asarray(res), np.asarray(times_io), np.asarray(
times_compute), b_universe.elapsed

@staticmethod
def _reduce(res, result_single_frame):
""" 'append' action for a time series"""
res.append(result_single_frame)
return res
21 changes: 15 additions & 6 deletions pmda/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,11 @@ def _single_frame(self, ts, atomgroups):
count = np.histogram(d, **self.rdf_settings)[0]
volume = u.trajectory.ts.volume

return {'count': count, 'volume': volume}
return np.array([count, np.array(volume, dtype=np.float64)])

def _conclude(self, ):
for block in self._results:
for data in block:
self.count += data['count']
self.volume += data['volume']

self.count = np.sum(self._results[:, 0])
self.volume = np.sum(self._results[:, 1])
# Number of each selection
N = self.nA * self.nB

Expand All @@ -163,3 +160,15 @@ def _conclude(self, ):

rdf = self.count / (density * vol * self.nf)
self.rdf = rdf

@staticmethod
def _reduce(res, result_single_frame):
""" 'add' action for an accumulator"""
if res == []:
# Convert res from an empty list to a numpy array
# which has the same shape as the single frame result
res = result_single_frame
else:
# Add two numpy arrays
res += result_single_frame
return res
10 changes: 10 additions & 0 deletions pmda/test/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,13 @@ def test_attrlock():
# Outside of lock context setting should again work
pab.thing2 = 100
assert pab.thing2 == 100


def test_reduce():
res = []
u = mda.Universe(PSF, DCD)
ana = NoneAnalysis(u.atoms)
res = ana._reduce(res, [1])
res = ana._reduce(res, [1])
# Should see res become a list with 2 elements.
assert res == [[1], [1]]
15 changes: 14 additions & 1 deletion pmda/test/test_rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pytest

import MDAnalysis as mda
import numpy as np
from pmda.rdf import InterRDF
from MDAnalysis.analysis import rdf

Expand Down Expand Up @@ -90,8 +91,20 @@ def test_same_result(sels):
s1, s2 = sels
nrdf = rdf.InterRDF(s1, s2).run()
prdf = InterRDF(s1, s2).run()
assert_almost_equal(nrdf.rdf, prdf.rdf)
assert_almost_equal(nrdf.count, prdf.count)
assert_almost_equal(nrdf.rdf, prdf.rdf)


def test_reduce(sels):
# should see numpy.array addtion
s1, s2 = sels
rdf = InterRDF(s1, s2)
res = []
single_frame = np.array([np.array([1, 2]), np.array([3])])
res = rdf._reduce(res, single_frame)
res = rdf._reduce(res, single_frame)
assert_almost_equal(res[0], np.array([2, 4]))
assert_almost_equal(res[1], np.array([6]))


@pytest.mark.parametrize('exclusion_block, value', [
Expand Down