Skip to content

WIP: Parallel trajectory analysis#617

Closed
mattiafelice-palermo wants to merge 2 commits intoMDAnalysis:developfrom
mattiafelice-palermo:develop
Closed

WIP: Parallel trajectory analysis#617
mattiafelice-palermo wants to merge 2 commits intoMDAnalysis:developfrom
mattiafelice-palermo:develop

Conversation

@mattiafelice-palermo
Copy link
Contributor

With reference to the MDAnalysis Discussion --> link
Branch on my repository --> link

I added a new module parallel_jobs.py in MDAnalysis/analysis, which contains a class ParallelProcessor that should allow to distribute computations of AnalysisBase objects on multiple threads on multicore machines.

A typical setup for a parallel analysis should look as follows:

import MDAnalysis as mda
from MDAnalysisTests.datafiles import DCD, PSF
import parallel_jobs  as pj
import electromagnetism

system     = mda.Universe(PSF,DCD)
selection  = system.select_atoms('all')
traj       = system.trajectory

jobs = [ electromagnetism.TotalDipole(selection = selection) ]
process = pj.ParallelProcessor(jobs,traj,threads=4,progressbar=True)
process.parallel_run()

I made some minor modifications to the AnalysisBase class in order to work with ParallelProcessor.
The first one is to allow analysis object to be initialized without specifiying a trajectory, which is later specified when initializing ParallelProcessor.

The second one is to avoid the trajectory object to be pickled when child processes are spawned from the Process method of the multiprocessing module:

def __getstate__(self):
    state = dict(self.__dict__)
    key = '_trajectory'
    if key in state:
        del state ['_trajectory']
    return state

In order to work with the ParallelProcessor class, analysis objects should specify a iadd method, as at the end of the analysis the children processes are merged into a single one with the operator +=.

# From parallel_jobs.py
# Sum the job objects from each thread
for job in range(jobs_num):
    for order, thread in enumerate(sorted(results, key=itemgetter(1))):
        if order == 0:
            result_list.append(thread[0][job])
        else:
            result_list[job] += thread[0][job]
# Example of iadd method for a analysis object
def __iadd__(self,other):
    self.dipoles += other.dipoles
    self.nframes += other.nframes
    return self

The class is already functional, but it seems to be affected from problems when a high number concurrent reads are happening, so any input in this issue is welcome. To reproduce the issue, which happens randomly in my tests, a combination of running an analysis on a high number of threads and the use of a small selection of molecules (I guess because this leads to very short and thus frequent read operation on the file) is required.

I created an example module with an analysis class TotalDipole which is compatible with the class ParallelProcessor. Running it on the DCD test data from MDAnalysisTests as a single analysis object and through the ParallelProcessor class (four threads, on my laptop) gives an execution time of 2.7 seconds for the former and 0.9 seconds for the latter. Of course, with more cpu intensive jobs on larger trajectories, the performance gain is even higher.

Let me know if you think this could be added to the MDAnalysis package, or if you have any suggestion to improve the code, the interface or any idea on how to fix the problem of concurrent reads.

…ent test analysis (electromagnetism.py), added parallel_jobs.py module, added test for the parallel_jobs.py module
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be replaced by:

charge_center = (positions * abscharges[:, None]).sum(axis=0) / charge_sum

Where the weird [:, None] is making the smaller array broadcast onto the larger one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip! Unfortunately I'm quite new to both python and numpy (I mainly work with Fortran...) so I apologize for the awkward code here and there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, Fortran is my first language too

@richardjgowers
Copy link
Member

The concurrency problem isn't surprising as currently each trajectory reader fills the coordinate arrays in place and AtomGroups then just have a view onto this array. So for example...

Job1 Job2
Move to frame 1
Read my coordinates
Do some analysis Move to frame 2
Read my coordinates
Read some more coordinates Do some analysis

When job1 does its second read of coordinates, the frame has been changed by job2, so the data isn't what it should be for job1.

I'm not sure what the easiest or best solution is to this, but it's something we want to get working. I'll try and get a quick hackish fix working for now.....

Apart from that, I'd prefer if the parallel run method was just a method attached to AnalysisBase, so you could just do:

td = TotalDipole(etc etc etc)

td.run()
# OR
td.parallel_run()
# OR
td.run(parallel=True)

@mattiafelice-palermo
Copy link
Contributor Author

Ok I understand, it makes sense. So when it works (surprisingly very often), it is because the coordinate array is still there and wasn't modified by another child process. Let me know if you can come up with a hack for that, unfortunately I don't know the internals of the program enough to be of help.

As for attaching the method to AnalysisBase, I like the sintax you propose, but my approach of using a "Manager" class to manage the jobs was to try to optimize the computation. By batching different jobs together, each job could in theory access to the coordinates of the frame in the array stored in memory without requiring to re-read them from the disk each time. Access from an array in memory is definitely faster than reading coordinates for each job from a file on disk and would make analysis way faster. So my question is, during the following cycle:

for i, ts in enumerate(self._trajectory[start:stop:step]):
    for job in job_list:
        job._single_frame()

when executing each single_frame() method, are positions, charges, masses, etc. accessed from temporary arrays stored in memory or are they re-read from file each time for each job in job_list?

Anyway, it could be a good idea to have both the method of the anaylisis object and the "Manager" class, what do you think?

@richardjgowers
Copy link
Member

So your code snippet above, you're correct that using your method each frame would be read only once. But I think reading is usually quite fast compared to the actual analysis (but benchmark this and check!). And another problem is you're assuming that each analysis task takes a similar amount of time, but if one takes a large amount of time, then you'll slow all other tasks in the manager object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two last lines should be
(reduced space)

self.filename = filename
self.time = []

and so on for other lines (if needed).

@richardjgowers
Copy link
Member

Ah right, yeah that does sound like it'll work nicely.

If the code is an extra commit after this one then just push to your repo and it'll appear here, if it's a big rewrite then just make a new PR.

@richardjgowers richardjgowers changed the title Parallel trajectory analysis WIP: Parallel trajectory analysis Jan 13, 2016
@richardjgowers
Copy link
Member

@mattihappy by the way, we're putting together a release soon, so if you want to create a separate pull request for just the electromagnetism module in serial, we could include that.

@richardjgowers richardjgowers self-assigned this Jan 14, 2016
@mattiafelice-palermo
Copy link
Contributor Author

@richardjgowers I'm not sure, it's still a very basic module and not really optimized, I wrote it just so that a test could be run. Truth to be told, I was planning to use it to build a analysis class for the computation of the dielectric constant - so perhaps it's better to wait once the module is a bit more complete (and perform better).

Probably it's not the right place to discuss this, but shouldn't the dipole moment (maybe also the total charge?) of a group of atoms be included among the basic properties/methods of an AtomGroup together with e.g. mass, center_of_mass, centroid, etc? If it's off topic I can create an Issue (or, if it's not a difficult task, implement them myself)

@richardjgowers
Copy link
Member

Yeah it could and should be a method of an AtomGroup. That said, we're actually moving how things like this are loaded into a more flexible system. So to avoid AtomGroup containing every possible method it might require, we instead define various Attributes, which if loaded by the input files, augment the various classes in MDAnalysis.

So the Charge Attribute is here:
https://github.com/MDAnalysis/mdanalysis/blob/issue-363/package/MDAnalysis/core/topologyattrs.py#L448

And an example of how center_of_mass now works is here:
https://github.com/MDAnalysis/mdanalysis/blob/issue-363/package/MDAnalysis/core/topologyattrs.py#L360

So if an input file defines masses, then the center_of_mass method there gets transplanted into AtomGroup

If you wanted to add dipole into the Charge attribute that'd be awesome, you'd need to checkout the issue-363 branch and submit a PR onto that branch (not develop).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants