diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index f3a722b985d..4a879d8633d 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -17,12 +17,10 @@ """ Analysis building blocks --- :mod:`MDAnalysis.analysis.base` ============================================================ - A collection of useful building blocks for creating Analysis classes. - - """ + import numpy as np import logging @@ -34,7 +32,7 @@ class AnalysisBase(object): """Base class for defining multi frame analysis The analysis base class is designed as a template for creating - multiframe analysis. + multiframe analysis. The class implements the following methods: @@ -56,21 +54,24 @@ class AnalysisBase(object): _single_frame Called after the trajectory is moved onto each new frame. - _conclude + Called once iteration on the trajectory is finished. Apply normalisation and averaging to results here. """ - def _setup_frames(self, trajectory, start=None, - stop=None, step=None): - self._trajectory = trajectory - start, stop, step = trajectory.check_slice_indices( - start, stop, step) - self.start = start - self.stop = stop - self.step = step - self.nframes = len(xrange(start, stop, step)) + def _setup_frames(self, trajectory, start=None, stop=None, step=None): + + if trajectory is None: + pass + else: + self._trajectory = trajectory + + start, stop, step = trajectory.check_slice_indices(start, stop, step) + self.start = start + self.stop = stop + self.step = step + self.nframes = len(xrange(start, stop, step)) def _single_frame(self): """Calculate data from a single frame of trajectory @@ -85,7 +86,6 @@ def _prepare(self): def _conclude(self): """Finalise the results you've gathered. - Called at the end of the run() method to finish everything up. """ pass @@ -96,10 +96,14 @@ def run(self): self._prepare() for i, ts in enumerate( self._trajectory[self.start:self.stop:self.step]): - self._ts = ts #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) self._single_frame() logger.info("Finishing up") self._conclude() - + def __getstate__(self): + state = dict(self.__dict__) + key = '_trajectory' + if key in state: + del state ['_trajectory'] + return state diff --git a/package/MDAnalysis/analysis/electromagnetism.py b/package/MDAnalysis/analysis/electromagnetism.py new file mode 100644 index 00000000000..fe450c6c3b8 --- /dev/null +++ b/package/MDAnalysis/analysis/electromagnetism.py @@ -0,0 +1,59 @@ +from base import AnalysisBase +import numpy as np + +class TotalDipole(AnalysisBase): + + def __init__(self, trajectory=None, filename='order.dat', selection=None, start=None, stop=None, step=None): + + if selection is None: + raise RuntimeError('In class TotalDipole: constroctur requires a selection') + else: + self.selection = selection + + self.filename = filename + self.time = [] + + self._setup_frames(trajectory, start, stop, step) + self.dipoles = [] + + def _single_frame(self): + selection = self.selection + + selection.wrap(compound='residues') + + dipole = np.zeros(3) + + for residue in selection.residues: + dipole += MolecularDipole(residue) + + self.dipoles.append(dipole) + + def _conclude(self): + total_dipole = np.sum(self.dipoles, axis=0) + print "Average dipole:", total_dipole/self.nframes + return total_dipole/self.nframes + + def __iadd__(self,other): + self.dipoles += other.dipoles + self.nframes += other.nframes + return self + + +#--- Functions ---# +def MolecularDipole(residue): + charges = residue.charges + abscharges = np.absolute(charges) + charge_sum = np.sum(abscharges) + positions = residue.positions + + charge_center = [] + + for coord in [0,1,2]: + charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum) + + dipole = [] + # 4.803 converts to debyes + for coord in [0,1,2]: + dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803) + + return dipole diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py new file mode 100644 index 00000000000..717aa635b52 --- /dev/null +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -0,0 +1,244 @@ +import multiprocessing as mp +import copy +from operator import itemgetter + +# For fancy progressbar +try: + from blessings import Terminal + from progressive.bar import Bar + from progressive.tree import ProgressTree, Value, BarDescriptor +except: + pass + +class ParallelProcessor(): + + def __init__(self, list_of_jobs, trajectory, start=None, stop=None, step=None, threads=None, + progressbar=True): + self._trajectory = trajectory + + start, stop, step = trajectory.check_slice_indices(start, stop, step) + + self.start = start + self.stop = stop + self.step = step + self.nframes = len(xrange(start, stop, step)) + + self.list_of_jobs = list_of_jobs + + if threads == None: + self.threads = mp.cpu_count() + elif threads > mp.cpu_count(): + self.threads = mp.cpu_count() + else: + self.threads = threads + + self.progressbar = progressbar + + + def slices(self): + # This function returns a list containing the start and + # last configuration to be analyzed from each thread + + threads = self.threads # Get number of threads from initialization + step = self.step + configs = (self.stop-self.start)/step + + # Check whether the number of threads is higher than + # the number of trajectories to be analyzed + while (configs/threads == 0): + threads -= 1 + + self.threads = threads # Update the number of threads + + print "Total configurations: "+str(configs) + print "Analysis running on ", threads, " threads.\n" + + # Number of cfgs for each thread, and remainder to be added + thread_cfg = configs/threads + reminder = configs%threads + + slices = [] + beg = self.start + + # Compute the start and last configurations + for thread in range(0,threads): + if thread < reminder: + end = beg+step*thread_cfg + else: + end = beg+step*(thread_cfg-1) + + slices.append([beg, end+1]) + beg = end+step + + # Print on screen the configurations assigned to each thread + for thread in range(threads): + confs = 1+(slices[thread][1]-1-slices[thread][0])/step + digits = len(str(self.stop)) + line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \ + +str(slices[thread][0]).rjust(digits)+"/" \ + +str(slices[thread][1]-1).rjust(digits) \ + +" | Configurations: "+str(confs).rjust(1+len(str(thread_cfg))) + print line + + print + return slices + + def job_copies(self): + # This function creates n=threads copies of the analysis + # objects submitted from the user + threads = self.threads + job_list = [] + + for thread in range(threads): + job_list.append([ copy.deepcopy(job) for job in self.list_of_jobs]) + + return job_list + + def compute(self,job_list,start,stop,step, out_queue, order,progress,error): + # Run the single_frame method for each analysis object for all + # the trajectories in the batch + count = 0 # just to track IO errors + + try: + for i, ts in enumerate(self._trajectory[start:stop:step]): + count = i + for job in job_list: + job._single_frame() + progress.put(order) # Updates the progress bar + + out_queue.put((job_list,order)) # Returns the results + except: + error.put([order, start+count*step]) + + def prepare(self,pool,slices): + # This function runs the setup_frame method from AnalysisBase + # and the prepare method from the analysis object. Each job + # object is initialized using start and stop from the slice function + for thread, elem in zip(pool,slices): + for job in thread: + job._setup_frames(self._trajectory,start=elem[0],stop=elem[1],step=self.step) + job._prepare() + + def conclude(self,list_of_jobs): + # Run conclude for each job object + for job in list_of_jobs: + job._conclude() + + def parallel_run(self): + # Returns indices for splitting the trajectory + slices = self.slices() + + # Create copies of the original object to be + # dispatched to multiprocess + pool = self.job_copies() + threads = self.threads + + self.prepare(pool,slices) + + # Queues for the communication between parent and child processes + out_queue = mp.Manager().Queue() + progress = mp.Manager().Queue() + error = mp.Manager().Queue() + + # Prepare multiprocess objects + processes = [ mp.Process(target=self.compute, args=(batch, elem[0],elem[1],self.step,out_queue,order,progress,error)) + for order, (batch, elem) in enumerate(zip(pool,slices)) ] + + + # Run processes + for p in processes: p.start() + + thread_configs = [ 1+(elem[1]-elem[0]-1)/self.step for elem in slices ] + + if self.progressbar: + try: + # Initialize progress bar + readings = [ Value(0) for i in range(threads) ] + total_cfg = Value(0) + bd_defaults = [ dict(type=Bar, kwargs=dict(max_value=cfg)) for cfg in thread_configs ] + bd_defaults.append( dict(type=Bar, kwargs=dict(max_value=sum(thread_configs))) ) # add total cfgs for total counter + + data_tuple = [ ("Core "+str(thread+1).rjust(len(str(threads))), + BarDescriptor(value=readings[thread], **bd_defaults[thread])) for thread in range(threads) ] + + data = dict( (key,value) for (key,value) in data_tuple ) + + data.update({'Total': BarDescriptor(value=total_cfg, **bd_defaults[-1])}) + + # Create blessings.Terminal instance + t = Terminal() + # Initialize a ProgressTree instance + n = ProgressTree(term=t) + # Make room for the progress bar + n.make_room(data) + + # Updates progress bar + while any([ p.is_alive() for p in processes ]): + while not progress.empty(): + core = progress.get() + n.cursor.restore() + readings[core].value += 1 + total_cfg.value += 1 + n.draw(data, BarDescriptor(bd_defaults)) + + print # lets leave some free space + except: + data = {} + for thread in range(threads): + data[thread]=0 + while any([ p.is_alive() for p in processes ]): + while not progress.empty(): + core = progress.get() + data[core] += 1 + + for thread in range(threads): + print "Core "+str(thread)+": "+str(data[thread])+"/"+str(thread_configs[thread]) + + print "\r", + + print "\033["+str(threads+1)+"A" + + print threads*"\n" + + + # Exit the completed processes + for p in processes: p.join() + + unread_configurations = [] + + # Collect errors from queues + while not error.empty(): + unread_configurations.append(error.get()) + + if len(unread_configurations) != 0: + unread_counter = 0 + for error in unread_configurations: + unread_counter += (slices[error[0]][1]-error[1])/self.step + print "Sorry, there were", unread_counter, "unread configurations." + for error in sorted(unread_configurations, key=itemgetter(0)): + print "Core", error[0]+1, "did not read from configuration", error[1]+1,\ + "to", slices[error[0]][1] + + print "\n" + + + results = [] + + # Collects results from the queue + while not out_queue.empty(): + results.append(out_queue.get()) + + jobs_num = len(self.list_of_jobs) + + result_list = [] + + # Sum the job objects from each thread + for job in range(jobs_num): + for order, thread in enumerate(sorted(results, key=itemgetter(1))): + if order == 0: + result_list.append(thread[0][job]) + else: + result_list[job] += thread[0][job] + + # Run the conclude function for each job + self.conclude(result_list) diff --git a/testsuite/MDAnalysisTests/test_parallel_jobs.py b/testsuite/MDAnalysisTests/test_parallel_jobs.py new file mode 100644 index 00000000000..0258e0cea2b --- /dev/null +++ b/testsuite/MDAnalysisTests/test_parallel_jobs.py @@ -0,0 +1,33 @@ +import MDAnalysis as mda +import numpy as np +import MDAnalysis.analysis.parallel_jobs as pj +import MDAnalysis.analysis.electromagnetism as electromagnetism + +from numpy.testing import * +import time + +from MDAnalysisTests.datafiles import (DCD,PSF) + +class TestParallel(TestCase): + def setUp(self): + self.system = mda.Universe(PSF,DCD) + self.selection = self.system.select_atoms('all') + self.traj = self.system.trajectory + + # Single thread analysis + print "Single thread analysis:" + start_time = time.time() + single_analysis = electromagnetism.TotalDipole(self.traj,selection=self.selection) + self.single_result = single_analysis.run() + print("--- %s seconds ---" % (time.time() - start_time)) + + def test_parallel(self): + print "Parallel analysis:" + start_time = time.time() + jobs = [ electromagnetism.TotalDipole(selection = self.selection) ] + process = pj.ParallelProcessor(jobs,self.traj,progressbar=True) + assert_equal(self.single_result, process.parallel_run()) + print("--- %s seconds ---" % (time.time() - start_time)) + + def tearDown(self): + del self.system