From f974a805baab1b442744b001f5c628e1d86b0b03 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Tue, 12 Jan 2016 15:15:49 +0100 Subject: [PATCH 01/14] Modified base.py to work with parallel_job.py, added total dipole moment test analysis (electromagnetism.py), added parallel_jobs.py module, added test for the parallel_jobs.py module --- package/MDAnalysis/analysis/base.py | 48 ++-- .../MDAnalysis/analysis/electromagnetism.py | 59 +++++ package/MDAnalysis/analysis/parallel_jobs.py | 244 ++++++++++++++++++ .../MDAnalysisTests/test_parallel_jobs.py | 33 +++ 4 files changed, 359 insertions(+), 25 deletions(-) create mode 100644 package/MDAnalysis/analysis/electromagnetism.py create mode 100644 package/MDAnalysis/analysis/parallel_jobs.py create mode 100644 testsuite/MDAnalysisTests/test_parallel_jobs.py diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index f3a722b985d..22a237602b1 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -17,11 +17,8 @@ """ Analysis building blocks --- :mod:`MDAnalysis.analysis.base` ============================================================ - A collection of useful building blocks for creating Analysis classes. - - """ import numpy as np import logging @@ -32,12 +29,9 @@ class AnalysisBase(object): """Base class for defining multi frame analysis - The analysis base class is designed as a template for creating multiframe analysis. - The class implements the following methods: - _setup_frames(trajectory, start=None, stop=None, step=None) Pass a Reader object and define the desired iteration pattern through the trajectory @@ -45,32 +39,30 @@ class AnalysisBase(object): run The user facing run method. Calls the analysis methods defined below - Your analysis can implement the following methods, which are called from run: - _prepare Called before iteration on the trajectory has begun. Data structures can be set up at this time, however most error checking should be done in the __init__ - _single_frame Called after the trajectory is moved onto each new frame. - _conclude Called once iteration on the trajectory is finished. Apply normalisation and averaging to results here. - """ - def _setup_frames(self, trajectory, start=None, - stop=None, step=None): - self._trajectory = trajectory - start, stop, step = trajectory.check_slice_indices( - start, stop, step) - self.start = start - self.stop = stop - self.step = step - self.nframes = len(xrange(start, stop, step)) + def _setup_frames(self, trajectory, start=None, stop=None, step=None): + + if trajectory is None: + pass + else: + self._trajectory = trajectory + + start, stop, step = trajectory.check_slice_indices(start, stop, step) + self.start = start + self.stop = stop + self.step = step + self.nframes = len(xrange(start, stop, step)) def _single_frame(self): """Calculate data from a single frame of trajectory @@ -85,21 +77,27 @@ def _prepare(self): def _conclude(self): """Finalise the results you've gathered. - Called at the end of the run() method to finish everything up. """ pass - + def run(self): """Perform the calculation""" logger.info("Starting preparation") self._prepare() for i, ts in enumerate( self._trajectory[self.start:self.stop:self.step]): - self._ts = ts - #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) + logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) self._single_frame() logger.info("Finishing up") self._conclude() - + #def __iadd__(self,other): + # return self + + def __getstate__(self): + state = dict(self.__dict__) + key = '_trajectory' + if key in state: + del state ['_trajectory'] + return state diff --git a/package/MDAnalysis/analysis/electromagnetism.py b/package/MDAnalysis/analysis/electromagnetism.py new file mode 100644 index 00000000000..ed22c61c095 --- /dev/null +++ b/package/MDAnalysis/analysis/electromagnetism.py @@ -0,0 +1,59 @@ +from base import AnalysisBase +import numpy as np + +class TotalDipole(AnalysisBase): + + def __init__(self, trajectory=None, filename='order.dat', selection=None, start=None, stop=None, step=None): + + if selection is None: + raise RuntimeError('In class OrientationalOrder: constructur requires a selection') + else: + self.selection = selection + + self.filename = filename + self.time = [] + + self._setup_frames(trajectory, start, stop, step) + self.dipoles = [] + + def _single_frame(self): + selection = self.selection + + selection.wrap(compound='residues') + + dipole = np.zeros(3) + + for residue in selection.residues: + dipole += MolecularDipole(residue) + + self.dipoles.append(dipole) + + def _conclude(self): + total_dipole = np.sum(self.dipoles, axis=0) + print "Average dipole:", total_dipole/self.nframes + return total_dipole/self.nframes + + def __iadd__(self,other): + self.dipoles += other.dipoles + self.nframes += other.nframes + return self + + +#--- Functions ---# +def MolecularDipole(residue): + charges = residue.charges + abscharges = np.absolute(charges) + charge_sum = np.sum(abscharges) + positions = residue.positions + + charge_center = [] + + for coord in [0,1,2]: + charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum) + + dipole = [] + # 4.803 is to convert in debyes + for coord in [0,1,2]: + dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803) + + return dipole diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py new file mode 100644 index 00000000000..717aa635b52 --- /dev/null +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -0,0 +1,244 @@ +import multiprocessing as mp +import copy +from operator import itemgetter + +# For fancy progressbar +try: + from blessings import Terminal + from progressive.bar import Bar + from progressive.tree import ProgressTree, Value, BarDescriptor +except: + pass + +class ParallelProcessor(): + + def __init__(self, list_of_jobs, trajectory, start=None, stop=None, step=None, threads=None, + progressbar=True): + self._trajectory = trajectory + + start, stop, step = trajectory.check_slice_indices(start, stop, step) + + self.start = start + self.stop = stop + self.step = step + self.nframes = len(xrange(start, stop, step)) + + self.list_of_jobs = list_of_jobs + + if threads == None: + self.threads = mp.cpu_count() + elif threads > mp.cpu_count(): + self.threads = mp.cpu_count() + else: + self.threads = threads + + self.progressbar = progressbar + + + def slices(self): + # This function returns a list containing the start and + # last configuration to be analyzed from each thread + + threads = self.threads # Get number of threads from initialization + step = self.step + configs = (self.stop-self.start)/step + + # Check whether the number of threads is higher than + # the number of trajectories to be analyzed + while (configs/threads == 0): + threads -= 1 + + self.threads = threads # Update the number of threads + + print "Total configurations: "+str(configs) + print "Analysis running on ", threads, " threads.\n" + + # Number of cfgs for each thread, and remainder to be added + thread_cfg = configs/threads + reminder = configs%threads + + slices = [] + beg = self.start + + # Compute the start and last configurations + for thread in range(0,threads): + if thread < reminder: + end = beg+step*thread_cfg + else: + end = beg+step*(thread_cfg-1) + + slices.append([beg, end+1]) + beg = end+step + + # Print on screen the configurations assigned to each thread + for thread in range(threads): + confs = 1+(slices[thread][1]-1-slices[thread][0])/step + digits = len(str(self.stop)) + line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \ + +str(slices[thread][0]).rjust(digits)+"/" \ + +str(slices[thread][1]-1).rjust(digits) \ + +" | Configurations: "+str(confs).rjust(1+len(str(thread_cfg))) + print line + + print + return slices + + def job_copies(self): + # This function creates n=threads copies of the analysis + # objects submitted from the user + threads = self.threads + job_list = [] + + for thread in range(threads): + job_list.append([ copy.deepcopy(job) for job in self.list_of_jobs]) + + return job_list + + def compute(self,job_list,start,stop,step, out_queue, order,progress,error): + # Run the single_frame method for each analysis object for all + # the trajectories in the batch + count = 0 # just to track IO errors + + try: + for i, ts in enumerate(self._trajectory[start:stop:step]): + count = i + for job in job_list: + job._single_frame() + progress.put(order) # Updates the progress bar + + out_queue.put((job_list,order)) # Returns the results + except: + error.put([order, start+count*step]) + + def prepare(self,pool,slices): + # This function runs the setup_frame method from AnalysisBase + # and the prepare method from the analysis object. Each job + # object is initialized using start and stop from the slice function + for thread, elem in zip(pool,slices): + for job in thread: + job._setup_frames(self._trajectory,start=elem[0],stop=elem[1],step=self.step) + job._prepare() + + def conclude(self,list_of_jobs): + # Run conclude for each job object + for job in list_of_jobs: + job._conclude() + + def parallel_run(self): + # Returns indices for splitting the trajectory + slices = self.slices() + + # Create copies of the original object to be + # dispatched to multiprocess + pool = self.job_copies() + threads = self.threads + + self.prepare(pool,slices) + + # Queues for the communication between parent and child processes + out_queue = mp.Manager().Queue() + progress = mp.Manager().Queue() + error = mp.Manager().Queue() + + # Prepare multiprocess objects + processes = [ mp.Process(target=self.compute, args=(batch, elem[0],elem[1],self.step,out_queue,order,progress,error)) + for order, (batch, elem) in enumerate(zip(pool,slices)) ] + + + # Run processes + for p in processes: p.start() + + thread_configs = [ 1+(elem[1]-elem[0]-1)/self.step for elem in slices ] + + if self.progressbar: + try: + # Initialize progress bar + readings = [ Value(0) for i in range(threads) ] + total_cfg = Value(0) + bd_defaults = [ dict(type=Bar, kwargs=dict(max_value=cfg)) for cfg in thread_configs ] + bd_defaults.append( dict(type=Bar, kwargs=dict(max_value=sum(thread_configs))) ) # add total cfgs for total counter + + data_tuple = [ ("Core "+str(thread+1).rjust(len(str(threads))), + BarDescriptor(value=readings[thread], **bd_defaults[thread])) for thread in range(threads) ] + + data = dict( (key,value) for (key,value) in data_tuple ) + + data.update({'Total': BarDescriptor(value=total_cfg, **bd_defaults[-1])}) + + # Create blessings.Terminal instance + t = Terminal() + # Initialize a ProgressTree instance + n = ProgressTree(term=t) + # Make room for the progress bar + n.make_room(data) + + # Updates progress bar + while any([ p.is_alive() for p in processes ]): + while not progress.empty(): + core = progress.get() + n.cursor.restore() + readings[core].value += 1 + total_cfg.value += 1 + n.draw(data, BarDescriptor(bd_defaults)) + + print # lets leave some free space + except: + data = {} + for thread in range(threads): + data[thread]=0 + while any([ p.is_alive() for p in processes ]): + while not progress.empty(): + core = progress.get() + data[core] += 1 + + for thread in range(threads): + print "Core "+str(thread)+": "+str(data[thread])+"/"+str(thread_configs[thread]) + + print "\r", + + print "\033["+str(threads+1)+"A" + + print threads*"\n" + + + # Exit the completed processes + for p in processes: p.join() + + unread_configurations = [] + + # Collect errors from queues + while not error.empty(): + unread_configurations.append(error.get()) + + if len(unread_configurations) != 0: + unread_counter = 0 + for error in unread_configurations: + unread_counter += (slices[error[0]][1]-error[1])/self.step + print "Sorry, there were", unread_counter, "unread configurations." + for error in sorted(unread_configurations, key=itemgetter(0)): + print "Core", error[0]+1, "did not read from configuration", error[1]+1,\ + "to", slices[error[0]][1] + + print "\n" + + + results = [] + + # Collects results from the queue + while not out_queue.empty(): + results.append(out_queue.get()) + + jobs_num = len(self.list_of_jobs) + + result_list = [] + + # Sum the job objects from each thread + for job in range(jobs_num): + for order, thread in enumerate(sorted(results, key=itemgetter(1))): + if order == 0: + result_list.append(thread[0][job]) + else: + result_list[job] += thread[0][job] + + # Run the conclude function for each job + self.conclude(result_list) diff --git a/testsuite/MDAnalysisTests/test_parallel_jobs.py b/testsuite/MDAnalysisTests/test_parallel_jobs.py new file mode 100644 index 00000000000..0258e0cea2b --- /dev/null +++ b/testsuite/MDAnalysisTests/test_parallel_jobs.py @@ -0,0 +1,33 @@ +import MDAnalysis as mda +import numpy as np +import MDAnalysis.analysis.parallel_jobs as pj +import MDAnalysis.analysis.electromagnetism as electromagnetism + +from numpy.testing import * +import time + +from MDAnalysisTests.datafiles import (DCD,PSF) + +class TestParallel(TestCase): + def setUp(self): + self.system = mda.Universe(PSF,DCD) + self.selection = self.system.select_atoms('all') + self.traj = self.system.trajectory + + # Single thread analysis + print "Single thread analysis:" + start_time = time.time() + single_analysis = electromagnetism.TotalDipole(self.traj,selection=self.selection) + self.single_result = single_analysis.run() + print("--- %s seconds ---" % (time.time() - start_time)) + + def test_parallel(self): + print "Parallel analysis:" + start_time = time.time() + jobs = [ electromagnetism.TotalDipole(selection = self.selection) ] + process = pj.ParallelProcessor(jobs,self.traj,progressbar=True) + assert_equal(self.single_result, process.parallel_run()) + print("--- %s seconds ---" % (time.time() - start_time)) + + def tearDown(self): + del self.system From 015a6ead890eafe81fb0e4c126dc229f7b31a260 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Tue, 12 Jan 2016 15:24:43 +0100 Subject: [PATCH 02/14] Minor modifications --- package/MDAnalysis/analysis/base.py | 18 ++++++++++++------ .../MDAnalysis/analysis/electromagnetism.py | 4 ++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index 22a237602b1..4a879d8633d 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -20,6 +20,7 @@ A collection of useful building blocks for creating Analysis classes. """ + import numpy as np import logging @@ -29,9 +30,12 @@ class AnalysisBase(object): """Base class for defining multi frame analysis + The analysis base class is designed as a template for creating - multiframe analysis. + multiframe analysis. + The class implements the following methods: + _setup_frames(trajectory, start=None, stop=None, step=None) Pass a Reader object and define the desired iteration pattern through the trajectory @@ -39,17 +43,22 @@ class AnalysisBase(object): run The user facing run method. Calls the analysis methods defined below + Your analysis can implement the following methods, which are called from run: + _prepare Called before iteration on the trajectory has begun. Data structures can be set up at this time, however most error checking should be done in the __init__ + _single_frame Called after the trajectory is moved onto each new frame. _conclude + Called once iteration on the trajectory is finished. Apply normalisation and averaging to results here. + """ def _setup_frames(self, trajectory, start=None, stop=None, step=None): @@ -80,21 +89,18 @@ def _conclude(self): Called at the end of the run() method to finish everything up. """ pass - + def run(self): """Perform the calculation""" logger.info("Starting preparation") self._prepare() for i, ts in enumerate( self._trajectory[self.start:self.stop:self.step]): - logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) + #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) self._single_frame() logger.info("Finishing up") self._conclude() - #def __iadd__(self,other): - # return self - def __getstate__(self): state = dict(self.__dict__) key = '_trajectory' diff --git a/package/MDAnalysis/analysis/electromagnetism.py b/package/MDAnalysis/analysis/electromagnetism.py index ed22c61c095..fe450c6c3b8 100644 --- a/package/MDAnalysis/analysis/electromagnetism.py +++ b/package/MDAnalysis/analysis/electromagnetism.py @@ -6,7 +6,7 @@ class TotalDipole(AnalysisBase): def __init__(self, trajectory=None, filename='order.dat', selection=None, start=None, stop=None, step=None): if selection is None: - raise RuntimeError('In class OrientationalOrder: constructur requires a selection') + raise RuntimeError('In class TotalDipole: constroctur requires a selection') else: self.selection = selection @@ -52,7 +52,7 @@ def MolecularDipole(residue): charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum) dipole = [] - # 4.803 is to convert in debyes + # 4.803 converts to debyes for coord in [0,1,2]: dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803) From 5fddd1e0fd1bddfe182e04a98994c8200967c18d Mon Sep 17 00:00:00 2001 From: mattihappy Date: Wed, 13 Jan 2016 12:48:16 +0100 Subject: [PATCH 03/14] New AnalysisBase and ParallelProcessor classes. Now each child process has its own universe, eliminating the problem of accessing to arrays that have been already deallocated. Adapted base.py, parallel_jobs.py and test_parallel_jobs.py according to PEP8 style. --- package/MDAnalysis/analysis/base.py | 48 ++- .../MDAnalysis/analysis/electromagnetism.py | 12 +- package/MDAnalysis/analysis/parallel_jobs.py | 278 ++++++++---------- .../MDAnalysisTests/test_parallel_jobs.py | 22 +- 4 files changed, 153 insertions(+), 207 deletions(-) diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index 4a879d8633d..458c8e06ac1 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -1,5 +1,5 @@ # -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- -# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 # # MDAnalysis --- http://www.MDAnalysis.org # Copyright (c) 2006-2015 Naveen Michaud-Agrawal, Elizabeth J. Denning, Oliver Beckstein @@ -21,7 +21,6 @@ classes. """ -import numpy as np import logging @@ -30,52 +29,49 @@ class AnalysisBase(object): """Base class for defining multi frame analysis - The analysis base class is designed as a template for creating multiframe analysis. - The class implements the following methods: - _setup_frames(trajectory, start=None, stop=None, step=None) Pass a Reader object and define the desired iteration pattern through the trajectory - + run The user facing run method. Calls the analysis methods defined below - Your analysis can implement the following methods, which are called from run: - _prepare Called before iteration on the trajectory has begun. Data structures can be set up at this time, however most error checking should be done in the __init__ - _single_frame Called after the trajectory is moved onto each new frame. _conclude - Called once iteration on the trajectory is finished. Apply normalisation and averaging to results here. - """ - def _setup_frames(self, trajectory, start=None, stop=None, step=None): - - if trajectory is None: - pass + def _setup_frames(self, universe, start=None, stop=None, step=None): + """ + Add method docstring + """ + if universe is None: + pass else: - self._trajectory = trajectory - - start, stop, step = trajectory.check_slice_indices(start, stop, step) - self.start = start - self.stop = stop - self.step = step + self._universe = universe + self._trajectory = self._universe.trajectory + + start, stop, step = self._trajectory.check_slice_indices(start, + stop, + step) + self.start = start + self.stop = stop + self.step = step self.nframes = len(xrange(start, stop, step)) def _single_frame(self): """Calculate data from a single frame of trajectory - + Don't worry about normalising, just deal with a single frame. """ pass @@ -98,12 +94,12 @@ def run(self): self._trajectory[self.start:self.stop:self.step]): #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) self._single_frame() - logger.info("Finishing up") + #logger.info("Finishing up") self._conclude() def __getstate__(self): state = dict(self.__dict__) - key = '_trajectory' - if key in state: - del state ['_trajectory'] + for key in ['_universe', '_trajectory']: + if key in state: + del state[key] return state diff --git a/package/MDAnalysis/analysis/electromagnetism.py b/package/MDAnalysis/analysis/electromagnetism.py index fe450c6c3b8..64cfb1f955b 100644 --- a/package/MDAnalysis/analysis/electromagnetism.py +++ b/package/MDAnalysis/analysis/electromagnetism.py @@ -3,23 +3,23 @@ class TotalDipole(AnalysisBase): - def __init__(self, trajectory=None, filename='order.dat', selection=None, start=None, stop=None, step=None): + def __init__(self, universe=None, filename='order.dat', selection=None, start=None, stop=None, step=None): if selection is None: raise RuntimeError('In class TotalDipole: constroctur requires a selection') else: - self.selection = selection + self.selection_string = selection + self._universe = universe + #self._trajectory = self._universe.trajectory self.filename = filename self.time = [] - self._setup_frames(trajectory, start, stop, step) + self._setup_frames(self._universe, start, stop, step) self.dipoles = [] def _single_frame(self): - selection = self.selection - - selection.wrap(compound='residues') + selection = self._universe.select_atoms(self.selection_string) dipole = np.zeros(3) diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py index 717aa635b52..70c0d9e7637 100644 --- a/package/MDAnalysis/analysis/parallel_jobs.py +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -1,72 +1,74 @@ +""" Add a docstring later + +""" +import MDAnalysis as mda import multiprocessing as mp import copy from operator import itemgetter -# For fancy progressbar -try: - from blessings import Terminal - from progressive.bar import Bar - from progressive.tree import ProgressTree, Value, BarDescriptor -except: - pass - -class ParallelProcessor(): - - def __init__(self, list_of_jobs, trajectory, start=None, stop=None, step=None, threads=None, - progressbar=True): - self._trajectory = trajectory - - start, stop, step = trajectory.check_slice_indices(start, stop, step) - - self.start = start - self.stop = stop - self.step = step +class ParallelProcessor(object): + """ Add class docstring later + """ + def __init__(self, jobs_list, universe, start=None, stop=None, + step=None, threads=None): + self._universe = universe + self.topology = universe.filename + self.trajname = universe._trajectory.filename + + start, stop, step = universe.trajectory.check_slice_indices(start, + stop, step) + + self.start = start + self.stop = stop + self.step = step self.nframes = len(xrange(start, stop, step)) - self.list_of_jobs = list_of_jobs + self.jobs_list = jobs_list - if threads == None: + if threads is None: self.threads = mp.cpu_count() - elif threads > mp.cpu_count(): + elif threads > mp.cpu_count(): self.threads = mp.cpu_count() else: self.threads = threads - - self.progressbar = progressbar + self.slices = self.compute_slices() - def slices(self): - # This function returns a list containing the start and - # last configuration to be analyzed from each thread + def compute_slices(self): + """ + This function returns a list containing the start and + last configuration to be analyzed from each thread + """ + threads = self.threads # Get number of threads from initialization + step = self.step + configs = (self.stop-self.start)/step - threads = self.threads # Get number of threads from initialization - step = self.step - configs = (self.stop-self.start)/step + self.nframes = configs # Check whether the number of threads is higher than # the number of trajectories to be analyzed - while (configs/threads == 0): + while configs/threads == 0: threads -= 1 - + self.threads = threads # Update the number of threads print "Total configurations: "+str(configs) print "Analysis running on ", threads, " threads.\n" # Number of cfgs for each thread, and remainder to be added - thread_cfg = configs/threads - reminder = configs%threads + thread_cfg = configs/threads + reminder = configs%threads - slices = [] + slices = [] beg = self.start # Compute the start and last configurations - for thread in range(0,threads): + for thread in range(0, threads): if thread < reminder: end = beg+step*thread_cfg else: end = beg+step*(thread_cfg-1) - + slices.append([beg, end+1]) beg = end+step @@ -77,150 +79,102 @@ def slices(self): line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \ +str(slices[thread][0]).rjust(digits)+"/" \ +str(slices[thread][1]-1).rjust(digits) \ - +" | Configurations: "+str(confs).rjust(1+len(str(thread_cfg))) + +" | Configurations: "\ + +str(confs).rjust(1+len(str(thread_cfg))) print line - print return slices - def job_copies(self): - # This function creates n=threads copies of the analysis - # objects submitted from the user - threads = self.threads - job_list = [] - for thread in range(threads): - job_list.append([ copy.deepcopy(job) for job in self.list_of_jobs]) - - return job_list - - def compute(self,job_list,start,stop,step, out_queue, order,progress,error): - # Run the single_frame method for each analysis object for all - # the trajectories in the batch - count = 0 # just to track IO errors - - try: - for i, ts in enumerate(self._trajectory[start:stop:step]): - count = i - for job in job_list: - job._single_frame() - progress.put(order) # Updates the progress bar - - out_queue.put((job_list,order)) # Returns the results - except: - error.put([order, start+count*step]) - - def prepare(self,pool,slices): - # This function runs the setup_frame method from AnalysisBase - # and the prepare method from the analysis object. Each job - # object is initialized using start and stop from the slice function - for thread, elem in zip(pool,slices): - for job in thread: - job._setup_frames(self._trajectory,start=elem[0],stop=elem[1],step=self.step) - job._prepare() - - def conclude(self,list_of_jobs): - # Run conclude for each job object - for job in list_of_jobs: + def compute(self, out_queue, order, progress): + """ + Run the single_frame method for each analysis object for all + the trajectories in the batch + """ + start = self.slices[order][0] + stop = self.slices[order][1] + step = self.step + + jobs_list = [] + universe = mda.Universe(self.topology, self.trajname) + traj = universe.trajectory + + for job in self.jobs_list: + jobs_list.append(copy.deepcopy(job)) + + for job in jobs_list: + # Initialize job objects. start, stop and step are + # given so that self.nframes is computed correctly + job._setup_frames(universe=universe, start=start, + stop=stop, step=self.step) + job._prepare() + + for timestep in traj[start:stop:step]: + for job in jobs_list: + job._single_frame() + progress.put(order) # Updates the progress bar + + out_queue.put((jobs_list, order)) # Returns the results + + def conclude(self, jobs_list): + """ + Run conclude for each job object + """ + for job in jobs_list: job._conclude() - - def parallel_run(self): - # Returns indices for splitting the trajectory - slices = self.slices() - # Create copies of the original object to be - # dispatched to multiprocess - pool = self.job_copies() + def parallel_run(self): + """ + Create copies of the original object to be + dispatched to multiprocess + """ threads = self.threads - self.prepare(pool,slices) - # Queues for the communication between parent and child processes out_queue = mp.Manager().Queue() - progress = mp.Manager().Queue() - error = mp.Manager().Queue() + progress = mp.Manager().Queue() # Prepare multiprocess objects - processes = [ mp.Process(target=self.compute, args=(batch, elem[0],elem[1],self.step,out_queue,order,progress,error)) - for order, (batch, elem) in enumerate(zip(pool,slices)) ] + processes = [mp.Process(target=self.compute, + args=(out_queue, order, progress)) + for order in range(threads)] # Run processes - for p in processes: p.start() - - thread_configs = [ 1+(elem[1]-elem[0]-1)/self.step for elem in slices ] - - if self.progressbar: - try: - # Initialize progress bar - readings = [ Value(0) for i in range(threads) ] - total_cfg = Value(0) - bd_defaults = [ dict(type=Bar, kwargs=dict(max_value=cfg)) for cfg in thread_configs ] - bd_defaults.append( dict(type=Bar, kwargs=dict(max_value=sum(thread_configs))) ) # add total cfgs for total counter - - data_tuple = [ ("Core "+str(thread+1).rjust(len(str(threads))), - BarDescriptor(value=readings[thread], **bd_defaults[thread])) for thread in range(threads) ] - - data = dict( (key,value) for (key,value) in data_tuple ) - - data.update({'Total': BarDescriptor(value=total_cfg, **bd_defaults[-1])}) - - # Create blessings.Terminal instance - t = Terminal() - # Initialize a ProgressTree instance - n = ProgressTree(term=t) - # Make room for the progress bar - n.make_room(data) - - # Updates progress bar - while any([ p.is_alive() for p in processes ]): - while not progress.empty(): - core = progress.get() - n.cursor.restore() - readings[core].value += 1 - total_cfg.value += 1 - n.draw(data, BarDescriptor(bd_defaults)) - - print # lets leave some free space - except: - data = {} - for thread in range(threads): - data[thread]=0 - while any([ p.is_alive() for p in processes ]): - while not progress.empty(): - core = progress.get() - data[core] += 1 - - for thread in range(threads): - print "Core "+str(thread)+": "+str(data[thread])+"/"+str(thread_configs[thread]) - - print "\r", - - print "\033["+str(threads+1)+"A" - - print threads*"\n" - + for process in processes: + process.start() - # Exit the completed processes - for p in processes: p.join() + thread_configs = [1+(elem[1]-elem[0]-1)/self.step + for elem in self.slices] - unread_configurations = [] + data = {} - # Collect errors from queues - while not error.empty(): - unread_configurations.append(error.get()) + for thread in range(threads): + data[thread] = 0 + + total = 0 + print + while any([process.is_alive() for process in processes]): + while not progress.empty(): + core = progress.get() + data[core] += 1 + total += 1 + + for thread in range(threads): + print "Core "+str(thread)+": " \ + +str(data[thread])+"/" \ + +str(thread_configs[thread]) - if len(unread_configurations) != 0: - unread_counter = 0 - for error in unread_configurations: - unread_counter += (slices[error[0]][1]-error[1])/self.step - print "Sorry, there were", unread_counter, "unread configurations." - for error in sorted(unread_configurations, key=itemgetter(0)): - print "Core", error[0]+1, "did not read from configuration", error[1]+1,\ - "to", slices[error[0]][1] - - print "\n" + print "Total: "+str(total)+"/"\ + +str(self.nframes) + print "\033["+str(threads+2)+"A" + + print (threads+1)*"\n" + + # Exit the completed processes + for process in processes: + process.join() results = [] @@ -228,8 +182,8 @@ def parallel_run(self): while not out_queue.empty(): results.append(out_queue.get()) - jobs_num = len(self.list_of_jobs) - + jobs_num = len(self.jobs_list) + result_list = [] # Sum the job objects from each thread diff --git a/testsuite/MDAnalysisTests/test_parallel_jobs.py b/testsuite/MDAnalysisTests/test_parallel_jobs.py index 0258e0cea2b..cab177e2023 100644 --- a/testsuite/MDAnalysisTests/test_parallel_jobs.py +++ b/testsuite/MDAnalysisTests/test_parallel_jobs.py @@ -1,33 +1,29 @@ import MDAnalysis as mda import numpy as np import MDAnalysis.analysis.parallel_jobs as pj -import MDAnalysis.analysis.electromagnetism as electromagnetism +import MDAnalysis.analysis.electromagnetism as em from numpy.testing import * import time -from MDAnalysisTests.datafiles import (DCD,PSF) +from MDAnalysisTests.datafiles import (DCD, PSF) class TestParallel(TestCase): def setUp(self): - self.system = mda.Universe(PSF,DCD) - self.selection = self.system.select_atoms('all') - self.traj = self.system.trajectory + self.universe = mda.Universe(PSF, DCD) + self.selection_string = 'all' # Single thread analysis - print "Single thread analysis:" start_time = time.time() - single_analysis = electromagnetism.TotalDipole(self.traj,selection=self.selection) + single_analysis = em.TotalDipole(universe=self.universe, + selection=self.selection_string) self.single_result = single_analysis.run() - print("--- %s seconds ---" % (time.time() - start_time)) def test_parallel(self): - print "Parallel analysis:" start_time = time.time() - jobs = [ electromagnetism.TotalDipole(selection = self.selection) ] - process = pj.ParallelProcessor(jobs,self.traj,progressbar=True) + jobs = [em.TotalDipole(selection=self.selection_string)] + process = pj.ParallelProcessor(jobs,self.universe) assert_equal(self.single_result, process.parallel_run()) - print("--- %s seconds ---" % (time.time() - start_time)) def tearDown(self): - del self.system + del self.universe From ba9568f472860d75c59742f594aa83749653fc94 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Wed, 13 Jan 2016 16:14:31 +0100 Subject: [PATCH 04/14] _single_frame in AnalysisBase now is called with timestep as argument --- package/MDAnalysis/analysis/base.py | 2 +- package/MDAnalysis/analysis/electromagnetism.py | 2 +- package/MDAnalysis/analysis/parallel_jobs.py | 2 +- testsuite/MDAnalysisTests/test_parallel_jobs.py | 3 +-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index 458c8e06ac1..e9d77919be0 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -93,7 +93,7 @@ def run(self): for i, ts in enumerate( self._trajectory[self.start:self.stop:self.step]): #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) - self._single_frame() + self._single_frame(ts) #logger.info("Finishing up") self._conclude() diff --git a/package/MDAnalysis/analysis/electromagnetism.py b/package/MDAnalysis/analysis/electromagnetism.py index 64cfb1f955b..05a3a445cda 100644 --- a/package/MDAnalysis/analysis/electromagnetism.py +++ b/package/MDAnalysis/analysis/electromagnetism.py @@ -18,7 +18,7 @@ def __init__(self, universe=None, filename='order.dat', selection=None, start=No self._setup_frames(self._universe, start, stop, step) self.dipoles = [] - def _single_frame(self): + def _single_frame(self,timestep): selection = self._universe.select_atoms(self.selection_string) dipole = np.zeros(3) diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py index 70c0d9e7637..c984a363f90 100644 --- a/package/MDAnalysis/analysis/parallel_jobs.py +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -111,7 +111,7 @@ def compute(self, out_queue, order, progress): for timestep in traj[start:stop:step]: for job in jobs_list: - job._single_frame() + job._single_frame(timestep) progress.put(order) # Updates the progress bar out_queue.put((jobs_list, order)) # Returns the results diff --git a/testsuite/MDAnalysisTests/test_parallel_jobs.py b/testsuite/MDAnalysisTests/test_parallel_jobs.py index cab177e2023..d5792907305 100644 --- a/testsuite/MDAnalysisTests/test_parallel_jobs.py +++ b/testsuite/MDAnalysisTests/test_parallel_jobs.py @@ -15,8 +15,7 @@ def setUp(self): # Single thread analysis start_time = time.time() - single_analysis = em.TotalDipole(universe=self.universe, - selection=self.selection_string) + single_analysis = em.TotalDipole(universe=self.universe, selection=self.selection_string) self.single_result = single_analysis.run() def test_parallel(self): From 447f306759da7bdfea76c26d07dce47fbfef0043 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Sun, 17 Jan 2016 04:57:13 +0100 Subject: [PATCH 05/14] Added parallel feature to AnalysisBase class. Added a small progress bar to track parallel computations. --- package/MDAnalysis/analysis/base.py | 165 ++++++++++++++++++- package/MDAnalysis/analysis/parallel_jobs.py | 29 +--- package/MDAnalysis/analysis/utils/prog.py | 98 +++++++++++ 3 files changed, 264 insertions(+), 28 deletions(-) create mode 100644 package/MDAnalysis/analysis/utils/prog.py diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index e9d77919be0..96c9aad68b2 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -22,6 +22,12 @@ """ import logging +import MDAnalysis as mda +import multiprocessing as mp +import copy +import time +from operator import itemgetter +import prog logger = logging.getLogger(__name__) @@ -86,17 +92,162 @@ def _conclude(self): """ pass - def run(self): + def run(self, parallel=None, threads=None): """Perform the calculation""" logger.info("Starting preparation") - self._prepare() - for i, ts in enumerate( - self._trajectory[self.start:self.stop:self.step]): - #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) - self._single_frame(ts) - #logger.info("Finishing up") + if parallel is None: + self._prepare() + for i, ts in enumerate( + self._trajectory[self.start:self.stop:self.step]): + #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) + self._single_frame(ts) + #logger.info("Finishing up") + self._conclude() + + else: + if threads is None: + self.threads = mp.cpu_count()-1 + elif threads > mp.cpu_count(): + self.threads = mp.cpu_count()-1 + else: + self.threads = threads + + self.slices = self.compute_slices() + self.topology = self._universe.filename + self.trajname = self._universe._trajectory.filename + self.nframes = 0 + self._parallel_run() + + def compute_slices(self): + """ + This function returns a list containing the start and + last configuration to be analyzed from each thread + """ + threads = self.threads # Get number of threads from initialization + step = self.step + configs = 1+(self.stop-self.start-1)/step + + #self.nframes = configs + + # Check whether the number of threads is higher than + # the number of trajectories to be analyzed + while configs/threads == 0: + threads -= 1 + + self.threads = threads # Update the number of threads + + print "Total configurations: "+str(configs) + print "Analysis running on ", threads, " threads.\n" + + # Number of cfgs for each thread, and remainder to be added + thread_cfg = configs/threads + reminder = configs%threads + + slices = [] + beg = self.start + + # Compute the start and last configurations + for thread in range(0, threads): + if thread < reminder: + end = beg+step*thread_cfg + else: + end = beg+step*(thread_cfg-1) + + slices.append([beg, end+1]) + beg = end+step + + # Print on screen the configurations assigned to each thread + for thread in range(threads): + confs = 1+(slices[thread][1]-1-slices[thread][0])/step + digits = len(str(self.stop)) + line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \ + +str(slices[thread][0]).rjust(digits)+"/" \ + +str(slices[thread][1]-1).rjust(digits) \ + +" | Configurations: "\ + +str(confs).rjust(1+len(str(thread_cfg))) + print line + + return slices + + def _parallel_run(self): + """ + Create copies of the original object to be + dispatched to multiprocess + """ + + threads = self.threads + + # Queues for the communication between parent and child processes + out_queue = mp.Manager().Queue() + progress = mp.Manager().Queue() + + # Prepare multiprocess objects + processes = [mp.Process(target=self.compute, + args=(out_queue, order, progress)) + for order in range(threads)] + + # Run processes + for process in processes: + process.start() + + thread_configs = [1+(elem[1]-elem[0]-1)/self.step + for elem in self.slices] + + name = self._type() + + pb = prog.ProgressbarMulticore(thread_configs,bar_length=50, name=name) + + while any(process.is_alive() for process in processes): + time.sleep(1) + pb.timer(1) + while not progress.empty(): + core = progress.get() + pb.update(core) + print + + # Exit the completed processes + for process in processes: + process.join() + + results = [] + + # Collects results from the queue + while not out_queue.empty(): + results.append(out_queue.get()) + + for thread in sorted(results, key=itemgetter(1)): + self += thread[0] + self._conclude() + def compute(self, out_queue, order, progress): + """ + Run the single_frame method for each analysis object for all + the trajectories in the batch + """ + start = self.slices[order][0] + stop = self.slices[order][1] + step = self.step + + universe = mda.Universe(self.topology, self.trajname) + traj = universe.trajectory + + analysis_object = copy.deepcopy(self) + analysis_object._setup_frames(universe=universe, start=start, + stop=stop, step=self.step) + + analysis_object._prepare() + + progress.put(order) + for timestep in traj[start:stop:step]: + analysis_object._single_frame(timestep) + progress.put(order) # Updates the progress bar + + out_queue.put((analysis_object, order)) # Returns the results + + def _type(self): + return self.__class__.__name__ + def __getstate__(self): state = dict(self.__dict__) for key in ['_universe', '_trajectory']: diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py index c984a363f90..54ac5dd6daf 100644 --- a/package/MDAnalysis/analysis/parallel_jobs.py +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -5,6 +5,7 @@ import multiprocessing as mp import copy from operator import itemgetter +import prog class ParallelProcessor(object): """ Add class docstring later @@ -41,7 +42,7 @@ def compute_slices(self): """ threads = self.threads # Get number of threads from initialization step = self.step - configs = (self.stop-self.start)/step + configs = 1+(self.stop-self.start-1)/step self.nframes = configs @@ -109,6 +110,7 @@ def compute(self, out_queue, order, progress): stop=stop, step=self.step) job._prepare() + progress.put(order) for timestep in traj[start:stop:step]: for job in jobs_list: job._single_frame(timestep) @@ -147,30 +149,15 @@ def parallel_run(self): thread_configs = [1+(elem[1]-elem[0]-1)/self.step for elem in self.slices] - data = {} + pb = prog.ProgressbarMulticore(thread_configs,bar_length=50) - for thread in range(threads): - data[thread] = 0 - - total = 0 - print while any([process.is_alive() for process in processes]): + time.sleep(1) + pb.timer(1) while not progress.empty(): core = progress.get() - data[core] += 1 - total += 1 - - for thread in range(threads): - print "Core "+str(thread)+": " \ - +str(data[thread])+"/" \ - +str(thread_configs[thread]) - - print "Total: "+str(total)+"/"\ - +str(self.nframes) - - print "\033["+str(threads+2)+"A" - - print (threads+1)*"\n" + pb.update(core) + print # Exit the completed processes for process in processes: diff --git a/package/MDAnalysis/analysis/utils/prog.py b/package/MDAnalysis/analysis/utils/prog.py new file mode 100644 index 00000000000..b49a7041326 --- /dev/null +++ b/package/MDAnalysis/analysis/utils/prog.py @@ -0,0 +1,98 @@ +""" Add docstring later +""" + +import time +import datetime +import numpy as np +import sys +import math + +class ProgressbarMulticore(object): + """ Add class docstring later + """ + def __init__(self,list_of_totals,name="Analysis", bar_length=40): + self.threads_compute_units = np.array(list_of_totals) + self.total_configs = np.sum(self.threads_compute_units) + self.threads = len(self.threads_compute_units) + self.working_threads = self.threads + + self.has_started = [False] * self.threads + self.counter = 0 + self.last_update = np.zeros(self.threads) # times of last update + self.cumulative_time = 0 + + self.eta = 0 + self.elaps = 0 + self.speed = 0 + + self.remaining_cfgs = np.amax(self.threads_compute_units) + self.remaining_changed = False + + # cosmectics + self.name = name + self.bar_length = bar_length + self.dots = 0 + self.eta_started = False + self.cfg_len = len(str(self.total_configs)) + + def _update_timings(self,core_id): + istant = time.time() + + if self.has_started[core_id]: + self.counter += 1 + self.threads_compute_units[core_id] -= 1 + self.cumulative_time += istant-self.last_update[core_id] + self.last_update[core_id] = istant + else: + self.has_started[core_id] = True + self.last_update[core_id] = istant + + remainings = np.amax(self.threads_compute_units) + + if remainings != self.remaining_cfgs: + self.remaining_changed = True + self.remaining_cfgs = remainings + + def _compute_eta(self): + if self.remaining_changed: + self.speed = self.cumulative_time/self.counter + self.eta = self.speed*self.remaining_cfgs + self.remaining_changed = False + + def update(self,core_id): + self._update_timings(core_id) + self._compute_eta() + + def _print_bar(self): + percentage=self.counter*100./self.total_configs + bars = int(percentage/100.*self.bar_length) + empty = self.bar_length-bars + + #timing = " ("+str(round(self.speed,1))+" s/cfg)" + + eta = "" + + left_cfgs = " "+str(self.total_configs-self.counter).rjust(self.cfg_len)+"/"+str(self.total_configs).rjust(self.cfg_len) + + if (self.eta < 1 and self.eta_started is False): + eta=str(self.dots*'.')+str((3-self.dots)*' ') + self.dots += 1 + timing = "" + if self.dots > 3: + self.dots = 0 + else: + self.eta_started = True + eta=str(datetime.timedelta(seconds=int(self.eta))) + + print self.name+" ["+str(bars*"=")+str(empty*" ")+"] "+str(round(percentage,1)).rjust(4)+"% Elapsed: "+str(datetime.timedelta(seconds=self.elaps))+" ETA: "+eta+left_cfgs+"\r", + sys.stdout.flush() + + def timer(self,diff): + if self.eta > diff: + self.eta -= diff + self._print_bar() + else: + self._print_bar() + + if any(self.has_started): + self.elaps += diff From dc29f26f2f63d49eebe491f12dc54cb91204e2fa Mon Sep 17 00:00:00 2001 From: mattihappy Date: Sun, 17 Jan 2016 14:42:03 +0100 Subject: [PATCH 06/14] Number of read configurations with AnalysisBase run(parallel=True) and with class ParallelProcessor has been corrected. Other minor fixes --- package/MDAnalysis/analysis/base.py | 2 +- package/MDAnalysis/analysis/parallel_jobs.py | 5 +++-- package/MDAnalysis/analysis/utils/__init__.py | 0 testsuite/MDAnalysisTests/test_parallel_jobs.py | 7 ++++--- 4 files changed, 8 insertions(+), 6 deletions(-) create mode 100644 package/MDAnalysis/analysis/utils/__init__.py diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index 96c9aad68b2..f84157cf2eb 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -27,7 +27,7 @@ import copy import time from operator import itemgetter -import prog +import utils.prog as prog logger = logging.getLogger(__name__) diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py index 54ac5dd6daf..80b496afe82 100644 --- a/package/MDAnalysis/analysis/parallel_jobs.py +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -4,8 +4,9 @@ import MDAnalysis as mda import multiprocessing as mp import copy +import time from operator import itemgetter -import prog +import utils.prog as prog class ParallelProcessor(object): """ Add class docstring later @@ -157,7 +158,7 @@ def parallel_run(self): while not progress.empty(): core = progress.get() pb.update(core) - print + print # Exit the completed processes for process in processes: diff --git a/package/MDAnalysis/analysis/utils/__init__.py b/package/MDAnalysis/analysis/utils/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/testsuite/MDAnalysisTests/test_parallel_jobs.py b/testsuite/MDAnalysisTests/test_parallel_jobs.py index d5792907305..6b22784b8d4 100644 --- a/testsuite/MDAnalysisTests/test_parallel_jobs.py +++ b/testsuite/MDAnalysisTests/test_parallel_jobs.py @@ -4,7 +4,6 @@ import MDAnalysis.analysis.electromagnetism as em from numpy.testing import * -import time from MDAnalysisTests.datafiles import (DCD, PSF) @@ -14,15 +13,17 @@ def setUp(self): self.selection_string = 'all' # Single thread analysis - start_time = time.time() single_analysis = em.TotalDipole(universe=self.universe, selection=self.selection_string) self.single_result = single_analysis.run() def test_parallel(self): - start_time = time.time() jobs = [em.TotalDipole(selection=self.selection_string)] process = pj.ParallelProcessor(jobs,self.universe) assert_equal(self.single_result, process.parallel_run()) + def test_parallel_base(self): + single_analysis = em.TotalDipole(universe=self.universe, selection=self.selection_string) + assert_equal(self.single_result, single_analysis.run(parallel=True)) + def tearDown(self): del self.universe From 59ce29e79da6afc04716cd4fde0cc5a5dd3f67ba Mon Sep 17 00:00:00 2001 From: mattihappy Date: Sun, 17 Jan 2016 15:19:38 +0100 Subject: [PATCH 07/14] Progressbar now outputs to stderr --- package/MDAnalysis/analysis/base.py | 3 ++- package/MDAnalysis/analysis/parallel_jobs.py | 3 ++- package/MDAnalysis/analysis/utils/prog.py | 6 +++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index f84157cf2eb..a967c35e3bf 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -203,7 +203,8 @@ def _parallel_run(self): while not progress.empty(): core = progress.get() pb.update(core) - print + + pb.summary() # Exit the completed processes for process in processes: diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py index 80b496afe82..a0ecd157389 100644 --- a/package/MDAnalysis/analysis/parallel_jobs.py +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -158,7 +158,8 @@ def parallel_run(self): while not progress.empty(): core = progress.get() pb.update(core) - print + + pb.summary() # Exit the completed processes for process in processes: diff --git a/package/MDAnalysis/analysis/utils/prog.py b/package/MDAnalysis/analysis/utils/prog.py index b49a7041326..12a146631da 100644 --- a/package/MDAnalysis/analysis/utils/prog.py +++ b/package/MDAnalysis/analysis/utils/prog.py @@ -84,7 +84,7 @@ def _print_bar(self): self.eta_started = True eta=str(datetime.timedelta(seconds=int(self.eta))) - print self.name+" ["+str(bars*"=")+str(empty*" ")+"] "+str(round(percentage,1)).rjust(4)+"% Elapsed: "+str(datetime.timedelta(seconds=self.elaps))+" ETA: "+eta+left_cfgs+"\r", + sys.stderr.write("\r"+self.name+" ["+str(bars*"=")+str(empty*" ")+"] "+str(round(percentage,1)).rjust(4)+"% Elapsed: "+str(datetime.timedelta(seconds=self.elaps))+" ETA: "+eta+left_cfgs,) sys.stdout.flush() def timer(self,diff): @@ -96,3 +96,7 @@ def timer(self,diff): if any(self.has_started): self.elaps += diff + + def summary(self): + sys.stderr.write("\n") + print self.name+": analyzed "+str(self.total_configs)+" configurations in "+str(datetime.timedelta(seconds=self.elaps))+"\n" From fc9460690ad92b02aaaf169653a17187f7d592ef Mon Sep 17 00:00:00 2001 From: mattihappy Date: Mon, 18 Jan 2016 23:10:51 +0100 Subject: [PATCH 08/14] Now progressbar works in threading mode (thus also for serial jobs). Moved the code to lib.log --- package/MDAnalysis/analysis/utils/__init__.py | 0 package/MDAnalysis/analysis/utils/prog.py | 102 ---------------- package/MDAnalysis/lib/log.py | 113 ++++++++++++++++++ 3 files changed, 113 insertions(+), 102 deletions(-) delete mode 100644 package/MDAnalysis/analysis/utils/__init__.py delete mode 100644 package/MDAnalysis/analysis/utils/prog.py diff --git a/package/MDAnalysis/analysis/utils/__init__.py b/package/MDAnalysis/analysis/utils/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/package/MDAnalysis/analysis/utils/prog.py b/package/MDAnalysis/analysis/utils/prog.py deleted file mode 100644 index 12a146631da..00000000000 --- a/package/MDAnalysis/analysis/utils/prog.py +++ /dev/null @@ -1,102 +0,0 @@ -""" Add docstring later -""" - -import time -import datetime -import numpy as np -import sys -import math - -class ProgressbarMulticore(object): - """ Add class docstring later - """ - def __init__(self,list_of_totals,name="Analysis", bar_length=40): - self.threads_compute_units = np.array(list_of_totals) - self.total_configs = np.sum(self.threads_compute_units) - self.threads = len(self.threads_compute_units) - self.working_threads = self.threads - - self.has_started = [False] * self.threads - self.counter = 0 - self.last_update = np.zeros(self.threads) # times of last update - self.cumulative_time = 0 - - self.eta = 0 - self.elaps = 0 - self.speed = 0 - - self.remaining_cfgs = np.amax(self.threads_compute_units) - self.remaining_changed = False - - # cosmectics - self.name = name - self.bar_length = bar_length - self.dots = 0 - self.eta_started = False - self.cfg_len = len(str(self.total_configs)) - - def _update_timings(self,core_id): - istant = time.time() - - if self.has_started[core_id]: - self.counter += 1 - self.threads_compute_units[core_id] -= 1 - self.cumulative_time += istant-self.last_update[core_id] - self.last_update[core_id] = istant - else: - self.has_started[core_id] = True - self.last_update[core_id] = istant - - remainings = np.amax(self.threads_compute_units) - - if remainings != self.remaining_cfgs: - self.remaining_changed = True - self.remaining_cfgs = remainings - - def _compute_eta(self): - if self.remaining_changed: - self.speed = self.cumulative_time/self.counter - self.eta = self.speed*self.remaining_cfgs - self.remaining_changed = False - - def update(self,core_id): - self._update_timings(core_id) - self._compute_eta() - - def _print_bar(self): - percentage=self.counter*100./self.total_configs - bars = int(percentage/100.*self.bar_length) - empty = self.bar_length-bars - - #timing = " ("+str(round(self.speed,1))+" s/cfg)" - - eta = "" - - left_cfgs = " "+str(self.total_configs-self.counter).rjust(self.cfg_len)+"/"+str(self.total_configs).rjust(self.cfg_len) - - if (self.eta < 1 and self.eta_started is False): - eta=str(self.dots*'.')+str((3-self.dots)*' ') - self.dots += 1 - timing = "" - if self.dots > 3: - self.dots = 0 - else: - self.eta_started = True - eta=str(datetime.timedelta(seconds=int(self.eta))) - - sys.stderr.write("\r"+self.name+" ["+str(bars*"=")+str(empty*" ")+"] "+str(round(percentage,1)).rjust(4)+"% Elapsed: "+str(datetime.timedelta(seconds=self.elaps))+" ETA: "+eta+left_cfgs,) - sys.stdout.flush() - - def timer(self,diff): - if self.eta > diff: - self.eta -= diff - self._print_bar() - else: - self._print_bar() - - if any(self.has_started): - self.elaps += diff - - def summary(self): - sys.stderr.write("\n") - print self.name+": analyzed "+str(self.total_configs)+" configurations in "+str(datetime.timedelta(seconds=self.elaps))+"\n" diff --git a/package/MDAnalysis/lib/log.py b/package/MDAnalysis/lib/log.py index 3f8d6eba708..ee1c9755c68 100644 --- a/package/MDAnalysis/lib/log.py +++ b/package/MDAnalysis/lib/log.py @@ -75,6 +75,10 @@ import sys import logging +import time +import datetime +import numpy as np +import threading from .. import version @@ -303,3 +307,112 @@ def echo(self, step, **kwargs): else: return echo(format % vars(self)) + +class Progressbar(threading.Thread): + """ Add class docstring later + """ + def __init__(self,list_of_totals,name="Analysis", bar_length=40, refresh=1): + threading.Thread.__init__(self) + # Number of units per thread, total of units and number of threads + self.threads_compute_units = np.array(list_of_totals) + self.total_units = np.sum(self.threads_compute_units) + self.threads = len(self.threads_compute_units) + + self.has_started = [False] * self.threads + self.counter = 0 # number of processed units + self.last_update = np.zeros(self.threads) # times of last update + self.cumulative_time = 0 + + self.daemon = True # kills bar if main thread died + self.eta = 0 # estimated time of accomplishment + self.elaps = 0 # elapsed time + self.speed = 0 # seconds per unit + self.freq = refresh # frequency of bar refreshing + + self.remaining_units = np.amax(self.threads_compute_units) + self.remaining_changed = False + + # Bar-related variables + self.name = name + self.bar_length = bar_length + self.dots = 0 + self.eta_started = False + self.cfg_len = len(str(self.total_units)) + + def _update_timings(self,core_id): + istant = time.time() + + # Update statistics each time a new unit has been completed + if self.has_started[core_id]: + self.counter += 1 + self.threads_compute_units[core_id] -= 1 + self.cumulative_time += istant-self.last_update[core_id] + self.last_update[core_id] = istant + else: + self.has_started[core_id] = True + self.last_update[core_id] = istant + + # Update eta only if the highest number of units left has + # decreased (prevents eta from changind all the time) + remainings = np.amax(self.threads_compute_units) + + if remainings != self.remaining_units: + self.remaining_changed = True + self.remaining_units = remainings + + def _compute_eta(self): + # Only update eta if the highest number of units left has changed + if self.remaining_changed: + self.speed = (self.cumulative_time/self.counter) + self.eta = self.speed*self.remaining_units + self.remaining_changed = False + + def update(self,core_id): + self._update_timings(core_id) + self._compute_eta() + + def _print_bar(self): + percentage=self.counter*100./self.total_units + bars = int(percentage/100.*self.bar_length) + empty = self.bar_length-bars + + eta = "" + + left_cfgs = " "+str(self.total_units-self.counter).rjust(self.cfg_len)+"/"+str(self.total_units).rjust(self.cfg_len) + + # Only start timing if at least one unit has arrived + if (self.eta < 1 and self.eta_started is False): + eta=str(self.dots*'.')+str((3-self.dots)*' ') + self.dots += 1 + timing = "" + if self.dots > 3: + self.dots = 0 + else: + self.eta_started = True + eta=str(datetime.timedelta(seconds=int(self.eta))) + + # Output bar to stderr + print "\033[2A" # move cursor one line up + sys.stderr.write(self.name+" ["+str(bars*"=")+str(empty*" ")+"] "+str(round(percentage,1)).rjust(4)+"% Elapsed: "+str(datetime.timedelta(seconds=self.elaps))+" ETA: "+eta+left_cfgs+"\n") + sys.stdout.flush() + + def run(self): + # Avoids negative ETA + print "\n" # avoid overwriting previous line in terminal + while self.remaining_units > 0: + if self.eta > self.freq: + self.eta -= self.freq + self._print_bar() + else: + self._print_bar() + # Update elaps time only if at least one unit has arrived + if any(self.has_started): + self.elaps += self.freq + + time.sleep(self.freq) + # Print a summary at the end + self._summary() + + def _summary(self): + sys.stderr.write("\n") + print self.name+": analyzed "+str(self.total_units)+" units in "+str(datetime.timedelta(seconds=self.elaps))+"\n" From 7a54fad2ffb4d1353f88b921bdd3884b9984af49 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Tue, 19 Jan 2016 00:33:50 +0100 Subject: [PATCH 09/14] Code check with pylint and more meaningful naming of variables --- package/MDAnalysis/lib/log.py | 79 ++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/package/MDAnalysis/lib/log.py b/package/MDAnalysis/lib/log.py index ee1c9755c68..2bef24b05cd 100644 --- a/package/MDAnalysis/lib/log.py +++ b/package/MDAnalysis/lib/log.py @@ -77,8 +77,8 @@ import logging import time import datetime -import numpy as np import threading +import numpy as np from .. import version @@ -311,25 +311,26 @@ def echo(self, step, **kwargs): class Progressbar(threading.Thread): """ Add class docstring later """ - def __init__(self,list_of_totals,name="Analysis", bar_length=40, refresh=1): + def __init__(self, list_of_totals, name="Analysis", + bar_length=40, refresh=1): threading.Thread.__init__(self) - # Number of units per thread, total of units and number of threads - self.threads_compute_units = np.array(list_of_totals) - self.total_units = np.sum(self.threads_compute_units) - self.threads = len(self.threads_compute_units) + # Number of units per job, total of units and number of jobs + self.jobs_compute_units = np.array(list_of_totals) + self.total_units = np.sum(self.jobs_compute_units) + self.jobs = len(self.jobs_compute_units) - self.has_started = [False] * self.threads + self.has_started = [False] * self.jobs self.counter = 0 # number of processed units - self.last_update = np.zeros(self.threads) # times of last update + self.last_update = np.zeros(self.jobs) # times of last update self.cumulative_time = 0 - + self.daemon = True # kills bar if main thread died self.eta = 0 # estimated time of accomplishment self.elaps = 0 # elapsed time self.speed = 0 # seconds per unit self.freq = refresh # frequency of bar refreshing - self.remaining_units = np.amax(self.threads_compute_units) + self.remaining_units = np.amax(self.jobs_compute_units) self.remaining_changed = False # Bar-related variables @@ -338,24 +339,24 @@ def __init__(self,list_of_totals,name="Analysis", bar_length=40, refresh=1): self.dots = 0 self.eta_started = False self.cfg_len = len(str(self.total_units)) - - def _update_timings(self,core_id): + + def _update_timings(self, job_id): istant = time.time() # Update statistics each time a new unit has been completed - if self.has_started[core_id]: + if self.has_started[job_id]: self.counter += 1 - self.threads_compute_units[core_id] -= 1 - self.cumulative_time += istant-self.last_update[core_id] - self.last_update[core_id] = istant + self.jobs_compute_units[job_id] -= 1 + self.cumulative_time += istant-self.last_update[job_id] + self.last_update[job_id] = istant else: - self.has_started[core_id] = True - self.last_update[core_id] = istant + self.has_started[job_id] = True + self.last_update[job_id] = istant # Update eta only if the highest number of units left has # decreased (prevents eta from changind all the time) - remainings = np.amax(self.threads_compute_units) - + remainings = np.amax(self.jobs_compute_units) + if remainings != self.remaining_units: self.remaining_changed = True self.remaining_units = remainings @@ -367,33 +368,50 @@ def _compute_eta(self): self.eta = self.speed*self.remaining_units self.remaining_changed = False - def update(self,core_id): - self._update_timings(core_id) + def update(self, job_id): + """Update progressbar by sending the list index corresponding to the + job you're willing to update. + + Example: two jobs, job1 has 10 work units and job2 15. To + initialize Progressbar you created the object as: + progressbar = MDAnalysis.lib.log.Progressbar([10, 15]) + + If job1 finished one unit, you should run: + progressbar.update(0) + + while if job2 finished one unit, you should run: + progressbar.update(1) + + """ + self._update_timings(job_id) self._compute_eta() def _print_bar(self): - percentage=self.counter*100./self.total_units + percentage = self.counter*100./self.total_units bars = int(percentage/100.*self.bar_length) empty = self.bar_length-bars eta = "" - left_cfgs = " "+str(self.total_units-self.counter).rjust(self.cfg_len)+"/"+str(self.total_units).rjust(self.cfg_len) + left_cfgs = " "+str(self.total_units-self.counter).rjust(self.cfg_len)+"/" \ + +str(self.total_units).rjust(self.cfg_len) # Only start timing if at least one unit has arrived - if (self.eta < 1 and self.eta_started is False): - eta=str(self.dots*'.')+str((3-self.dots)*' ') + if self.eta < 1 and self.eta_started is False: + eta = str(self.dots*'.')+str((3-self.dots)*' ') self.dots += 1 - timing = "" if self.dots > 3: self.dots = 0 else: self.eta_started = True - eta=str(datetime.timedelta(seconds=int(self.eta))) + eta = str(datetime.timedelta(seconds=int(self.eta))) # Output bar to stderr print "\033[2A" # move cursor one line up - sys.stderr.write(self.name+" ["+str(bars*"=")+str(empty*" ")+"] "+str(round(percentage,1)).rjust(4)+"% Elapsed: "+str(datetime.timedelta(seconds=self.elaps))+" ETA: "+eta+left_cfgs+"\n") + sys.stderr.write(self.name+" ["+str(bars*"=")+str(empty*" ")+"] " \ + +str(round(percentage, 1)).rjust(4)+"% Elapsed: " \ + +str(datetime.timedelta(seconds=self.elaps)) \ + +" ETA: "+eta+left_cfgs+"\n") sys.stdout.flush() def run(self): @@ -415,4 +433,5 @@ def run(self): def _summary(self): sys.stderr.write("\n") - print self.name+": analyzed "+str(self.total_units)+" units in "+str(datetime.timedelta(seconds=self.elaps))+"\n" + print self.name+": computed "+str(self.total_units) \ + +" units in "+str(datetime.timedelta(seconds=self.elaps))+"\n" From 4fccebd78d286e60600f61f2bf8580e64468a2ef Mon Sep 17 00:00:00 2001 From: mattihappy Date: Tue, 19 Jan 2016 00:36:30 +0100 Subject: [PATCH 10/14] Modified method run() of AnalysisBase class so that it redirects to appropriare serial or parallel method. Adapted AnalysisBase and ParallelProcessor to run with new progressbar, which now works also for serial analyses --- package/MDAnalysis/analysis/base.py | 167 ++++++++++--------- package/MDAnalysis/analysis/parallel_jobs.py | 19 +-- 2 files changed, 93 insertions(+), 93 deletions(-) diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index a967c35e3bf..98a1d083ced 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -22,12 +22,10 @@ """ import logging -import MDAnalysis as mda import multiprocessing as mp import copy -import time from operator import itemgetter -import utils.prog as prog +import MDAnalysis as mda logger = logging.getLogger(__name__) @@ -68,14 +66,14 @@ def _setup_frames(self, universe, start=None, stop=None, step=None): self._trajectory = self._universe.trajectory start, stop, step = self._trajectory.check_slice_indices(start, - stop, - step) + stop, + step) self.start = start self.stop = stop self.step = step self.nframes = len(xrange(start, stop, step)) - def _single_frame(self): + def _single_frame(self, timestep): """Calculate data from a single frame of trajectory Don't worry about normalising, just deal with a single frame. @@ -93,32 +91,89 @@ def _conclude(self): pass def run(self, parallel=None, threads=None): + """ Chooses whether to run analysis in serial or parallel + mode depending on user input""" + if parallel is None: + self._serial_run() + else: + self._parallel_run(threads) + + def _serial_run(self): """Perform the calculation""" logger.info("Starting preparation") - if parallel is None: - self._prepare() - for i, ts in enumerate( - self._trajectory[self.start:self.stop:self.step]): - #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) - self._single_frame(ts) - #logger.info("Finishing up") - self._conclude() + prog = mda.lib.log.Progressbar([self.nframes]) + prog.start() + self._prepare() + for i, ts in enumerate( + self._trajectory[self.start:self.stop:self.step]): + #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) + self._single_frame(ts) + prog.update(0) + #logger.info("Finishing up") + self._conclude() + + def _parallel_run(self, threads=None): + """ + Create copies of the original object to be + dispatched to multiprocess + """ + if threads is None: + self.threads = mp.cpu_count()-1 + elif threads > mp.cpu_count(): + self.threads = mp.cpu_count()-1 else: - if threads is None: - self.threads = mp.cpu_count()-1 - elif threads > mp.cpu_count(): - self.threads = mp.cpu_count()-1 - else: - self.threads = threads - - self.slices = self.compute_slices() - self.topology = self._universe.filename - self.trajname = self._universe._trajectory.filename - self.nframes = 0 - self._parallel_run() - - def compute_slices(self): + self.threads = threads + + self.slices = self._compute_slices() + self.topology = self._universe.filename + self.trajname = self._universe._trajectory.filename + self.nframes = 0 + + threads = self.threads + + # Queues for the communication between parent and child processes + out_queue = mp.Manager().Queue() + progress = mp.Manager().Queue() + + # Prepare multiprocess objects + processes = [mp.Process(target=self._compute, + args=(out_queue, order, progress)) + for order in range(threads)] + + # Run processes + for process in processes: + process.start() + + thread_configs = [1+(elem[1]-elem[0]-1)/self.step + for elem in self.slices] + + name = self._type() + + prog = mda.lib.log.Progressbar(thread_configs, bar_length=50, name=name) + prog.start() + + while any(process.is_alive() for process in processes): + while not progress.empty(): + core = progress.get() + prog.update(core) + + # Exit the completed processes + for process in processes: + process.join() + + results = [] + + # Collects results from the queue + while not out_queue.empty(): + results.append(out_queue.get()) + + for thread in sorted(results, key=itemgetter(1)): + self += thread[0] + + self._conclude() + + def _compute_slices(self): """ This function returns a list containing the start and last configuration to be analyzed from each thread @@ -167,61 +222,9 @@ def compute_slices(self): +str(confs).rjust(1+len(str(thread_cfg))) print line - return slices - - def _parallel_run(self): - """ - Create copies of the original object to be - dispatched to multiprocess - """ - - threads = self.threads - - # Queues for the communication between parent and child processes - out_queue = mp.Manager().Queue() - progress = mp.Manager().Queue() - - # Prepare multiprocess objects - processes = [mp.Process(target=self.compute, - args=(out_queue, order, progress)) - for order in range(threads)] - - # Run processes - for process in processes: - process.start() - - thread_configs = [1+(elem[1]-elem[0]-1)/self.step - for elem in self.slices] - - name = self._type() - - pb = prog.ProgressbarMulticore(thread_configs,bar_length=50, name=name) - - while any(process.is_alive() for process in processes): - time.sleep(1) - pb.timer(1) - while not progress.empty(): - core = progress.get() - pb.update(core) - - pb.summary() - - # Exit the completed processes - for process in processes: - process.join() - - results = [] - - # Collects results from the queue - while not out_queue.empty(): - results.append(out_queue.get()) - - for thread in sorted(results, key=itemgetter(1)): - self += thread[0] - - self._conclude() + return slices - def compute(self, out_queue, order, progress): + def _compute(self, out_queue, order, progress): """ Run the single_frame method for each analysis object for all the trajectories in the batch diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py index a0ecd157389..b704d58e621 100644 --- a/package/MDAnalysis/analysis/parallel_jobs.py +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -1,12 +1,11 @@ """ Add a docstring later """ -import MDAnalysis as mda import multiprocessing as mp import copy import time from operator import itemgetter -import utils.prog as prog +import MDAnalysis as mda class ParallelProcessor(object): """ Add class docstring later @@ -18,7 +17,7 @@ def __init__(self, jobs_list, universe, start=None, stop=None, self.trajname = universe._trajectory.filename start, stop, step = universe.trajectory.check_slice_indices(start, - stop, step) + stop, step) self.start = start self.stop = stop @@ -139,8 +138,8 @@ def parallel_run(self): # Prepare multiprocess objects processes = [mp.Process(target=self.compute, - args=(out_queue, order, progress)) - for order in range(threads)] + args=(out_queue, order, progress)) + for order in range(threads)] # Run processes @@ -150,16 +149,14 @@ def parallel_run(self): thread_configs = [1+(elem[1]-elem[0]-1)/self.step for elem in self.slices] - pb = prog.ProgressbarMulticore(thread_configs,bar_length=50) + prog = mda.lib.log.Progressbar(thread_configs, bar_length=50, + name="ParallelProcessor") + prog.start() while any([process.is_alive() for process in processes]): - time.sleep(1) - pb.timer(1) while not progress.empty(): core = progress.get() - pb.update(core) - - pb.summary() + prog.update(core) # Exit the completed processes for process in processes: From d83a821ca250bcb1c2c540370e6f8595fcbbdaf2 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Wed, 20 Jan 2016 14:01:46 +0100 Subject: [PATCH 11/14] Modified tests and analysis modules to work with new AnalysisBase (which now takes a universe and not a trajectory, and requires method _single_frame to take a timestep as argument --- package/MDAnalysis/analysis/polymer.py | 4 ++-- package/MDAnalysis/analysis/rdf.py | 6 +++--- testsuite/MDAnalysisTests/analysis/test_base.py | 17 ++++++++--------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/package/MDAnalysis/analysis/polymer.py b/package/MDAnalysis/analysis/polymer.py index 77a4c05d60e..10708cf5a99 100644 --- a/package/MDAnalysis/analysis/polymer.py +++ b/package/MDAnalysis/analysis/polymer.py @@ -73,12 +73,12 @@ def __init__(self, atomgroups, if not all( l == chainlength for l in lens): raise ValueError("Not all AtomGroups were the same size") - self._setup_frames(atomgroups[0].universe.trajectory, + self._setup_frames(atomgroups[0].universe, start, stop, step) self._results = np.zeros(chainlength - 1, dtype=np.float32) - def _single_frame(self): + def _single_frame(self,timestep): # could optimise this by writing a "self dot array" # we're only using the upper triangle of np.inner # function would accept a bunch of coordinates and spit out the diff --git a/package/MDAnalysis/analysis/rdf.py b/package/MDAnalysis/analysis/rdf.py index 946e2ae8e9e..14bf712bf1f 100644 --- a/package/MDAnalysis/analysis/rdf.py +++ b/package/MDAnalysis/analysis/rdf.py @@ -84,7 +84,7 @@ def __init__(self, g1, g2, self.g2 = g2 self.u = g1.universe - self._setup_frames(self.u.trajectory, + self._setup_frames(self.u, start=start, stop=stop, step=step) @@ -115,7 +115,7 @@ def __init__(self, g1, g2, self._exclusion_block = None self._exclusion_mask = None - def _single_frame(self): + def _single_frame(self,timestep): distances.distance_array(self.g1.positions, self.g2.positions, box=self.u.dimensions, result=self._result) # Maybe exclude same molecule distances @@ -125,7 +125,7 @@ def _single_frame(self): count = np.histogram(self._result, **self.rdf_settings)[0] self.count += count - self.volume += self._ts.volume + self.volume += timestep.volume def _conclude(self): # Number of each selection diff --git a/testsuite/MDAnalysisTests/analysis/test_base.py b/testsuite/MDAnalysisTests/analysis/test_base.py index 5e9c1bbd725..541cda71f1d 100644 --- a/testsuite/MDAnalysisTests/analysis/test_base.py +++ b/testsuite/MDAnalysisTests/analysis/test_base.py @@ -27,17 +27,16 @@ class FrameAnalysis(AnalysisBase): """Just grabs frame numbers of frames it goes over""" - def __init__(self, reader, start=None, stop=None, step=None): - self.traj = reader - self._setup_frames(reader, + def __init__(self, universe, start=None, stop=None, step=None): + self._setup_frames(universe, start=start, stop=stop, step=step) self.frames = [] - def _single_frame(self): - self.frames.append(self._ts.frame) + def _single_frame(self, timestep): + self.frames.append(timestep.frame) class TestAnalysisBase(object): @@ -49,28 +48,28 @@ def tearDown(self): del self.u def test_default(self): - an = FrameAnalysis(self.u.trajectory) + an = FrameAnalysis(self.u) assert_(an.nframes == len(self.u.trajectory)) an.run() assert_(an.frames == range(len(self.u.trajectory))) def test_start(self): - an = FrameAnalysis(self.u.trajectory, start=20) + an = FrameAnalysis(self.u, start=20) assert_(an.nframes == len(self.u.trajectory) - 20) an.run() assert_(an.frames == range(20, len(self.u.trajectory))) def test_stop(self): - an = FrameAnalysis(self.u.trajectory, stop=20) + an = FrameAnalysis(self.u, stop=20) assert_(an.nframes == 20) an.run() assert_(an.frames == range(20)) def test_step(self): - an = FrameAnalysis(self.u.trajectory, step=20) + an = FrameAnalysis(self.u, step=20) assert_(an.nframes == 5) an.run() From 747b486aaa5b91e4e14a8f359822f9ef3e3e2925 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Wed, 20 Jan 2016 19:25:38 +0100 Subject: [PATCH 12/14] Modified AnalysisBase, selections should now be passed within a list. --- package/MDAnalysis/analysis/base.py | 41 ++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index 98a1d083ced..8c8057a3f89 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -39,7 +39,6 @@ class AnalysisBase(object): _setup_frames(trajectory, start=None, stop=None, step=None) Pass a Reader object and define the desired iteration pattern through the trajectory - run The user facing run method. Calls the analysis methods defined below @@ -75,7 +74,6 @@ def _setup_frames(self, universe, start=None, stop=None, step=None): def _single_frame(self, timestep): """Calculate data from a single frame of trajectory - Don't worry about normalising, just deal with a single frame. """ pass @@ -233,12 +231,10 @@ def _compute(self, out_queue, order, progress): stop = self.slices[order][1] step = self.step - universe = mda.Universe(self.topology, self.trajname) - traj = universe.trajectory - analysis_object = copy.deepcopy(self) - analysis_object._setup_frames(universe=universe, start=start, - stop=stop, step=self.step) + + analysis_object.nframes = len(xrange(start, stop, step)) + traj = analysis_object._universe.trajectory analysis_object._prepare() @@ -247,6 +243,10 @@ def _compute(self, out_queue, order, progress): analysis_object._single_frame(timestep) progress.put(order) # Updates the progress bar + # Avoid initializing universe again when results are sent back + analysis_object._universe.filename = None + analysis_object._universe.trajectory.filename = None + out_queue.put((analysis_object, order)) # Returns the results def _type(self): @@ -254,7 +254,28 @@ def _type(self): def __getstate__(self): state = dict(self.__dict__) - for key in ['_universe', '_trajectory']: - if key in state: - del state[key] + # Replace the _ags entry with indices + # pop removes the _ag key, or returns [] (empty list) if the Key didn't exist + state['ag indices'] = [ag.indices for ag in state.pop('_ag', [])] + try: + state['universe filenames'] = self._universe.filename, self._universe.trajectory.filename + except: + pass + state.pop('_universe', None) + state.pop('_trajectory', None) + return state + + + def __setstate__(self, state): + self.__dict__ = state + # Create my local Universe + try: + # Create my local Universe + self._universe = mda.Universe(*state['universe filenames']) + # Create my local AGs + self._ag = [self._universe.atoms[idx] for idx in state['ag indices']] + except: + pass + + From a67c12087dd395f99e78a90389b9cd203f3ad11f Mon Sep 17 00:00:00 2001 From: Richard Gowers Date: Wed, 20 Jan 2016 22:17:07 +0000 Subject: [PATCH 13/14] Proposed improvements to parallelbase Only the results of copies of Analysis base are passed back (saves repickling the object) Uses future division (to save headaches later) --- package/MDAnalysis/analysis/base.py | 118 +++++++++++----------------- package/MDAnalysis/analysis/rdf.py | 40 ++++++---- 2 files changed, 72 insertions(+), 86 deletions(-) diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index 8c8057a3f89..c2aaf6ae667 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -20,11 +20,13 @@ A collection of useful building blocks for creating Analysis classes. """ +from __future__ import division import logging import multiprocessing as mp import copy from operator import itemgetter + import MDAnalysis as mda @@ -64,9 +66,8 @@ def _setup_frames(self, universe, start=None, stop=None, step=None): self._universe = universe self._trajectory = self._universe.trajectory - start, stop, step = self._trajectory.check_slice_indices(start, - stop, - step) + start, stop, step = self._trajectory.check_slice_indices( + start, stop, step) self.start = start self.stop = stop self.step = step @@ -88,13 +89,13 @@ def _conclude(self): """ pass - def run(self, parallel=None, threads=None): + def run(self, parallel=None, nthreads=None): """ Chooses whether to run analysis in serial or parallel mode depending on user input""" - if parallel is None: + if not parallel: self._serial_run() else: - self._parallel_run(threads) + self._parallel_run(nthreads) def _serial_run(self): """Perform the calculation""" @@ -110,25 +111,18 @@ def _serial_run(self): #logger.info("Finishing up") self._conclude() - def _parallel_run(self, threads=None): + def _parallel_run(self, nthreads=None): """ Create copies of the original object to be dispatched to multiprocess """ - - if threads is None: - self.threads = mp.cpu_count()-1 - elif threads > mp.cpu_count(): - self.threads = mp.cpu_count()-1 + if nthreads is None: + self.nthreads = mp.cpu_count() - 1 else: - self.threads = threads + # Cap number of threads + self.nthreads = min([mp.cpu_count(), nthreads, self.nframes]) self.slices = self._compute_slices() - self.topology = self._universe.filename - self.trajname = self._universe._trajectory.filename - self.nframes = 0 - - threads = self.threads # Queues for the communication between parent and child processes out_queue = mp.Manager().Queue() @@ -137,18 +131,17 @@ def _parallel_run(self, threads=None): # Prepare multiprocess objects processes = [mp.Process(target=self._compute, args=(out_queue, order, progress)) - for order in range(threads)] + for order in range(self.nthreads)] # Run processes for process in processes: process.start() - thread_configs = [1+(elem[1]-elem[0]-1)/self.step + thread_configs = [1+(elem[1]-elem[0]-1) // self.step for elem in self.slices] - name = self._type() - - prog = mda.lib.log.Progressbar(thread_configs, bar_length=50, name=name) + prog = mda.lib.log.Progressbar( + thread_configs, bar_length=50, name=self.__class__.__name__) prog.start() while any(process.is_alive() for process in processes): @@ -161,14 +154,15 @@ def _parallel_run(self, threads=None): process.join() results = [] - # Collects results from the queue while not out_queue.empty(): results.append(out_queue.get()) - for thread in sorted(results, key=itemgetter(1)): - self += thread[0] + # Sort results, then collate them + for other_results in sorted(results, key=itemgetter(1)): + self._add_other_results(other_results[0]) + # Averaging here self._conclude() def _compute_slices(self): @@ -176,44 +170,34 @@ def _compute_slices(self): This function returns a list containing the start and last configuration to be analyzed from each thread """ - threads = self.threads # Get number of threads from initialization step = self.step - configs = 1+(self.stop-self.start-1)/step - - #self.nframes = configs - - # Check whether the number of threads is higher than - # the number of trajectories to be analyzed - while configs/threads == 0: - threads -= 1 - - self.threads = threads # Update the number of threads + configs = 1 + (self.stop - self.start - 1) // step - print "Total configurations: "+str(configs) - print "Analysis running on ", threads, " threads.\n" + print "Total configurations: {}".format(configs) + print "Analysis running on {} threads.\n".format(self.nthreads) # Number of cfgs for each thread, and remainder to be added - thread_cfg = configs/threads - reminder = configs%threads + thread_cfg = configs // self.nthreads + reminder = configs % self.nthreads slices = [] beg = self.start # Compute the start and last configurations - for thread in range(0, threads): + for thread in range(0, self.nthreads): if thread < reminder: - end = beg+step*thread_cfg + end = beg + step * thread_cfg else: - end = beg+step*(thread_cfg-1) + end = beg + step * (thread_cfg-1) slices.append([beg, end+1]) - beg = end+step + beg = end + step # Print on screen the configurations assigned to each thread - for thread in range(threads): + for thread in range(self.nthreads): confs = 1+(slices[thread][1]-1-slices[thread][0])/step digits = len(str(self.stop)) - line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \ + line = "Thread "+str(thread+1).rjust(len(str(self.nthreads)))+": " \ +str(slices[thread][0]).rjust(digits)+"/" \ +str(slices[thread][1]-1).rjust(digits) \ +" | Configurations: "\ @@ -226,11 +210,16 @@ def _compute(self, out_queue, order, progress): """ Run the single_frame method for each analysis object for all the trajectories in the batch + + order - my id among all the parallel versions + out_queue - where to put my results + progress - the progressbar to update """ start = self.slices[order][0] stop = self.slices[order][1] step = self.step + # Create a local version of the analysis object analysis_object = copy.deepcopy(self) analysis_object.nframes = len(xrange(start, stop, step)) @@ -243,39 +232,28 @@ def _compute(self, out_queue, order, progress): analysis_object._single_frame(timestep) progress.put(order) # Updates the progress bar - # Avoid initializing universe again when results are sent back - analysis_object._universe.filename = None - analysis_object._universe.trajectory.filename = None - - out_queue.put((analysis_object, order)) # Returns the results - - def _type(self): - return self.__class__.__name__ + # Returns the results along with our order index + out_queue.put((analysis_object.results, order)) def __getstate__(self): state = dict(self.__dict__) # Replace the _ags entry with indices # pop removes the _ag key, or returns [] (empty list) if the Key didn't exist - state['ag indices'] = [ag.indices for ag in state.pop('_ag', [])] - try: - state['universe filenames'] = self._universe.filename, self._universe.trajectory.filename - except: - pass + ag_indices = [ag.indices for ag in state.pop('_ags', [])] + universe_filenames = (self._universe.filename, self._universe.trajectory.filename) + state.pop('_ags', None) state.pop('_universe', None) state.pop('_trajectory', None) - return state - + return state, universe_filenames, ag_indices def __setstate__(self, state): - self.__dict__ = state + statedict, universe_filenames, ag_indices = state + self.__dict__ = statedict # Create my local Universe - try: - # Create my local Universe - self._universe = mda.Universe(*state['universe filenames']) - # Create my local AGs - self._ag = [self._universe.atoms[idx] for idx in state['ag indices']] - except: - pass + self._universe = mda.Universe(*universe_filenames) + self._ags = [self._universe.atoms[idx] + for idx in ag_indices] + diff --git a/package/MDAnalysis/analysis/rdf.py b/package/MDAnalysis/analysis/rdf.py index 14bf712bf1f..e0a6c677805 100644 --- a/package/MDAnalysis/analysis/rdf.py +++ b/package/MDAnalysis/analysis/rdf.py @@ -80,11 +80,10 @@ class InterRDF(AnalysisBase): def __init__(self, g1, g2, nbins=75, range=(0.0, 15.0), exclusion_block=None, start=None, stop=None, step=None): - self.g1 = g1 - self.g2 = g2 - self.u = g1.universe + self._ags = [g1, g2] + self._universe = g1.universe - self._setup_frames(self.u, + self._setup_frames(self._universe, start=start, stop=stop, step=step) @@ -92,19 +91,22 @@ def __init__(self, g1, g2, self.rdf_settings = {'bins':nbins, 'range':range} + self.results = {} + # Empty histogram to store the RDF count, edges = np.histogram([-1], **self.rdf_settings) count = count.astype(np.float64) count *= 0.0 - self.count = count + self.results['count'] = count self.edges = edges self.bins = 0.5 * (edges[:-1] + edges[1:]) # Need to know average volume - self.volume = 0.0 + self.results['volume'] = 0.0 # Allocate a results array which we will reuse - self._result = np.zeros((len(self.g1), len(self.g2)), dtype=np.float64) + self._result = np.zeros((len(self._ags[0]), len(self._ags[1])), + dtype=np.float64) # If provided exclusions, create a mask of _result which # lets us take these out if not exclusion_block is None: @@ -115,22 +117,28 @@ def __init__(self, g1, g2, self._exclusion_block = None self._exclusion_mask = None + def _add_other_results(self, other): + for k in ['count', 'volume']: + self.results[k] += other[k] + def _single_frame(self,timestep): - distances.distance_array(self.g1.positions, self.g2.positions, - box=self.u.dimensions, result=self._result) + distances.distance_array( + self._ags[0].positions, + self._ags[1].positions, + box=self._ags[0].dimensions, + result=self._result) # Maybe exclude same molecule distances if not self._exclusion_mask is None: self._exclusion_mask[:] = self._maxrange count = np.histogram(self._result, **self.rdf_settings)[0] - self.count += count - - self.volume += timestep.volume + self.results['count'] += count + self.results['volume'] += timestep.volume def _conclude(self): # Number of each selection - nA = len(self.g1) - nB = len(self.g2) + nA = len(self._ags[0]) + nB = len(self._ags[1]) N = nA * nB # If we had exclusions, take these into account @@ -144,10 +152,10 @@ def _conclude(self): vol *= 4/3.0 * np.pi # Average number density - box_vol = self.volume / self.nframes + box_vol = self.results['volume'] / self.nframes density = N / box_vol - rdf = self.count / (density * vol * self.nframes) + rdf = self.results['count'] / (density * vol * self.nframes) self.rdf = rdf From d6a77f94dad1600cb29d599803cbe6828f551478 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Fri, 22 Jan 2016 23:09:00 +0100 Subject: [PATCH 14/14] Added tests for parallel usage of AnalysisBase --- .../MDAnalysisTests/analysis/test_base.py | 46 +++++++++++++++---- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/testsuite/MDAnalysisTests/analysis/test_base.py b/testsuite/MDAnalysisTests/analysis/test_base.py index 541cda71f1d..e02b542f05d 100644 --- a/testsuite/MDAnalysisTests/analysis/test_base.py +++ b/testsuite/MDAnalysisTests/analysis/test_base.py @@ -18,6 +18,7 @@ from numpy.testing import ( assert_, ) +from multiprocessing import cpu_count import MDAnalysis as mda from MDAnalysis.analysis.base import AnalysisBase @@ -33,44 +34,71 @@ def __init__(self, universe, start=None, stop=None, step=None): stop=stop, step=step) - self.frames = [] + self.results = {} + self.results['frames'] = [] def _single_frame(self, timestep): - self.frames.append(timestep.frame) + self.results['frames'].append(timestep.frame) + def _add_other_results(self, other_result): + self.results['frames'] += other_result['frames'] class TestAnalysisBase(object): def setUp(self): # has 98 frames self.u = mda.Universe(PSF, DCD) + self.frames = len(self.u.trajectory) def tearDown(self): del self.u def test_default(self): an = FrameAnalysis(self.u) - assert_(an.nframes == len(self.u.trajectory)) - + assert_(an.nframes == self.frames) an.run() - assert_(an.frames == range(len(self.u.trajectory))) + assert_(an.results['frames'] == range(self.frames)) + + for cores in range(1, cpu_count() + 1): + an_par = FrameAnalysis(self.u) + assert_(an_par.nframes == self.frames) + an_par.run(parallel=True, nthreads=cores) + assert_(an_par.results['frames'] == range(self.frames)) def test_start(self): an = FrameAnalysis(self.u, start=20) - assert_(an.nframes == len(self.u.trajectory) - 20) + assert_(an.nframes == self.frames - 20) an.run() - assert_(an.frames == range(20, len(self.u.trajectory))) + assert_(an.results['frames'] == range(20, self.frames)) + + for cores in range(1, cpu_count() + 1): + an_par = FrameAnalysis(self.u, start=20) + assert_(an_par.nframes == self.frames - 20) + an_par.run(parallel=True, nthreads=cores) + assert_(an_par.results['frames'] == range(20, self.frames)) def test_stop(self): an = FrameAnalysis(self.u, stop=20) assert_(an.nframes == 20) an.run() - assert_(an.frames == range(20)) + assert_(an.results['frames'] == range(20)) + + for cores in range(1, cpu_count() + 1): + an_par = FrameAnalysis(self.u, stop=20) + assert_(an_par.nframes == 20) + an_par.run(parallel=True, nthreads=cores) + assert_(an_par.results['frames'] == range(20)) def test_step(self): an = FrameAnalysis(self.u, step=20) assert_(an.nframes == 5) an.run() - assert_(an.frames == range(98)[::20]) + assert_(an.results['frames'] == range(98)[::20]) + + for cores in range(1, cpu_count() + 1): + an_par = FrameAnalysis(self.u, step=20) + assert_(an_par.nframes == 5) + an_par.run(parallel=True, nthreads=cores) + assert_(an_par.results['frames'] == range(98)[::20])