Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ Chronological list of authors
2018
- Shujie Fan
- Richard J Gowers
- Ioannis Paraskevakos

3 changes: 2 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ The rules for this file:
* release numbers follow "Semantic Versioning" http://semver.org

------------------------------------------------------------------------------
xx/xx/18 VOD555, richardjgowers
xx/xx/18 VOD555, richardjgowers, iparask

* 0.2.0

Enhancements
* add add timing for _conclude and _prepare (Issue #49)
* add parallel particle-particle RDF calculation module pmda.rdf (Issue #41)
* add readonly_attributes context manager to ParallelAnalysisBase
* add parallel implementation of Leaflet Finder (Issue #47)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add your GitHub name at the top of 0.2.0!

add yourself to AUTHORS and theauthors line in doc/conf.py

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I will!



06/07/18 orbeckst
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ also function as examples for how to implement your own functions with
api/rms
api/contacts
api/rdf
api/leaflet
2 changes: 2 additions & 0 deletions docs/api/leaflet.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.. automodule:: pmda.leaflet

2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

# General information about the project.
project = u'PMDA'
author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein'
author = u'Max Linke, Shujie Fan, Richard J. Gowers, Oliver Beckstein, Ioannis Paraskevakos'
copyright = u'2018, ' + author


Expand Down
13 changes: 11 additions & 2 deletions docs/references.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
Huff, David Lippa, Dillon Niederhut, and M. Pacer,
editors, Proceedings of the 16th Python in Science
Conference, pages 64–72, Austin,
TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_
TX, 2017. SciPy. doi:`10.25080/shinma-7f4c6e7-00a`_.

.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a
.. _`10.25080/shinma-7f4c6e7-00a`: https://doi.org/10.25080/shinma-7f4c6e7-00a

.. [Paraskevakos2018] Ioannis Paraskevakos, Andre Luckow, Mahzad Khoshlessan,
George Chantzialexiou, Thomas E. Cheatham, Oliver
Beckstein, Geoffrey C. Fox and Shantenu Jha. Task-
parallel Analysis of Molecular Dynamics Trajectories. In
47th International Conference on Parallel Processing
(ICPP 2018). doi: `10.1145/3225058.3225128`_.

.. _`10.1145/3225058.3225128` : https://doi.org/10.1145/3225058.3225128
310 changes: 310 additions & 0 deletions pmda/leaflet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# PMDA
# Copyright (c) 2018 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
"""
LeafletFInder Analysis tool --- :mod:`pmda.leaflet`
==========================================================

This module contains parallel versions of analysis tasks in
:mod:`MDAnalysis.analysis.leaflet`.

.. autoclass:: LeafletFinder
:members:
:undoc-members:
:inherited-members:

"""
from __future__ import absolute_import, division

import numpy as np
import dask.bag as db
import networkx as nx
from scipy.spatial import cKDTree

import MDAnalysis as mda
from dask import distributed, multiprocessing
from joblib import cpu_count

from .parallel import ParallelAnalysisBase, Timing
from .util import timeit


class LeafletFinder(ParallelAnalysisBase):
"""Parallel Leaflet Finder analysis.

Identify atoms in the same leaflet of a lipid bilayer.
This class implements and parallelizes the *LeafletFinder* algorithm
[Michaud-Agrawal2011]_.

The parallelization is done based on [Paraskevakos2018]_.

Attributes
----------

Parameters
----------
Universe : :class:`~MDAnalysis.core.groups.Universe`
a :class:`MDAnalysis.core.groups.Universe` (the
`atomgroup` must belong to this Universe)
atomgroup : tuple of :class:`~MDAnalysis.core.groups.AtomGroup`
atomgroups that are iterated in parallel

Note
----
At the moment, this class has far fewer features than the serial
version :class:`MDAnalysis.analysis.leaflet.LeafletFinder`.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional notes to add:

  • Currently, periodic boundaries are not taken into account.
  • The calculation is parallelized on a per-frame basis [Paraskevakos2018]_; at the moment, no parallelization over trajectory blocks is performed.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

This version offers Leaflet Finder algorithm 4 ("Tree-based Nearest
Neighbor and Parallel-Connected Com- ponents (Tree-Search)") in
[Paraskevakos2018]_.

Currently, periodic boundaries are not taken into account.

The calculation is parallelized on a per-frame basis;
at the moment, no parallelization over trajectory blocks is performed.

"""

def __init__(self, universe, atomgroups):
self._atomgroup = atomgroups
self._results = list()

super(LeafletFinder, self).__init__(universe, (atomgroups,))

def _find_connected_components(self, data, cutoff=15.0):
"""Perform the Connected Components discovery for the atoms in data.

Parameters
----------
data : Tuple of lists of Numpy arrays
This is a data and index tuple. The data are organized as
`([AtomPositions1<NumpyArray>,AtomPositions2<NumpyArray>],
[index1,index2])`. `index1` and `index2` are showing the
position of the `AtomPosition` in the adjacency matrix and
allows to correct the node number of the produced graph.
cutoff : float (optional)
head group-defining atoms within a distance of `cutoff`
Angstroms are deemed to be in the same leaflet [15.0]

Returns
-------
values : list.
A list of all the connected components of the graph that is
generated from `data`

"""
window, index = data[0]
num = window[0].shape[0]
i_index = index[0]
j_index = index[1]
graph = nx.Graph()

if i_index == j_index:
train = window[0]
test = window[1]
else:
train = np.vstack([window[0], window[1]])
test = np.vstack([window[0], window[1]])

tree = cKDTree(train, leafsize=40)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here that this does not respect PBC; possible TODO: work with capped_distances() instead

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by PBC?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://en.wikipedia.org/wiki/Periodic_boundary_conditions

like Pacman geometry. Particles are simulated within a finite box of size [Lx, Ly, Lz], Particles near x=0 and x=Lx are actually very close to each other.

So the leaflet is actually an infinite sheet and has no "edge". Without considering periodic boundaries you'll identify the leaflets, but you'll miss some of the edges in the graph representation, which might be important for some analysis

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see. May I suggest the following. Have a working implementation without taking into account PBCs and without breaking the trajectory into multiple blocks. As soon as that is done and this PR can be accepted, merge it so that we do not miss the release this is intended for.

Next version of the implementation will take into account PBCs and also try to see how to parallelize over trajectory blocks.

@orbeckst , @richardjgowers do you agree with that?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with an initial version that does not do PBC if

  • the docs mention this
  • we have an issue open for it

Breaking trajectory into blocks can also wait in my opinion.

edges = tree.query_ball_point(test, cutoff)
edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list))
for idx, dest_list in enumerate(edges)]

edge_list_flat = np.array([list(item) for sublist in edge_list for
item in sublist])

if i_index == j_index:
res = edge_list_flat.transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] + j_index - 1
else:
removed_elements = list()
for i in range(edge_list_flat.shape[0]):
if (edge_list_flat[i, 0] >= 0 and
edge_list_flat[i, 0] <= num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= num and
edge_list_flat[i, 1] <= 2 * num - 1) or \
(edge_list_flat[i, 0] >= num and
edge_list_flat[i, 0] <= 2 * num - 1) and \
(edge_list_flat[i, 1] >= 0 and
edge_list_flat[i, 1] <= num - 1):
removed_elements.append(i)
res = np.delete(edge_list_flat, removed_elements,
axis=0).transpose()
res[0] = res[0] + i_index - 1
res[1] = res[1] - num + j_index - 1
if res.shape[1] == 0:
res = np.zeros((2, 1), dtype=np.int)

edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])]
graph.add_edges_from(edges)

# partial connected components

subgraphs = nx.connected_components(graph)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc networkx is wholly Python

This function in MDAnalysis:

https://github.com/MDAnalysis/mdanalysis/blob/develop/package/MDAnalysis/lib/_cutil.pyx#L322

Is a c++ function which does the same, pass it node and edge indices and it will return a list of the indices in each subgraph

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your comment correctly, you are suggesting to change from netwrokx to the function MDAnalysis implements, right?

When I started the implementation of the Leaflet Finder for my paper I used the class that exists in MDAnalysis (link). There networkx is being used. In addition, I was very cautious when I built my version to use highly used python packages for things I did not want to reimplement.

Did you reimplement a Connected component calculator or did that pre-exist Leaflet Finder?

Also, I am extremely bad with abbreviations and I have to google them all the time. Can you expand iirc?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, iirc = if I remember correctly.

Yeah I think the MDAnalysis function will be faster. It was added after the original leaflet finder class.

If you want to keep this the same for a closer comparison to the original class that's fine

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richardjgowers did you benchmark find_fragments() against networkx?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also, we should update the serial LeafletFinder with capped_distances and find_fragments...)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we use networkx for the initial version because this is what was described in the paper.

We can then open an new issue to optimize parallel leafletfinder and add PBC.

comp = [g for g in subgraphs]
return comp

# pylint: disable=arguments-differ
def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs,
cutoff=15.0):
"""Perform computation on a single trajectory frame.

Must return computed values as a list. You can only **read**
from member variables stored in ``self``. Changing them during
a run will result in undefined behavior. `ts` and any of the
atomgroups can be changed (but changes will be overwritten
when the next time step is read).

Parameters
----------
scheduler_kwargs : Dask Scheduler parameters.
cutoff : float (optional)
head group-defining atoms within a distance of `cutoff`
Angstroms are deemed to be in the same leaflet [15.0]

Returns
-------
values : anything
The output from the computation over a single frame must
be returned. The `value` will be added to a list for each
block and the list of blocks is stored as :attr:`_results`
before :meth:`_conclude` is run. In order to simplify
processing, the `values` should be "simple" shallow data
structures such as arrays or lists of numbers.

"""

# Get positions of the atoms in the atomgroup and find their number.
atoms = ts.positions[atomgroups.indices]
matrix_size = atoms.shape[0]
arranged_coord = list()
part_size = int(matrix_size / n_jobs)
# Partition the data based on a 2-dimensional partitioning
for i in range(1, matrix_size + 1, part_size):
for j in range(i, matrix_size + 1, part_size):
arranged_coord.append(([atoms[i - 1:i - 1 + part_size],
atoms[j - 1:j - 1 + part_size]],
[i, j]))
# Distribute the data over the available cores, apply the map function
# and execute.
parAtoms = db.from_sequence(arranged_coord,
npartitions=len(arranged_coord))
parAtomsMap = parAtoms.map_partitions(self._find_connected_components,
cutoff=cutoff)
Components = parAtomsMap.compute(**scheduler_kwargs)

# Gather the results and start the reduction. TODO: think if it can go
# to the private _reduce method of the based class.
result = list(Components)

# Create the overall connected components of the graph
while len(result) != 0:
item1 = result[0]
result.pop(0)
ind = []
for i, item2 in enumerate(Components):
if item1.intersection(item2):
item1 = item1.union(item2)
ind.append(i)
ind.reverse()
for j in ind:
Components.pop(j)
Components.append(item1)

# Change output for and return.
indices = [np.sort(list(g)) for g in Components]
return indices

# pylint: disable=arguments-differ
def run(self,
start=None,
stop=None,
step=None,
scheduler=None,
n_jobs=-1,
cutoff=15.0):
"""Perform the calculation

Parameters
----------
start : int, optional
start frame of analysis
stop : int, optional
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
scheduler : dask scheduler, optional
Use dask scheduler, defaults to multiprocessing. This can be used
to spread work to a distributed scheduler
n_jobs : int, optional
number of tasks to start, if `-1` use number of logical cpu cores.
This argument will be ignored when the distributed scheduler is
used

"""
if scheduler is None:
scheduler = multiprocessing

if n_jobs == -1:
if scheduler == multiprocessing:
n_jobs = cpu_count()
elif isinstance(scheduler, distributed.Client):
n_jobs = len(scheduler.ncores())
else:
raise ValueError(
"Couldn't guess ideal number of jobs from scheduler."
"Please provide `n_jobs` in call to method.")

with timeit() as b_universe:
universe = mda.Universe(self._top, self._traj)

scheduler_kwargs = {'get': scheduler.get}
if scheduler == multiprocessing:
scheduler_kwargs['num_workers'] = n_jobs

start, stop, step = self._trajectory.check_slice_indices(
start, stop, step)
with timeit() as total:
with timeit() as prepare:
self._prepare()

with self.readonly_attributes():
timings = list()
times_io = []
for frame in range(start, stop, step):
with timeit() as b_io:
ts = universe.trajectory[frame]
times_io.append(b_io.elapsed)
with timeit() as b_compute:
components = self. \
_single_frame(ts=ts,
atomgroups=self._atomgroup,
scheduler_kwargs=scheduler_kwargs,
n_jobs=n_jobs,
cutoff=cutoff)

timings.append(b_compute.elapsed)
leaflet1 = self._atomgroup[components[0]]
leaflet2 = self._atomgroup[components[1]]
self._results.append([leaflet1, leaflet2])
with timeit() as conclude:
self._conclude()
self.timing = Timing(times_io,
np.hstack(timings), total.elapsed,
b_universe.elapsed, prepare.elapsed,
conclude.elapsed)
return self

def _conclude(self):
self.results = self._results
Loading