From f974a805baab1b442744b001f5c628e1d86b0b03 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Tue, 12 Jan 2016 15:15:49 +0100 Subject: [PATCH 1/2] Modified base.py to work with parallel_job.py, added total dipole moment test analysis (electromagnetism.py), added parallel_jobs.py module, added test for the parallel_jobs.py module --- package/MDAnalysis/analysis/base.py | 48 ++-- .../MDAnalysis/analysis/electromagnetism.py | 59 +++++ package/MDAnalysis/analysis/parallel_jobs.py | 244 ++++++++++++++++++ .../MDAnalysisTests/test_parallel_jobs.py | 33 +++ 4 files changed, 359 insertions(+), 25 deletions(-) create mode 100644 package/MDAnalysis/analysis/electromagnetism.py create mode 100644 package/MDAnalysis/analysis/parallel_jobs.py create mode 100644 testsuite/MDAnalysisTests/test_parallel_jobs.py diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index f3a722b985d..22a237602b1 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -17,11 +17,8 @@ """ Analysis building blocks --- :mod:`MDAnalysis.analysis.base` ============================================================ - A collection of useful building blocks for creating Analysis classes. - - """ import numpy as np import logging @@ -32,12 +29,9 @@ class AnalysisBase(object): """Base class for defining multi frame analysis - The analysis base class is designed as a template for creating multiframe analysis. - The class implements the following methods: - _setup_frames(trajectory, start=None, stop=None, step=None) Pass a Reader object and define the desired iteration pattern through the trajectory @@ -45,32 +39,30 @@ class AnalysisBase(object): run The user facing run method. Calls the analysis methods defined below - Your analysis can implement the following methods, which are called from run: - _prepare Called before iteration on the trajectory has begun. Data structures can be set up at this time, however most error checking should be done in the __init__ - _single_frame Called after the trajectory is moved onto each new frame. - _conclude Called once iteration on the trajectory is finished. Apply normalisation and averaging to results here. - """ - def _setup_frames(self, trajectory, start=None, - stop=None, step=None): - self._trajectory = trajectory - start, stop, step = trajectory.check_slice_indices( - start, stop, step) - self.start = start - self.stop = stop - self.step = step - self.nframes = len(xrange(start, stop, step)) + def _setup_frames(self, trajectory, start=None, stop=None, step=None): + + if trajectory is None: + pass + else: + self._trajectory = trajectory + + start, stop, step = trajectory.check_slice_indices(start, stop, step) + self.start = start + self.stop = stop + self.step = step + self.nframes = len(xrange(start, stop, step)) def _single_frame(self): """Calculate data from a single frame of trajectory @@ -85,21 +77,27 @@ def _prepare(self): def _conclude(self): """Finalise the results you've gathered. - Called at the end of the run() method to finish everything up. """ pass - + def run(self): """Perform the calculation""" logger.info("Starting preparation") self._prepare() for i, ts in enumerate( self._trajectory[self.start:self.stop:self.step]): - self._ts = ts - #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) + logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) self._single_frame() logger.info("Finishing up") self._conclude() - + #def __iadd__(self,other): + # return self + + def __getstate__(self): + state = dict(self.__dict__) + key = '_trajectory' + if key in state: + del state ['_trajectory'] + return state diff --git a/package/MDAnalysis/analysis/electromagnetism.py b/package/MDAnalysis/analysis/electromagnetism.py new file mode 100644 index 00000000000..ed22c61c095 --- /dev/null +++ b/package/MDAnalysis/analysis/electromagnetism.py @@ -0,0 +1,59 @@ +from base import AnalysisBase +import numpy as np + +class TotalDipole(AnalysisBase): + + def __init__(self, trajectory=None, filename='order.dat', selection=None, start=None, stop=None, step=None): + + if selection is None: + raise RuntimeError('In class OrientationalOrder: constructur requires a selection') + else: + self.selection = selection + + self.filename = filename + self.time = [] + + self._setup_frames(trajectory, start, stop, step) + self.dipoles = [] + + def _single_frame(self): + selection = self.selection + + selection.wrap(compound='residues') + + dipole = np.zeros(3) + + for residue in selection.residues: + dipole += MolecularDipole(residue) + + self.dipoles.append(dipole) + + def _conclude(self): + total_dipole = np.sum(self.dipoles, axis=0) + print "Average dipole:", total_dipole/self.nframes + return total_dipole/self.nframes + + def __iadd__(self,other): + self.dipoles += other.dipoles + self.nframes += other.nframes + return self + + +#--- Functions ---# +def MolecularDipole(residue): + charges = residue.charges + abscharges = np.absolute(charges) + charge_sum = np.sum(abscharges) + positions = residue.positions + + charge_center = [] + + for coord in [0,1,2]: + charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum) + + dipole = [] + # 4.803 is to convert in debyes + for coord in [0,1,2]: + dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803) + + return dipole diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py new file mode 100644 index 00000000000..717aa635b52 --- /dev/null +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -0,0 +1,244 @@ +import multiprocessing as mp +import copy +from operator import itemgetter + +# For fancy progressbar +try: + from blessings import Terminal + from progressive.bar import Bar + from progressive.tree import ProgressTree, Value, BarDescriptor +except: + pass + +class ParallelProcessor(): + + def __init__(self, list_of_jobs, trajectory, start=None, stop=None, step=None, threads=None, + progressbar=True): + self._trajectory = trajectory + + start, stop, step = trajectory.check_slice_indices(start, stop, step) + + self.start = start + self.stop = stop + self.step = step + self.nframes = len(xrange(start, stop, step)) + + self.list_of_jobs = list_of_jobs + + if threads == None: + self.threads = mp.cpu_count() + elif threads > mp.cpu_count(): + self.threads = mp.cpu_count() + else: + self.threads = threads + + self.progressbar = progressbar + + + def slices(self): + # This function returns a list containing the start and + # last configuration to be analyzed from each thread + + threads = self.threads # Get number of threads from initialization + step = self.step + configs = (self.stop-self.start)/step + + # Check whether the number of threads is higher than + # the number of trajectories to be analyzed + while (configs/threads == 0): + threads -= 1 + + self.threads = threads # Update the number of threads + + print "Total configurations: "+str(configs) + print "Analysis running on ", threads, " threads.\n" + + # Number of cfgs for each thread, and remainder to be added + thread_cfg = configs/threads + reminder = configs%threads + + slices = [] + beg = self.start + + # Compute the start and last configurations + for thread in range(0,threads): + if thread < reminder: + end = beg+step*thread_cfg + else: + end = beg+step*(thread_cfg-1) + + slices.append([beg, end+1]) + beg = end+step + + # Print on screen the configurations assigned to each thread + for thread in range(threads): + confs = 1+(slices[thread][1]-1-slices[thread][0])/step + digits = len(str(self.stop)) + line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \ + +str(slices[thread][0]).rjust(digits)+"/" \ + +str(slices[thread][1]-1).rjust(digits) \ + +" | Configurations: "+str(confs).rjust(1+len(str(thread_cfg))) + print line + + print + return slices + + def job_copies(self): + # This function creates n=threads copies of the analysis + # objects submitted from the user + threads = self.threads + job_list = [] + + for thread in range(threads): + job_list.append([ copy.deepcopy(job) for job in self.list_of_jobs]) + + return job_list + + def compute(self,job_list,start,stop,step, out_queue, order,progress,error): + # Run the single_frame method for each analysis object for all + # the trajectories in the batch + count = 0 # just to track IO errors + + try: + for i, ts in enumerate(self._trajectory[start:stop:step]): + count = i + for job in job_list: + job._single_frame() + progress.put(order) # Updates the progress bar + + out_queue.put((job_list,order)) # Returns the results + except: + error.put([order, start+count*step]) + + def prepare(self,pool,slices): + # This function runs the setup_frame method from AnalysisBase + # and the prepare method from the analysis object. Each job + # object is initialized using start and stop from the slice function + for thread, elem in zip(pool,slices): + for job in thread: + job._setup_frames(self._trajectory,start=elem[0],stop=elem[1],step=self.step) + job._prepare() + + def conclude(self,list_of_jobs): + # Run conclude for each job object + for job in list_of_jobs: + job._conclude() + + def parallel_run(self): + # Returns indices for splitting the trajectory + slices = self.slices() + + # Create copies of the original object to be + # dispatched to multiprocess + pool = self.job_copies() + threads = self.threads + + self.prepare(pool,slices) + + # Queues for the communication between parent and child processes + out_queue = mp.Manager().Queue() + progress = mp.Manager().Queue() + error = mp.Manager().Queue() + + # Prepare multiprocess objects + processes = [ mp.Process(target=self.compute, args=(batch, elem[0],elem[1],self.step,out_queue,order,progress,error)) + for order, (batch, elem) in enumerate(zip(pool,slices)) ] + + + # Run processes + for p in processes: p.start() + + thread_configs = [ 1+(elem[1]-elem[0]-1)/self.step for elem in slices ] + + if self.progressbar: + try: + # Initialize progress bar + readings = [ Value(0) for i in range(threads) ] + total_cfg = Value(0) + bd_defaults = [ dict(type=Bar, kwargs=dict(max_value=cfg)) for cfg in thread_configs ] + bd_defaults.append( dict(type=Bar, kwargs=dict(max_value=sum(thread_configs))) ) # add total cfgs for total counter + + data_tuple = [ ("Core "+str(thread+1).rjust(len(str(threads))), + BarDescriptor(value=readings[thread], **bd_defaults[thread])) for thread in range(threads) ] + + data = dict( (key,value) for (key,value) in data_tuple ) + + data.update({'Total': BarDescriptor(value=total_cfg, **bd_defaults[-1])}) + + # Create blessings.Terminal instance + t = Terminal() + # Initialize a ProgressTree instance + n = ProgressTree(term=t) + # Make room for the progress bar + n.make_room(data) + + # Updates progress bar + while any([ p.is_alive() for p in processes ]): + while not progress.empty(): + core = progress.get() + n.cursor.restore() + readings[core].value += 1 + total_cfg.value += 1 + n.draw(data, BarDescriptor(bd_defaults)) + + print # lets leave some free space + except: + data = {} + for thread in range(threads): + data[thread]=0 + while any([ p.is_alive() for p in processes ]): + while not progress.empty(): + core = progress.get() + data[core] += 1 + + for thread in range(threads): + print "Core "+str(thread)+": "+str(data[thread])+"/"+str(thread_configs[thread]) + + print "\r", + + print "\033["+str(threads+1)+"A" + + print threads*"\n" + + + # Exit the completed processes + for p in processes: p.join() + + unread_configurations = [] + + # Collect errors from queues + while not error.empty(): + unread_configurations.append(error.get()) + + if len(unread_configurations) != 0: + unread_counter = 0 + for error in unread_configurations: + unread_counter += (slices[error[0]][1]-error[1])/self.step + print "Sorry, there were", unread_counter, "unread configurations." + for error in sorted(unread_configurations, key=itemgetter(0)): + print "Core", error[0]+1, "did not read from configuration", error[1]+1,\ + "to", slices[error[0]][1] + + print "\n" + + + results = [] + + # Collects results from the queue + while not out_queue.empty(): + results.append(out_queue.get()) + + jobs_num = len(self.list_of_jobs) + + result_list = [] + + # Sum the job objects from each thread + for job in range(jobs_num): + for order, thread in enumerate(sorted(results, key=itemgetter(1))): + if order == 0: + result_list.append(thread[0][job]) + else: + result_list[job] += thread[0][job] + + # Run the conclude function for each job + self.conclude(result_list) diff --git a/testsuite/MDAnalysisTests/test_parallel_jobs.py b/testsuite/MDAnalysisTests/test_parallel_jobs.py new file mode 100644 index 00000000000..0258e0cea2b --- /dev/null +++ b/testsuite/MDAnalysisTests/test_parallel_jobs.py @@ -0,0 +1,33 @@ +import MDAnalysis as mda +import numpy as np +import MDAnalysis.analysis.parallel_jobs as pj +import MDAnalysis.analysis.electromagnetism as electromagnetism + +from numpy.testing import * +import time + +from MDAnalysisTests.datafiles import (DCD,PSF) + +class TestParallel(TestCase): + def setUp(self): + self.system = mda.Universe(PSF,DCD) + self.selection = self.system.select_atoms('all') + self.traj = self.system.trajectory + + # Single thread analysis + print "Single thread analysis:" + start_time = time.time() + single_analysis = electromagnetism.TotalDipole(self.traj,selection=self.selection) + self.single_result = single_analysis.run() + print("--- %s seconds ---" % (time.time() - start_time)) + + def test_parallel(self): + print "Parallel analysis:" + start_time = time.time() + jobs = [ electromagnetism.TotalDipole(selection = self.selection) ] + process = pj.ParallelProcessor(jobs,self.traj,progressbar=True) + assert_equal(self.single_result, process.parallel_run()) + print("--- %s seconds ---" % (time.time() - start_time)) + + def tearDown(self): + del self.system From 015a6ead890eafe81fb0e4c126dc229f7b31a260 Mon Sep 17 00:00:00 2001 From: mattihappy Date: Tue, 12 Jan 2016 15:24:43 +0100 Subject: [PATCH 2/2] Minor modifications --- package/MDAnalysis/analysis/base.py | 18 ++++++++++++------ .../MDAnalysis/analysis/electromagnetism.py | 4 ++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index 22a237602b1..4a879d8633d 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -20,6 +20,7 @@ A collection of useful building blocks for creating Analysis classes. """ + import numpy as np import logging @@ -29,9 +30,12 @@ class AnalysisBase(object): """Base class for defining multi frame analysis + The analysis base class is designed as a template for creating - multiframe analysis. + multiframe analysis. + The class implements the following methods: + _setup_frames(trajectory, start=None, stop=None, step=None) Pass a Reader object and define the desired iteration pattern through the trajectory @@ -39,17 +43,22 @@ class AnalysisBase(object): run The user facing run method. Calls the analysis methods defined below + Your analysis can implement the following methods, which are called from run: + _prepare Called before iteration on the trajectory has begun. Data structures can be set up at this time, however most error checking should be done in the __init__ + _single_frame Called after the trajectory is moved onto each new frame. _conclude + Called once iteration on the trajectory is finished. Apply normalisation and averaging to results here. + """ def _setup_frames(self, trajectory, start=None, stop=None, step=None): @@ -80,21 +89,18 @@ def _conclude(self): Called at the end of the run() method to finish everything up. """ pass - + def run(self): """Perform the calculation""" logger.info("Starting preparation") self._prepare() for i, ts in enumerate( self._trajectory[self.start:self.stop:self.step]): - logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) + #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) self._single_frame() logger.info("Finishing up") self._conclude() - #def __iadd__(self,other): - # return self - def __getstate__(self): state = dict(self.__dict__) key = '_trajectory' diff --git a/package/MDAnalysis/analysis/electromagnetism.py b/package/MDAnalysis/analysis/electromagnetism.py index ed22c61c095..fe450c6c3b8 100644 --- a/package/MDAnalysis/analysis/electromagnetism.py +++ b/package/MDAnalysis/analysis/electromagnetism.py @@ -6,7 +6,7 @@ class TotalDipole(AnalysisBase): def __init__(self, trajectory=None, filename='order.dat', selection=None, start=None, stop=None, step=None): if selection is None: - raise RuntimeError('In class OrientationalOrder: constructur requires a selection') + raise RuntimeError('In class TotalDipole: constroctur requires a selection') else: self.selection = selection @@ -52,7 +52,7 @@ def MolecularDipole(residue): charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum) dipole = [] - # 4.803 is to convert in debyes + # 4.803 converts to debyes for coord in [0,1,2]: dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803)