Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions package/MDAnalysis/analysis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
"""
Analysis building blocks --- :mod:`MDAnalysis.analysis.base`
============================================================

A collection of useful building blocks for creating Analysis
classes.


"""

import numpy as np
import logging

Expand All @@ -34,7 +32,7 @@ class AnalysisBase(object):
"""Base class for defining multi frame analysis

The analysis base class is designed as a template for creating
multiframe analysis.
multiframe analysis.

The class implements the following methods:

Expand All @@ -56,21 +54,24 @@ class AnalysisBase(object):

_single_frame
Called after the trajectory is moved onto each new frame.

_conclude

Called once iteration on the trajectory is finished.
Apply normalisation and averaging to results here.

"""
def _setup_frames(self, trajectory, start=None,
stop=None, step=None):
self._trajectory = trajectory
start, stop, step = trajectory.check_slice_indices(
start, stop, step)
self.start = start
self.stop = stop
self.step = step
self.nframes = len(xrange(start, stop, step))
def _setup_frames(self, trajectory, start=None, stop=None, step=None):

if trajectory is None:
pass
else:
self._trajectory = trajectory

start, stop, step = trajectory.check_slice_indices(start, stop, step)
self.start = start
self.stop = stop
self.step = step
self.nframes = len(xrange(start, stop, step))

def _single_frame(self):
"""Calculate data from a single frame of trajectory
Expand All @@ -85,7 +86,6 @@ def _prepare(self):

def _conclude(self):
"""Finalise the results you've gathered.

Called at the end of the run() method to finish everything up.
"""
pass
Expand All @@ -96,10 +96,14 @@ def run(self):
self._prepare()
for i, ts in enumerate(
self._trajectory[self.start:self.stop:self.step]):
self._ts = ts
#logger.info("--> Doing frame {} of {}".format(i+1, self.nframes))
self._single_frame()
logger.info("Finishing up")
self._conclude()


def __getstate__(self):
state = dict(self.__dict__)
key = '_trajectory'
if key in state:
del state ['_trajectory']
return state
59 changes: 59 additions & 0 deletions package/MDAnalysis/analysis/electromagnetism.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from base import AnalysisBase
import numpy as np

class TotalDipole(AnalysisBase):

def __init__(self, trajectory=None, filename='order.dat', selection=None, start=None, stop=None, step=None):

if selection is None:
raise RuntimeError('In class TotalDipole: constroctur requires a selection')
else:
self.selection = selection

self.filename = filename
self.time = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two last lines should be
(reduced space)

self.filename = filename
self.time = []

and so on for other lines (if needed).


self._setup_frames(trajectory, start, stop, step)
self.dipoles = []

def _single_frame(self):
selection = self.selection

selection.wrap(compound='residues')

dipole = np.zeros(3)

for residue in selection.residues:
dipole += MolecularDipole(residue)

self.dipoles.append(dipole)

def _conclude(self):
total_dipole = np.sum(self.dipoles, axis=0)
print "Average dipole:", total_dipole/self.nframes
return total_dipole/self.nframes

def __iadd__(self,other):
self.dipoles += other.dipoles
self.nframes += other.nframes
return self


#--- Functions ---#
def MolecularDipole(residue):
charges = residue.charges
abscharges = np.absolute(charges)
charge_sum = np.sum(abscharges)
positions = residue.positions

charge_center = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be replaced by:

charge_center = (positions * abscharges[:, None]).sum(axis=0) / charge_sum

Where the weird [:, None] is making the smaller array broadcast onto the larger one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip! Unfortunately I'm quite new to both python and numpy (I mainly work with Fortran...) so I apologize for the awkward code here and there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, Fortran is my first language too


for coord in [0,1,2]:
charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum)

dipole = []
# 4.803 converts to debyes
for coord in [0,1,2]:
dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803)

return dipole
244 changes: 244 additions & 0 deletions package/MDAnalysis/analysis/parallel_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
import multiprocessing as mp
import copy
from operator import itemgetter

# For fancy progressbar
try:
from blessings import Terminal
from progressive.bar import Bar
from progressive.tree import ProgressTree, Value, BarDescriptor
except:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should avoid bare except. Use

except ImportError:
    ...

https://docs.python.org/2/howto/doanddont.html#except

pass

class ParallelProcessor():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be

class ParallelProcessor(object) # new style


def __init__(self, list_of_jobs, trajectory, start=None, stop=None, step=None, threads=None,
progressbar=True):
self._trajectory = trajectory

start, stop, step = trajectory.check_slice_indices(start, stop, step)

self.start = start
self.stop = stop
self.step = step
self.nframes = len(xrange(start, stop, step))

self.list_of_jobs = list_of_jobs

if threads == None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might use 'threads is None`

self.threads = mp.cpu_count()
elif threads > mp.cpu_count():
self.threads = mp.cpu_count()
else:
self.threads = threads

self.progressbar = progressbar


def slices(self):
# This function returns a list containing the start and
# last configuration to be analyzed from each thread

threads = self.threads # Get number of threads from initialization
step = self.step
configs = (self.stop-self.start)/step

# Check whether the number of threads is higher than
# the number of trajectories to be analyzed
while (configs/threads == 0):
threads -= 1

self.threads = threads # Update the number of threads

print "Total configurations: "+str(configs)
print "Analysis running on ", threads, " threads.\n"

# Number of cfgs for each thread, and remainder to be added
thread_cfg = configs/threads
reminder = configs%threads

slices = []
beg = self.start

# Compute the start and last configurations
for thread in range(0,threads):
if thread < reminder:
end = beg+step*thread_cfg
else:
end = beg+step*(thread_cfg-1)

slices.append([beg, end+1])
beg = end+step

# Print on screen the configurations assigned to each thread
for thread in range(threads):
confs = 1+(slices[thread][1]-1-slices[thread][0])/step
digits = len(str(self.stop))
line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \
+str(slices[thread][0]).rjust(digits)+"/" \
+str(slices[thread][1]-1).rjust(digits) \
+" | Configurations: "+str(confs).rjust(1+len(str(thread_cfg)))
print line

print
return slices

def job_copies(self):
# This function creates n=threads copies of the analysis
# objects submitted from the user
threads = self.threads
job_list = []

for thread in range(threads):
job_list.append([ copy.deepcopy(job) for job in self.list_of_jobs])

return job_list

def compute(self,job_list,start,stop,step, out_queue, order,progress,error):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add more space.

# Run the single_frame method for each analysis object for all
# the trajectories in the batch
count = 0 # just to track IO errors

try:
for i, ts in enumerate(self._trajectory[start:stop:step]):
count = i
for job in job_list:
job._single_frame()
progress.put(order) # Updates the progress bar

out_queue.put((job_list,order)) # Returns the results
except:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bare except

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're still just looking at this at a design level, so we don't need this level of detail yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hainm , thanks for the style recommendations, I will look into them as soon as the design of the class is stable!

@richardjgowers, I have rewritten the both the AnalysisBase and ParallelProcessor classes so that child processes have their own Universe. It seems to work fine and there are no problems so far with concurrent reads. How should I proceed in this case, should I create a new, separate pull request?

As for the AnalysisBase method vs Manager class issue, I I shall do some benchmarks - I can do that at work where I have some pretty big trajectories.

For the task time issue, let me write an example. If I have two threads and three different analysis objects, and a trajectory with 100 configurations, we get this:

Thread Task(s) Frames
Thread 1 Analysis1, Analysis2, Analysis3 0 to 49
Thread 2 Analysis1, Analysis2, Analysis3 50 to 99

So, if I'm not missing something (might as well be), since each thread share the same kind of analysis objects, they should, and in my tests do, pretty much terminate their tasks at about the same time.

Thanks for you feedback!

error.put([order, start+count*step])

def prepare(self,pool,slices):
# This function runs the setup_frame method from AnalysisBase
# and the prepare method from the analysis object. Each job
# object is initialized using start and stop from the slice function
for thread, elem in zip(pool,slices):
for job in thread:
job._setup_frames(self._trajectory,start=elem[0],stop=elem[1],step=self.step)
job._prepare()

def conclude(self,list_of_jobs):
# Run conclude for each job object
for job in list_of_jobs:
job._conclude()

def parallel_run(self):
# Returns indices for splitting the trajectory
slices = self.slices()

# Create copies of the original object to be
# dispatched to multiprocess
pool = self.job_copies()
threads = self.threads

self.prepare(pool,slices)

# Queues for the communication between parent and child processes
out_queue = mp.Manager().Queue()
progress = mp.Manager().Queue()
error = mp.Manager().Queue()

# Prepare multiprocess objects
processes = [ mp.Process(target=self.compute, args=(batch, elem[0],elem[1],self.step,out_queue,order,progress,error))
for order, (batch, elem) in enumerate(zip(pool,slices)) ]


# Run processes
for p in processes: p.start()

thread_configs = [ 1+(elem[1]-elem[0]-1)/self.step for elem in slices ]

if self.progressbar:
try:
# Initialize progress bar
readings = [ Value(0) for i in range(threads) ]
total_cfg = Value(0)
bd_defaults = [ dict(type=Bar, kwargs=dict(max_value=cfg)) for cfg in thread_configs ]
bd_defaults.append( dict(type=Bar, kwargs=dict(max_value=sum(thread_configs))) ) # add total cfgs for total counter

data_tuple = [ ("Core "+str(thread+1).rjust(len(str(threads))),
BarDescriptor(value=readings[thread], **bd_defaults[thread])) for thread in range(threads) ]

data = dict( (key,value) for (key,value) in data_tuple )

data.update({'Total': BarDescriptor(value=total_cfg, **bd_defaults[-1])})

# Create blessings.Terminal instance
t = Terminal()
# Initialize a ProgressTree instance
n = ProgressTree(term=t)
# Make room for the progress bar
n.make_room(data)

# Updates progress bar
while any([ p.is_alive() for p in processes ]):
while not progress.empty():
core = progress.get()
n.cursor.restore()
readings[core].value += 1
total_cfg.value += 1
n.draw(data, BarDescriptor(bd_defaults))

print # lets leave some free space
except:
data = {}
for thread in range(threads):
data[thread]=0
while any([ p.is_alive() for p in processes ]):
while not progress.empty():
core = progress.get()
data[core] += 1

for thread in range(threads):
print "Core "+str(thread)+": "+str(data[thread])+"/"+str(thread_configs[thread])

print "\r",

print "\033["+str(threads+1)+"A"

print threads*"\n"


# Exit the completed processes
for p in processes: p.join()

unread_configurations = []

# Collect errors from queues
while not error.empty():
unread_configurations.append(error.get())

if len(unread_configurations) != 0:
unread_counter = 0
for error in unread_configurations:
unread_counter += (slices[error[0]][1]-error[1])/self.step
print "Sorry, there were", unread_counter, "unread configurations."
for error in sorted(unread_configurations, key=itemgetter(0)):
print "Core", error[0]+1, "did not read from configuration", error[1]+1,\
"to", slices[error[0]][1]

print "\n"


results = []

# Collects results from the queue
while not out_queue.empty():
results.append(out_queue.get())

jobs_num = len(self.list_of_jobs)

result_list = []

# Sum the job objects from each thread
for job in range(jobs_num):
for order, thread in enumerate(sorted(results, key=itemgetter(1))):
if order == 0:
result_list.append(thread[0][job])
else:
result_list[job] += thread[0][job]

# Run the conclude function for each job
self.conclude(result_list)
Loading