From cbe2af25ce1586e0922a6e85fc4c7fb03a14951c Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Sun, 19 Aug 2018 16:07:17 -0600 Subject: [PATCH 01/19] Setting up parallel leaflet finder --- pmda/leaflet.py | 242 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 pmda/leaflet.py diff --git a/pmda/leaflet.py b/pmda/leaflet.py new file mode 100644 index 00000000..54458c11 --- /dev/null +++ b/pmda/leaflet.py @@ -0,0 +1,242 @@ +# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 +# +# PMDA +# Copyright (c) 2018 The MDAnalysis Development Team and contributors +# (see the file AUTHORS for the full list of names) +# +# Released under the GNU Public Licence, v2 or any higher version +""" +LeafletFInder Analysis tool --- :mod:`pmda.leaflet` +======================================= + +This module contains parallel versions of analysis tasks in +:mod:`MDAnalysis.analysis.leaflet`. + +.. autoclass:: LeafletFinder + :members: + :undoc-members: + :inherited-members: + +""" +from __future__ import absolute_import + +import numpy as np +import dask.bag as db +import networkx as nx +from sklearn.neighbors import BallTree + +from .parallel import ParallelAnalysisBase + + +class LeafletFinder(ParallelAnalysisBase): + """Parallel Leaflet Finder analysis. + + Identify atoms in the same leaflet of a lipid bilayer. + This class implements and parallelizes the *LeafletFinder* algorithm [Michaud-Agrawal2011]_. + + Attributes + ---------- + + + Parameters + ---------- + + Note + ---- + At the moment, this class has far fewer features than the serial + version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`. + + """ + def _prepare(self): + + + def _find_parcc(self,data,cutoff=15.0): + window,index = data[0] + num = window[0].shape[0] + i_index = index[0] + j_index = index[1] + graph = nx.Graph() + if i_index == j_index: + train = window[0] + test = window[1] + else: + train = np.vstack([window[0],window[1]]) + test = np.vstack([window[0],window[1]]) + tree = BallTree(train, leaf_size=40) + edges = tree.query_radius(test, cutoff) + edge_list=[list(zip(np.repeat(idx, len(dest_list)), \ + dest_list)) for idx, dest_list in enumerate(edges)] + + edge_list_flat = np.array([list(item) \ + for sublist in edge_list for item in sublist]) + if i_index == j_index: + res = edge_list_flat.transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] + j_index - 1 + else: + removed_elements = list() + for i in range(edge_list_flat.shape[0]): + if (edge_list_flat[i,0]>=0 and edge_list_flat[i,0]<=num-1) and (edge_list_flat[i,1]>=0 and + edge_list_flat[i,1]<=num-1) or\ + (edge_list_flat[i,0]>=num and edge_list_flat[i,0]<=2*num-1) and (edge_list_flat[i,1]>=num and edge_list_flat[i,1]<=2*num-1): + removed_elements.append(i) + res = np.delete(edge_list_flat,removed_elements,axis=0).transpose() + res[0] = res[0] + i_index - 1 + res[1] = res[1] -num + j_index - 1 + + edges=[(res[0,k],res[1,k]) for k in range(0,res.shape[1])] + graph.add_edges_from(edges) + + # partial connected components + + subgraphs = nx.connected_components(graph) + comp = [g for g in subgraphs] + return comp + + + def _single_frame(self, ts, atomgroups,scheduler_kwargs,cutoff=15.0): + """Perform computation on a single trajectory frame. + + Must return computed values as a list. You can only **read** + from member variables stored in ``self``. Changing them during + a run will result in undefined behavior. `ts` and any of the + atomgroups can be changed (but changes will be overwritten + when the next time step is read). + + Parameters + ---------- + ts : :class:`~MDAnalysis.coordinates.base.Timestep` + The current coordinate frame (time step) in the + trajectory. + atomgroups : tuple + Tuple of :class:`~MDAnalysis.core.groups.AtomGroup` + instances that are updated to the current frame. + + Returns + ------- + values : anything + The output from the computation over a single frame must + be returned. The `value` will be added to a list for each + block and the list of blocks is stored as :attr:`_results` + before :meth:`_conclude` is run. In order to simplify + processing, the `values` should be "simple" shallow data + structures such as arrays or lists of numbers. + + """ + + start_time = time() + atoms = np.load(args.filename) + matrix_size = ts.shape[0] + arraged_coord = list() + for i in range(1,matrix_size+1,part_size): + for j in range(1,matrix_size+1,part_size): + arraged_coord.append(([atoms[i-1:i-1+part_size],atoms[j-1:j-1+part_size]],[i,j])) + + parAtoms = db.from_sequence(arraged_coord,npartitions=len(arraged_coord)) + parAtomsMap = parAtoms.map_partitions(find_parcc) + Components = parAtomsMap.compute(get=client.get) + + edge_list_time = time() + result = list(Components) + while len(result)!=0: + item1 = result[0] + result.pop(0) + ind = [] + for i, item2 in enumerate(Components): + if item1.intersection(item2): + item1=item1.union(item2) + ind.append(i) + ind.reverse() + [Components.pop(j) for j in ind] + Components.append(item1) + + connComp = time() + indices = [np.sort(list(g)) for g in Components] + return indices + + + def run(self, + start=None, + stop=None, + step=None, + scheduler=None, + n_jobs=1, + n_blocks=None): + """Perform the calculation + + Parameters + ---------- + start : int, optional + start frame of analysis + stop : int, optional + stop frame of analysis + step : int, optional + number of frames to skip between each analysed frame + scheduler : dask scheduler, optional + Use dask scheduler, defaults to multiprocessing. This can be used + to spread work to a distributed scheduler + n_jobs : int, optional + number of tasks to start, if `-1` use number of logical cpu cores. + This argument will be ignored when the distributed scheduler is + used + n_blocks : int, optional + number of partitions to divide trajectory frame into. If ``None`` set equal + to sqrt(n_jobs) or number of available workers in scheduler. + + """ + if scheduler is None: + scheduler = multiprocessing + + if n_jobs == -1: + n_jobs = cpu_count() + + if n_blocks is None: + if scheduler == multiprocessing: + n_blocks = n_jobs + elif isinstance(scheduler, distributed.Client): + n_blocks = len(scheduler.ncores()) + else: + raise ValueError( + "Couldn't guess ideal number of blocks from scheduler." + "Please provide `n_blocks` in call to method.") + + scheduler_kwargs = {'get': scheduler.get} + if scheduler == multiprocessing: + scheduler_kwargs['num_workers'] = n_jobs + + start, stop, step = self._trajectory.check_slice_indices( + start, stop, step) + n_frames = len(range(start, stop, step)) + + with timeit() as total: + with timeit() as prepare: + self._prepare() + + with timeit() as total: + with timeit() as prepare: + self._prepare() + time_prepare = prepare.elapsed + blocks = [] + with self.readonly_attributes(): + for b in range(n_blocks): + task = delayed( + self._dask_helper, pure=False)( + b * bsize * step + start, + min(stop, (b + 1) * bsize * step + start), + step, + self._indices, + self._top, + self._traj, ) + blocks.append(task) + blocks = delayed(blocks) + res = blocks.compute(**scheduler_kwargs) + self._results = np.asarray([el[0] for el in res]) + with timeit() as conclude: + self._conclude() + + self.timing = Timing( + np.hstack([el[1] for el in res]), + np.hstack([el[2] for el in res]), total.elapsed, + np.array([el[3] for el in res]), time_prepare, conclude.elapsed) + return self \ No newline at end of file From df3b72cde5e6b20c073873f4b49dacdd602aa2a8 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Sun, 30 Sep 2018 17:50:25 -0400 Subject: [PATCH 02/19] Changed run method --- pmda/leaflet.py | 70 ++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 54458c11..9478c903 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -48,8 +48,6 @@ class LeafletFinder(ParallelAnalysisBase): version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`. """ - def _prepare(self): - def _find_parcc(self,data,cutoff=15.0): window,index = data[0] @@ -95,7 +93,7 @@ def _find_parcc(self,data,cutoff=15.0): return comp - def _single_frame(self, ts, atomgroups,scheduler_kwargs,cutoff=15.0): + def _single_frame(self, atomgroups,scheduler_kwargs,n_blocks,cutoff=15.0): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -106,12 +104,13 @@ def _single_frame(self, ts, atomgroups,scheduler_kwargs,cutoff=15.0): Parameters ---------- - ts : :class:`~MDAnalysis.coordinates.base.Timestep` - The current coordinate frame (time step) in the - trajectory. atomgroups : tuple Tuple of :class:`~MDAnalysis.core.groups.AtomGroup` instances that are updated to the current frame. + scheduler_kwargs : Dask Scheduler parameters. + cutoff : float (optional) + head group-defining atoms within a distance of `cutoff` + Angstroms are deemed to be in the same leaflet [15.0] Returns ------- @@ -125,20 +124,26 @@ def _single_frame(self, ts, atomgroups,scheduler_kwargs,cutoff=15.0): """ - start_time = time() - atoms = np.load(args.filename) - matrix_size = ts.shape[0] + # Get positions of the atoms in the atomgroup and find their number. + atoms = atomgroups.positions + matrix_size = atoms.shape[0] arraged_coord = list() + part_size = matrix_size/n_blocks + # Partition the data based on a 2-dimensional partitioning for i in range(1,matrix_size+1,part_size): for j in range(1,matrix_size+1,part_size): arraged_coord.append(([atoms[i-1:i-1+part_size],atoms[j-1:j-1+part_size]],[i,j])) + # Distribute the data over the available cores, apply the map function + # and execute. parAtoms = db.from_sequence(arraged_coord,npartitions=len(arraged_coord)) parAtomsMap = parAtoms.map_partitions(find_parcc) - Components = parAtomsMap.compute(get=client.get) + Components = parAtomsMap.compute(**scheduler_kwargs) - edge_list_time = time() + # Gather the results and start the reduction. TODO: think if it can go to + # the private _reduce method of the based class. result = list(Components) + # Create the overall connected components of the graph while len(result)!=0: item1 = result[0] result.pop(0) @@ -151,7 +156,7 @@ def _single_frame(self, ts, atomgroups,scheduler_kwargs,cutoff=15.0): [Components.pop(j) for j in ind] Components.append(item1) - connComp = time() + # Change output for and return. indices = [np.sort(list(g)) for g in Components] return indices @@ -162,7 +167,8 @@ def run(self, step=None, scheduler=None, n_jobs=1, - n_blocks=None): + n_blocks=None, + cutoff=15.0): """Perform the calculation Parameters @@ -201,6 +207,9 @@ def run(self, "Couldn't guess ideal number of blocks from scheduler." "Please provide `n_blocks` in call to method.") + + universe = mda.Universe(self._top, self._traj) + atomgroup = u.atoms[self._indices] scheduler_kwargs = {'get': scheduler.get} if scheduler == multiprocessing: scheduler_kwargs['num_workers'] = n_jobs @@ -208,30 +217,16 @@ def run(self, start, stop, step = self._trajectory.check_slice_indices( start, stop, step) n_frames = len(range(start, stop, step)) - with timeit() as total: - with timeit() as prepare: - self._prepare() - - with timeit() as total: - with timeit() as prepare: - self._prepare() - time_prepare = prepare.elapsed - blocks = [] + frames = [] with self.readonly_attributes(): - for b in range(n_blocks): - task = delayed( - self._dask_helper, pure=False)( - b * bsize * step + start, - min(stop, (b + 1) * bsize * step + start), - step, - self._indices, - self._top, - self._traj, ) - blocks.append(task) - blocks = delayed(blocks) - res = blocks.compute(**scheduler_kwargs) - self._results = np.asarray([el[0] for el in res]) + for frame in range(start, stop, step): + leaflet = self._single_frame(atomgroups=atomgroup, + scheduler_kwargs=scheduler_kwargs, + n_blocks=n_blocks, + cutoff=cutoff) + frames.append(leaflet[0:1]) + self._results = frames with timeit() as conclude: self._conclude() @@ -239,4 +234,7 @@ def run(self, np.hstack([el[1] for el in res]), np.hstack([el[2] for el in res]), total.elapsed, np.array([el[3] for el in res]), time_prepare, conclude.elapsed) - return self \ No newline at end of file + return self + + def _conclude(self): + self.results = np.hstack(self._results) \ No newline at end of file From 140616ba4c17a9613a2b51f88a04ecb7a126447e Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Sun, 30 Sep 2018 19:08:36 -0400 Subject: [PATCH 03/19] Adding errors in LF and adding test --- pmda/leaflet.py | 50 +++++++++++++++++++++++++++++---------- pmda/test/test_leaflet.py | 40 +++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 12 deletions(-) create mode 100644 pmda/test/test_leaflet.py diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 9478c903..f3216ca1 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -24,9 +24,14 @@ import numpy as np import dask.bag as db import networkx as nx + +import MDAnalysis as mda from sklearn.neighbors import BallTree +from dask import distributed, multiprocessing +from joblib import cpu_count -from .parallel import ParallelAnalysisBase +from .parallel import ParallelAnalysisBase, Timing +from .util import timeit class LeafletFinder(ParallelAnalysisBase): @@ -49,6 +54,29 @@ class LeafletFinder(ParallelAnalysisBase): """ + def __init__(self, universe, atomgroup): + """Parameters + ---------- + Universe : :class:`~MDAnalysis.core.groups.Universe` + a :class:`MDAnalysis.core.groups.Universe` (the + `atomgroups` must belong to this Universe) + + atomgroups : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` + atomgroups that are iterated in parallel + + Attributes + ---------- + _results : list + The raw data from each process are stored as a list of + lists, with each sublist containing the return values from + :meth:`pmda.parallel.ParallelAnalysisBase._single_frame`. + + """ + self._trajectory = universe.trajectory + self._top = universe.filename + self._traj = universe.trajectory.filename + self._atomgroup = atomgroup + def _find_parcc(self,data,cutoff=15.0): window,index = data[0] num = window[0].shape[0] @@ -93,7 +121,7 @@ def _find_parcc(self,data,cutoff=15.0): return comp - def _single_frame(self, atomgroups,scheduler_kwargs,n_blocks,cutoff=15.0): + def _single_frame(self, scheduler_kwargs,n_blocks,cutoff=15.0): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -125,7 +153,7 @@ def _single_frame(self, atomgroups,scheduler_kwargs,n_blocks,cutoff=15.0): """ # Get positions of the atoms in the atomgroup and find their number. - atoms = atomgroups.positions + atoms = self._atomgroup.positions matrix_size = atoms.shape[0] arraged_coord = list() part_size = matrix_size/n_blocks @@ -137,7 +165,7 @@ def _single_frame(self, atomgroups,scheduler_kwargs,n_blocks,cutoff=15.0): # Distribute the data over the available cores, apply the map function # and execute. parAtoms = db.from_sequence(arraged_coord,npartitions=len(arraged_coord)) - parAtomsMap = parAtoms.map_partitions(find_parcc) + parAtomsMap = parAtoms.map_partitions(self._find_parcc) Components = parAtomsMap.compute(**scheduler_kwargs) # Gather the results and start the reduction. TODO: think if it can go to @@ -209,7 +237,6 @@ def run(self, universe = mda.Universe(self._top, self._traj) - atomgroup = u.atoms[self._indices] scheduler_kwargs = {'get': scheduler.get} if scheduler == multiprocessing: scheduler_kwargs['num_workers'] = n_jobs @@ -221,19 +248,18 @@ def run(self, frames = [] with self.readonly_attributes(): for frame in range(start, stop, step): - leaflet = self._single_frame(atomgroups=atomgroup, - scheduler_kwargs=scheduler_kwargs, + leaflet = self._single_frame(scheduler_kwargs=scheduler_kwargs, n_blocks=n_blocks, cutoff=cutoff) frames.append(leaflet[0:1]) self._results = frames with timeit() as conclude: self._conclude() - - self.timing = Timing( - np.hstack([el[1] for el in res]), - np.hstack([el[2] for el in res]), total.elapsed, - np.array([el[3] for el in res]), time_prepare, conclude.elapsed) + # TODO: Fix timinns + #self.timing = Timing( + # np.hstack([el[1] for el in res]), + # np.hstack([el[2] for el in res]), total.elapsed, + # np.array([el[3] for el in res]), time_prepare, conclude.elapsed) return self def _conclude(self): diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py new file mode 100644 index 00000000..67708f28 --- /dev/null +++ b/pmda/test/test_leaflet.py @@ -0,0 +1,40 @@ +from __future__ import absolute_import, division, print_function + +import pytest +from numpy.testing import (assert_almost_equal, assert_array_equal, + assert_array_almost_equal) + +import MDAnalysis +from MDAnalysisTests.datafiles import (PSF, DCD) + +from pmda import leaflet + + +class TestLeafLet(object): + + @pytest.fixture() + def universe(self): + return MDAnalysis.Universe(PSF, DCD) + + @pytest.fixture() + def correct_values(self): + pass + + @pytest.fixture() + def correct_values_frame_5(self): + return [range(214)] + + def test_leaflet(self, universe): + pass + + def test_leaflet_step(self, universe, correct_values): + + pass + + def test_leaflet_single_frame(self, universe, correct_values_frame_5): + ca = universe.select_atoms("name CA") + universe.trajectory.rewind() + leaflets = leaflet.LeafletFinder(universe,ca).run(start=5,stop=6) + print(leaflets.results) + assert_almost_equal(leaflets.results, correct_values_frame_5, err_msg="error: leaflets should match " + + "test values") \ No newline at end of file From 5d1d946e54850fbc919704dbe516e23296e700e4 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 3 Oct 2018 10:20:54 -0400 Subject: [PATCH 04/19] Adding Scipy and networkx in setup. Changed balltree with ckdtree --- pmda/leaflet.py | 20 +++++++++++++++----- setup.py | 4 +++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index f3216ca1..596f0b9f 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -8,7 +8,7 @@ # Released under the GNU Public Licence, v2 or any higher version """ LeafletFInder Analysis tool --- :mod:`pmda.leaflet` -======================================= +========================================================== This module contains parallel versions of analysis tasks in :mod:`MDAnalysis.analysis.leaflet`. @@ -24,9 +24,9 @@ import numpy as np import dask.bag as db import networkx as nx +from scipy.spatial import cKDTree import MDAnalysis as mda -from sklearn.neighbors import BallTree from dask import distributed, multiprocessing from joblib import cpu_count @@ -39,13 +39,23 @@ class LeafletFinder(ParallelAnalysisBase): Identify atoms in the same leaflet of a lipid bilayer. This class implements and parallelizes the *LeafletFinder* algorithm [Michaud-Agrawal2011]_. + The parallelization is done based on: + + "Task-parallel Analysis of Molecular Dynamics Trajectories", + Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, George Chantzialexiou, + Thomas E. Cheatham, Oliver Beckstein, Geoffrey C. Fox and Shantenu Jha + 47th International Conference on Parallel Processing (ICPP 2018) Attributes ---------- - Parameters ---------- + Universe : :class:`~MDAnalysis.core.groups.Universe` + a :class:`MDAnalysis.core.groups.Universe` (the + `atomgroup` must belong to this Universe) + atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` + atomgroups that are iterated in parallel Note ---- @@ -89,8 +99,8 @@ def _find_parcc(self,data,cutoff=15.0): else: train = np.vstack([window[0],window[1]]) test = np.vstack([window[0],window[1]]) - tree = BallTree(train, leaf_size=40) - edges = tree.query_radius(test, cutoff) + tree = cKDTree(train, leaf_size=40) + edges = tree.query_ball_point(test, cutoff) edge_list=[list(zip(np.repeat(idx, len(dest_list)), \ dest_list)) for idx, dest_list in enumerate(edges)] diff --git a/setup.py b/setup.py index 5a14ee9d..2ea46299 100644 --- a/setup.py +++ b/setup.py @@ -51,7 +51,9 @@ 'MDAnalysis>=0.18', 'dask', 'six', - 'joblib', # cpu_count func currently + 'joblib', + 'networkx', + 'scipy', # cpu_count func currently ], tests_require=[ 'pytest', From ab81b9093050fb50f784d24d139b5dedf3f7686f Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 3 Oct 2018 14:10:48 -0400 Subject: [PATCH 05/19] Single frame test passes, PEP8 both class and test. --- pmda/leaflet.py | 100 ++++++++++++++++++++------------------ pmda/test/test_leaflet.py | 34 +++++++------ 2 files changed, 73 insertions(+), 61 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 596f0b9f..b59c22a8 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -38,12 +38,14 @@ class LeafletFinder(ParallelAnalysisBase): """Parallel Leaflet Finder analysis. Identify atoms in the same leaflet of a lipid bilayer. - This class implements and parallelizes the *LeafletFinder* algorithm [Michaud-Agrawal2011]_. - The parallelization is done based on: - + This class implements and parallelizes the *LeafletFinder* algorithm + [Michaud-Agrawal2011]_. + The parallelization is done based on: + "Task-parallel Analysis of Molecular Dynamics Trajectories", - Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, George Chantzialexiou, - Thomas E. Cheatham, Oliver Beckstein, Geoffrey C. Fox and Shantenu Jha + Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, George + Chantzialexiou, Thomas E. Cheatham, Oliver Beckstein, Geoffrey C. Fox and + Shantenu Jha 47th International Conference on Parallel Processing (ICPP 2018) Attributes @@ -69,26 +71,23 @@ def __init__(self, universe, atomgroup): ---------- Universe : :class:`~MDAnalysis.core.groups.Universe` a :class:`MDAnalysis.core.groups.Universe` (the - `atomgroups` must belong to this Universe) + `atomgroup` must belong to this Universe) - atomgroups : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` - atomgroups that are iterated in parallel + atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` + atomgroup that are iterated in parallel Attributes ---------- - _results : list - The raw data from each process are stored as a list of - lists, with each sublist containing the return values from - :meth:`pmda.parallel.ParallelAnalysisBase._single_frame`. """ + #super(ParallelAnalysisBase, self).__init__() self._trajectory = universe.trajectory self._top = universe.filename self._traj = universe.trajectory.filename self._atomgroup = atomgroup - def _find_parcc(self,data,cutoff=15.0): - window,index = data[0] + def _find_parcc(self, data, cutoff=15.0): + window, index = data[0] num = window[0].shape[0] i_index = index[0] j_index = index[1] @@ -97,15 +96,15 @@ def _find_parcc(self,data,cutoff=15.0): train = window[0] test = window[1] else: - train = np.vstack([window[0],window[1]]) - test = np.vstack([window[0],window[1]]) - tree = cKDTree(train, leaf_size=40) + train = np.vstack([window[0], window[1]]) + test = np.vstack([window[0], window[1]]) + tree = cKDTree(train, leafsize=40) edges = tree.query_ball_point(test, cutoff) - edge_list=[list(zip(np.repeat(idx, len(dest_list)), \ - dest_list)) for idx, dest_list in enumerate(edges)] + edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) + for idx, dest_list in enumerate(edges)] - edge_list_flat = np.array([list(item) \ - for sublist in edge_list for item in sublist]) + edge_list_flat = np.array([list(item) for sublist in edge_list for + item in sublist]) if i_index == j_index: res = edge_list_flat.transpose() res[0] = res[0] + i_index - 1 @@ -113,15 +112,21 @@ def _find_parcc(self,data,cutoff=15.0): else: removed_elements = list() for i in range(edge_list_flat.shape[0]): - if (edge_list_flat[i,0]>=0 and edge_list_flat[i,0]<=num-1) and (edge_list_flat[i,1]>=0 and - edge_list_flat[i,1]<=num-1) or\ - (edge_list_flat[i,0]>=num and edge_list_flat[i,0]<=2*num-1) and (edge_list_flat[i,1]>=num and edge_list_flat[i,1]<=2*num-1): - removed_elements.append(i) - res = np.delete(edge_list_flat,removed_elements,axis=0).transpose() + if (edge_list_flat[i, 0] >= 0 and + edge_list_flat[i, 0] <= num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= num and + edge_list_flat[i, 1] <= 2 * num - 1): + removed_elements.append(i) + res = np.delete(edge_list_flat, removed_elements, + axis=0).transpose() res[0] = res[0] + i_index - 1 - res[1] = res[1] -num + j_index - 1 + res[1] = res[1] - num + j_index - 1 - edges=[(res[0,k],res[1,k]) for k in range(0,res.shape[1])] + edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] graph.add_edges_from(edges) # partial connected components @@ -130,8 +135,7 @@ def _find_parcc(self,data,cutoff=15.0): comp = [g for g in subgraphs] return comp - - def _single_frame(self, scheduler_kwargs,n_blocks,cutoff=15.0): + def _single_frame(self, scheduler_kwargs, n_blocks, cutoff=15.0): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -166,29 +170,32 @@ def _single_frame(self, scheduler_kwargs,n_blocks,cutoff=15.0): atoms = self._atomgroup.positions matrix_size = atoms.shape[0] arraged_coord = list() - part_size = matrix_size/n_blocks + part_size = int(matrix_size / n_blocks) # Partition the data based on a 2-dimensional partitioning - for i in range(1,matrix_size+1,part_size): - for j in range(1,matrix_size+1,part_size): - arraged_coord.append(([atoms[i-1:i-1+part_size],atoms[j-1:j-1+part_size]],[i,j])) + for i in range(1, matrix_size + 1, part_size): + for j in range(1, matrix_size + 1, part_size): + arraged_coord.append(([atoms[i - 1:i - 1 + part_size], + atoms[j - 1:j - 1 + part_size]], + [i, j])) # Distribute the data over the available cores, apply the map function # and execute. - parAtoms = db.from_sequence(arraged_coord,npartitions=len(arraged_coord)) + parAtoms = db.from_sequence(arraged_coord, + npartitions=len(arraged_coord)) parAtomsMap = parAtoms.map_partitions(self._find_parcc) Components = parAtomsMap.compute(**scheduler_kwargs) - # Gather the results and start the reduction. TODO: think if it can go to - # the private _reduce method of the based class. + # Gather the results and start the reduction. TODO: think if it can go + # to the private _reduce method of the based class. result = list(Components) # Create the overall connected components of the graph - while len(result)!=0: + while len(result) != 0: item1 = result[0] result.pop(0) ind = [] for i, item2 in enumerate(Components): if item1.intersection(item2): - item1=item1.union(item2) + item1 = item1.union(item2) ind.append(i) ind.reverse() [Components.pop(j) for j in ind] @@ -198,7 +205,6 @@ def _single_frame(self, scheduler_kwargs,n_blocks,cutoff=15.0): indices = [np.sort(list(g)) for g in Components] return indices - def run(self, start=None, stop=None, @@ -225,8 +231,9 @@ def run(self, This argument will be ignored when the distributed scheduler is used n_blocks : int, optional - number of partitions to divide trajectory frame into. If ``None`` set equal - to sqrt(n_jobs) or number of available workers in scheduler. + number of partitions to divide trajectory frame into. If ``None`` + set equal to sqrt(n_jobs) or number of available workers in + scheduler. """ if scheduler is None: @@ -245,12 +252,11 @@ def run(self, "Couldn't guess ideal number of blocks from scheduler." "Please provide `n_blocks` in call to method.") - universe = mda.Universe(self._top, self._traj) scheduler_kwargs = {'get': scheduler.get} if scheduler == multiprocessing: scheduler_kwargs['num_workers'] = n_jobs - + start, stop, step = self._trajectory.check_slice_indices( start, stop, step) n_frames = len(range(start, stop, step)) @@ -261,16 +267,16 @@ def run(self, leaflet = self._single_frame(scheduler_kwargs=scheduler_kwargs, n_blocks=n_blocks, cutoff=cutoff) - frames.append(leaflet[0:1]) + frames.append(leaflet[0:2]) self._results = frames with timeit() as conclude: self._conclude() # TODO: Fix timinns - #self.timing = Timing( + # self.timing = Timing( # np.hstack([el[1] for el in res]), # np.hstack([el[2] for el in res]), total.elapsed, # np.array([el[3] for el in res]), time_prepare, conclude.elapsed) return self def _conclude(self): - self.results = np.hstack(self._results) \ No newline at end of file + self.results = [self._atomgroup[i] for i in self._results] diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py index 67708f28..51621034 100644 --- a/pmda/test/test_leaflet.py +++ b/pmda/test/test_leaflet.py @@ -2,39 +2,45 @@ import pytest from numpy.testing import (assert_almost_equal, assert_array_equal, - assert_array_almost_equal) + assert_array_almost_equal) import MDAnalysis -from MDAnalysisTests.datafiles import (PSF, DCD) - +from MDAnalysisTests.datafiles import Martini_membrane_gro +from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT from pmda import leaflet class TestLeafLet(object): + @pytest.fixture() + def u_one_frame(self): + return MDAnalysis.Universe(Martini_membrane_gro) + @pytest.fixture() def universe(self): - return MDAnalysis.Universe(PSF, DCD) + return MDAnalysis.Universe(Martini_membrane_gro) @pytest.fixture() def correct_values(self): pass @pytest.fixture() - def correct_values_frame_5(self): - return [range(214)] + def correct_values_single_frame(self): + return [range(1, 2150, 12), range(2521, 4670, 12)] def test_leaflet(self, universe): pass def test_leaflet_step(self, universe, correct_values): - pass - def test_leaflet_single_frame(self, universe, correct_values_frame_5): - ca = universe.select_atoms("name CA") - universe.trajectory.rewind() - leaflets = leaflet.LeafletFinder(universe,ca).run(start=5,stop=6) - print(leaflets.results) - assert_almost_equal(leaflets.results, correct_values_frame_5, err_msg="error: leaflets should match " + - "test values") \ No newline at end of file + def test_leaflet_single_frame(self, + u_one_frame, + correct_values_single_frame): + lipid_heads = u_one_frame.select_atoms("name PO4") + u_one_frame.trajectory.rewind() + leaflets = leaflet.LeafletFinder(u_one_frame, lipid_heads).run(start=0, + stop=1) + assert_almost_equal(leaflets.results[0].indices, + correct_values_single_frame, err_msg="error: " + + "leaflets should match test values") From bb65a59becdfa3224627be06c8ae7d49777992c0 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 3 Oct 2018 14:56:37 -0400 Subject: [PATCH 06/19] Removing code comments based on PEP8 messages --- pmda/leaflet.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index b59c22a8..76eccc1e 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -80,7 +80,6 @@ def __init__(self, universe, atomgroup): ---------- """ - #super(ParallelAnalysisBase, self).__init__() self._trajectory = universe.trajectory self._top = universe.filename self._traj = universe.trajectory.filename @@ -169,6 +168,7 @@ def _single_frame(self, scheduler_kwargs, n_blocks, cutoff=15.0): # Get positions of the atoms in the atomgroup and find their number. atoms = self._atomgroup.positions matrix_size = atoms.shape[0] + print(matrix_size) arraged_coord = list() part_size = int(matrix_size / n_blocks) # Partition the data based on a 2-dimensional partitioning @@ -264,9 +264,10 @@ def run(self, frames = [] with self.readonly_attributes(): for frame in range(start, stop, step): - leaflet = self._single_frame(scheduler_kwargs=scheduler_kwargs, - n_blocks=n_blocks, - cutoff=cutoff) + leaflet = self. \ + _single_frame(scheduler_kwargs=scheduler_kwargs, + n_blocks=n_blocks, + cutoff=cutoff) frames.append(leaflet[0:2]) self._results = frames with timeit() as conclude: From 56f39ac787e5ca7ad94cccc57ff6896b425b0770 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Fri, 5 Oct 2018 10:19:54 -0400 Subject: [PATCH 07/19] trajectory level testing and pep8 changes --- pmda/leaflet.py | 16 +++++++--------- pmda/test/test_leaflet.py | 35 +++++++++++++++++++++++++---------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 76eccc1e..8d0516e4 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -84,6 +84,7 @@ def __init__(self, universe, atomgroup): self._top = universe.filename self._traj = universe.trajectory.filename self._atomgroup = atomgroup + self._results = list() def _find_parcc(self, data, cutoff=15.0): window, index = data[0] @@ -145,9 +146,6 @@ def _single_frame(self, scheduler_kwargs, n_blocks, cutoff=15.0): Parameters ---------- - atomgroups : tuple - Tuple of :class:`~MDAnalysis.core.groups.AtomGroup` - instances that are updated to the current frame. scheduler_kwargs : Dask Scheduler parameters. cutoff : float (optional) head group-defining atoms within a distance of `cutoff` @@ -168,7 +166,6 @@ def _single_frame(self, scheduler_kwargs, n_blocks, cutoff=15.0): # Get positions of the atoms in the atomgroup and find their number. atoms = self._atomgroup.positions matrix_size = atoms.shape[0] - print(matrix_size) arraged_coord = list() part_size = int(matrix_size / n_blocks) # Partition the data based on a 2-dimensional partitioning @@ -261,15 +258,16 @@ def run(self, start, stop, step) n_frames = len(range(start, stop, step)) with timeit() as total: - frames = [] with self.readonly_attributes(): + frames = list() for frame in range(start, stop, step): - leaflet = self. \ + components = self. \ _single_frame(scheduler_kwargs=scheduler_kwargs, n_blocks=n_blocks, cutoff=cutoff) - frames.append(leaflet[0:2]) - self._results = frames + leaflet1 = self._atomgroup[components[0]] + leaflet2 = self._atomgroup[components[1]] + self._results.append([leaflet1,leaflet2]) with timeit() as conclude: self._conclude() # TODO: Fix timinns @@ -280,4 +278,4 @@ def run(self, return self def _conclude(self): - self.results = [self._atomgroup[i] for i in self._results] + self.results = self._results diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py index 51621034..c9cd88d8 100644 --- a/pmda/test/test_leaflet.py +++ b/pmda/test/test_leaflet.py @@ -8,6 +8,7 @@ from MDAnalysisTests.datafiles import Martini_membrane_gro from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT from pmda import leaflet +import numpy as np class TestLeafLet(object): @@ -18,21 +19,34 @@ def u_one_frame(self): @pytest.fixture() def universe(self): - return MDAnalysis.Universe(Martini_membrane_gro) + return MDAnalysis.Universe(GRO_MEMPROT, XTC_MEMPROT) @pytest.fixture() def correct_values(self): - pass + return [np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36634]), + np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36634]), + np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36634]), + np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36634]), + np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36634])] @pytest.fixture() def correct_values_single_frame(self): - return [range(1, 2150, 12), range(2521, 4670, 12)] - - def test_leaflet(self, universe): - pass - - def test_leaflet_step(self, universe, correct_values): - pass + return [np.arange(1, 2150, 12), np.arange(2521, 4670, 12)] + + def test_leaflet(self, universe, correct_values): + lipid_heads = universe.select_atoms("name P and resname POPG") + universe.trajectory.rewind() + leaflets = leaflet.LeafletFinder(universe, lipid_heads).run() + results = [atoms.indices for atomgroup in leaflets.results + for atoms in atomgroup] + [assert_almost_equal(x, y, err_msg="error: leaflets should match " + + "test values") for x, y in + zip(results, correct_values)] def test_leaflet_single_frame(self, u_one_frame, @@ -41,6 +55,7 @@ def test_leaflet_single_frame(self, u_one_frame.trajectory.rewind() leaflets = leaflet.LeafletFinder(u_one_frame, lipid_heads).run(start=0, stop=1) - assert_almost_equal(leaflets.results[0].indices, + assert_almost_equal([atoms.indices for atomgroup in leaflets.results + for atoms in atomgroup], correct_values_single_frame, err_msg="error: " + "leaflets should match test values") From 146dcaab4119e63d29dfe554c81888ab21830be7 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 10 Oct 2018 12:07:19 -0400 Subject: [PATCH 08/19] Changing where the paper is mentioned --- docs/references.rst | 9 +++++++++ pmda/leaflet.py | 31 +++++++++++++++---------------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/docs/references.rst b/docs/references.rst index 4655c4dd..eb020c1d 100644 --- a/docs/references.rst +++ b/docs/references.rst @@ -34,3 +34,12 @@ TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_ .. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a + +.. [Paraskevakos2018] Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, + George Chantzialexiou, Thomas E. Cheatham, Oliver + Beckstein, Geoffrey C. Fox and Shantenu Jha. Task- + parallel Analysis of Molecular Dynamics Trajectories. In + 47th International Conference on Parallel Processing + (ICPP 2018). doi: `10.1145/3225058.32251281`_ + +.. _`10.1145/3225058.3225128` : https://doi.org/10.1145/3225058.3225128 \ No newline at end of file diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 8d0516e4..5bc052bf 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -40,13 +40,8 @@ class LeafletFinder(ParallelAnalysisBase): Identify atoms in the same leaflet of a lipid bilayer. This class implements and parallelizes the *LeafletFinder* algorithm [Michaud-Agrawal2011]_. - The parallelization is done based on: - "Task-parallel Analysis of Molecular Dynamics Trajectories", - Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, George - Chantzialexiou, Thomas E. Cheatham, Oliver Beckstein, Geoffrey C. Fox and - Shantenu Jha - 47th International Conference on Parallel Processing (ICPP 2018) + The parallelization is done based on [Paraskevakos2018]_. Attributes ---------- @@ -64,6 +59,10 @@ class LeafletFinder(ParallelAnalysisBase): At the moment, this class has far fewer features than the serial version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`. + This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest + Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in + [Paraskevakos2018]_. + """ def __init__(self, universe, atomgroup): @@ -120,7 +119,7 @@ def _find_parcc(self, data, cutoff=15.0): edge_list_flat[i, 0] <= 2 * num - 1) and \ (edge_list_flat[i, 1] >= num and edge_list_flat[i, 1] <= 2 * num - 1): - removed_elements.append(i) + removed_elements.append(i) res = np.delete(edge_list_flat, removed_elements, axis=0).transpose() res[0] = res[0] + i_index - 1 @@ -179,7 +178,7 @@ def _single_frame(self, scheduler_kwargs, n_blocks, cutoff=15.0): # and execute. parAtoms = db.from_sequence(arraged_coord, npartitions=len(arraged_coord)) - parAtomsMap = parAtoms.map_partitions(self._find_parcc) + parAtomsMap = parAtoms.map_partitions(self._find_parcc,cutoff=cutoff) Components = parAtomsMap.compute(**scheduler_kwargs) # Gather the results and start the reduction. TODO: think if it can go @@ -253,28 +252,28 @@ def run(self, scheduler_kwargs = {'get': scheduler.get} if scheduler == multiprocessing: scheduler_kwargs['num_workers'] = n_jobs - + start, stop, step = self._trajectory.check_slice_indices( start, stop, step) n_frames = len(range(start, stop, step)) with timeit() as total: with self.readonly_attributes(): frames = list() + timings = list() for frame in range(start, stop, step): - components = self. \ + with timeit() as b_compute: + components = self. \ _single_frame(scheduler_kwargs=scheduler_kwargs, n_blocks=n_blocks, cutoff=cutoff) leaflet1 = self._atomgroup[components[0]] leaflet2 = self._atomgroup[components[1]] - self._results.append([leaflet1,leaflet2]) + timings.append(b_compute.elapsed) + self._results.append([leaflet1, leaflet2]) with timeit() as conclude: self._conclude() - # TODO: Fix timinns - # self.timing = Timing( - # np.hstack([el[1] for el in res]), - # np.hstack([el[2] for el in res]), total.elapsed, - # np.array([el[3] for el in res]), time_prepare, conclude.elapsed) + self.timing = Timing( + np.hstack(timings), total.elapsed, conclude.elapsed) return self def _conclude(self): From 0ad29988c80965cdc9a4728c5221d9d93b54a393 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 10 Oct 2018 14:10:22 -0400 Subject: [PATCH 09/19] Updated Changelog --- CHANGELOG | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG b/CHANGELOG index 21572956..6fbf16e0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -21,6 +21,7 @@ Enhancements * add add timing for _conclude and _prepare (Issue #49) * add parallel particle-particle RDF calculation module pmda.rdf (Issue #41) * add readonly_attributes context manager to ParallelAnalysisBase + * add parallel implementation of Leaflet Finder (Issue #47) 06/07/18 orbeckst From d5813acf01385dde7f7fc1e25756e59998495143 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 11 Oct 2018 00:56:45 -0400 Subject: [PATCH 10/19] Continuing PR review resolve --- pmda/leaflet.py | 112 +++++++++++++++++++++++--------------- pmda/test/test_leaflet.py | 6 +- setup.py | 4 +- 3 files changed, 75 insertions(+), 47 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 5bc052bf..3a63d4b9 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -25,6 +25,7 @@ import dask.bag as db import networkx as nx from scipy.spatial import cKDTree +from sklearn.neighbors import BallTree import MDAnalysis as mda from dask import distributed, multiprocessing @@ -59,33 +60,46 @@ class LeafletFinder(ParallelAnalysisBase): At the moment, this class has far fewer features than the serial version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`. - This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest - Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in + This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest + Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in [Paraskevakos2018]_. - """ - - def __init__(self, universe, atomgroup): - """Parameters - ---------- - Universe : :class:`~MDAnalysis.core.groups.Universe` - a :class:`MDAnalysis.core.groups.Universe` (the - `atomgroup` must belong to this Universe) + Currently, periodic boundaries are not taken into account. - atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup` - atomgroup that are iterated in parallel + The calculation is parallelized on a per-frame basis [Paraskevakos2018]_; + at the moment, no parallelization over trajectory blocks is performed. - Attributes - ---------- + """ - """ + def __init__(self, universe, atomgroup): self._trajectory = universe.trajectory self._top = universe.filename self._traj = universe.trajectory.filename self._atomgroup = atomgroup self._results = list() - def _find_parcc(self, data, cutoff=15.0): + def _find_connected_components(self, data, cutoff=15.0): + """Perform the Connected Components discovery for the atoms in data. + + Parameters + ---------- + data : Tuple of lists of Numpy arrays + This is a data and index tuple. The data are organized as + `([AtomPositions1,AtomPositions2], + [index1,index2])`. `index1` and `index2` are showing the + position of the `AtomPosition` in the adjacency matrix and + allows to correct the node number of the produced graph. + cutoff : float (optional) + head group-defining atoms within a distance of `cutoff` + Angstroms are deemed to be in the same leaflet [15.0] + + Returns + ------- + values : list. + A list of all the connected components of the graph that is + generated from `data` + + """ window, index = data[0] num = window[0].shape[0] i_index = index[0] @@ -97,6 +111,7 @@ def _find_parcc(self, data, cutoff=15.0): else: train = np.vstack([window[0], window[1]]) test = np.vstack([window[0], window[1]]) + tree = cKDTree(train, leafsize=40) edges = tree.query_ball_point(test, cutoff) edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) @@ -104,6 +119,7 @@ def _find_parcc(self, data, cutoff=15.0): edge_list_flat = np.array([list(item) for sublist in edge_list for item in sublist]) + if i_index == j_index: res = edge_list_flat.transpose() res[0] = res[0] + i_index - 1 @@ -118,12 +134,18 @@ def _find_parcc(self, data, cutoff=15.0): (edge_list_flat[i, 0] >= num and edge_list_flat[i, 0] <= 2 * num - 1) and \ (edge_list_flat[i, 1] >= num and - edge_list_flat[i, 1] <= 2 * num - 1): + edge_list_flat[i, 1] <= 2 * num - 1) or \ + (edge_list_flat[i, 0] >= num and + edge_list_flat[i, 0] <= 2 * num - 1) and \ + (edge_list_flat[i, 1] >= 0 and + edge_list_flat[i, 1] <= num - 1): removed_elements.append(i) res = np.delete(edge_list_flat, removed_elements, axis=0).transpose() res[0] = res[0] + i_index - 1 res[1] = res[1] - num + j_index - 1 + if res.shape[1] == 0: + res = np.zeros((2, 1), dtype=np.int) edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] graph.add_edges_from(edges) @@ -134,7 +156,7 @@ def _find_parcc(self, data, cutoff=15.0): comp = [g for g in subgraphs] return comp - def _single_frame(self, scheduler_kwargs, n_blocks, cutoff=15.0): + def _single_frame(self, scheduler_kwargs, n_jobs, cutoff=15.0): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -165,25 +187,27 @@ def _single_frame(self, scheduler_kwargs, n_blocks, cutoff=15.0): # Get positions of the atoms in the atomgroup and find their number. atoms = self._atomgroup.positions matrix_size = atoms.shape[0] - arraged_coord = list() - part_size = int(matrix_size / n_blocks) + arranged_coord = list() + part_size = int(matrix_size / n_jobs) + # Partition the data based on a 2-dimensional partitioning for i in range(1, matrix_size + 1, part_size): for j in range(1, matrix_size + 1, part_size): - arraged_coord.append(([atoms[i - 1:i - 1 + part_size], + arranged_coord.append(([atoms[i - 1:i - 1 + part_size], atoms[j - 1:j - 1 + part_size]], [i, j])) - # Distribute the data over the available cores, apply the map function # and execute. - parAtoms = db.from_sequence(arraged_coord, - npartitions=len(arraged_coord)) - parAtomsMap = parAtoms.map_partitions(self._find_parcc,cutoff=cutoff) + parAtoms = db.from_sequence(arranged_coord, + npartitions=len(arranged_coord)) + parAtomsMap = parAtoms.map_partitions(self._find_connected_components, + cutoff=cutoff) Components = parAtomsMap.compute(**scheduler_kwargs) # Gather the results and start the reduction. TODO: think if it can go # to the private _reduce method of the based class. result = list(Components) + # Create the overall connected components of the graph while len(result) != 0: item1 = result[0] @@ -207,7 +231,6 @@ def run(self, step=None, scheduler=None, n_jobs=1, - n_blocks=None, cutoff=15.0): """Perform the calculation @@ -226,54 +249,57 @@ def run(self, number of tasks to start, if `-1` use number of logical cpu cores. This argument will be ignored when the distributed scheduler is used - n_blocks : int, optional - number of partitions to divide trajectory frame into. If ``None`` - set equal to sqrt(n_jobs) or number of available workers in - scheduler. """ if scheduler is None: scheduler = multiprocessing if n_jobs == -1: - n_jobs = cpu_count() - - if n_blocks is None: if scheduler == multiprocessing: - n_blocks = n_jobs + n_jobs = cpu_count() elif isinstance(scheduler, distributed.Client): - n_blocks = len(scheduler.ncores()) + n_jobs = len(scheduler.ncores()) else: raise ValueError( - "Couldn't guess ideal number of blocks from scheduler." - "Please provide `n_blocks` in call to method.") + "Couldn't guess ideal number of jobs from scheduler." + "Please provide `n_jobs` in call to method.") + + with timeit() as b_universe: + universe = mda.Universe(self._top, self._traj) - universe = mda.Universe(self._top, self._traj) scheduler_kwargs = {'get': scheduler.get} if scheduler == multiprocessing: scheduler_kwargs['num_workers'] = n_jobs start, stop, step = self._trajectory.check_slice_indices( start, stop, step) - n_frames = len(range(start, stop, step)) with timeit() as total: + with timeit() as prepare: + self._prepare() + with self.readonly_attributes(): frames = list() timings = list() + times_io = [] for frame in range(start, stop, step): + with timeit() as b_io: + ts = universe.trajectory[frame] + times_io.append(b_io.elapsed) with timeit() as b_compute: components = self. \ _single_frame(scheduler_kwargs=scheduler_kwargs, - n_blocks=n_blocks, + n_jobs=n_jobs, cutoff=cutoff) + + timings.append(b_compute.elapsed) leaflet1 = self._atomgroup[components[0]] leaflet2 = self._atomgroup[components[1]] - timings.append(b_compute.elapsed) self._results.append([leaflet1, leaflet2]) with timeit() as conclude: self._conclude() - self.timing = Timing( - np.hstack(timings), total.elapsed, conclude.elapsed) + self.timing = Timing(times_io, + np.hstack(timings), total.elapsed, b_universe.elapsed, + prepare.elapsed, conclude.elapsed) return self def _conclude(self): diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py index c9cd88d8..1d28ead5 100644 --- a/pmda/test/test_leaflet.py +++ b/pmda/test/test_leaflet.py @@ -53,8 +53,10 @@ def test_leaflet_single_frame(self, correct_values_single_frame): lipid_heads = u_one_frame.select_atoms("name PO4") u_one_frame.trajectory.rewind() - leaflets = leaflet.LeafletFinder(u_one_frame, lipid_heads).run(start=0, - stop=1) + leaflets = leaflet.LeafletFinder(u_one_frame, + lipid_heads).run(start=0, stop=1, + n_jobs=2) + assert_almost_equal([atoms.indices for atomgroup in leaflets.results for atoms in atomgroup], correct_values_single_frame, err_msg="error: " + diff --git a/setup.py b/setup.py index 2ea46299..67ab3d22 100644 --- a/setup.py +++ b/setup.py @@ -51,9 +51,9 @@ 'MDAnalysis>=0.18', 'dask', 'six', - 'joblib', + 'joblib', # cpu_count func currently 'networkx', - 'scipy', # cpu_count func currently + 'scipy', ], tests_require=[ 'pytest', From 063b76ac339bcf142bb70a680f2294ce6fa292b4 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 11 Oct 2018 00:58:08 -0400 Subject: [PATCH 11/19] Reduced number of tasks --- pmda/leaflet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 3a63d4b9..31521323 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -192,7 +192,7 @@ def _single_frame(self, scheduler_kwargs, n_jobs, cutoff=15.0): # Partition the data based on a 2-dimensional partitioning for i in range(1, matrix_size + 1, part_size): - for j in range(1, matrix_size + 1, part_size): + for j in range(i, matrix_size + 1, part_size): arranged_coord.append(([atoms[i - 1:i - 1 + part_size], atoms[j - 1:j - 1 + part_size]], [i, j])) From 4765fd93421b67e53e83245016525df88a80aadf Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 11 Oct 2018 20:37:29 -0400 Subject: [PATCH 12/19] Removing redundant import --- pmda/leaflet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 31521323..07f0425e 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -25,7 +25,6 @@ import dask.bag as db import networkx as nx from scipy.spatial import cKDTree -from sklearn.neighbors import BallTree import MDAnalysis as mda from dask import distributed, multiprocessing From 594b19e30fd545bf5b852292c5e1a4db3bf88097 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 11 Oct 2018 21:33:56 -0400 Subject: [PATCH 13/19] PEP8 corrections --- pmda/leaflet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 07f0425e..3fd43364 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -296,8 +296,8 @@ def run(self, self._results.append([leaflet1, leaflet2]) with timeit() as conclude: self._conclude() - self.timing = Timing(times_io, - np.hstack(timings), total.elapsed, b_universe.elapsed, + self.timing = Timing(times_io, + np.hstack(timings), total.elapsed, b_universe.elapsed, prepare.elapsed, conclude.elapsed) return self From 4ddc64f16861becf0ddea6a8aa068d79ae4adecd Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 11 Oct 2018 22:10:11 -0400 Subject: [PATCH 14/19] Further PEP8 corrections --- pmda/leaflet.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 3fd43364..3242a67a 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -283,7 +283,7 @@ def run(self, for frame in range(start, stop, step): with timeit() as b_io: ts = universe.trajectory[frame] - times_io.append(b_io.elapsed) + times_io.append(b_io.elapsed) with timeit() as b_compute: components = self. \ _single_frame(scheduler_kwargs=scheduler_kwargs, @@ -297,8 +297,9 @@ def run(self, with timeit() as conclude: self._conclude() self.timing = Timing(times_io, - np.hstack(timings), total.elapsed, b_universe.elapsed, - prepare.elapsed, conclude.elapsed) + np.hstack(timings), total.elapsed, + b_universe.elapsed, prepare.elapsed, + conclude.elapsed) return self def _conclude(self): From b7d99f7891dfc4e185c34f700d579f99caa7daec Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Fri, 12 Oct 2018 10:37:29 -0400 Subject: [PATCH 15/19] Debug, references, increased coverage and PEP8. --- docs/references.rst | 2 +- pmda/leaflet.py | 17 ++++++++++------- pmda/test/test_leaflet.py | 15 ++++++++------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/docs/references.rst b/docs/references.rst index eb020c1d..d008b1e4 100644 --- a/docs/references.rst +++ b/docs/references.rst @@ -40,6 +40,6 @@ Beckstein, Geoffrey C. Fox and Shantenu Jha. Task- parallel Analysis of Molecular Dynamics Trajectories. In 47th International Conference on Parallel Processing - (ICPP 2018). doi: `10.1145/3225058.32251281`_ + (ICPP 2018). doi: `10.1145/3225058.3225128`_ .. _`10.1145/3225058.3225128` : https://doi.org/10.1145/3225058.3225128 \ No newline at end of file diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 3242a67a..139260c3 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -104,6 +104,7 @@ def _find_connected_components(self, data, cutoff=15.0): i_index = index[0] j_index = index[1] graph = nx.Graph() + if i_index == j_index: train = window[0] test = window[1] @@ -155,7 +156,8 @@ def _find_connected_components(self, data, cutoff=15.0): comp = [g for g in subgraphs] return comp - def _single_frame(self, scheduler_kwargs, n_jobs, cutoff=15.0): + def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, + cutoff=15.0): """Perform computation on a single trajectory frame. Must return computed values as a list. You can only **read** @@ -184,11 +186,10 @@ def _single_frame(self, scheduler_kwargs, n_jobs, cutoff=15.0): """ # Get positions of the atoms in the atomgroup and find their number. - atoms = self._atomgroup.positions + atoms = ts.positions[atomgroups.indices] matrix_size = atoms.shape[0] arranged_coord = list() part_size = int(matrix_size / n_jobs) - # Partition the data based on a 2-dimensional partitioning for i in range(1, matrix_size + 1, part_size): for j in range(i, matrix_size + 1, part_size): @@ -217,7 +218,8 @@ def _single_frame(self, scheduler_kwargs, n_jobs, cutoff=15.0): item1 = item1.union(item2) ind.append(i) ind.reverse() - [Components.pop(j) for j in ind] + for j in ind: + Components.pop(j) Components.append(item1) # Change output for and return. @@ -229,7 +231,7 @@ def run(self, stop=None, step=None, scheduler=None, - n_jobs=1, + n_jobs=-1, cutoff=15.0): """Perform the calculation @@ -277,7 +279,6 @@ def run(self, self._prepare() with self.readonly_attributes(): - frames = list() timings = list() times_io = [] for frame in range(start, stop, step): @@ -286,7 +287,9 @@ def run(self, times_io.append(b_io.elapsed) with timeit() as b_compute: components = self. \ - _single_frame(scheduler_kwargs=scheduler_kwargs, + _single_frame(ts=ts, + atomgroups=self._atomgroup, + scheduler_kwargs=scheduler_kwargs, n_jobs=n_jobs, cutoff=cutoff) diff --git a/pmda/test/test_leaflet.py b/pmda/test/test_leaflet.py index 1d28ead5..cf16b57b 100644 --- a/pmda/test/test_leaflet.py +++ b/pmda/test/test_leaflet.py @@ -7,6 +7,7 @@ import MDAnalysis from MDAnalysisTests.datafiles import Martini_membrane_gro from MDAnalysisTests.datafiles import GRO_MEMPROT, XTC_MEMPROT +from dask import multiprocessing from pmda import leaflet import numpy as np @@ -25,13 +26,13 @@ def universe(self): def correct_values(self): return [np.array([36507, 36761, 37523, 37650, 38031, 38285]), np.array([36634]), - np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36507, 36761, 38285, 39174]), np.array([36634]), - np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36507, 36761, 37650, 38285, 39174, 39936]), np.array([36634]), - np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36507, 36761, 37650, 38285, 39174, 39428, 39936]), np.array([36634]), - np.array([36507, 36761, 37523, 37650, 38031, 38285]), + np.array([36507, 36761]), np.array([36634])] @pytest.fixture() @@ -41,7 +42,8 @@ def correct_values_single_frame(self): def test_leaflet(self, universe, correct_values): lipid_heads = universe.select_atoms("name P and resname POPG") universe.trajectory.rewind() - leaflets = leaflet.LeafletFinder(universe, lipid_heads).run() + leaflets = leaflet.LeafletFinder(universe, lipid_heads) + leaflets.run(scheduler=multiprocessing, n_jobs=1) results = [atoms.indices for atomgroup in leaflets.results for atoms in atomgroup] [assert_almost_equal(x, y, err_msg="error: leaflets should match " + @@ -54,8 +56,7 @@ def test_leaflet_single_frame(self, lipid_heads = u_one_frame.select_atoms("name PO4") u_one_frame.trajectory.rewind() leaflets = leaflet.LeafletFinder(u_one_frame, - lipid_heads).run(start=0, stop=1, - n_jobs=2) + lipid_heads).run(start=0, stop=1) assert_almost_equal([atoms.indices for atomgroup in leaflets.results for atoms in atomgroup], From 8cfcb19b7c2575a4c2bdc5e39ba4049a5519f24a Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Fri, 12 Oct 2018 13:59:13 -0400 Subject: [PATCH 16/19] Docs added for Leaflet Finder --- docs/api.rst | 1 + docs/references.rst | 6 +++--- pmda/leaflet.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 7d2e3668..00522c40 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -42,3 +42,4 @@ also function as examples for how to implement your own functions with api/rms api/contacts api/rdf + api/leaflet diff --git a/docs/references.rst b/docs/references.rst index d008b1e4..9f9a2a1d 100644 --- a/docs/references.rst +++ b/docs/references.rst @@ -31,15 +31,15 @@ Huff, David Lippa, Dillon Niederhut, and M. Pacer, editors, Proceedings of the 16th Python in Science Conference, pages 64–72, Austin, - TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_ + TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_. -.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a +.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a .. [Paraskevakos2018] Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan, George Chantzialexiou, Thomas E. Cheatham, Oliver Beckstein, Geoffrey C. Fox and Shantenu Jha. Task- parallel Analysis of Molecular Dynamics Trajectories. In 47th International Conference on Parallel Processing - (ICPP 2018). doi: `10.1145/3225058.3225128`_ + (ICPP 2018). doi: `10.1145/3225058.3225128`_. .. _`10.1145/3225058.3225128` : https://doi.org/10.1145/3225058.3225128 \ No newline at end of file diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 139260c3..06518a73 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -65,7 +65,7 @@ class LeafletFinder(ParallelAnalysisBase): Currently, periodic boundaries are not taken into account. - The calculation is parallelized on a per-frame basis [Paraskevakos2018]_; + The calculation is parallelized on a per-frame basis; at the moment, no parallelization over trajectory blocks is performed. """ From 042e5bc1981b5096a36771162cf250fc54719bb4 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Fri, 12 Oct 2018 14:42:20 -0400 Subject: [PATCH 17/19] Addind missing file --- docs/api/leaflet.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 docs/api/leaflet.rst diff --git a/docs/api/leaflet.rst b/docs/api/leaflet.rst new file mode 100644 index 00000000..11767989 --- /dev/null +++ b/docs/api/leaflet.rst @@ -0,0 +1,2 @@ +.. automodule:: pmda.leaflet + From df641551660501171d153d8898d4fc77fdac7562 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Mon, 15 Oct 2018 13:45:46 -0400 Subject: [PATCH 18/19] Trying to fix linter's messages --- pmda/leaflet.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pmda/leaflet.py b/pmda/leaflet.py index 06518a73..3f879ae5 100644 --- a/pmda/leaflet.py +++ b/pmda/leaflet.py @@ -19,7 +19,7 @@ :inherited-members: """ -from __future__ import absolute_import +from __future__ import absolute_import, division import numpy as np import dask.bag as db @@ -70,13 +70,12 @@ class LeafletFinder(ParallelAnalysisBase): """ - def __init__(self, universe, atomgroup): - self._trajectory = universe.trajectory - self._top = universe.filename - self._traj = universe.trajectory.filename - self._atomgroup = atomgroup + def __init__(self, universe, atomgroups): + self._atomgroup = atomgroups self._results = list() + super(LeafletFinder, self).__init__(universe, (atomgroups,)) + def _find_connected_components(self, data, cutoff=15.0): """Perform the Connected Components discovery for the atoms in data. @@ -156,6 +155,7 @@ def _find_connected_components(self, data, cutoff=15.0): comp = [g for g in subgraphs] return comp + # pylint: disable=arguments-differ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, cutoff=15.0): """Perform computation on a single trajectory frame. @@ -226,6 +226,7 @@ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, indices = [np.sort(list(g)) for g in Components] return indices + # pylint: disable=arguments-differ def run(self, start=None, stop=None, From ce5d471b928f2d77982a7804e0c19ccc6340ef33 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 17 Oct 2018 16:21:03 -0400 Subject: [PATCH 19/19] Adding my name on AUTHORS, changelog and conf.py --- AUTHORS | 1 + CHANGELOG | 2 +- docs/conf.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/AUTHORS b/AUTHORS index d283ee33..8c75971a 100644 --- a/AUTHORS +++ b/AUTHORS @@ -22,4 +22,5 @@ Chronological list of authors 2018 - Shujie Fan - Richard J Gowers + - Ioannis Paraskevakos diff --git a/CHANGELOG b/CHANGELOG index 6fbf16e0..7611e8ad 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -13,7 +13,7 @@ The rules for this file: * release numbers follow "Semantic Versioning" http://semver.org ------------------------------------------------------------------------------ -xx/xx/18 VOD555, richardjgowers +xx/xx/18 VOD555, richardjgowers, iparask * 0.2.0 diff --git a/docs/conf.py b/docs/conf.py index d9fb6398..a918e6b7 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -57,7 +57,7 @@ # General information about the project. project = u'PMDA' -author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein' +author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein, Ioannis Paraskevakos' copyright = u'2018, ' + author