-
Notifications
You must be signed in to change notification settings - Fork 824
Parallel trajectory analysis (revisited) #618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f974a80
015a6ea
5fddd1e
ba9568f
447f306
dc29f26
59ce29e
fc94606
7a54fad
4fccebd
d83a821
747b486
a67c120
e9535f6
d6a77f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| # -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- | ||
| # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 | ||
| # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 | ||
| # | ||
| # MDAnalysis --- http://www.MDAnalysis.org | ||
| # Copyright (c) 2006-2015 Naveen Michaud-Agrawal, Elizabeth J. Denning, Oliver Beckstein | ||
|
|
@@ -17,64 +17,64 @@ | |
| """ | ||
| Analysis building blocks --- :mod:`MDAnalysis.analysis.base` | ||
| ============================================================ | ||
|
|
||
| A collection of useful building blocks for creating Analysis | ||
| classes. | ||
|
|
||
|
|
||
| """ | ||
| import numpy as np | ||
| from __future__ import division | ||
|
|
||
| import logging | ||
| import multiprocessing as mp | ||
| import copy | ||
| from operator import itemgetter | ||
|
|
||
| import MDAnalysis as mda | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class AnalysisBase(object): | ||
| """Base class for defining multi frame analysis | ||
|
|
||
| The analysis base class is designed as a template for creating | ||
| multiframe analysis. | ||
|
|
||
| multiframe analysis. | ||
| The class implements the following methods: | ||
|
|
||
| _setup_frames(trajectory, start=None, stop=None, step=None) | ||
| Pass a Reader object and define the desired iteration pattern | ||
| through the trajectory | ||
|
|
||
| run | ||
| The user facing run method. Calls the analysis methods | ||
| defined below | ||
|
|
||
| Your analysis can implement the following methods, which are | ||
| called from run: | ||
|
|
||
| _prepare | ||
| Called before iteration on the trajectory has begun. | ||
| Data structures can be set up at this time, however most | ||
| error checking should be done in the __init__ | ||
|
|
||
| _single_frame | ||
| Called after the trajectory is moved onto each new frame. | ||
|
|
||
| _conclude | ||
| Called once iteration on the trajectory is finished. | ||
| Apply normalisation and averaging to results here. | ||
|
|
||
| """ | ||
| def _setup_frames(self, trajectory, start=None, | ||
| stop=None, step=None): | ||
| self._trajectory = trajectory | ||
| start, stop, step = trajectory.check_slice_indices( | ||
| start, stop, step) | ||
| self.start = start | ||
| self.stop = stop | ||
| self.step = step | ||
| self.nframes = len(xrange(start, stop, step)) | ||
|
|
||
| def _single_frame(self): | ||
| def _setup_frames(self, universe, start=None, stop=None, step=None): | ||
| """ | ||
| Add method docstring | ||
| """ | ||
| if universe is None: | ||
| pass | ||
| else: | ||
| self._universe = universe | ||
| self._trajectory = self._universe.trajectory | ||
|
|
||
| start, stop, step = self._trajectory.check_slice_indices( | ||
| start, stop, step) | ||
| self.start = start | ||
| self.stop = stop | ||
| self.step = step | ||
| self.nframes = len(xrange(start, stop, step)) | ||
|
|
||
| def _single_frame(self, timestep): | ||
| """Calculate data from a single frame of trajectory | ||
|
|
||
| Don't worry about normalising, just deal with a single frame. | ||
| """ | ||
| pass | ||
|
|
@@ -85,21 +85,175 @@ def _prepare(self): | |
|
|
||
| def _conclude(self): | ||
| """Finalise the results you've gathered. | ||
|
|
||
| Called at the end of the run() method to finish everything up. | ||
| """ | ||
| pass | ||
|
|
||
| def run(self): | ||
| def run(self, parallel=None, nthreads=None): | ||
| """ Chooses whether to run analysis in serial or parallel | ||
| mode depending on user input""" | ||
| if not parallel: | ||
| self._serial_run() | ||
| else: | ||
| self._parallel_run(nthreads) | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason why this line has to go? Could we call for i, ts in enumerate(self._traj[etc]):
self._single_frame(ts)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, calling _single_frame with ts as the argument looks much cleaner. |
||
| def _serial_run(self): | ||
| """Perform the calculation""" | ||
| logger.info("Starting preparation") | ||
| prog = mda.lib.log.Progressbar([self.nframes]) | ||
| prog.start() | ||
| self._prepare() | ||
| for i, ts in enumerate( | ||
| self._trajectory[self.start:self.stop:self.step]): | ||
| self._ts = ts | ||
| #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) | ||
| self._single_frame() | ||
| logger.info("Finishing up") | ||
| self._single_frame(ts) | ||
| prog.update(0) | ||
| #logger.info("Finishing up") | ||
| self._conclude() | ||
|
|
||
| def _parallel_run(self, nthreads=None): | ||
| """ | ||
| Create copies of the original object to be | ||
| dispatched to multiprocess | ||
| """ | ||
| if nthreads is None: | ||
| self.nthreads = mp.cpu_count() - 1 | ||
| else: | ||
| # Cap number of threads | ||
| self.nthreads = min([mp.cpu_count(), nthreads, self.nframes]) | ||
|
|
||
| self.slices = self._compute_slices() | ||
|
|
||
| # Queues for the communication between parent and child processes | ||
| out_queue = mp.Manager().Queue() | ||
| progress = mp.Manager().Queue() | ||
|
|
||
| # Prepare multiprocess objects | ||
| processes = [mp.Process(target=self._compute, | ||
| args=(out_queue, order, progress)) | ||
| for order in range(self.nthreads)] | ||
|
|
||
| # Run processes | ||
| for process in processes: | ||
| process.start() | ||
|
|
||
| thread_configs = [1+(elem[1]-elem[0]-1) // self.step | ||
| for elem in self.slices] | ||
|
|
||
| prog = mda.lib.log.Progressbar( | ||
| thread_configs, bar_length=50, name=self.__class__.__name__) | ||
| prog.start() | ||
|
|
||
| while any(process.is_alive() for process in processes): | ||
| while not progress.empty(): | ||
| core = progress.get() | ||
| prog.update(core) | ||
|
|
||
| # Exit the completed processes | ||
| for process in processes: | ||
| process.join() | ||
|
|
||
| results = [] | ||
| # Collects results from the queue | ||
| while not out_queue.empty(): | ||
| results.append(out_queue.get()) | ||
|
|
||
| # Sort results, then collate them | ||
| for other_results in sorted(results, key=itemgetter(1)): | ||
| self._add_other_results(other_results[0]) | ||
|
|
||
| # Averaging here | ||
| self._conclude() | ||
|
|
||
| def _compute_slices(self): | ||
| """ | ||
| This function returns a list containing the start and | ||
| last configuration to be analyzed from each thread | ||
| """ | ||
| step = self.step | ||
| configs = 1 + (self.stop - self.start - 1) // step | ||
|
|
||
| print "Total configurations: {}".format(configs) | ||
| print "Analysis running on {} threads.\n".format(self.nthreads) | ||
|
|
||
| # Number of cfgs for each thread, and remainder to be added | ||
| thread_cfg = configs // self.nthreads | ||
| reminder = configs % self.nthreads | ||
|
|
||
| slices = [] | ||
| beg = self.start | ||
|
|
||
| # Compute the start and last configurations | ||
| for thread in range(0, self.nthreads): | ||
| if thread < reminder: | ||
| end = beg + step * thread_cfg | ||
| else: | ||
| end = beg + step * (thread_cfg-1) | ||
|
|
||
| slices.append([beg, end+1]) | ||
| beg = end + step | ||
|
|
||
| # Print on screen the configurations assigned to each thread | ||
| for thread in range(self.nthreads): | ||
| confs = 1+(slices[thread][1]-1-slices[thread][0])/step | ||
| digits = len(str(self.stop)) | ||
| line = "Thread "+str(thread+1).rjust(len(str(self.nthreads)))+": " \ | ||
| +str(slices[thread][0]).rjust(digits)+"/" \ | ||
| +str(slices[thread][1]-1).rjust(digits) \ | ||
| +" | Configurations: "\ | ||
| +str(confs).rjust(1+len(str(thread_cfg))) | ||
| print line | ||
|
|
||
| return slices | ||
|
|
||
| def _compute(self, out_queue, order, progress): | ||
| """ | ||
| Run the single_frame method for each analysis object for all | ||
| the trajectories in the batch | ||
|
|
||
| order - my id among all the parallel versions | ||
| out_queue - where to put my results | ||
| progress - the progressbar to update | ||
| """ | ||
| start = self.slices[order][0] | ||
| stop = self.slices[order][1] | ||
| step = self.step | ||
|
|
||
| # Create a local version of the analysis object | ||
| analysis_object = copy.deepcopy(self) | ||
|
|
||
| analysis_object.nframes = len(xrange(start, stop, step)) | ||
| traj = analysis_object._universe.trajectory | ||
|
|
||
| analysis_object._prepare() | ||
|
|
||
| progress.put(order) | ||
| for timestep in traj[start:stop:step]: | ||
| analysis_object._single_frame(timestep) | ||
| progress.put(order) # Updates the progress bar | ||
|
|
||
| # Returns the results along with our order index | ||
| out_queue.put((analysis_object.results, order)) | ||
|
|
||
| def __getstate__(self): | ||
| state = dict(self.__dict__) | ||
| # Replace the _ags entry with indices | ||
| # pop removes the _ag key, or returns [] (empty list) if the Key didn't exist | ||
| ag_indices = [ag.indices for ag in state.pop('_ags', [])] | ||
| universe_filenames = (self._universe.filename, self._universe.trajectory.filename) | ||
| state.pop('_ags', None) | ||
| state.pop('_universe', None) | ||
| state.pop('_trajectory', None) | ||
|
|
||
| return state, universe_filenames, ag_indices | ||
|
|
||
| def __setstate__(self, state): | ||
| statedict, universe_filenames, ag_indices = state | ||
| self.__dict__ = statedict | ||
| # Create my local Universe | ||
| self._universe = mda.Universe(*universe_filenames) | ||
| self._ags = [self._universe.atoms[idx] | ||
| for idx in ag_indices] | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| from base import AnalysisBase | ||
| import numpy as np | ||
|
|
||
| class TotalDipole(AnalysisBase): | ||
|
|
||
| def __init__(self, universe=None, filename='order.dat', selection=None, start=None, stop=None, step=None): | ||
|
|
||
| if selection is None: | ||
| raise RuntimeError('In class TotalDipole: constroctur requires a selection') | ||
| else: | ||
| self.selection_string = selection | ||
|
|
||
| self._universe = universe | ||
| #self._trajectory = self._universe.trajectory | ||
| self.filename = filename | ||
| self.time = [] | ||
|
|
||
| self._setup_frames(self._universe, start, stop, step) | ||
| self.dipoles = [] | ||
|
|
||
| def _single_frame(self,timestep): | ||
| selection = self._universe.select_atoms(self.selection_string) | ||
|
|
||
| dipole = np.zeros(3) | ||
|
|
||
| for residue in selection.residues: | ||
| dipole += MolecularDipole(residue) | ||
|
|
||
| self.dipoles.append(dipole) | ||
|
|
||
| def _conclude(self): | ||
| total_dipole = np.sum(self.dipoles, axis=0) | ||
| print "Average dipole:", total_dipole/self.nframes | ||
| return total_dipole/self.nframes | ||
|
|
||
| def __iadd__(self,other): | ||
| self.dipoles += other.dipoles | ||
| self.nframes += other.nframes | ||
| return self | ||
|
|
||
|
|
||
| #--- Functions ---# | ||
| def MolecularDipole(residue): | ||
| charges = residue.charges | ||
| abscharges = np.absolute(charges) | ||
| charge_sum = np.sum(abscharges) | ||
| positions = residue.positions | ||
|
|
||
| charge_center = [] | ||
|
|
||
| for coord in [0,1,2]: | ||
| charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum) | ||
|
|
||
| dipole = [] | ||
| # 4.803 converts to debyes | ||
| for coord in [0,1,2]: | ||
| dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803) | ||
|
|
||
| return dipole |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this hack any more right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the
if universe is None:? Yep I think it should go. I'll commit a modified version right away.