diff --git a/conftest.py b/conftest.py new file mode 100644 index 00000000..19cb2998 --- /dev/null +++ b/conftest.py @@ -0,0 +1,31 @@ + +# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 +# +# PMDA +# Copyright (c) 2017 The MDAnalysis Development Team and contributors +# (see the file AUTHORS for the full list of names) +# +# Released under the GNU Public Licence, v2 or any higher version + +from dask import distributed, multiprocessing +import pytest + +@pytest.fixture(scope="session", params=(1, 2)) +def client(tmpdir_factory, request): + with tmpdir_factory.mktemp("dask_cluster").as_cwd(): + lc = distributed.LocalCluster(n_workers=request.param, processes=True) + client = distributed.Client(lc) + + yield client + + client.close() + lc.close() + + +@pytest.fixture(scope='session', params=('distributed', 'multiprocessing')) +def scheduler(request, client): + if request.param == 'distributed': + return client + else: + return multiprocessing diff --git a/pmda/custom.py b/pmda/custom.py index 7fda4c19..6a35329d 100644 --- a/pmda/custom.py +++ b/pmda/custom.py @@ -100,7 +100,7 @@ def _single_frame(self, ts, atomgroups): return self.function(*args, **self.kwargs) def _conclude(self): - self.results = np.hstack(self._results) + self.results = np.vstack(self._results) def analysis_class(function): diff --git a/pmda/parallel.py b/pmda/parallel.py index ef930018..7784391e 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -88,6 +88,16 @@ class ParallelAnalysisBase(object): This class will automatically take care of setting up the trajectory reader for iterating in parallel. + To parallelize the analysis ``ParallelAnalysisBase`` separates the + trajectory into work blocks containing multiple frames. The number of + blocks is equal to the number of available cores or dask workers. This + minimizes the number of python processes that are started during a + calculation. Accumulation of frames within a block happens in the + `self._reduce` function. A consequence when using dask is that adding + additional workers during a computation will not result in an reduction + of run-time. + + To define a new Analysis, :class:`~pmda.parallel.ParallelAnalysisBase` needs to be subclassed and @@ -95,7 +105,8 @@ class ParallelAnalysisBase(object): :meth:`~pmda.parallel.ParallelAnalysisBase._conclude` must be defined. It is also possible to define :meth:`~~pmda.parallel.ParallelAnalysisBase._prepare` for - pre-processing. See the example below. + pre-processing and :meth:`~~pmda.parallel.ParallelAnalysisBase._reduce` + for custom reduce operation on the work blocks. See the example below. .. code-block:: python @@ -121,7 +132,30 @@ def _conclude(self): # sensible new variable. self.results = np.hstack(self._results) # Apply normalisation and averaging to results here if wanted. - self.results /= np.sum(self.results) + self.results /= np.sum(self.results + + @staticmethod + def _reduce(res, result_single_frame): + # NOT REQUIRED + # Called for every frame. ``res`` contains all the results + # before current time step, and ``result_single_frame`` is the + # result of self._single_frame for the current time step. The + # return value is the updated ``res``. The default is to append + # results to a python list. This approach is sufficient for + # time-series data. + res.append(results_single_frame) + # This is not suitable for every analysis. To add results over + # multiple frames this function can be overwritten. The default + # value for ``res`` is an empty list. Here we change the type to + # the return type of `self._single_frame`. Afterwards we can + # safely use addition to accumulate the results. + if res == []: + res = result_single_frame + else: + res += result_single_frame + # If you overwrite this function *always* return the updated + # ``res`` at the end. + return res Afterwards the new analysis can be run like this. @@ -322,9 +356,15 @@ def _dask_helper(self, start, stop, step, indices, top, traj): with timeit() as b_io: ts = u.trajectory[i] with timeit() as b_compute: - res.append(self._single_frame(ts, agroups)) + res = self._reduce(res, self._single_frame(ts, agroups)) times_io.append(b_io.elapsed) times_compute.append(b_compute.elapsed) return np.asarray(res), np.asarray(times_io), np.asarray( times_compute), b_universe.elapsed + + @staticmethod + def _reduce(res, result_single_frame): + """ 'append' action for a time series""" + res.append(result_single_frame) + return res diff --git a/pmda/rdf.py b/pmda/rdf.py index b01486e4..40aebc39 100644 --- a/pmda/rdf.py +++ b/pmda/rdf.py @@ -30,8 +30,7 @@ import numpy as np -from MDAnalysis.lib.distances import distance_array -from MDAnalysis.lib.util import blocks_of +from MDAnalysis.lib import distances from .parallel import ParallelAnalysisBase @@ -117,33 +116,31 @@ def _prepare(self): # Need to know average volume self.volume = 0.0 + # Set the max range to filter the search radius + self._maxrange = self.rdf_settings['range'][1] def _single_frame(self, ts, atomgroups): g1, g2 = atomgroups u = g1.universe - d = distance_array(g1.positions, g2.positions, - box=u.dimensions) + pairs, dist = distances.capped_distance(g1.positions, + g2.positions, + self._maxrange, + box=u.dimensions) # If provided exclusions, create a mask of _result which - # lets us take these out - exclusion_mask = None + # lets us take these out. if self._exclusion_block is not None: - exclusion_mask = blocks_of(d, *self._exclusion_block) - maxrange = self.rdf_settings['range'][1] + 1.0 - # Maybe exclude same molecule distances - if exclusion_mask is not None: - exclusion_mask[:] = maxrange - count = [] - count = np.histogram(d, **self.rdf_settings)[0] + idxA = pairs[:, 0]//self._exclusion_block[0] + idxB = pairs[:, 1]//self._exclusion_block[1] + mask = np.where(idxA != idxB)[0] + dist = dist[mask] + count = np.histogram(dist, **self.rdf_settings)[0] volume = u.trajectory.ts.volume - return {'count': count, 'volume': volume} + return np.array([count, np.array(volume, dtype=np.float64)]) def _conclude(self, ): - for block in self._results: - for data in block: - self.count += data['count'] - self.volume += data['volume'] - + self.count = np.sum(self._results[:, 0]) + self.volume = np.sum(self._results[:, 1]) # Number of each selection N = self.nA * self.nB @@ -163,3 +160,15 @@ def _conclude(self, ): rdf = self.count / (density * vol * self.nf) self.rdf = rdf + + @staticmethod + def _reduce(res, result_single_frame): + """ 'add' action for an accumulator""" + if isinstance(res, list) and len(res) == 0: + # Convert res from an empty list to a numpy array + # which has the same shape as the single frame result + res = result_single_frame + else: + # Add two numpy arrays + res += result_single_frame + return res diff --git a/pmda/test/test_custom.py b/pmda/test/test_custom.py index 5ec6d2e2..e29dcb97 100644 --- a/pmda/test/test_custom.py +++ b/pmda/test/test_custom.py @@ -8,32 +8,33 @@ # Released under the GNU Public Licence, v2 or any higher version from __future__ import absolute_import, division -import pytest import numpy as np - -from numpy.testing import assert_equal - import MDAnalysis as mda -from pmda import custom - from MDAnalysisTests.datafiles import PSF, DCD from MDAnalysisTests.util import no_deprecated_call +import pytest +from numpy.testing import assert_equal + +from pmda import custom def custom_function(mobile): return mobile.center_of_geometry() -def test_AnalysisFromFunction(): +def test_AnalysisFromFunction(scheduler): u = mda.Universe(PSF, DCD) step = 2 - ana1 = custom.AnalysisFromFunction(custom_function, u, - u.atoms).run(step=step) - ana2 = custom.AnalysisFromFunction(custom_function, u, - u.atoms).run(step=step) - ana3 = custom.AnalysisFromFunction(custom_function, u, - u.atoms).run(step=step) + ana1 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( + step=step, scheduler=scheduler + ) + ana2 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( + step=step, scheduler=scheduler + ) + ana3 = custom.AnalysisFromFunction(custom_function, u, u.atoms).run( + step=step, scheduler=scheduler + ) results = [] for ts in u.trajectory[::step]: @@ -53,8 +54,9 @@ def test_AnalysisFromFunction_otherAgs(): u2 = mda.Universe(PSF, DCD) u3 = mda.Universe(PSF, DCD) step = 2 - ana = custom.AnalysisFromFunction(custom_function_2, u, u.atoms, u2.atoms, - u3.atoms).run(step=step) + ana = custom.AnalysisFromFunction( + custom_function_2, u, u.atoms, u2.atoms, u3.atoms + ).run(step=step) results = [] for ts in u.trajectory[::step]: diff --git a/pmda/test/test_parallel.py b/pmda/test/test_parallel.py index 15863e58..1fcb6cc0 100644 --- a/pmda/test/test_parallel.py +++ b/pmda/test/test_parallel.py @@ -78,26 +78,6 @@ def test_sub_frames(analysis, n_jobs): np.testing.assert_almost_equal(analysis.res, [10, 20, 30, 40]) -@pytest.fixture(scope="session") -def client(tmpdir_factory): - with tmpdir_factory.mktemp("dask_cluster").as_cwd(): - lc = distributed.LocalCluster(n_workers=2, processes=True) - client = distributed.Client(lc) - - yield client - - client.close() - lc.close() - - -@pytest.fixture(scope='session', params=('distributed', 'multiprocessing')) -def scheduler(request, client): - if request.param == 'distributed': - return client - else: - return multiprocessing - - def test_scheduler(analysis, scheduler): analysis.run(scheduler=scheduler) @@ -130,3 +110,13 @@ def test_attrlock(): # Outside of lock context setting should again work pab.thing2 = 100 assert pab.thing2 == 100 + + +def test_reduce(): + res = [] + u = mda.Universe(PSF, DCD) + ana = NoneAnalysis(u.atoms) + res = ana._reduce(res, [1]) + res = ana._reduce(res, [1]) + # Should see res become a list with 2 elements. + assert res == [[1], [1]] diff --git a/pmda/test/test_rdf.py b/pmda/test/test_rdf.py index c1f2d835..2dfec024 100644 --- a/pmda/test/test_rdf.py +++ b/pmda/test/test_rdf.py @@ -24,6 +24,7 @@ import pytest import MDAnalysis as mda +import numpy as np from pmda.rdf import InterRDF from MDAnalysis.analysis import rdf @@ -90,8 +91,20 @@ def test_same_result(sels): s1, s2 = sels nrdf = rdf.InterRDF(s1, s2).run() prdf = InterRDF(s1, s2).run() - assert_almost_equal(nrdf.rdf, prdf.rdf) assert_almost_equal(nrdf.count, prdf.count) + assert_almost_equal(nrdf.rdf, prdf.rdf) + + +def test_reduce(sels): + # should see numpy.array addtion + s1, s2 = sels + rdf = InterRDF(s1, s2) + res = [] + single_frame = np.array([np.array([1, 2]), np.array([3])]) + res = rdf._reduce(res, single_frame) + res = rdf._reduce(res, single_frame) + assert_almost_equal(res[0], np.array([2, 4])) + assert_almost_equal(res[1], np.array([6])) @pytest.mark.parametrize('exclusion_block, value', [