Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# PMDA
# Copyright (c) 2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version

from dask import distributed, multiprocessing
import pytest

@pytest.fixture(scope="session", params=(1, 2))
def client(tmpdir_factory, request):
with tmpdir_factory.mktemp("dask_cluster").as_cwd():
lc = distributed.LocalCluster(n_workers=request.param, processes=True)
client = distributed.Client(lc)

yield client

client.close()
lc.close()


@pytest.fixture(scope='session', params=('distributed', 'multiprocessing'))
def scheduler(request, client):
if request.param == 'distributed':
return client
else:
return multiprocessing
2 changes: 1 addition & 1 deletion pmda/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _single_frame(self, ts, atomgroups):
return self.function(*args, **self.kwargs)

def _conclude(self):
self.results = np.hstack(self._results)
self.results = np.vstack(self._results)


def analysis_class(function):
Expand Down
46 changes: 43 additions & 3 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,25 @@ class ParallelAnalysisBase(object):
This class will automatically take care of setting up the trajectory
reader for iterating in parallel.

To parallelize the analysis ``ParallelAnalysisBase`` separates the
trajectory into work blocks containing multiple frames. The number of
blocks is equal to the number of available cores or dask workers. This
minimizes the number of python processes that are started during a
calculation. Accumulation of frames within a block happens in the
`self._reduce` function. A consequence when using dask is that adding
additional workers during a computation will not result in an reduction
of run-time.


To define a new Analysis,
:class:`~pmda.parallel.ParallelAnalysisBase` needs to be
subclassed and
:meth:`~pmda.parallel.ParallelAnalysisBase._single_frame` and
:meth:`~pmda.parallel.ParallelAnalysisBase._conclude` must be
defined. It is also possible to define
:meth:`~~pmda.parallel.ParallelAnalysisBase._prepare` for
pre-processing. See the example below.
pre-processing and :meth:`~~pmda.parallel.ParallelAnalysisBase._reduce`
for custom reduce operation on the work blocks. See the example below.

.. code-block:: python

Expand All @@ -121,7 +132,30 @@ def _conclude(self):
# sensible new variable.
self.results = np.hstack(self._results)
# Apply normalisation and averaging to results here if wanted.
self.results /= np.sum(self.results)
self.results /= np.sum(self.results

@staticmethod
def _reduce(res, result_single_frame):
# NOT REQUIRED
# Called for every frame. ``res`` contains all the results
# before current time step, and ``result_single_frame`` is the
# result of self._single_frame for the current time step. The
# return value is the updated ``res``. The default is to append
# results to a python list. This approach is sufficient for
# time-series data.
res.append(results_single_frame)
# This is not suitable for every analysis. To add results over
# multiple frames this function can be overwritten. The default
# value for ``res`` is an empty list. Here we change the type to
# the return type of `self._single_frame`. Afterwards we can
# safely use addition to accumulate the results.
if res == []:
res = result_single_frame
else:
res += result_single_frame
# If you overwrite this function *always* return the updated
# ``res`` at the end.
return res

Afterwards the new analysis can be run like this.

Expand Down Expand Up @@ -322,9 +356,15 @@ def _dask_helper(self, start, stop, step, indices, top, traj):
with timeit() as b_io:
ts = u.trajectory[i]
with timeit() as b_compute:
res.append(self._single_frame(ts, agroups))
res = self._reduce(res, self._single_frame(ts, agroups))
times_io.append(b_io.elapsed)
times_compute.append(b_compute.elapsed)

return np.asarray(res), np.asarray(times_io), np.asarray(
times_compute), b_universe.elapsed

@staticmethod
def _reduce(res, result_single_frame):
""" 'append' action for a time series"""
res.append(result_single_frame)
return res
47 changes: 28 additions & 19 deletions pmda/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@

import numpy as np

from MDAnalysis.lib.distances import distance_array
from MDAnalysis.lib.util import blocks_of
from MDAnalysis.lib import distances

from .parallel import ParallelAnalysisBase

Expand Down Expand Up @@ -117,33 +116,31 @@ def _prepare(self):

# Need to know average volume
self.volume = 0.0
# Set the max range to filter the search radius
self._maxrange = self.rdf_settings['range'][1]

def _single_frame(self, ts, atomgroups):
g1, g2 = atomgroups
u = g1.universe
d = distance_array(g1.positions, g2.positions,
box=u.dimensions)
pairs, dist = distances.capped_distance(g1.positions,
g2.positions,
self._maxrange,
box=u.dimensions)
# If provided exclusions, create a mask of _result which
# lets us take these out
exclusion_mask = None
# lets us take these out.
if self._exclusion_block is not None:
exclusion_mask = blocks_of(d, *self._exclusion_block)
maxrange = self.rdf_settings['range'][1] + 1.0
# Maybe exclude same molecule distances
if exclusion_mask is not None:
exclusion_mask[:] = maxrange
count = []
count = np.histogram(d, **self.rdf_settings)[0]
idxA = pairs[:, 0]//self._exclusion_block[0]
idxB = pairs[:, 1]//self._exclusion_block[1]
mask = np.where(idxA != idxB)[0]
dist = dist[mask]
count = np.histogram(dist, **self.rdf_settings)[0]
volume = u.trajectory.ts.volume

return {'count': count, 'volume': volume}
return np.array([count, np.array(volume, dtype=np.float64)])

def _conclude(self, ):
for block in self._results:
for data in block:
self.count += data['count']
self.volume += data['volume']

self.count = np.sum(self._results[:, 0])
self.volume = np.sum(self._results[:, 1])
# Number of each selection
N = self.nA * self.nB

Expand All @@ -163,3 +160,15 @@ def _conclude(self, ):

rdf = self.count / (density * vol * self.nf)
self.rdf = rdf

@staticmethod
def _reduce(res, result_single_frame):
""" 'add' action for an accumulator"""
if isinstance(res, list) and len(res) == 0:
# Convert res from an empty list to a numpy array
# which has the same shape as the single frame result
res = result_single_frame
else:
# Add two numpy arrays
res += result_single_frame
return res
32 changes: 17 additions & 15 deletions pmda/test/test_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,33 @@
# Released under the GNU Public Licence, v2 or any higher version
from __future__ import absolute_import, division

import pytest

import numpy as np

from numpy.testing import assert_equal

import MDAnalysis as mda
from pmda import custom

from MDAnalysisTests.datafiles import PSF, DCD
from MDAnalysisTests.util import no_deprecated_call
import pytest
from numpy.testing import assert_equal

from pmda import custom


def custom_function(mobile):
return mobile.center_of_geometry()


def test_AnalysisFromFunction():
def test_AnalysisFromFunction(scheduler):
u = mda.Universe(PSF, DCD)
step = 2
ana1 = custom.AnalysisFromFunction(custom_function, u,
u.atoms).run(step=step)
ana2 = custom.AnalysisFromFunction(custom_function, u,
u.atoms).run(step=step)
ana3 = custom.AnalysisFromFunction(custom_function, u,
u.atoms).run(step=step)
ana1 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run(
step=step, scheduler=scheduler
)
ana2 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run(
step=step, scheduler=scheduler
)
ana3 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run(
step=step, scheduler=scheduler
)

results = []
for ts in u.trajectory[::step]:
Expand All @@ -53,8 +54,9 @@ def test_AnalysisFromFunction_otherAgs():
u2 = mda.Universe(PSF, DCD)
u3 = mda.Universe(PSF, DCD)
step = 2
ana = custom.AnalysisFromFunction(custom_function_2, u, u.atoms, u2.atoms,
u3.atoms).run(step=step)
ana = custom.AnalysisFromFunction(
custom_function_2, u, u.atoms, u2.atoms, u3.atoms
).run(step=step)

results = []
for ts in u.trajectory[::step]:
Expand Down
30 changes: 10 additions & 20 deletions pmda/test/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,6 @@ def test_sub_frames(analysis, n_jobs):
np.testing.assert_almost_equal(analysis.res, [10, 20, 30, 40])


@pytest.fixture(scope="session")
def client(tmpdir_factory):
with tmpdir_factory.mktemp("dask_cluster").as_cwd():
lc = distributed.LocalCluster(n_workers=2, processes=True)
client = distributed.Client(lc)

yield client

client.close()
lc.close()


@pytest.fixture(scope='session', params=('distributed', 'multiprocessing'))
def scheduler(request, client):
if request.param == 'distributed':
return client
else:
return multiprocessing


def test_scheduler(analysis, scheduler):
analysis.run(scheduler=scheduler)

Expand Down Expand Up @@ -130,3 +110,13 @@ def test_attrlock():
# Outside of lock context setting should again work
pab.thing2 = 100
assert pab.thing2 == 100


def test_reduce():
res = []
u = mda.Universe(PSF, DCD)
ana = NoneAnalysis(u.atoms)
res = ana._reduce(res, [1])
res = ana._reduce(res, [1])
# Should see res become a list with 2 elements.
assert res == [[1], [1]]
15 changes: 14 additions & 1 deletion pmda/test/test_rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pytest

import MDAnalysis as mda
import numpy as np
from pmda.rdf import InterRDF
from MDAnalysis.analysis import rdf

Expand Down Expand Up @@ -90,8 +91,20 @@ def test_same_result(sels):
s1, s2 = sels
nrdf = rdf.InterRDF(s1, s2).run()
prdf = InterRDF(s1, s2).run()
assert_almost_equal(nrdf.rdf, prdf.rdf)
assert_almost_equal(nrdf.count, prdf.count)
assert_almost_equal(nrdf.rdf, prdf.rdf)


def test_reduce(sels):
# should see numpy.array addtion
s1, s2 = sels
rdf = InterRDF(s1, s2)
res = []
single_frame = np.array([np.array([1, 2]), np.array([3])])
res = rdf._reduce(res, single_frame)
res = rdf._reduce(res, single_frame)
assert_almost_equal(res[0], np.array([2, 4]))
assert_almost_equal(res[1], np.array([6]))


@pytest.mark.parametrize('exclusion_block, value', [
Expand Down